SOFA-JRaft 源码解析:日志复制机制

与上一篇介绍的主节点选举一样,日志复制(Log Replication)同样是 Raft 算法的核心组成部分,是支撑 Raft 节点达成共识的基础。Raft 中的日志主要可以分为两类:一类是协议自身运行所生成的日志,例如集群节点配置变更信息;另外一类就是用户向集群提交的指令所生成的日志。为了让集群中的各个节点达成共识,Leader 节点需要将日志数据复制给集群中的各个节点,并采用投票机制让这些节点决定是否许可日志对应的操作。对于被许可的操作日志,各个节点会严格按照相同的顺序在本地进行存储,并重放日志对应的操作,以此实现节点之间的共识。

JRaft 在设计和实现层面为每个 Follower 和 Learner 节点都绑定了一个复制器 Replicator 实例,由 Replicator 负责向目标节点复制日志数据,Replicator 实例之间彼此相互隔离,互不影响,并由 ReplicatorGroup 进行统一管理。日志复制需要涉及到集群中节点之间的频繁通信和数据传输,所以需要保证复制操作的高性能,并且不允许出现乱序和断层。为此,JRaft 引入了多种优化策略,包括:Follower 节点之间并发复制、批量发送,以及 Pipeline 机制等。

日志复制从广义层面而言除了复制单条的 LogEntry 外,还包含向目标节点复制快照数据。本文我们重点关注对于 LogEntry 的复制,而对于快照数据的复制则留到下一篇介绍快照机制时再展开分析。

日志生成

在开始分析日志复制的运行机制之前,我打算先用一小节的篇幅介绍一下 JRaft 生成日志的过程。毕竟日志生成和复制是紧密关联的,了解 JRaft 如何生成一条日志有利于更好的理解后续复制日志的过程。前面曾介绍过,JRaft 中的日志主要可以分为两大类:一类是系统内部运行产生的日志;另外一类是用户主动往 JRaft 集群提交指令所产生的日志。本小节我们以后者为例,介绍 JRaft 日志从生成到写入本地存储系统的过程。

JRaft 提供了 Node#apply 交互接口以让业务向 JRaft 集群提交操作指令,这些指令以 Task 的形式在集群中流转,并以日志的形式记录到 Leader 节点中,同时同步给集群中所有的 Follower 节点,并最终透传给所有成功完成日志复制的集群节点状态机。

方法 Node#apply 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public void apply(final Task task) {
// 当前节点被关闭
if (this.shutdownLatch != null) {
Utils.runClosureInThread(task.getDone(), new Status(RaftError.ENODESHUTDOWN, "Node is shutting down."));
throw new IllegalStateException("Node is shutting down");
}
Requires.requireNonNull(task, "Null task");

// 创建一个 LogEntry 对象,用于封装 Task 中的数据
final LogEntry entry = new LogEntry();
entry.setData(task.getData());
int retryTimes = 0;
try {
// 将 Task 及其对应的 LogEntry 对象以事件的形式投递给 Disruptor 队列
final EventTranslator<LogEntryAndClosure> translator = (event, sequence) -> {
event.reset();
event.done = task.getDone();
event.entry = entry;
event.expectedTerm = task.getExpectedTerm();
};
while (true) {
if (this.applyQueue.tryPublishEvent(translator)) {
break;
} else {
// 重试 3 次
retryTimes++;
if (retryTimes > MAX_APPLY_RETRY_TIMES) {
Utils.runClosureInThread(task.getDone(),
new Status(RaftError.EBUSY, "Node is busy, has too many tasks."));
LOG.warn("Node {} applyQueue is overload.", getNodeId());
this.metrics.recordTimes("apply-task-overload-times", 1);
return;
}
ThreadHelper.onSpinWait();
}
}

} catch (final Exception e) {
LOG.error("Fail to apply task.", e);
Utils.runClosureInThread(task.getDone(), new Status(RaftError.EPERM, "Node is down."));
}
}

上述实现只是简单的将承载用户操作指令的 Task 封装成 LogEntry 对象,并以事件的形式投递给 Disruptor 队列进行异步处理,用户可以通过 Task 的 Task#done 字段感知任务被状态机处理的响应状态。LogEntryAndClosureHandler 实现了 EventHandler 接口,用于消费 Disruptor 队列中的事件,具体的处理逻辑由 NodeImpl#executeApplyingTasks 方法完成:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
private void executeApplyingTasks(final List<LogEntryAndClosure> tasks) {
this.writeLock.lock();
try {
final int size = tasks.size();
// 只有 Leader 节点允许处理 Task
if (this.state != State.STATE_LEADER) {
final Status st = new Status();
if (this.state != State.STATE_TRANSFERRING) {
st.setError(RaftError.EPERM, "Is not leader.");
} else {
st.setError(RaftError.EBUSY, "Is transferring leadership.");
}
LOG.debug("Node {} can't apply, status={}.", getNodeId(), st);
final List<LogEntryAndClosure> savedTasks = new ArrayList<>(tasks);
// 快速失败
Utils.runInThread(() -> {
for (int i = 0; i < size; i++) {
savedTasks.get(i).done.run(st);
}
});
return;
}
final List<LogEntry> entries = new ArrayList<>(size);
// 遍历处理 Task 集合
for (int i = 0; i < size; i++) {
final LogEntryAndClosure task = tasks.get(i);
// 如果 Task 期望校验 term 值,则校验当前节点的 term 值是否是期望的 term 值
if (task.expectedTerm != -1 && task.expectedTerm != this.currTerm) {
LOG.debug("Node {} can't apply task whose expectedTerm={} doesn't match currTerm={}.",
getNodeId(), task.expectedTerm, this.currTerm);
if (task.done != null) {
final Status st = new Status(RaftError.EPERM,
"expected_term=%d doesn't match current_term=%d", task.expectedTerm, this.currTerm);
Utils.runClosureInThread(task.done, st);
}
continue;
}
// 为每个 task 创建并初始化对应的选票,用于决策对应的 LogEntry 是否允许被提交
if (!this.ballotBox.appendPendingTask(this.conf.getConf(),
this.conf.isStable() ? null : this.conf.getOldConf(), task.done)) {
Utils.runClosureInThread(task.done, new Status(RaftError.EINTERNAL, "Fail to append task."));
continue;
}
// set task entry info before adding to list.
task.entry.getId().setTerm(this.currTerm);
task.entry.setType(EnumOutter.EntryType.ENTRY_TYPE_DATA);
entries.add(task.entry);
}
// 追加日志数据到本地文件系统,完成之后回调 LeaderStableClosure
this.logManager.appendEntries(entries, new LeaderStableClosure(entries));
// update conf.first
checkAndSetConfiguration(true);
} finally {
this.writeLock.unlock();
}
}

对于往 Raft 集群提交的指令只允许由 Leader 节点进行处理,这点无可厚非。上述实现会对 Task 进行简单的校验,主要是验证当前节点的 term 值是否是 Task 期望的 term 值,对于通过校验的 Task 则会为其创建并初始化对应的选票,并转化为 LogEntry 对象写入本地存储系统。因为每个 Task 最终都会被转换为对应的日志复制给集群中的所有节点,所以需要创建对应的选票,以实现当集群中的大部分节点都成功完成对该日志的复制操作之后,将对应的日志标记为 committed。创建并初始化选票的过程由 BallotBox#appendPendingTask 方法实现,后续我们会再次提及该方法,这里暂且跳过。

将日志数据写入本地存储系统的过程则由 LogManager#appendEntries 方法实现,该方法接收一个 LeaderStableClosure 类型的回调,当数据被处理完成之后会触发执行该回调。下面来看一下 LogManager#appendEntries 方法的执行流程,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public void appendEntries(final List<LogEntry> entries, final StableClosure done) {
Requires.requireNonNull(done, "done");
// 运行发生错误
if (this.hasError) {
entries.clear();
Utils.runClosureInThread(done, new Status(RaftError.EIO, "Corrupted LogStorage"));
return;
}
boolean doUnlock = true;
this.writeLock.lock();
try {
// 对于 Leader 节点而言,基于本地 lastLogIndex 值设置各个 LogEntry 的 logIndex
// 对于 Follower 节点而言,检查待复制的日志与本地已有的日志是否存在冲突,如果存在冲突则强行覆盖本地日志
if (!entries.isEmpty() && !checkAndResolveConflict(entries, done)) {
// If checkAndResolveConflict returns false, the done will be called in it.
entries.clear();
return;
}
for (int i = 0; i < entries.size(); i++) {
final LogEntry entry = entries.get(i);
// Set checksum after checkAndResolveConflict
if (this.raftOptions.isEnableLogEntryChecksum()) {
// 设置 checksum 值
entry.setChecksum(entry.checksum());
}

// 对于 ENTRY_TYPE_CONFIGURATION 类型的 LogEntry,记录集群配置信息
if (entry.getType() == EntryType.ENTRY_TYPE_CONFIGURATION) {
Configuration oldConf = new Configuration();
if (entry.getOldPeers() != null) {
oldConf = new Configuration(entry.getOldPeers(), entry.getOldLearners());
}
final ConfigurationEntry conf = new ConfigurationEntry(entry.getId(),
new Configuration(entry.getPeers(), entry.getLearners()), oldConf);
this.configManager.add(conf);
}
}

// 更新内存数据
if (!entries.isEmpty()) {
done.setFirstLogIndex(entries.get(0).getId().getIndex());
this.logsInMemory.addAll(entries);
}
done.setEntries(entries);

// 将修正后的 LogEntry 数据封装成事件投递给 Disruptor 队列,事件类型为 OTHER
int retryTimes = 0;
final EventTranslator<StableClosureEvent> translator = (event, sequence) -> {
event.reset();
event.type = EventType.OTHER;
event.done = done;
};
while (true) {
if (tryOfferEvent(done, translator)) {
break;
} else {
retryTimes++;
// 最大重试 50 次
if (retryTimes > APPEND_LOG_RETRY_TIMES) {
reportError(RaftError.EBUSY.getNumber(), "LogManager is busy, disk queue overload.");
return;
}
ThreadHelper.onSpinWait();
}
}
doUnlock = false;
// 尝试触发等待新的可复制数据的回调,以继续向目标 Follower 节点发送数据
if (!wakeupAllWaiter(this.writeLock)) {
notifyLastLogIndexListeners();
}
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}

往本地追加日志数据的操作除了会被 Leader 节点执行,也会被 Follower 或 Learner 节点所执行,所以上述方法会在各个节点上被调用。因为 Leader 节点的更替,这其中免不了会存在日志数据的冲突,而解决冲突的 LogManagerImpl#checkAndResolveConflict 方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
private boolean checkAndResolveConflict(final List<LogEntry> entries, final StableClosure done) {
final LogEntry firstLogEntry = ArrayDeque.peekFirst(entries);
// Leader 节点,基于 lastLogIndex 设置 logIndex 值
if (firstLogEntry.getId().getIndex() == 0) {
// Node is currently the leader and |entries| are from the user who
// don't know the correct indexes the logs should assign to.
// So we have to assign indexes to the appending entries
for (int i = 0; i < entries.size(); i++) {
entries.get(i).getId().setIndex(++this.lastLogIndex);
}
return true;
}
// Follower 节点
else {
// Node is currently a follower and |entries| are from the leader.
// We should check and resolve the conflicts between the local logs and |entries|
if (firstLogEntry.getId().getIndex() > this.lastLogIndex + 1) {
// 待写入的日志与本地已有的日志之间存在断层
Utils.runClosureInThread(done, new Status(RaftError.EINVAL,
"There's gap between first_index=%d and last_log_index=%d",
firstLogEntry.getId().getIndex(), this.lastLogIndex));
return false;
}

// 待写入的所有日志的 logIndex 都小于已经应用的日志的最大 logIndex,直接返回
final long appliedIndex = this.appliedId.getIndex();
final LogEntry lastLogEntry = ArrayDeque.peekLast(entries);
if (lastLogEntry.getId().getIndex() <= appliedIndex) {
LOG.warn(
"Received entries of which the lastLog={} is not greater than appliedIndex={}, return immediately with nothing changed.",
lastLogEntry.getId().getIndex(), appliedIndex);
// Replicate old logs before appliedIndex should be considered successfully, response OK.
Utils.runClosureInThread(done);
return false;
}

// 待追加的日志与本地已有的日志之前正好衔接上,直接更新 lastLogIndex
if (firstLogEntry.getId().getIndex() == this.lastLogIndex + 1) {
// fast path
this.lastLogIndex = lastLogEntry.getId().getIndex();
}
// 说明待追加的日志与本地已有的日志之间存在交叉
else {
// Appending entries overlap the local ones. We should find if there
// is a conflicting index from which we should truncate the local ones.
int conflictingIndex = 0;
// 从头开始遍历寻找第一个 term 值不匹配的 logIndex
for (; conflictingIndex < entries.size(); conflictingIndex++) {
if (unsafeGetTerm(entries.get(conflictingIndex).getId().getIndex())
!= entries.get(conflictingIndex).getId().getTerm()) {
break;
}
}
// 日志数据存在冲突,将本地冲突之后的日志数据阶段
if (conflictingIndex != entries.size()) {
if (entries.get(conflictingIndex).getId().getIndex() <= this.lastLogIndex) {
// Truncate all the conflicting entries to make local logs consensus with the leader.
unsafeTruncateSuffix(entries.get(conflictingIndex).getId().getIndex() - 1);
}
this.lastLogIndex = lastLogEntry.getId().getIndex();
}
// else this is a duplicated AppendEntriesRequest, we have nothing to do besides releasing all the entries
// 将已经写入本地的日志数据从请求中剔除
if (conflictingIndex > 0) {
// Remove duplication
entries.subList(0, conflictingIndex).clear();
}
}
return true;
}
}

对于 Leader 节点而言,因为 Raft 算法的强 Leader 设计,所以 Leader 的日志数据是整个集群日志数据的标杆,不存在冲突一说。因此,对于 Leader 节点而言,上述实现只是简单的为当前 LogEntry 对象修正对应的 logIndex 值,但是对于 Follower 和 Learner 节点而言则免不了出现日志数据冲突,分为以下几种情况:

  • 待写入的日志数据与本地已有的日志数据存在断层,此时只能返回错误。
  • 待写入的日志数据相对于本地已有的日志数据更老,即最大的 logIndex 小于等于本地已经写入的日志数据的 logIndex,直接忽略。
  • 待写入的日志数据与本地已有的日志数据正好衔接上,直接递增 lastLogIndex 即可。
  • 待写入的日志数据与本地已有的日志数据存在重叠,此时需要判断是否存在冲突,并强行覆盖本地存在冲突的数据。

完成对于冲突数据的处理,LogManager 会先将日志数据写入内存,并将日志数据以 OTHER 类型事件的形式提交给 Disruptor 队列,用于实现异步刷盘。方法 LogManagerImpl#wakeupAllWaiter 用于通知那些等待新数据到达的复制器 Replicator 实例,这些 Replicator 在向目标 Follower 或 Learner 节点复制日志数据时可能出现没有数据可以复制的情况,此时这些复制器 Replicator 会注册一个回调监听新的数据到来,而通知这些监听器的时机则位于此,关于这一部分的具体流程留到下一小节再展开介绍。

下面来看一下对于 Disruptor 队列中 OTHER 类型事件的处理过程,即异步刷盘的过程,由 StableClosureEventHandler 实现。StableClosureEventHandler 定义了一个 AppendBatcher 类型的字段,用于缓存待写入的数据。方法 AppendBatcher#flush 用于执行将缓存的数据写入存储系统,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
LogId flush() {
if (this.size > 0) {
// 将数据落盘,并返回最新的 LogId
this.lastId = appendToStorage(this.toAppend);
for (int i = 0; i < this.size; i++) {
// 清空缓存的 LogEntry 数据
this.storage.get(i).getEntries().clear();
Status st = null;
try {
if (LogManagerImpl.this.hasError) {
// LogManager 运行异常
st = new Status(RaftError.EIO, "Corrupted LogStorage");
} else {
st = Status.OK();
}
// 应用回调
this.storage.get(i).run(st);
} catch (Throwable t) {
LOG.error("Fail to run closure with status: {}.", st, t);
}
}
this.toAppend.clear();
this.storage.clear();

}
this.size = 0;
this.bufferSize = 0;
return this.lastId;
}

具体执行过程如上述代码注释,其中方法 LogManagerImpl#appendToStorage 实现了将数据写入存储系统的逻辑,默认也就是写入 RocksDB 存储引擎,实现比较直观,不再展开。

在完成对一个批次日志数据的处理之后,下面来看一下针对回调 LeaderStableClosure 的处理逻辑,实现如下:

1
2
3
4
5
6
7
8
9
public void run(final Status status) {
if (status.isOk()) {
NodeImpl.this.ballotBox.commitAt(
this.firstLogIndex, this.firstLogIndex + this.nEntries - 1, NodeImpl.this.serverId);
} else {
LOG.error("Node {} append [{}, {}] failed, status={}.",
getNodeId(), this.firstLogIndex, this.firstLogIndex + this.nEntries - 1, status);
}
}

如果响应状态是 OK,则上述回调会执行 BallotBox#commitAt 方法检查该批次中的日志数据是否被过半数的节点所成功复制,如果存在复制成功的日志数据,则递增 lastCommittedIndex 值,并向状态机发布 COMMITTED 事件。方法 BallotBox#commitAt 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public boolean commitAt(final long firstLogIndex, final long lastLogIndex, final PeerId peer) {
// TODO use lock-free algorithm here?
final long stamp = this.stampedLock.writeLock();
long lastCommittedIndex = 0;
try {
if (this.pendingIndex == 0) {
return false;
}
if (lastLogIndex < this.pendingIndex) {
return true;
}

if (lastLogIndex >= this.pendingIndex + this.pendingMetaQueue.size()) {
throw new ArrayIndexOutOfBoundsException();
}

final long startAt = Math.max(this.pendingIndex, firstLogIndex);
Ballot.PosHint hint = new Ballot.PosHint();
// 遍历检查当前批次中的 LogEntry 是否有成功被过半数节点复制的
for (long logIndex = startAt; logIndex <= lastLogIndex; logIndex++) {
final Ballot bl = this.pendingMetaQueue.get((int) (logIndex - this.pendingIndex));
hint = bl.grant(peer, hint);
// 当前 LogEntry 被过半数节点成功复制,记录 lastCommittedIndex
if (bl.isGranted()) {
lastCommittedIndex = logIndex;
}
}
// 没有一条日志被过半数节点所成功复制,先返回
if (lastCommittedIndex == 0) {
return true;
}
// When removing a peer off the raft group which contains even number of peers,
// the quorum would decrease by 1, e.g. 3 of 4 changes to 2 of 3. In this case,
// the log after removal may be committed before some previous logs,
// since we use the new configuration to deal the quorum of the removal request,
// we think it's safe to commit all the uncommitted previous logs, which is not well proved right now
// 剔除已经被过半数节点复制的 LogIndex 对应的选票,
// Raft 保证一个 LogEntry 被提交之后,在此之前的 LogEntry 一定是 committed 状态
this.pendingMetaQueue.removeFromFirst((int) (lastCommittedIndex - this.pendingIndex) + 1);
LOG.debug("Committed log fromIndex={}, toIndex={}.", this.pendingIndex, lastCommittedIndex);
this.pendingIndex = lastCommittedIndex + 1;
// 更新集群的 lastCommittedIndex 值
this.lastCommittedIndex = lastCommittedIndex;
} finally {
this.stampedLock.unlockWrite(stamp);
}

// 向状态机发布 COMMITTED 事件
this.waiter.onCommitted(lastCommittedIndex);
return true;
}

可能读者会疑惑这里只有 Leader 节点执行了写入日志操作,当前日志怎么可能会被 granted 呢?读者需要明白的一点是,Leader 节点在将数据写入内存之后,即通知对应的复制器 Replicator 开始往目标 Follower 节点复制数据(Replicator 优先从内存中读取待复制的日志数据)。到日志数据在 Leader 节点被落盘之后回调上述 BallotBox#commitAt 方法这中间是有一个时间差的,所以此时 Leader 节点执行 BallotBox#commitAt 操作有可能对应的日志数据已被过半数节点所复制。此外,方法 BallotBox#commitAt 除了会被 Leader 节点调用,也会在 Follower 节点完成日志数据复制的 AppendEntries 请求响应处理期间被调用,此时也会触发检查 granted 操作,具体在下一小节介绍日志复制机制时会展开说明。

关于日志生成的过程到这里也就基本介绍完了,本小节的最后我们来看一下 FSMCaller#onCommitted 方法到底做了哪些事情。该方法接收一个 committedIndex 参数,在 LogEntry 被提交时触发,会向对应的 Disruptor 队列中发布一个 COMMITTED 类型的事件,关于该事件的处理由 FSMCallerImpl#doCommitted 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
private void doCommitted(final long committedIndex) {
// 状态机调度器运行异常
if (!this.error.getStatus().isOk()) {
return;
}
// 获取最新被状态机应用的 LogEntry 对应的 logIndex 值
final long lastAppliedIndex = this.lastAppliedIndex.get();
// We can tolerate the disorder of committed_index
if (lastAppliedIndex >= committedIndex) {
// 当前 committedIndex 对应的 LogEntry 已经被处理过,无需重复处理
return;
}
final long startMs = Utils.monotonicMs();
try {
final List<Closure> closures = new ArrayList<>();
final List<TaskClosure> taskClosures = new ArrayList<>();
// 获取 committedIndex 之前的 Task 的回调列表,填充到 closures 集合中,
// 如果是 TaskClosure 类型,则顺便记录到 taskClosures 中,主要是为了回调 TaskClosure#onCommitted 方法
final long firstClosureIndex = this.closureQueue.popClosureUntil(committedIndex, closures, taskClosures);

// 对于 TaskClosure 类型的 Task 回调,应用 TaskClosure#onCommitted 方法
onTaskCommitted(taskClosures);

Requires.requireTrue(firstClosureIndex >= 0, "Invalid firstClosureIndex");
// 迭代器,用于迭代 LogEntry
final IteratorImpl iterImpl = new IteratorImpl(
this.fsm, this.logManager, closures, firstClosureIndex, lastAppliedIndex, committedIndex, this.applyingIndex);

// 如果是 good,则说明还有可以继续处理的日志
while (iterImpl.isGood()) {
// 获取当前待处理的 LogEntry 对象
final LogEntry logEntry = iterImpl.entry();
// 系统内部的 LogEntry 对象
if (logEntry.getType() != EnumOutter.EntryType.ENTRY_TYPE_DATA) {
if (logEntry.getType() == EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) {
if (logEntry.getOldPeers() != null && !logEntry.getOldPeers().isEmpty()) {
// Joint stage is not supposed to be noticeable by end users.
this.fsm.onConfigurationCommitted(new Configuration(iterImpl.entry().getPeers()));
}
}
if (iterImpl.done() != null) {
// For other entries, we have nothing to do besides flush the
// pending tasks and run this closure to notify the caller that the
// entries before this one were successfully committed and applied.
iterImpl.done().run(Status.OK());
}
iterImpl.next();
continue;
}

// 连续处理一批业务操作产生的日志,应用 StateMachine#onApply 方法
doApplyTasks(iterImpl);
}

// 发生错误,将错误透传给业务和当前节点
if (iterImpl.hasError()) {
setError(iterImpl.getError());
iterImpl.runTheRestClosureWithError();
}
final long lastIndex = iterImpl.getIndex() - 1;
final long lastTerm = this.logManager.getTerm(lastIndex);
final LogId lastAppliedId = new LogId(lastIndex, lastTerm);
// 更新最新应用的日志对应的 logIndex 和 term 值
this.lastAppliedIndex.set(lastIndex);
this.lastAppliedTerm = lastTerm;
// 通知 LogManager,这些已经被应用的 LogEntry 可以从内存中移除了
this.logManager.setAppliedId(lastAppliedId);
notifyLastAppliedIndexUpdated(lastIndex);
} finally {
this.nodeMetrics.recordLatency("fsm-commit", Utils.monotonicMs() - startMs);
}
}

业务向 JRaft 集群提交的 Task 在被转换成日志并成功复制给集群中的过半数以上节点(即对应的日志被提交)之后,接下去就需要将这些日志中存储的指令透传给业务状态机,相应的实现由上述方法完成。

FSMCaller 本地维护了一个 lastAppliedIndex 字段,用于记录已经被应用(即已将日志中的指令透传给业务状态机)的 LogEntry 对应的 logIndex 值。因为 Raft 算法能够保证某个 committedIndex 之前的所有 LogEntry 都是被提交的,所以即使 committedIndex 的到达顺序出现乱序也不会影响正常的运行逻辑。

我们在调用 Node#apply 方法向 JRaft 集群提交 Task 时,一般都会给 Task 设置一个回调,即给 Task#done 字段赋值。所以,FSMCaller 对于给定的 committedIndex,首先会调用 ClosureQueueImpl#popClosureUntil 方法获取到这些已经被提交的 LogEntry 对应的 Task 的回调。这些回调最终会透传给业务状态机,由业务决定是响应成功还是失败。那么这些回调是什么时候被记录的呢?

还记得我们在介绍 JRaft 节点初始化过程时曾提及过 FSMCaller 和 BallotBox 所持有的 ClosureQueue 实例是同一个吗?这些 Task 回调正是在前面调用 BallotBox#appendPendingTask 方法时记录的,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public boolean appendPendingTask(final Configuration conf, final Configuration oldConf, final Closure done) {
// 创建并初始化选票
final Ballot bl = new Ballot();
if (!bl.init(conf, oldConf)) {
LOG.error("Fail to init ballot.");
return false;
}
final long stamp = this.stampedLock.writeLock();
try {
// 节点成功 Leader 之后必须调用 BallotBox#resetPendingIndex 方法重置 pendingIndex 值
if (this.pendingIndex <= 0) {
LOG.error("Fail to appendingTask, pendingIndex={}.", this.pendingIndex);
return false;
}
// 记录选票,用于检查是否赢得过半数选票
this.pendingMetaQueue.add(bl);
// 记录 Task 的回调 done 对象,当对应的日志被 committed 时触发执行
this.closureQueue.appendPendingClosure(done);
return true;
} finally {
this.stampedLock.unlockWrite(stamp);
}
}

接下来,FSMCaller 采用了迭代器模式将需要处理的日志封装成迭代器对象,并对于业务操作产生的日志调用 FSMCallerImpl#doApplyTasks 方法将一批连续的 ENTRY_TYPE_DATA 类型日志透传给状态机:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private void doApplyTasks(final IteratorImpl iterImpl) {
final IteratorWrapper iter = new IteratorWrapper(iterImpl);
final long startApplyMs = Utils.monotonicMs();
final long startIndex = iter.getIndex();
try {
// 应用 StateMachine#onApply 方法
this.fsm.onApply(iter);
} finally {
this.nodeMetrics.recordLatency("fsm-apply-tasks", Utils.monotonicMs() - startApplyMs);
this.nodeMetrics.recordSize("fsm-apply-tasks-count", iter.getIndex() - startIndex);
}
// 迭代器中的日志还没有被处理完,但是业务已经退出了 onApply 方法
if (iter.hasNext()) {
LOG.error("Iterator is still valid, did you return before iterator reached the end?");
}
// Try move to next in case that we pass the same log twice.
iter.next();
}

而状态机 StateMachine 的核心方法 StateMachine#onApply 也正是在此处被调用。

日志复制

上一小节我们以业务操作日志为例,从 Leader 节点视角分析了 JRaft 是如何产生和处理一条日志的。在将用户操作指令封装成 LogEntry 写入内存之后,日志复制的进程即开始了,与此同时,Leader 节点会以异步的方式将数据落盘。日志复制仍然采用投票机制,当一条日志被集群中过半数以上的节点成功复制之后,这条日志会被打上 committed 标签。此类日志中承载的操作指令最后会被透传给状态机,由业务负责执行。

本小节侧重于分析 Leader 节点将日志数据复制给集群中的 Follower 节点的运行机制。Leader 节点针对每个 Follower 节点都会在本地为其创建并启动一个复制器 Replicator 实例,而日志复制的过程则全权由 Replicator 负责,各 Replicator 之间相互独立,彼此互不影响。

JRaft 还设计了 ReplicatorGroup 类,由名称我们可以推断出该类用于实现对于同一个 group 下的所有 Replicator 实例进行管理,例如启停到目标 Follower 或 Learner 节点的 Replicator 实例、检查到目标 Follower 或 Learner 节点的复制关系,以及主动向目标节点发送心跳请求等。

Pipeline 机制

Leader 节点将日志数据复制给 Follower 节点的过程必须保证日志数据的顺序性和连续性,这一点是毋庸置疑的。为了达到此目的,最简单的交互模式就是“request -> response -> request”,即每次发送出去一个请求之后必须等待接收并处理完对应的响应之后再发送下一个请求,从交互上保证日志复制的严格串行化。这一设计的优点在于实现简单,但是性能上却不尽如人意。

日志数据复制在 Raft 算法的运行过程中是一项频繁的操作,为了在保证日志复制顺序和连续的前提下尽量提升复制的性能,除了并发的向各个 Follower 或 Learner 节点批量发送数据之外,JRaft 在实现上还引入了 pipeline 机制。这一机制简单而言就是将请求和响应从串行改为并行,请求和响应彼此之间互不阻塞。Leader 节点可以连续的向 Follower 节点发送请求,对于那些已经发送出去还未收到响应的请求,或者已经收到但是还没来得及处理的响应对应的请求将其标记为 inflight,并在成功处理完对应的响应之后去除请求的 inflight 标记。如果期间发生错误或者 Leader 节点宕机,对于这些 inflight 请求会尝试重新发送,以此保证日志数据在复制期间不会漏传给 Follower 节点。Pipeline 机制与 TCP 协议中的滑动窗口算法思想相通,是分布式系统中提升吞吐量的惯用策略,例如 Kafka 生产者在往服务端发送消息时同样采用了类似的机制。

然而,我们也看到这一机制可能会导致同一个 LogEntry 被多次复制给 Follower 节点,好在 Raft 算法要求日志中的指令必须是幂等的,同时 Raft 算法针对日志数据的冲突解决机制能够保证重复复制的 LogEntry 能够被最后一次复制的 LogEntry 所覆盖。

image

上图描绘了 JRaft Pipeline 机制的设计,其中 inflight request queue 中的青色实心圆点表示已经发送出去还未接收到响应的请求(即 inflight 请求),而 pending response queue 中的黄色实心圆点表示已经收到的响应,黄色空心圆点表示还未收到的响应。对于响应而言,由于网络原因到达顺序不一定与请求顺序相吻合,JRaft 对于提前到达的响应会先将其缓存起来,并按照请求的顺序对响应按顺序进行处理。如果因为等待的某个响应迟迟不能到达导致 inflight 请求越积越多,或者某个响应异常,则 Leader 节点会清空 inflight request queue 中的请求,并重新发送这些请求。这一机制能够实现将发送请求和处理响应并行化,并且由于 Raft 算法要求日志承载的指令必须是幂等的,所以重试策略不会破坏数据的最终状态。

下面来看一下 JRaft 对于 pipeline 机制的实现。首先来了解几个相关的字段定义,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
/** 记录最近的 inflight RPC 请求 */
private Inflight rpcInFly;
/** FIFO 队列,记录 inflight RPC 请求列表 */
private final ArrayDeque<Inflight> inflights = new ArrayDeque<>();
/** 请求序列 */
private int reqSeq = 0;
/** 期望的响应序列 */
private int requiredNextSeq = 0;
/** 状态版本,当重置 inflight 请求队列时会递增,以实现忽略版本不匹配的 inflight 请求响应 */
private int version = 0;
/**
* 记录已经收到但是还没有被处理的响应,按照请求序列从小到大排序,
* 响应的顺序是未知的,但是需要保证处理的顺序
*/
private final PriorityQueue<RpcResponse> pendingResponses = new PriorityQueue<>(50);

各字段的作用如代码注释,其中 Inflight 类用于描述一个 inflight 请求,定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static class Inflight {
/** 请求中的 LogEntry 数目 */
final int count;
/** 请求对应的起始 nextIndex 值 */
final long startIndex;
/** LogEntry 的总字节长度 */
final int size;
/** RPC future */
final Future<Message> rpcFuture;
/** 请求类型:复制日志 or 安装快照 */
final RequestType requestType;
/** 请求序列,用于匹配请求和响应,保证按照请求的顺序处理响应 */
final int seq;

// ... 省略构造函数和 toString 方法

boolean isSendingLogEntries() {
return this.requestType == RequestType.AppendEntries && this.count > 0;
}
}

Replicator 在成功发送一个 RPC 请求之后会调用 Replicator#addInflight 方法将请求相关的信息封装成 Inflight 对象记录到 inflight 队列中:

1
2
3
4
5
6
7
8
9
10
11
12
private void addInflight(final RequestType reqType,
final long startIndex,
final int count,
final int size,
final int seq,
final Future<Message> rpcInfly) {
// 更新本地记录的最近一次发送的 inflight RPC 请求
this.rpcInFly = new Inflight(reqType, startIndex, count, size, seq, rpcInfly);
// 标记当前请求为 inflight
this.inflights.add(this.rpcInFly);
this.nodeMetrics.recordSize("replicate-inflights-count", this.inflights.size());
}

当接收到请求对应的响应时,Replicator 会执行 Replicator#onRpcReturned 方法处理响应,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
static void onRpcReturned(final ThreadId id,
final RequestType reqType,
final Status status,
final Message request,
final Message response,
final int seq,
final int stateVersion,
final long rpcSendTime) {
if (id == null) {
return;
}
final long startTimeMs = Utils.nowMs();
Replicator r;
// 获取当前 Replicator 对应的不可重入锁
if ((r = (Replicator) id.lock()) == null) {
return;
}

// 状态版本发生变化,说明 inflight 队列被重置过,忽略重置之前请求对应的响应
if (stateVersion != r.version) {
LOG.debug("Replicator {} ignored old version response {}, current version is {}, request is {}\n, and response is {}\n, status is {}.",
r, stateVersion, r.version, request, response, status);
id.unlock();
return;
}

// 获取等待处理的响应优先级队列,按照请求序列从小到大排序
final PriorityQueue<RpcResponse> holdingQueue = r.pendingResponses;
holdingQueue.add(new RpcResponse(reqType, seq, status, request, response, rpcSendTime));

// 太多等待处理的响应(默认为 256 个),而期望请求序列对应的响应迟迟不来,重置请求 inflight 队列,重新发送探针请求
if (holdingQueue.size() > r.raftOptions.getMaxReplicatorInflightMsgs()) {
LOG.warn("Too many pending responses {} for replicator {}, maxReplicatorInflightMsgs={}",
holdingQueue.size(), r.options.getPeerId(), r.raftOptions.getMaxReplicatorInflightMsgs());
r.resetInflights();
r.state = State.Probe;
r.sendEmptyEntries(false);
return;
}

// 标识是否继续发送 AppendEntries 请求
boolean continueSendEntries = false;

final boolean isLogDebugEnabled = LOG.isDebugEnabled();
StringBuilder sb = null;
if (isLogDebugEnabled) {
sb = new StringBuilder("Replicator ").append(r).append(" is processing RPC responses, ");
}
try {
// 记录已经处理的响应数
int processed = 0;
// 遍历处理响应
while (!holdingQueue.isEmpty()) {
// 获取收到的请求序列最小的响应
final RpcResponse queuedPipelinedResponse = holdingQueue.peek();

// Sequence mismatch, waiting for next response.
// 响应乱序,继续等待期望序列的响应
if (queuedPipelinedResponse.seq != r.requiredNextSeq) {
if (processed > 0) {
if (isLogDebugEnabled) {
sb.append("has processed ").append(processed).append(" responses, ");
}
break;
} else {
// Do not processed any responses, UNLOCK id and return.
continueSendEntries = false;
id.unlock();
return;
}
}

/* 开始处理请求对应的响应 */

holdingQueue.remove();
processed++;
// 获取 inflight 请求
final Inflight inflight = r.pollInflight();
if (inflight == null) {
// 响应对应的请求已经被清除,忽略当前响应
if (isLogDebugEnabled) {
sb.append("ignore response because request not found: ").append(queuedPipelinedResponse).append(",\n");
}
continue;
}
// 请求序列与响应中记录的请求序列匹配不上,重置请求 inflight 队列,阻塞一会后重新发送探针请求
if (inflight.seq != queuedPipelinedResponse.seq) {
// reset state
LOG.warn("Replicator {} response sequence out of order, expect {}, but it is {}, reset state to try again.",
r, inflight.seq, queuedPipelinedResponse.seq);
r.resetInflights();
r.state = State.Probe;
continueSendEntries = false;
r.block(Utils.nowMs(), RaftError.EREQUEST.getNumber());
return;
}

// 依据响应类型分别处理
try {
switch (queuedPipelinedResponse.requestType) {
// 处理 AppendEntries 请求
case AppendEntries:
continueSendEntries = onAppendEntriesReturned(
id,
inflight,
queuedPipelinedResponse.status,
(AppendEntriesRequest) queuedPipelinedResponse.request,
(AppendEntriesResponse) queuedPipelinedResponse.response,
rpcSendTime,
startTimeMs,
r);
break;
// 处理 InstallSnapshot 请求
case Snapshot:
continueSendEntries = onInstallSnapshotReturned(
id,
r,
queuedPipelinedResponse.status,
(InstallSnapshotRequest) queuedPipelinedResponse.request,
(InstallSnapshotResponse) queuedPipelinedResponse.response);
break;
}
} finally {
if (continueSendEntries) {
// Success, increase the response sequence.
r.getAndIncrementRequiredNextSeq();
} else {
// The id is already unlocked in onAppendEntriesReturned/onInstallSnapshotReturned, we SHOULD break out.
break;
}
}
}
} finally {
if (isLogDebugEnabled) {
sb.append("after processed, continue to send entries: ").append(continueSendEntries);
LOG.debug(sb.toString());
}
// 继续发送 AppendEntries 请求
if (continueSendEntries) {
// unlock in sendEntries.
r.sendEntries();
}
}
}

由上述实现我们可以总结 pipeline 机制在处理响应时需要考虑以下几点:

  • 因为 inflight 请求本质上是一种未完成的请求,有重试的可能,所以当重新发送请求时,之前请求对应的响应即使收到了也应该被忽略。
  • 响应的顺序是未知的,但是需要保证处理的顺序,所以对于提前收到的响应需要先缓存起来,必须按照请求发送的顺序而非响应到达的顺序进行处理。
  • 需要保证请求序列和响应序列相匹配。

针对上述中的第一点,JRaft 在实现上通过版本策略予以实现。Replicator 定义了一个 Replicator#version 字段,用于标识当前 inflight 队列的版本。当重置 inflight 队列时会自增该版本号,并清空 inflight 队列和响应队列等。Replicator 执行此操作的目的在于丢弃那些 inflight 请求以重新发送,但是这些已经发送出去的 inflight 请求对应的响应可能正在赶来的路上,当节点收到这些响应时需要予以忽略,而忽略的依据就是版本不匹配。

发送探针

完成了对于 Pipeline 机制的介绍,下面开始分析 JRaft 复制日志数据的过程。上一篇曾介绍过,当一个节点竞选 Leader 成功之后会调用 ReplicatorGroup#addReplicator 方法建立到各个 Follower 节点之间的复制关系,本文我们将从这里切入分析 JRaft 的日志复制机制。

方法 ReplicatorGroup#addReplicator 接收两个参数,用于指定目标 Follower 或 Learner 节点,以及节点类型,具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public boolean addReplicator(final PeerId peer, // 目标 Follower 或 Learner 节点
final ReplicatorType replicatorType // 节点类型
) {
// 在此之前应该先调用 ReplicatorGroup#resetTerm 方法
Requires.requireTrue(this.commonOptions.getTerm() != 0);
this.failureReplicators.remove(peer);
// 已建立复制关系,避免重复
if (this.replicatorMap.containsKey(peer)) {
return true;
}
final ReplicatorOptions opts = this.commonOptions == null ? new ReplicatorOptions() : this.commonOptions.copy();
opts.setReplicatorType(replicatorType);
opts.setPeerId(peer);
// 创建并启动到目标节点的复制器
final ThreadId rid = Replicator.start(opts, this.raftOptions);
if (rid == null) {
LOG.error("Fail to start replicator to peer={}, replicatorType={}.", peer, replicatorType);
this.failureReplicators.put(peer, replicatorType);
return false;
}
return this.replicatorMap.put(peer, rid) == null;
}

上述实现的核心在于调用 Replicator#start 方法创建并启动到目标节点的复制器 Replicator 实例。该方法返回一个 ThreadId 对象,用于为对应的 Replicator 对象提供不可重入锁支持,其中不可重入锁基于 AQS 实现。

方法 Replicator#start 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public static ThreadId start(final ReplicatorOptions opts, final RaftOptions raftOptions) {
if (opts.getLogManager() == null || opts.getBallotBox() == null || opts.getNode() == null) {
throw new IllegalArgumentException("Invalid ReplicatorOptions.");
}
// 创建复制器 Replicator 对象
final Replicator r = new Replicator(opts, raftOptions);
// 检查到目标节点的连通性
if (!r.rpcService.connect(opts.getPeerId().getEndpoint())) {
LOG.error("Fail to init sending channel to {}.", opts.getPeerId());
// Return and it will be retried later.
return null;
}

// ... register replicator metric set

// Start replication
r.id = new ThreadId(r, r);
// 获取与当前 Replicator 绑定的不可重入锁
r.id.lock();
// 发布 CREATED 事件
notifyReplicatorStatusListener(r, ReplicatorEvent.CREATED);
LOG.info("Replicator={}@{} is started", r.id, r.options.getPeerId());
r.catchUpClosure = null;
// 更新最近一次发送 RPC 请求的时间戳
r.lastRpcSendTimestamp = Utils.monotonicMs();
// 启动心跳超时计时器
r.startHeartbeatTimer(Utils.nowMs());
// 发送探针请求,以获取接下去发往目标节点的正确 logIndex 位置,并启动日志复制进程
// id.unlock in sendEmptyEntries
r.sendEmptyEntries(false);
return r.id;
}

其中方法 Replicator#sendEmptyEntries 用于向目标节点发送一个空的 AppendEntries 请求,此类请求可以是一个探针(probe)请求,也可以是一个心跳请求。关于心跳请求的发送和响应过程,我们将在后面的小节专门介绍,下面来看一下探针请求的发送和响应过程。Leader 节点在通过复制器 Replicator 与目标 Follower 节点建立连接后,需要发送一个探针请求,目的是获取 Follower 节点已经拥有的日志位置,以便于接下去向 Follower 节点发送后续的日志数据。

方法 Replicator#sendEmptyEntries 接收两个参数,当发送探针请求时会设置参数 isHeartbeat = false,同时设置参数 heartBeatClosure = null,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
private void sendEmptyEntries(final boolean isHeartbeat,
final RpcResponseClosure<AppendEntriesResponse> heartBeatClosure) {
// 构建 AppendEntries 请求
final AppendEntriesRequest.Builder rb = AppendEntriesRequest.newBuilder();
// 为 AppendEntries 请求填充基础参数,包括当前节点的 term 值、groupId、节点 ID,以及 committedLogIndex 等等
// 如果返回 false 说明待发送的部分日志已经变为快照,需要先给目标节点安装快照
if (!fillCommonFields(rb, this.nextIndex - 1, isHeartbeat)) {
// id is unlock in installSnapshot
installSnapshot();
if (isHeartbeat && heartBeatClosure != null) {
RpcUtils.runClosureInThread(heartBeatClosure,
new Status(RaftError.EAGAIN, "Fail to send heartbeat to peer %s", this.options.getPeerId()));
}
return;
}
try {
final long monotonicSendTimeMs = Utils.monotonicMs();
final AppendEntriesRequest request = rb.build();

// 心跳请求
if (isHeartbeat) {
// ... 省略心跳请求相关实现
}
// 探针请求
else {
// Sending a probe request.
this.statInfo.runningState = RunningState.APPENDING_ENTRIES;
this.statInfo.firstLogIndex = this.nextIndex;
this.statInfo.lastLogIndex = this.nextIndex - 1;
this.appendEntriesCounter++;
this.state = State.Probe;
final int stateVersion = this.version;
// 递增请求序列
final int seq = getAndIncrementReqSeq();
// 向目标节点发送 AppendEntries 请求
final Future<Message> rpcFuture = this.rpcService.appendEntries(
this.options.getPeerId().getEndpoint(),
request,
-1,
new RpcResponseClosureAdapter<AppendEntriesResponse>() {

@Override
public void run(final Status status) {
// 处理响应
onRpcReturned(Replicator.this.id,
RequestType.AppendEntries,
status,
request,
getResponse(),
seq,
stateVersion,
monotonicSendTimeMs);
}

});

// 将当前请求标记为 inflight,并记录到 inflight 队列中
addInflight(RequestType.AppendEntries, this.nextIndex, 0, 0, seq, rpcFuture);
}
LOG.debug("Node {} send HeartbeatRequest to {} term {} lastCommittedIndex {}",
this.options.getNode().getNodeId(), this.options.getPeerId(), this.options.getTerm(), request.getCommittedIndex());
} finally {
this.id.unlock();
}
}

上述方法的核心逻辑在于构造并向目标节点发送 AppendEntries 请求,方法 Replicator#fillCommonFields 会往 AppendEntries 请求对象中填充一些基础数据,包括当前节点的 term 值、groupId、节点 ID、最近一次复制成功的日志对应的 logIndex 和 term 值、目标节点 ID,以及当前节点最新的 committedIndex 值。如果该方法返回 false,则说明需要发送给目标节点的日志已经变为快照形式存储,需要转为走快照机制为目标节点安装快照,关于快照相关的实现我们将在后面用专门的文章进行介绍,这里暂且跳过。

在将 AppendEntries 请求发送出去之后,JRaft 会将请求标记为 inflight 并记录到 inflight 队列中,然后等待目标节点响应,关于请求的 pipeline 机制在前面已经专门介绍过。下面来看一下目标节点对于 AppendEntries 探针请求的处理过程,由 NodeImpl#handleAppendEntriesRequest 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
public Message handleAppendEntriesRequest(final AppendEntriesRequest request, final RpcRequestClosure done) {
boolean doUnlock = true;
final long startMs = Utils.monotonicMs();
this.writeLock.lock();
final int entriesCount = request.getEntriesCount();
try {
// 当前节点处于非活跃状态,响应错误
if (!this.state.isActive()) {
LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
return RpcFactoryHelper //
.responseFactory() //
.newResponse(AppendEntriesResponse.getDefaultInstance(), RaftError.EINVAL,
"Node %s is not in active state, state %s.", getNodeId(), this.state.name());
}

// 解析请求来源节点 ID
final PeerId serverId = new PeerId();
if (!serverId.parse(request.getServerId())) {
// 解析失败,响应错误
LOG.warn("Node {} received AppendEntriesRequest from {} serverId bad format.", getNodeId(), request.getServerId());
return RpcFactoryHelper //
.responseFactory() //
.newResponse(AppendEntriesResponse.getDefaultInstance(),
RaftError.EINVAL, "Parse serverId failed: %s.", request.getServerId());
}

// 校验请求中的 term 值,如果小于当前节点,则拒绝请求并返回自己当前的 term 值
if (request.getTerm() < this.currTerm) {
LOG.warn("Node {} ignore stale AppendEntriesRequest from {}, term={}, currTerm={}.",
getNodeId(), request.getServerId(), request.getTerm(), this.currTerm);
return AppendEntriesResponse.newBuilder() //
.setSuccess(false) //
.setTerm(this.currTerm) //
.build();
}

// 基于请求和节点本地状态判断是否需要执行 stepdown
checkStepDown(request.getTerm(), serverId);

// 请求来源节点并不是当前节点所知道的 Leader 节点,
// 可能出现网络分区,尝试将 term 值加 1,以触发 Leader 节点 stepdown
if (!serverId.equals(this.leaderId)) {
LOG.error("Another peer {} declares that it is the leader at term {} which was occupied by leader {}.",
serverId, this.currTerm, this.leaderId);
// Increase the term by 1 and make both leaders step down to minimize the loss of split brain
stepDown(request.getTerm() + 1, false,
new Status(RaftError.ELEADERCONFLICT, "More than one leader in the same term."));
return AppendEntriesResponse.newBuilder() //
.setSuccess(false) //
.setTerm(request.getTerm() + 1) //
.build();
}

// 更新本地记录的最近一次收到来自 Leader 节点请求的时间戳
updateLastLeaderTimestamp(Utils.monotonicMs());

// 当前是复制日志的 AppendEntries 请求,但是本地正在安装快照,响应错误
if (entriesCount > 0 && this.snapshotExecutor != null && this.snapshotExecutor.isInstallingSnapshot()) {
LOG.warn("Node {} received AppendEntriesRequest while installing snapshot.", getNodeId());
return RpcFactoryHelper //
.responseFactory() //
.newResponse(AppendEntriesResponse.getDefaultInstance(), RaftError.EBUSY,
"Node %s:%s is installing snapshot.", this.groupId, this.serverId);
}

final long prevLogIndex = request.getPrevLogIndex();
final long prevLogTerm = request.getPrevLogTerm();
final long localPrevLogTerm = this.logManager.getTerm(prevLogIndex);
// 请求中 logIndex 对应的 term 值与本地不匹配
if (localPrevLogTerm != prevLogTerm) {
final long lastLogIndex = this.logManager.getLastLogIndex();
LOG.warn("Node {} reject term_unmatched AppendEntriesRequest from {}, " +
"term={}, prevLogIndex={}, prevLogTerm={}, localPrevLogTerm={}, lastLogIndex={}, entriesSize={}.",
getNodeId(), request.getServerId(), request.getTerm(), prevLogIndex, prevLogTerm, localPrevLogTerm, lastLogIndex, entriesCount);

return AppendEntriesResponse.newBuilder() //
.setSuccess(false) //
.setTerm(this.currTerm) //
.setLastLogIndex(lastLogIndex) //
.build();
}

// 心跳或者探针请求
if (entriesCount == 0) {
// 返回本地当前的 term 值以及对应的 logIndex
final AppendEntriesResponse.Builder respBuilder = AppendEntriesResponse.newBuilder() //
.setSuccess(true) //
.setTerm(this.currTerm) //
.setLastLogIndex(this.logManager.getLastLogIndex());
doUnlock = false;
this.writeLock.unlock();
// see the comments at FollowerStableClosure#run()
// 基于 Leader 的 committedIndex 更新本地的 lastCommittedIndex 值
this.ballotBox.setLastCommittedIndex(Math.min(request.getCommittedIndex(), prevLogIndex));
return respBuilder.build();
}

// ... 省略复制日志数据请求的处理逻辑

} finally {
if (doUnlock) {
this.writeLock.unlock();
}
// ... metrics
}
}

Follower 节点对于探针请求的整体响应流程可以概括为:

  1. 如果当前节点处于非活跃状态,则响应错误;
  2. 否则,解析请求来源节点的节点 ID,如果解析失败则响应错误;
  3. 否则,校验请求中的 term 值是否小于当前节点,如果是则拒绝请求;
  4. 否则,基于请求和当前节点本地状态判断是否需要执行 stepdown 操作;
  5. 判断请求来源节点是否是当前节点所认可的 Leader 节点,如果不是则说明可能出现网络分区,尝试将响应中的 term 值加 1,以触发请求节点执行 stepdown 操作;
  6. 否则,更新本地记录的最近一次收到来自 Leader 节点的时间戳;
  7. 校验最近一次完成复制的 LogEntry 对应的 term 值是否与本地相匹配,如果不匹配则拒绝请求,并返回本地已知的最新 logIndex 值;
  8. 否则,依据请求中的 committedIndex 值更新本地的 committedIndex 值,同时响应请求,返回本地已知的最新 logIndex 和 term 值。

通过这一系列的校验过程,Follower 节点会在针对当前探针请求的响应中附上本地已知的最新 logIndex 和 term 值,而 Leader 节点会依据响应选择正确位置的 LogEntry 发送给当前 Follower 节点。

下面来看一下 Leader 节点对于上述 Follower 节点响应的处理过程,因为探针请求本质上是一类 AppendEntries 请求,所以由 Replicator#onAppendEntriesReturned 方法实现(省略了部分 DEBUG 日志打印):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
private static boolean onAppendEntriesReturned(final ThreadId id,
final Inflight inflight,
final Status status,
final AppendEntriesRequest request,
final AppendEntriesResponse response,
final long rpcSendTime,
final long startTimeMs,
final Replicator r) {
// inflight 请求与响应中记录的请求对应的 logIndex 不匹配,重置请求 inflight 队列,重新发送探针请求
if (inflight.startIndex != request.getPrevLogIndex() + 1) {
LOG.warn("Replicator {} received invalid AppendEntriesResponse, in-flight startIndex={}, request prevLogIndex={}, reset the replicator state and probe again.",
r, inflight.startIndex, request.getPrevLogIndex());
r.resetInflights();
r.state = State.Probe;
// unlock id in sendEmptyEntries
r.sendEmptyEntries(false);
return false;
}

// ... metrics

// 目标 Follower 发生错误,重置请求 inflight 队列,重新发送探针请求
if (!status.isOk()) {
// If the follower crashes, any RPC to the follower fails immediately,
// so we need to block the follower for a while instead of looping until it comes back or be removed
// dummy_id is unlock in block
notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status);
if (++r.consecutiveErrorTimes % 10 == 0) {
LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}",
r.options.getPeerId(), r.consecutiveErrorTimes, status);
}
// 重置 inflight 队列,阻塞一会儿重新发送探针请求
r.resetInflights();
r.state = State.Probe;
// unlock in in block
r.block(startTimeMs, status.getCode());
return false;
}

/* 目标 Follower 节点运行正常 */

r.consecutiveErrorTimes = 0;
// 目标 Follower 节点拒绝响应
if (!response.getSuccess()) {
// Follower 节点的 term 值更大
if (response.getTerm() > r.options.getTerm()) {
final NodeImpl node = r.options.getNode();
r.notifyOnCaughtUp(RaftError.EPERM.getNumber(), true);
// 销毁当前复制器 Replicator
r.destroy();
// 提升当前节点的 term 值,并执行 stepdown
node.increaseTermTo(response.getTerm(), new Status(RaftError.EHIGHERTERMRESPONSE,
"Leader receives higher term heartbeat_response from peer:%s", r.options.getPeerId()));
return false;
}
// 更新最近一次向目标节点发送 RPC 请求的时间戳
if (rpcSendTime > r.lastRpcSendTimestamp) {
r.lastRpcSendTimestamp = rpcSendTime;
}
// 重置 inflight 队列,调整 nextIndex 之后重新发送探针请求
r.resetInflights();
// prev_log_index and prev_log_term doesn't match
if (response.getLastLogIndex() + 1 < r.nextIndex) {
LOG.debug("LastLogIndex at peer={} is {}", r.options.getPeerId(), response.getLastLogIndex());
// The peer contains less logs than leader
r.nextIndex = response.getLastLogIndex() + 1;
}
// Follower 节点本地的 logIndex 更大,可能包含老的 Leader 节点复制的日志,
// 递减 nextIndex 之后重试,直到找到两个节点相同日志的交叉点为止
else {
// The peer contains logs from old term which should be truncated,
// decrease _last_log_at_peer by one to test the right index to keep
if (r.nextIndex > 1) {
LOG.debug("logIndex={} dismatch", r.nextIndex);
r.nextIndex--;
} else {
LOG.error("Peer={} declares that log at index=0 doesn't match, which is not supposed to happen", r.options.getPeerId());
}
}
// dummy_id is unlock in _send_heartbeat
// 重新发送探针请求
r.sendEmptyEntries(false);
return false;
}

/* 目标 Follower 节点响应成功 */

// 请求期间 term 值已经发生变化,当前节点可能已经不是 Leader 节点,清空 inflight 队列
if (response.getTerm() != r.options.getTerm()) {
r.resetInflights();
r.state = State.Probe;
LOG.error("Fail, response term {} dismatch, expect term {}", response.getTerm(), r.options.getTerm());
id.unlock();
return false;
}
// 更新最近一次向目标节点发送 RPC 请求的时间戳
if (rpcSendTime > r.lastRpcSendTimestamp) {
r.lastRpcSendTimestamp = rpcSendTime;
}
final int entriesSize = request.getEntriesCount();
// 如果是复制日志请求,当 Follower 节点复制成功之后需要尝试执行 BallotBox#commitAt 以检测当前日志是否被过半数的节点成功复制
if (entriesSize > 0) {
if (r.options.getReplicatorType().isFollower()) {
// Only commit index when the response is from follower.
r.options.getBallotBox().commitAt(r.nextIndex, r.nextIndex + entriesSize - 1, r.options.getPeerId());
}
}

r.state = State.Replicate;
r.blockTimer = null;
// 更新待发送的下一个 logIndex 位置
r.nextIndex += entriesSize;
r.hasSucceeded = true;
r.notifyOnCaughtUp(RaftError.SUCCESS.getNumber(), false);
// dummy_id is unlock in _send_entries
if (r.timeoutNowIndex > 0 && r.timeoutNowIndex < r.nextIndex) {
r.sendTimeoutNow(false, false);
}
return true;
}

Leader 节点对于 Follower 节点的探针请求响应处理流程可以概括如下:

  1. 校验 inflight 请求与响应中记录的请求对应的已经完成复制的 logIndex 是否一致,如果不是则需要重置 inflight 队列,并重新发送探针请求;
  2. 否则,如果目标 Follower 节点运行异常,则同样需要重置 inflight 队列,并重新发送探针请求;
  3. 否则,说明目标 Follower 节点运行正常,但是目标节点可以同意当前请求,也可以拒绝当前请求,需要分别处理。

如果目标 Follower 节点拒绝当前请求 ,按照之前对于 Follower 节点处理 AppendEntries 探针请求过程的分析可知可能包含以下原因:

  1. Follower 节点本地的 term 值相对于当前 Leader 节点更大。
  2. Follower 节点本地记录的 Leader 节点 ID 并不是当前 Leader 节点,即可能出现网络分区。
  3. Follower 节点与当前 Leader 节点的日志数据存在冲突。

针对原因 1 和 2,说明集群中已经有更新的 Leader 节点,此时当前节点需要销毁对应的复制器 Replicator 实例,并执行 stepdown 操作。

针对原因 3 需要分为两类情况:

  1. 如果目标 Follower 节点本地最新的 logIndex 相对于当前复制器 Replicator 记录的 nextIndex 要小,则需要修正 nextIndex 之后重新发送探针请求。
  2. 如果目标 Follower 节点本地最新的 logIndex 相对于当前复制器 Replicator 记录的 nextIndex 相等或更大,说明目标 Follower 节点包含老的 Leader 节点复制的日志,此时需要递减 nextIndex 值并重新发送探针请求,以解决日志冲突问题。

如果目标 Follower 节点同意当前请求 ,则说明 Follower 节点确认当前复制器 Replicator 实例记录的 nextIndex 值是正确的,无需修正 nextIndex 值,接下去可以继续执行往目标 Follower 节点复制日志的操作。

复制日志

上一小节介绍了复制器 Replicator 在启动时会向目标 Follower 节点发送探针请求,以获取目标 Follower 节点已经拥有的日志位置,以便于接下去向 Follower 节点发送后续的日志数据。如果刚刚分析的 Replicator#onAppendEntriesReturned 方法处理探针请求对应的响应正常,即返回 true,那么接下去就会触发日志复制的进程,即调用 Replicator#sendEntries 方法开始往目标 Follower 节点复制日志数据。

方法 Replicator#sendEntries 会尝试计算接下去待发送的 LogEntry 对应的 logIndex 值,如果当前复制器 Replicator 负载较高,则会尝试暂停发送。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
void sendEntries() {
boolean doUnlock = true;
try {
long prevSendIndex = -1;
while (true) {
// 获取下一个待发送的 LogEntry 对应的 logIndex 值,如果返回 -1 表示暂停复制
final long nextSendingIndex = getNextSendIndex();
if (nextSendingIndex > prevSendIndex) {
// 向目标节点复制 nextSendingIndex 位置之后的 LogEntry 数据
if (sendEntries(nextSendingIndex)) {
prevSendIndex = nextSendingIndex;
} else {
doUnlock = false;
// id already unlock in sendEntries when it returns false.
break;
}
} else {
break;
}
}
} finally {
if (doUnlock) {
this.id.unlock();
}
}
}

long getNextSendIndex() {
// 没有 inflight 请求,从 nextIndex 开始发送
if (this.inflights.isEmpty()) {
return this.nextIndex;
}
// 太多 inflight 请求,暂停发送新的 AppendEntries 请求
if (this.inflights.size() > this.raftOptions.getMaxReplicatorInflightMsgs()) {
return -1L;
}
// Last request should be a AppendEntries request and has some entries.
// 最近一次发送的 RPC 请求是携带 LogEntry 的 AppendEntries 请求
if (this.rpcInFly != null && this.rpcInFly.isSendingLogEntries()) {
// 计算并返回接下去待发送的 LogEntry 对应的 logIndex 值
return this.rpcInFly.startIndex + this.rpcInFly.count;
}
return -1L;
}

方法 Replicator#sendEntries 的重载版本 Replicator#sendEntries(long) 用于从指定 logIndex 位置开始从本地获取对应的 LogEntry 数据并复制给目标 Follower 节点,整体过程与前面介绍的发送探针请求 Replicator#sendEmptyEntries 方法基本类似。这里重点关注一下从本地加载日志数据并填充到 AppendEntries 请求对象中的过程,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
ByteBufferCollector dataBuf = null;
// 获取单次批量发送 LogEntry 的数目上线
final int maxEntriesSize = this.raftOptions.getMaxEntriesSize();
final RecyclableByteBufferList byteBufList = RecyclableByteBufferList.newInstance();
try {
for (int i = 0; i < maxEntriesSize; i++) {
final RaftOutter.EntryMeta.Builder emb = RaftOutter.EntryMeta.newBuilder();
// 获取指定 logIndex 的 LogEntry 数据,填充到 emb 和 byteBufList 中,
// 如果返回 false 说明容量已满
if (!prepareEntry(nextSendingIndex, i, emb, byteBufList)) {
break;
}
rb.addEntries(emb.build());
}

// 未获取到任何 LogEntry 数据,可能目标数据已经变为快照了,也可能是真的没有数据可以复制
if (rb.getEntriesCount() == 0) {
// nextSendingIndex < firstLogIndex,说明对应区间的数据已变为快照,需要先给目标节点安装快照
if (nextSendingIndex < this.options.getLogManager().getFirstLogIndex()) {
installSnapshot();
return false;
}
// 说明没有新的数据可以复制,设置一个回调等待新的数据到来之后重新触发 sendEntries 操作
waitMoreEntries(nextSendingIndex);
return false;
}

// 将日志数据填充到 AppendEntries 请求中
if (byteBufList.getCapacity() > 0) {
dataBuf = ByteBufferCollector.allocateByRecyclers(byteBufList.getCapacity());
for (final ByteBuffer b : byteBufList) {
dataBuf.put(b);
}
final ByteBuffer buf = dataBuf.getBuffer();
buf.flip();
rb.setData(ZeroByteStringHelper.wrap(buf));
}
} finally {
RecycleUtil.recycle(byteBufList);
}

JRaft 在 AppendEntries 请求的设计上采用了将 LogEntry 的元数据和数据体相分离的策略,所以上述从本地加载日志数据的过程会先填充元数据,再填充数据体。实现上使用 RecyclableByteBufferList 作为数据体的载体,RecyclableByteBufferList 可以看做是一个可回收利用的 ByteBuffer 链表,实现层面借鉴了 Netty 的轻量级对象池的设计思想。

下面来看一下从本地加载日志数据并填充 AppendEntries 请求的过程,由 Replicator#prepareEntry 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
boolean prepareEntry(final long nextSendingIndex,
final int offset,
final RaftOutter.EntryMeta.Builder emb,
final RecyclableByteBufferList dataBuffer) {
// 数据量已经超过阈值
if (dataBuffer.getCapacity() >= this.raftOptions.getMaxBodySize()) {
return false;
}
// 基于偏移量计算当前处理的 LogEntry 的 logIndex 值
final long logIndex = nextSendingIndex + offset;
// 从本地获取对应的 LogEntry 数据
final LogEntry entry = this.options.getLogManager().getEntry(logIndex);
if (entry == null) {
return false;
}
// 将 LogEntry 拆分为元数据和数据体分别填充 EntryMeta 和 RecyclableByteBufferList
emb.setTerm(entry.getId().getTerm());
// 设置 checksum
if (entry.hasChecksum()) {
emb.setChecksum(entry.getChecksum()); // since 1.2.6
}
// 设置 LogEntry 类型
emb.setType(entry.getType());
if (entry.getPeers() != null) {
Requires.requireTrue(!entry.getPeers().isEmpty(), "Empty peers at logIndex=%d", logIndex);
fillMetaPeers(emb, entry);
} else {
Requires.requireTrue(entry.getType() != EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION,
"Empty peers but is ENTRY_TYPE_CONFIGURATION type at logIndex=%d", logIndex);
}
// 设置数据长度
final int remaining = entry.getData() != null ? entry.getData().remaining() : 0;
emb.setDataLen(remaining);
// 填充数据到 dataBuffer
if (entry.getData() != null) {
// should slice entry data
dataBuffer.add(entry.getData().slice());
}
return true;
}

上述实现依据偏移量计算得到最终的 logIndex 值,然后调用 LogManagerImpl#getEntry 方法从文件系统加载获取到对应的 LogEntry 数据,最后将 LogEntry 中的元数据和数据体拆分后分别填充到 EntryMeta 和 RecyclableByteBufferList 对象中。JRaft 默认采用 RocksDB 作为日志数据的存储引擎,其中 key 就是 LogEntry 对应的 logIndex 值,所以方法 LogManagerImpl#getEntry 的执行过程简单来说就是依据 logIndex 从 RocksDB 中获取对应数据的过程。

如果从本地获取不到 logIndex 对应的日志数据,那么可能存在两种原因:

  1. 需要复制的数据已经变为快照形式存储。
  2. 没有可以复制的数据。

针对第一种情况直接给目标 Follower 节点安装快照即可,针对第二种情况则立即退出当前 Replicator#sendEntries 方法,并设置一个回调等待新的日志数据。关于快照机制将在后面用专门的篇章进行介绍,这里我们主要来看一下针对情况二的处理过程,位于 Replicator#waitMoreEntries 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private void waitMoreEntries(final long nextWaitIndex) {
try {
LOG.debug("Node {} waits more entries", this.options.getNode().getNodeId());
// 已经设置过等待
if (this.waitId >= 0) {
return;
}
// 设置一个回调,当有可复制日志时触发再次往目标 Follower 节点发送数据
this.waitId = this.options.getLogManager().wait(
nextWaitIndex - 1,
(arg, errorCode) -> continueSending((ThreadId) arg, errorCode), this.id);
this.statInfo.runningState = RunningState.IDLE;
} finally {
this.id.unlock();
}
}

// com.alipay.sofa.jraft.storage.impl.LogManagerImpl#wait
public long wait(final long expectedLastLogIndex, final NewLogCallback cb, final Object arg) {
final WaitMeta wm = new WaitMeta(cb, arg, 0);
return notifyOnNewLog(expectedLastLogIndex, wm);
}

private long notifyOnNewLog(final long expectedLastLogIndex, final WaitMeta wm) {
this.writeLock.lock();
try {
// 已经有新的日志可复制,或者当前 LogManager 已被停止
if (expectedLastLogIndex != this.lastLogIndex || this.stopped) {
wm.errorCode = this.stopped ? RaftError.ESTOP.getNumber() : 0;
Utils.runInThread(() -> runOnNewLog(wm));
return 0L;
}
if (this.nextWaitId == 0) { //skip 0
++this.nextWaitId;
}
final long waitId = this.nextWaitId++;
// 记录等待的信息
this.waitMap.put(waitId, wm);
return waitId;
} finally {
this.writeLock.unlock();
}
}

上述实现会针对期望的 logIndex 设置一个回调,如果本地最新的 logIndex 超过该期望值则说明有新的日志数据可以被复制,会触发执行 Replicator#continueSending 操作,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
static boolean continueSending(final ThreadId id, final int errCode) {
// 当前 Replicator 已被销毁
if (id == null) {
// It was destroyed already
return true;
}
final Replicator r = (Replicator) id.lock();
if (r == null) {
return false;
}
r.waitId = -1;
// 超时,重新发送探针请求
if (errCode == RaftError.ETIMEDOUT.getNumber()) {
r.blockTimer = null;
// Send empty entries after block timeout to check the correct
// _next_index otherwise the replicator is likely waits in executor.shutdown();
// _wait_more_entries and no further logs would be replicated even if the
// last_index of this followers is less than |next_index - 1|
r.sendEmptyEntries(false);
}
// LogManager 正常运行,继续尝试向目标 Follower 节点发送数据
else if (errCode != RaftError.ESTOP.getNumber()) {
// id is unlock in _send_entries
r.sendEntries();
}
// LogManager 被停止,停止向目标节点发送日志数据
else {
LOG.warn("Replicator {} stops sending entries.", id);
id.unlock();
}
return true;
}

由此可见,回调逻辑会再次尝试执行前面所介绍的 Replicator#sendEntries 逻辑。上述回调另外一个被触发的场景则在于调用 LogManager#appendEntries 追加新的日志数据的时候,这也是比较容易理解的。

完成了对于 AppendEntries 请求的构造,接下去复制器 Replicator 会采用 RPC 的方式将该请求发送给目标 Follower 节点。下面来看一下 Follower 节点对于复制日志数据 AppendEntries 请求的处理过程,位于 NodeImpl#handleAppendEntriesRequest 方法中。关于该方法我们前面在分析探针请求时已经介绍过,所以下面重点来看一下 Follower 节点针对日志数据复制操作相关的处理逻辑,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// Parse request
long index = prevLogIndex;
final List<LogEntry> entries = new ArrayList<>(entriesCount);
ByteBuffer allData = null;
if (request.hasData()) {
allData = request.getData().asReadOnlyByteBuffer();
}

final List<RaftOutter.EntryMeta> entriesList = request.getEntriesList();
// 遍历逐一解析请求中的 LogEntry 数据,记录到 entries 列表中
for (int i = 0; i < entriesCount; i++) {
index++;
// 获取 LogEntry 元数据信息
final RaftOutter.EntryMeta entry = entriesList.get(i);

// 基于元数据和数据体构造 LogEntry 对象
final LogEntry logEntry = logEntryFromMeta(index, allData, entry);

if (logEntry != null) {
// 如果启用了 checksum 机制,则校验 checksum 值
if (this.raftOptions.isEnableLogEntryChecksum() && logEntry.isCorrupted()) {
// checksum 值不匹配,说明数据可能被篡改
long realChecksum = logEntry.checksum();
LOG.error(
"Corrupted log entry received from leader, index={}, term={}, expectedChecksum={}, realChecksum={}",
logEntry.getId().getIndex(), logEntry.getId().getTerm(), logEntry.getChecksum(), realChecksum);
return RpcFactoryHelper //
.responseFactory() //
.newResponse(AppendEntriesResponse.getDefaultInstance(), RaftError.EINVAL,
"The log entry is corrupted, index=%d, term=%d, expectedChecksum=%d, realChecksum=%d",
logEntry.getId().getIndex(), logEntry.getId().getTerm(), logEntry.getChecksum(), realChecksum);
}
entries.add(logEntry);
}
}

final FollowerStableClosure closure = new FollowerStableClosure(request,
AppendEntriesResponse.newBuilder().setTerm(this.currTerm), this, done, this.currTerm);
// 将 LogEntry 数据写入本地磁盘
this.logManager.appendEntries(entries, closure);
// update configuration after _log_manager updated its memory status
checkAndSetConfiguration(true);
return null;

针对复制日志数据的 AppendEntries 请求,Follower 节点会基于请求中的 LogEntry 元数据和数据体信息逐一解析构造对应的 LogEntry 对象(实现如下),并调用 LogManager#appendEntries 方法批量的将日志数据写入本地存储系统,关于 LogManager#appendEntries 方法已在前面分析过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
private LogEntry logEntryFromMeta(final long index, final ByteBuffer allData, final RaftOutter.EntryMeta entry) {
// 忽略 ENTRY_TYPE_UNKNOWN 类型的 LogEntry 数据
if (entry.getType() != EnumOutter.EntryType.ENTRY_TYPE_UNKNOWN) {
// 给 LogEntry 对象填充基本的元数据信息
final LogEntry logEntry = new LogEntry();
logEntry.setId(new LogId(index, entry.getTerm()));
logEntry.setType(entry.getType());
if (entry.hasChecksum()) {
logEntry.setChecksum(entry.getChecksum()); // since 1.2.6
}

// 基于元数据中记录的数据长度获取对应的 LogEntry 数据体,并填充到 LogEntry 对象中
final long dataLen = entry.getDataLen();
if (dataLen > 0) {
final byte[] bs = new byte[(int) dataLen];
assert allData != null;
allData.get(bs, 0, bs.length);
logEntry.setData(ByteBuffer.wrap(bs));
}

// 针对 ENTRY_TYPE_CONFIGURATION 类型的 LogEntry,解析并填充集群节点配置数据

if (entry.getPeersCount() > 0) {
if (entry.getType() != EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) {
throw new IllegalStateException(
"Invalid log entry that contains peers but is not ENTRY_TYPE_CONFIGURATION type: " + entry.getType());
}

// 填充集群节点配置信息
fillLogEntryPeers(entry, logEntry);
} else if (entry.getType() == EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) {
throw new IllegalStateException(
"Invalid log entry that contains zero peers but is ENTRY_TYPE_CONFIGURATION type");
}
return logEntry;
}
return null;
}

日志数据写入磁盘是一个异步的过程,当日志数据成功在 Follower 节点落盘之后,Follower 节点会向 Leader 节点发送 AppendEntries 响应。最后来看一下 Leader 节点针对复制日志数据的 AppendEntries 请求响应的处理过程,由前面对于 JRaft Pipeline 机制的介绍可知这里处理 AppendEntries 请求响应的过程由 Replicator#onAppendEntriesReturned 方法实现。前面在介绍探针请求时同样分析过该方法的实现,针对复制日志数据的 AppendEntries 请求响应的处理重点关注下面这样一段逻辑:

1
2
3
4
5
6
7
8
9
10
// 如果是复制日志请求,当 Follower 节点复制成功之后需要尝试执行 BallotBox#commitAt 以检测当前日志是否被过半数的节点成功复制
if (entriesSize > 0) {
if (r.options.getReplicatorType().isFollower()) {
// Only commit index when the response is from follower.
r.options.getBallotBox().commitAt(r.nextIndex, r.nextIndex + entriesSize - 1, r.options.getPeerId());
}
if (LOG.isDebugEnabled()) {
LOG.debug("Replicated logs in [{}, {}] to peer {}", r.nextIndex, r.nextIndex + entriesSize - 1, r.options.getPeerId());
}
}

如果是针对复制日志数据的 AppendEntries 请求的响应,如果响应来自 Follower 节点则会触发执行 BallotBox#commitAt 方法以检查当前批次的日志数据是否能够被提交,即是否有超过半数的节点完成了对于该批次日志数据的复制操作,如果是则会触发 Leader 节点将该批次的日志数据标记为 committed。

前面在介绍日志数据生成流程时我们也曾遇到过该方法,当 Leader 节点完成对于该批次日志数据的落盘之后会回调该方法检查该批次的日志数据是否允许被提交,只不过那时是由 Leader 节点触发这一检查的过程,而这里则是由各个 Follower 节点所触发,本质上这也是一种投票机制。关于 BallotBox#commitAt 方法的实现已在前面介绍日志生成流程时分析过,不再重复介绍。

心跳机制

复制器 Replicator 中的字段 Replicator#lastRpcSendTimestamp 用于记录最近一次成功向目标 Follower 节点发送 RPC 请求的时间戳。上一篇我们在分析 JRaft 选主机制时曾介绍过 Leader 节点会基于该时间戳判断目标 Follower 节点是否处于活跃状态,所以我们可以认为该字段是目标 Follower 节点心跳正常的判定标志,对于一个心跳正常的 Follower 节点,该字段的值距离当前时间戳应该始终控制在一个合理的阈值范围内。

前面介绍的探针请求和复制日志数据请求都会在处理请求响应时更新该字段,不过仅靠这两类请求触发时间戳更新显然是不够的,毕竟整个 JRaft 集群不会始终处于频繁的日志数据复制状态。为此,JRaft 还在复制器 Replicator 中实现了一套心跳机制,本小节我们就深入分析这一套心跳机制的执行流程。

当启动一个复制器 Replicator 实例时(即调用 Replicator#start 方法)会执行 Replicator#startHeartbeatTimer 方法启动心跳计时器,该计时器会延迟指定时间(默认为 100ms)执行 Replicator#onTimeout 操作给当前复制器添加一个 ETIMEDOUT 事件,实现如下:

1
2
3
4
5
6
7
8
private static void onTimeout(final ThreadId id) {
if (id != null) {
// ETIMEDOUT 错误会触发再次向目标节点发送心跳请求
id.setError(RaftError.ETIMEDOUT.getNumber());
} else {
LOG.warn("Replicator id is null when timeout, maybe it's destroyed.");
}
}

而复制器 Replicator 对于此类事件的处理逻辑就是调用 Replicator#sendHeartbeat 方法向目标 Follower 节点发送心跳请求:

1
2
3
4
5
6
7
8
private static void sendHeartbeat(final ThreadId id) {
final Replicator r = (Replicator) id.lock();
if (r == null) {
return;
}
// 向目标 Follower 节点发送心跳请求
r.sendEmptyEntries(true);
}

由上述实现可以看到,所谓的心跳请求本质上还是一个空的 AppendEntries 请求。关于方法 Replicator#sendEmptyEntries 在前面介绍探针请求时已经分析过,而关于心跳请求的实现逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
if (isHeartbeat) {
// Sending a heartbeat request
this.heartbeatCounter++;
RpcResponseClosure<AppendEntriesResponse> heartbeatDone;
// 参数指定的响应回调优先
if (heartBeatClosure != null) {
heartbeatDone = heartBeatClosure;
}
// 设置默认的心跳请求响应回调
else {
heartbeatDone = new RpcResponseClosureAdapter<AppendEntriesResponse>() {

@Override
public void run(final Status status) {
onHeartbeatReturned(Replicator.this.id, status, request, getResponse(), monotonicSendTimeMs);
}
};
}
// 发送心跳请求
this.heartbeatInFly = this.rpcService.appendEntries(
this.options.getPeerId().getEndpoint(),
request,
this.options.getElectionTimeoutMs() / 2,
heartbeatDone);
}

Follower 节点对于心跳请求的处理逻辑与探针请求一致,所以下面来看一下 Leader 节点对于心跳请求响应的处理流程。对于发送心跳请求而言,JRaft 允许调用方自定义响应回调,目前这一块主要服务于线性一致性读操作,下面来分析一下默认的心跳请求响应回调逻辑,由 Replicator#onHeartbeatReturned 方法实现(省略部分 DEBUG 日志):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
static void onHeartbeatReturned(final ThreadId id,
final Status status,
final AppendEntriesRequest request,
final AppendEntriesResponse response,
final long rpcSendTime) {
// 复制器已经被销毁
if (id == null) {
// replicator already was destroyed.
return;
}
final long startTimeMs = Utils.nowMs();
Replicator r;
if ((r = (Replicator) id.lock()) == null) {
return;
}
boolean doUnlock = true;
try {
// Follower 节点运行异常
if (!status.isOk()) {
r.state = State.Probe;
notifyReplicatorStatusListener(r, ReplicatorEvent.ERROR, status);
if (++r.consecutiveErrorTimes % 10 == 0) {
LOG.warn("Fail to issue RPC to {}, consecutiveErrorTimes={}, error={}",
r.options.getPeerId(), r.consecutiveErrorTimes, status);
}
// 重新启动心跳计时器
r.startHeartbeatTimer(startTimeMs);
return;
}
r.consecutiveErrorTimes = 0;
// 目标 Follower 节点的 term 值更大,说明有新的 Leader 节点
if (response.getTerm() > r.options.getTerm()) {
final NodeImpl node = r.options.getNode();
r.notifyOnCaughtUp(RaftError.EPERM.getNumber(), true);
// 销毁当前复制器
r.destroy();
// 递增当前节点的 term 值
node.increaseTermTo(response.getTerm(), new Status(RaftError.EHIGHERTERMRESPONSE,
"Leader receives higher term heartbeat_response from peer:%s", r.options.getPeerId()));
return;
}
// Follower 节点拒绝响应,重新发送探针请求,并启动心跳计时器
if (!response.getSuccess() && response.hasLastLogIndex()) {
LOG.warn("Heartbeat to peer {} failure, try to send a probe request.", r.options.getPeerId());
doUnlock = false;
r.sendEmptyEntries(false);
r.startHeartbeatTimer(startTimeMs);
return;
}

// 更新 RPC 请求时间戳
if (rpcSendTime > r.lastRpcSendTimestamp) {
r.lastRpcSendTimestamp = rpcSendTime;
}
// 启动心跳计时器
r.startHeartbeatTimer(startTimeMs);
} finally {
if (doUnlock) {
id.unlock();
}
}
}

如果目标 Follower 节点运行异常,则不应该更新复制器 Replicator 的 Replicator#lastRpcSendTimestamp 字段,这无可厚非。如果目标 Follower 节点运行正常,但是拒绝当前的心跳请求,按照之前的总结分为以下三种原因:

  1. Follower 节点本地的 term 值相对于当前 Leader 节点更大。
  2. Follower 节点本地记录的 Leader 节点 ID 并不是当前 Leader 节点,即可能出现网络分区。
  3. Follower 节点与当前 Leader 节点的日志数据存在冲突。

其中只有第三种情况会在响应中携带 Follower 节点的最新 logIndex 值,此时心跳请求会触发向目标 Follower 节点发送探针请求,并在探针请求响应中更新 RPC 发送时间戳。然而,不管是上述哪种原因导致 Follower 节点拒绝响应,亦或是同意响应,复制器 Replicator 都会再次调用 Replicator#startHeartbeatTimer 方法进入下一轮心跳进程。

总结

本文分析了 JRaft 关于 Raft 算法日志数据复制机制的设计与实现,包括日志的生成过程、探寻待发送的日志位置、复制日志数据、心跳机制,以及 Pipeline 机制等。日志数据复制是 Raft 算法运行的基础,是一项重要且频繁的操作,实现层面的优劣直接影响着 Raft 算法库的运行效率。为此,JRaft 引入了多种优化策略,包括 Follower 节点之间并发复制、批量发送,以及 Pipeline 机制。

关于日志复制,实际上快照机制从广义上来说也属于日志复制范畴,准确来说是对日志复制的有一种优化手段,不过快照机制也有其自身独有的特点和应用场景,并且是一项可选的功能,所以我将其与常规的日志复制区别开来,下一篇将对该机制进行针对性的分析。

参考

  1. Raft Consensus Algorithm
  2. SOFA-JRaft 官网
  3. SOFA-JRaft 日志复制:pipeline 实现剖析