Redis学习记录:lettuce哨兵模式接入拓扑刷新源码分析

  • 最近遇到一个问题,用户在用lettuce接入容器化redis时,出现failover后lettuce无法感知导致readonly的问题。

  • 本篇文章简要记录一下排查过程和涉及到的部分源码分析。

一、问题背景

用户使用lettuce以哨兵接入,在手动执行sentinel failover命令后,业务客户端出现持续的readonly报错,且无法自动恢复。

经过日志排查可以确认,在sentinel failover命令执行后,redis-operator存在自愈操作,将故障转移过程中的双主变为单主,即对新从节点执行了slaveof操作。

停止redis-operator调谐,再次手动执行sentinel failover命令,业务客户端可以自动感知主从切换,不再出现readonly报错。

二、问题原因

经过对比停止redis-operator调谐后的日志差异,发现核心在于哨兵的+convert-to-slave消息。在哨兵发出此消息后,lettuce打印拓扑刷新日志,后续连接指向新的master。

问题原因和流程大致也可以确认:

  1. 当用户手动触发故障转移后,redis提升一个slave为master,此时存在两个master。

  2. redis-operator监控到存在两个master,触发自愈流程:将一个master降级为slave并断开此节点的所有连接,最后reset哨兵。

  3. lettuce客户端拓扑无感知,连接被operator断开后,触发重连,但重连地址仍为旧master节点。

  4. 此时lettuce客户端执行写操作会报错readonly,且由于拓扑未刷新,导致后续也无法重连至新master节点。

无operator介入的常规流程为:

  1. 当用户手动触发故障转移后,redis提升一个slave为master,此时存在两个master。

  2. operator未干预哨兵故障转移,哨兵正常执行故障转移,主从切换相关事件正常推送。

  3. lettuce客户端订阅到相关事件,刷新拓扑;但存量连接地址合法,不会主动断开连接。

  4. 哨兵故障转移结束后,检测到存在多主,触发自愈流程:将一个master降级为slave,断开此节点的所有连接,并推送相关事件。

  5. lettuce客户端连接被哨兵断开后,触发重连,重连地址为新master节点。

  6. 此时lettuce客户端正常读写。

由此可以确认是operator介入且未推送事件,导致lettuce无法感知拓扑刷新导致。

这个问题解决主要从两个方向入手:

  1. 调整operator自愈逻辑

  2. 客户端捕获异常并适配

经过测试都可以解决这个问题,由此问题最终解决。

三、相关源码分析

redis源码版本: 6.2.x
lettuce源码版本: 6.x

1. 哨兵提主逻辑

以下为sentinel.c中的部分源码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
void sentinelRefreshInstanceInfo(sentinelRedisInstance *ri, const char *info) {
...
// 哨兵视角检测到节点预期状态为slave, 但是实际状态为master
if ((ri->flags & SRI_SLAVE) && role == SRI_MASTER) {
if ((ri->flags & SRI_PROMOTED) &&
(ri->master->flags & SRI_FAILOVER_IN_PROGRESS) &&
(ri->master->failover_state ==
SENTINEL_FAILOVER_STATE_WAIT_PROMOTION))
{
// 故障转移过程中
// 且此时新master尚未提升完成
// todo: 提主+更新状态
...
} else {
// 故障转移过程中新master已经稳定 / 非故障转移状态出现多主
// 实际表现就是存在预期master以外的非预期master
mstime_t wait_time = SENTINEL_PUBLISH_PERIOD*4;
if (!(ri->flags & SRI_PROMOTED) &&
sentinelMasterLooksSane(ri->master) &&
sentinelRedisInstanceNoDownFor(ri,wait_time) &&
mstime() - ri->role_reported_time > wait_time)
{
// 执行slaveof建立复制关系
int retval = sentinelSendSlaveOf(ri,ri->master->addr);
// 推送+convert-to-slave事件
if (retval == C_OK)
sentinelEvent(LL_NOTICE,"+convert-to-slave",ri,"%@");
}
}
}
...
}

哨兵建立复制关系的大致源码如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
int sentinelSendSlaveOf(sentinelRedisInstance *ri, const sentinelAddr *addr) {
...
// 这个函数内发起了一个事务, 主要执行流程如下:
// 1. 对此实例执行slaveof命令 建立复制关系
// 2. 重写此实例配置 刷到盘上
// 3. 杀掉此实例所有normal和pubsub类型的连接
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"MULTI"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s %s %s",
sentinelInstanceMapCommand(ri,"SLAVEOF"),
host, portstr);
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s REWRITE",
sentinelInstanceMapCommand(ri,"CONFIG"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;

for (int type = 0; type < 2; type++) {
retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s KILL TYPE %s",
sentinelInstanceMapCommand(ri,"CLIENT"),
type == 0 ? "normal" : "pubsub");
if (retval == C_ERR) return retval;
ri->link->pending_commands++;
}

retval = redisAsyncCommand(ri->link->cc,
sentinelDiscardReplyCallback, ri, "%s",
sentinelInstanceMapCommand(ri,"EXEC"));
if (retval == C_ERR) return retval;
ri->link->pending_commands++;
return C_OK;
}

所以实际上,当哨兵检测到多主时,会把多余的master降级为slave,并杀掉所有的连接。当这一套操作执行成功后,哨兵会推送+convert-to-slave事件。

2. lettuce拓扑刷新逻辑

从拓扑刷新的地方入手,如下为拓扑刷新的地方,可以看到触发拓扑刷新后,最终会执行setKnownNodes()函数更新拓扑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// SentinelConnector.java
class SentinelConnector<K, V> implements MasterReplicaConnector<K, V> {
...
private Runnable getTopologyRefreshRunnable(MasterReplicaTopologyRefresh refresh,
MasterReplicaConnectionProvider<K, V> connectionProvider) {

return () -> {
try {

LOG.debug("Refreshing topology");
refresh.getNodes(redisURI).subscribe(nodes -> {

EventRecorder.getInstance().record(new MasterReplicaTopologyChangedEvent(redisURI, nodes));

if (nodes.isEmpty()) {
LOG.warn("Topology refresh returned no nodes from {}", redisURI);
}

LOG.debug("New topology: {}", nodes);
connectionProvider.setKnownNodes(nodes);

}, t -> LOG.error("Error during background refresh", t));

} catch (Exception e) {
LOG.error("Error during background refresh", e);
}
};
}
}

setKnownNodes()函数内容如下,主要就是把旧拓扑删除后,把新的拓扑加进去,然后调用closeStaleConnections()函数。

closeStaleConnections()函数内部会调用getStaleConnectionKeys()函数获取过期的连接,然后把过期的连接主动close掉,此段代码逻辑较为简单,跳过代码展示。

lettuce判定连接过期的标准为:连接的地址不在当前拓扑内。也就是说lettuce在拓扑刷新后并不会主动断开连接,这个操作得由其他人来做。根据上文中对redis源码的分析,可以发现哨兵在执行完slaveof以后会主动断开连接。此时拓扑刷新lettuce就可以连接到最新的master。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// MasterReplicaConnectionProvider.java
class MasterReplicaConnectionProvider<K, V> {
...
public void setKnownNodes(Collection<RedisNodeDescription> knownNodes) {
synchronized (stateLock) {

this.knownNodes.clear();
this.knownNodes.addAll(knownNodes);

closeStaleConnections();
}
}
...
}

那么拓扑刷新如何触发?可以看到在使用connectAsync()创建异步连接时,会获取runnable函数,即拓扑刷新函数。

这个拓扑刷新函数会通过initializeConnection()函数来绑定到SentinelTopologyRefresh对象上。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// SentinelConnector.java
class SentinelConnector<K, V> implements MasterReplicaConnector<K, V> {
...
public CompletableFuture<StatefulRedisMasterReplicaConnection<K, V>> connectAsync() {
...
SentinelTopologyRefresh sentinelTopologyRefresh = new SentinelTopologyRefresh(redisClient,
redisURI.getSentinelMasterId(), redisURI.getSentinels());

MasterReplicaTopologyRefresh refresh = new MasterReplicaTopologyRefresh(redisClient, topologyProvider);
MasterReplicaConnectionProvider<K, V> connectionProvider = new MasterReplicaConnectionProvider<>(redisClient, codec,
redisURI, Collections.emptyMap());

Runnable runnable = getTopologyRefreshRunnable(refresh, connectionProvider); // 获取拓扑刷新函数

return refresh.getNodes(redisURI).flatMap(nodes -> {

if (nodes.isEmpty()) {
return Mono.error(new RedisException(String.format("Cannot determine topology from %s", redisURI)));
}

return initializeConnection(codec, sentinelTopologyRefresh, connectionProvider, runnable, nodes);
}).onErrorMap(ExecutionException.class, Throwable::getCause).toFuture();
}
...
private Mono<StatefulRedisMasterReplicaConnection<K, V>> initializeConnection(RedisCodec<K, V> codec,
SentinelTopologyRefresh sentinelTopologyRefresh, MasterReplicaConnectionProvider<K, V> connectionProvider,
Runnable runnable, List<RedisNodeDescription> nodes) {
...
CompletionStage<Void> bind = sentinelTopologyRefresh.bind(runnable); // 绑定拓扑刷新函数
...
}
}

查看SentinelTopologyRefresh的定义,发现其存在成员topologyRefresh用于触发拓扑刷新。

1
2
3
4
5
6
7
8
9
10
11
12
// SentinelTopologyRefresh.java
class SentinelTopologyRefresh implements AsyncCloseable, Closeable {
...
private final PubSubMessageActionScheduler topologyRefresh;
...
SentinelTopologyRefresh(RedisClient redisClient, String masterId, List<RedisURI> sentinels) {
...
this.topologyRefresh = new PubSubMessageActionScheduler(redisClient.getResources().eventExecutorGroup(),
new TopologyRefreshMessagePredicate(masterId));
...
}
}

继续深入查找MessagePredicate的实现类TopologyRefreshMessagePredicate,发现内部实现了test()方法用于校验是否需要刷新拓扑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class SentinelTopologyRefresh implements AsyncCloseable, Closeable {
...
private static final Set<String> PROCESSING_CHANNELS = new HashSet<>(
Arrays.asList("failover-end", "failover-end-for-timeout"));
...
private static class TopologyRefreshMessagePredicate implements MessagePredicate {
...
private Set<String> TOPOLOGY_CHANGE_CHANNELS = new HashSet<>(
Arrays.asList("+slave", "+sdown", "-sdown", "fix-slave-config", "+convert-to-slave", "+role-change"));
...
@Override
public boolean test(String channel, String message) {

// trailing spaces after the master name are not bugs
if (channel.equals("+elected-leader") || channel.equals("+reset-master")) {
if (message.startsWith(String.format("master %s ", masterId))) {
return true;
}
}

if (TOPOLOGY_CHANGE_CHANNELS.contains(channel)) {
if (message.contains(String.format("@ %s ", masterId))) {
return true;
}
}

if (channel.equals("+switch-master")) {
if (message.startsWith(String.format("%s ", masterId))) {
return true;
}
}

return PROCESSING_CHANNELS.contains(channel);
}
}
...
}

至此哨兵接入模式的调用链路已经大致完全,lettuce哨兵这边会根据多个哨兵事件来刷新拓扑,但是假如地址仍合法则不会主动断开连接。

当redis哨兵介入提主后会主动kill掉存量连接,则此时lettuce感知到连接断开,会依据此前刷新的拓扑来重新连接,恢复正常。

假如operator介入自愈,则可能会主动reset哨兵,哨兵由此无法正常发送事件,导致lettuce拓扑刷新异常,最终无法连接到正确的节点。

不过lettuce这个逻辑感觉还是很多坑的,比如lettuce不会主动断开连接且为异步连接,则假如连接非正常断开,则lettuce这边可能就无法感知连接断开,导致连接一直异常不会触发重连。不过新版本的好像新增了tcp探活配置来解决这个问题。