Redis学习记录:过期删除相关源码分析

  • 最近工作中修复了一个跟过期删除逻辑相关的bug,在排查的过程中也是看了一下这块的源码。

  • 本篇文章简要记录一下我对这部分源码的分析,方便后续查阅。

一、前言

Redis实现中,主要支持如下两种过期删除策略:

  • 惰性过期: 执行命令涉及到的时候才删除
  • 定期删除: 后台跑cron任务定期遍历过期key删除

下文中会把二者涉及到的源码和流程进行梳理,正常来讲二者配合可以很好的在CPU使用和内存占用之间取得平衡。

而最近工作中就是因为错误逻辑,导致二者配合出现了点问题,从而引发读写异常。

注:以下代码基于redis 6.2.16版本分析

二、惰性过期

惰性过期的核心函数expireIfNeeded()内容如下,此函数内会校验key是否过期,随后确保由主节点执行实际删除操作。

这块逻辑在执行读写key命令前都会被间接触发,以保证惰性过期的有效性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// db.c
int expireIfNeeded(redisDb *db, robj *key) {
// 校验key是否过期
if (!keyIsExpired(db,key)) return 0;

// slave跳过这个逻辑,仅master执行实际删除
if (server.masterhost != NULL) return 1;

if (checkClientPauseTimeoutAndReturnIfPaused()) return 1;

/* Delete the key */
deleteExpiredKeyAndPropagate(db,key);
return 1;
}

删除逻辑如下,上文expireIfNeeded()校验key需要删除时,调用此处逻辑按序执行删除、通知、落盘、传播一条龙。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// db.c
/* Delete the specified expired key and propagate expire. */
void deleteExpiredKeyAndPropagate(redisDb *db, robj *keyobj) {
mstime_t expire_latency;
latencyStartMonitor(expire_latency);
if (server.lazyfree_lazy_expire)
dbAsyncDelete(db,keyobj); // 惰性释放
else
dbSyncDelete(db,keyobj); // 删除
latencyEndMonitor(expire_latency);
latencyAddSampleIfNeeded("expire-del",expire_latency);
notifyKeyspaceEvent(NOTIFY_EXPIRED,"expired",keyobj,db->id);
signalModifiedKey(NULL, db, keyobj);
propagateExpire(db,keyobj,server.lazyfree_lazy_expire); // 落盘&传播
server.stat_expiredkeys++;
}

三、 定期删除

定期过期的核心函数activeExpireCycle()中的核心流程如下。

可以看到实际上就是在函数中去遍历每个db的过期key字典执行删除操作,当然里面有很多细节就不展开来讲了。

其中函数内会维护全局变量来记录每次检查的进度,每次检查的时候,通过检测key的数量、检测的时间等来限制资源的使用量,避免影响redis本身的服务。

在redis6之前,过期key的遍历是通过随机选择来进行的,可能会导致key长期未被清除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
// expire.c
void activeExpireCycle(int type) {
...
// 全局变量用于后续调用
static unsigned int current_db = 0; /* Next DB to test. */
static int timelimit_exit = 0; /* Time limit hit in previous call? */
static long long last_fast_cycle = 0; /* When last fast cycle ran. */

int j, iteration = 0;
int dbs_per_call = CRON_DBS_PER_CALL;
long long start = ustime(), timelimit, elapsed;
...
// 确认要检测的db
if (dbs_per_call > server.dbnum || timelimit_exit)
dbs_per_call = server.dbnum;
...
long total_sampled = 0;
long total_expired = 0;

for (j = 0; j < dbs_per_call && timelimit_exit == 0; j++) {
unsigned long expired, sampled;
redisDb *db = server.db+(current_db % server.dbnum);
current_db++;

do {
...
// db没过期key直接跳过
if ((num = dictSize(db->expires)) == 0) {
db->avg_ttl = 0;
break;
}
...
// 此处会限制检测的key数量(sampled)
while (sampled < num && checked_buckets < max_buckets) {
for (int table = 0; table < 2; table++) {
if (table == 1 && !dictIsRehashing(db->expires)) break;

unsigned long idx = db->expires_cursor;
idx &= db->expires->ht[table].sizemask;
dictEntry *de = db->expires->ht[table].table[idx];
long long ttl;

// 遍历过期字典桶
checked_buckets++;
while(de) {
/* Get the next entry now since this entry may get
* deleted. */
dictEntry *e = de;
de = de->next;

ttl = dictGetSignedIntegerVal(e)-now;
// 尝试删除过期key 最终也是调用deleteExpiredKeyAndPropagate()函数
if (activeExpireCycleTryExpire(db,e,now)) expired++;
// 此处后续计算平均ttl用
if (ttl > 0) {
ttl_sum += ttl;
ttl_samples++;
}
sampled++;
}
}
db->expires_cursor++;
}
total_expired += expired;
total_sampled += sampled;

// 更新平均ttl
if (ttl_samples) {
long long avg_ttl = ttl_sum/ttl_samples;

/* Do a simple running average with a few samples.
* We just use the current estimate with a weight of 2%
* and the previous estimate with a weight of 98%. */
if (db->avg_ttl == 0) db->avg_ttl = avg_ttl;
db->avg_ttl = (db->avg_ttl/50)*49 + (avg_ttl/50);
}

// 计算本轮删除是否到时间
// 所以这个地方实际上是某个db每个哈希桶校验完成后进行时间校验的
if ((iteration & 0xf) == 0) { /* check once every 16 iterations. */
elapsed = ustime()-start;
if (elapsed > timelimit) {
timelimit_exit = 1;
server.stat_expired_time_cap_reached_count++;
break;
}
}
} while (sampled == 0 ||
(expired*100/sampled) > config_cycle_acceptable_stale);
}

elapsed = ustime()-start;
server.stat_expire_cycle_time_used += elapsed;
latencyAddSampleIfNeeded("expire-cycle",elapsed/1000);

/* Update our estimate of keys existing but yet to be expired.
* Running average with this sample accounting for 5%. */
double current_perc;
if (total_sampled) {
current_perc = (double)total_expired/total_sampled;
} else
current_perc = 0;
server.stat_expired_stale_perc = (current_perc*0.05)+
(server.stat_expired_stale_perc*0.95);
}

定期删除逻辑相关调用处如下,实际最终只有master会执行定期删除逻辑,执行频率是每秒HZ次。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// server.c
void databasesCron(void) {
/* Expire keys by random sampling. Not required for slaves
* as master will synthesize DELs for us. */
if (server.active_expire_enabled) {
if (iAmMaster()) {
activeExpireCycle(ACTIVE_EXPIRE_CYCLE_SLOW);
} else {
expireSlaveKeys();
}
}
...
}

int serverCron(struct aeEventLoop *eventLoop, long long id, void *clientData) {
...
/* Handle background operations on Redis databases. */
databasesCron();
...
}

四、拓展-惰性释放

这个其实跟上面的过期删除策略已经没关系了,这个主要是删除的方式。

常规来讲Redis的读写是线性的,如果某个key太过巨大,就有可能会导致线程阻塞,影响服务。

所以在Redis4版本中引入了新特性lazy free并提供了unlink命令。过期key的惰性释放可以通过配置lazyfree-lazy-expire来启用。

下面是异步删除入口函数dbAsyncDelete()的代码,相比于同步删除入口函数dbSyncDelete(),其实就是多了bioCreateLazyFreeJob()相关的逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// lazyfree.c
int dbAsyncDelete(redisDb *db, robj *key) {

if (dictSize(db->expires) > 0) dictDelete(db->expires,key->ptr);

// 获取key实体
dictEntry *de = dictUnlink(db->dict,key->ptr);
if (de) {
robj *val = dictGetVal(de);

/* Tells the module that the key has been unlinked from the database. */
moduleNotifyKeyUnlink(key,val);

// 计算key的释放权重(其实差不多就是看里面资源数量)
size_t free_effort = lazyfreeGetFreeEffort(key,val);

// 如果要删的value有点多,就塞到bio里异步删
if (free_effort > LAZYFREE_THRESHOLD && val->refcount == 1) {
atomicIncr(lazyfree_objects,1);
bioCreateLazyFreeJob(lazyfreeFreeObject,1, val);
dictSetVal(db->dict,de,NULL);// value置空 等待bio异步删除
}
}

// 实际删除key和value,
if (de) {
dictFreeUnlinkedEntry(db->dict,de);
if (server.cluster_enabled) slotToKeyDel(key->ptr);
return 1;
} else {
return 0;
}
}

顺便看一下unlink命令和del命令的实现,发现实际上最终也是走的dbAsyncDelete()dbSyncDelete()函数,豁然开朗。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
void delCommand(client *c) {
delGenericCommand(c,server.lazyfree_lazy_user_del);
}

void unlinkCommand(client *c) {
delGenericCommand(c,1);
}

/* This command implements DEL and LAZYDEL. */
void delGenericCommand(client *c, int lazy) {
int numdel = 0, j;

for (j = 1; j < c->argc; j++) {
expireIfNeeded(c->db,c->argv[j]);
// 此处
int deleted = lazy ? dbAsyncDelete(c->db,c->argv[j]) :
dbSyncDelete(c->db,c->argv[j]);
if (deleted) {
signalModifiedKey(c,c->db,c->argv[j]);
notifyKeyspaceEvent(NOTIFY_GENERIC,
"del",c->argv[j],c->db->id);
server.dirty++;
numdel++;
}
}
addReplyLongLong(c,numdel);
}