Kafka 源码解析:分区多副本容错机制

在分布式应用中,通常会引入冗余策略来保证集群中节点在宕机时的服务可用性,Kafka 在设计上也是如此。Kafka 会为每个 topic 分区创建多个副本,并将这些副本分散在多台 broker 节点上,以避免单点问题。一个分区的副本集合包含一个 leader 角色和多个 follower 角色,其中 leader 副本主要负责响应客户端对于指定 topic 分区消息的读写,并管理集合中的其它 follower 副本,而 follower 副本则主要负责与 leader 副本间保持数据同步,保证在 leader 副本失效时能够有新的 follower 选举成为新的 leader,以维持 Kafka 服务的正常运行。

Replica 组件

Replica 类用于定义 Kafka 中的副本,副本除了有前面介绍的 leader 和 follower 角色之分外,也区分 本地副本远程副本 ,其中本地副本是指与其关联的 Log 对象位于相同 broker 节点上,而远程副本的 Log 对象则位于其它 broker 节点上。对于远程副本而言,当前 broker 节点仅维护其 LEO 位置信息。 远程副本的主要作用在于协助 leader 副本维护分区的 HW 位置值 ,具体过程将在后面分析 HW 位置管理时进行说明。

在前面介绍 Kafka 的日志存储机制时我们知道一个 topic 分区对应一个 Log 对象,而在设计上为了避免单点问题,一个 topic 分区又会包含多个副本,这些副本分布在多个不相同的 broker 节点上,如果某个副本正好位于其所属的 Log 对象所在的 broker 节点上,我们称之为本地副本,否则即为远程副本。

下面来看一下 Replica 类的字段定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class Replica(val brokerId: Int, // 当前副本所在的 broker 的 ID
val partition: Partition, // 当前副本所属的 topic 分区对象
time: Time = Time.SYSTEM, // 时间戳工具
initialHighWatermarkValue: Long = 0L, // 初始 HW
val log: Option[Log] = None // 当前副本所属的 Log 对象,如果是远程副本,该字段为空,通过该字段可以区分是本地副本还是远程副本
) extends Logging {

/**
* 记录副本的 HW 值,消费者只能读取 HW 之前的消息,之后的消息对消费者不可见,
* 由 leader 副本维护,当消息被 ISR 集合中所有副本成功同步时更新该字段。
*/
@volatile private[this] var highWatermarkMetadata = new LogOffsetMetadata(initialHighWatermarkValue)
/**
* 记录副本所属 Log 对象最后一条消息的 offset 值:
* - 如果是本地副本,可以直接从 Log#nextOffsetMetadata 字段中获取;
* - 如果是远程副本,则由其它 broker 发送请求来更新该值。
*/
@volatile private[this] var logEndOffsetMetadata = LogOffsetMetadata.UnknownOffsetMetadata
/** 缓存上次从 leader 拉取消息时 leader 副本的 LEO 值 */
@volatile private[this] var lastFetchLeaderLogEndOffset = 0L
/** 记录上次从 leader 拉取消息的时间戳 */
@volatile private[this] var lastFetchTimeMs = 0L
/** 记录当前 follower 从 leader 拉取消息的最近一次时间戳,用于标识当前 follower 滞后 leader 的程度 */
@volatile private[this] var _lastCaughtUpTimeMs = 0L
/** 副本所属 topic 分区对象 */
val topicPartition: TopicPartition = partition.topicPartition

// ... 省略方法定义

}

对于本地副本来说会持有所属 Log 对象的引用,可以基于这一点来判定当前副本是本地副本还是远程副本。此外,Replica 对象还记录了当前副本的 LEO 和 HW 值,以及最近一次从 leader 副本拉取消息的时间戳,同时还定义了相关方法用于维护这些信息,下面分别来看一下维护 LEO 和 HW 值的方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def updateLogReadResult(logReadResult: LogReadResult) {
// 更新 _lastCaughtUpTimeMs 值,记录了 follower 从 leader 拉取消息的最新时间
if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset)
_lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, logReadResult.fetchTimeMs)
else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset)
_lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, lastFetchTimeMs)

// 如果当前副本是远程副本,则更新当前副本的 LEO 值
logEndOffset = logReadResult.info.fetchOffsetMetadata
// 更新本地记录的从 leader 拉取消息时 leader 副本的 LEO 值
lastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffset
// 更新本地记录的从 leader 拉取消息的时间戳
lastFetchTimeMs = logReadResult.fetchTimeMs
}

private def logEndOffset_=(newLogEndOffset: LogOffsetMetadata) {
if (isLocal) {
// 如果是本地副本无需更新 LEO 值,而是由对应 Log 对象的 Log#logEndOffsetMetadata 字段决定
throw new KafkaException(s"Should not set log end offset on partition $topicPartition's local replica $brokerId")
} else {
// 如果当前副本是远程副本,则更新副本的 LEO 值
logEndOffsetMetadata = newLogEndOffset
trace(s"Setting log end offset for replica $brokerId for partition $topicPartition to [$logEndOffsetMetadata]")
}
}

方法 Replica#updateLogReadResult 用于更新当前 Replica 对象的 LEO 值。对于 follower 来说,当从 leader 完成一次消息同步操作后,follower 会更新本地记录的 LEO 值,并更新相应的时间戳信息,其中 _lastCaughtUpTimeMs 字段用于记录 follower 最近一次成功从 leader 拉取消息的时间戳,可以标识当前 follower 相对于 leader 的滞后程度。

由上面的实现可以看出,只有远程副本需要更新 LEO 值,因为远程副本未持有所属 Log 对象的引用,需要通过本地字段缓存当前副本的 LEO 值。Replica 类定义了 Replica#logEndOffset 方法用于获取当前副本的 LEO 值:

1
def logEndOffset: LogOffsetMetadata = if (isLocal) log.get.logEndOffsetMetadata else logEndOffsetMetadata

对于本地副本来说,可以调用其持有的 Log 对象的 Log#logEndOffsetMetadata 方法直接获取对应的 LEO 值,而对于远程副本来说则返回本地缓存的 LEO 值。

对于 HW 值而言,Replica 同样提供了更新的方法(如下),需要注意的一点是这里仅更新本地副本的 HW 值,因为远程副本所在的 broker 节点仅维护副本的 LEO 位置信息 :

1
2
3
4
5
6
7
8
9
def highWatermark_=(newHighWatermark: LogOffsetMetadata) {
if (isLocal) {
// 如果是本地副本,则更新对应的 HW 值
highWatermarkMetadata = newHighWatermark
trace(s"Setting high watermark for replica $brokerId partition $topicPartition to [$newHighWatermark]")
} else {
throw new KafkaException(s"Should not set high watermark on partition $topicPartition's non-local replica $brokerId")
}
}

同时,Replica 也提供了获取当前副本 HW 值的方法,实现如下:

1
def highWatermark: LogOffsetMetadata = highWatermarkMetadata

Partition 组件

Partition 类用于定义 Kafka 中的分区,一个 topic 可以设置多个分区。前面在介绍 Kafka 架构与核心概念时曾提及过,Kafka 之所以需要引入分区的概念,主要是希望利用分布式系统中的多节点来提升 Kafka 集群的性能和可扩展性。因为一个 topic 的各个分区可以分布在不同的 broker 节点上,进而就能将 topic 的消息数据分散在这些 broker 节点上存储,对于消息的读写压力就可以由这些节点进行分摊。当我们感知到一个 topic 的消息读写量较大时,我们可以适当增加分区的数目来实现扩容的目的。设想如果我们不引入分区策略,而是由一个 broker 节点完整负责一个 topic,考虑每个 topic 之间的消息数据量和读写量可能存在较大差别,那么各个 broker 节点在负载均衡性上也会有较大的差异,最终影响的是集群整体的可用性。

此外,为了保证高可用性,Kafka 会为每个分区设置多个副本,Partition 提供了管理这些副本的方法,包括执行副本角色切换、维护 ISR 集合、管理 HW 值和 LEO 值,以及调用日志存储系统写入日志数据等。

下面来看一下 Partition 类的字段定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class Partition(val topic: String, // 分区所属的 topic
val partitionId: Int, // 分区编号
time: Time, // 时间戳工具
replicaManager: ReplicaManager // 副本管理
) extends Logging with KafkaMetricsGroup {

/** topic 分区对象 */
val topicPartition = new TopicPartition(topic, partitionId)
/** 当前 broker 的 ID */
private val localBrokerId = replicaManager.config.brokerId
/** 管理分区日志数据 */
private val logManager = replicaManager.logManager
/** ZK 工具类 */
private val zkUtils = replicaManager.zkUtils
/** AR 集合,维护当前分区全部副本的集合,key 是副本 ID */
private val assignedReplicaMap = new Pool[Int, Replica]
/** leader 副本的年代信息 */
@volatile private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
/** leader 副本的 ID */
@volatile var leaderReplicaIdOpt: Option[Int] = None
/** 当前分区的 ISR 集合 */
@volatile var inSyncReplicas: Set[Replica] = Set.empty[Replica]
/** 当前集群控制器的年代信息,会在切换副本角色时进行更新 */
private var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1

// ... 省略方法定义

}

Partition 中提供了多种方法实现,按照功能划分可以将其中的核心方法划分为以下 5 类:

  1. 副本对象操作:getOrCreateReplica / getReplica / removeReplica
  2. 副本角色切换:makeLeader / makeFollower
  3. 日志数据操作:delete / appendRecordsToLeader
  4. ISR 集合管理:maybeExpandIsr / maybeShrinkIsr
  5. HW 和 LEO 位置管理:checkEnoughReplicasReachOffset / maybeIncrementLeaderHW / updateReplicaLogReadResult

下面按照分类对这些方法逐一进行分析。

副本对象操作

Partition 对象定义了 Partition#assignedReplicaMap 字段用于记录了隶属于当前分区的所有副本 Replica 对象,即 AR 集合,并提供了相关方法用于管理该字段。其中 Partition#getReplica 方法和 Partition#removeReplica 方法分别用于从字段中获取和移除指定副本 ID 对应的副本 Replica 对象,实现比较简单。

本小节我们主要对 Partition#getOrCreateReplica 方法进行分析,该方法相对于 Partition#getReplica 方法的区别在于当给定的副本 ID 在本地找不到对应的副本 Replica 对象时,会创建一个新的 Replica 对象。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
// 尝试从 AR 集合中获取 replicaId 对应的 Replica 对象,如果不存在则创建一个
assignedReplicaMap.getAndMaybePut(replicaId, {
// 如果是本地副本
if (this.isReplicaLocal(replicaId)) {
// 获取 log 相关配置信息,ZK 中的配置会覆盖默认配置
val config = LogConfig.fromProps(logManager.defaultConfig.originals, AdminUtils.fetchEntityConfig(zkUtils, ConfigType.Topic, topic))

// 创建 topic 分区对应的 Log 对象,如果已经存在则直接返回
val log = logManager.createLog(topicPartition, config)

// 加载对应 log 目录下的 replication-offset-checkpoint 文件,其中记录了每个 topic 分区的 HW 值
val checkpoint = replicaManager.highWatermarkCheckpoints(log.dir.getParentFile.getAbsolutePath)
val offsetMap = checkpoint.read()

// 获取当前 topic 分区对应的 HW 值,并与 LEO 比较,选择较小的值作为此副本的 HW 位置
val offset = math.min(offsetMap.getOrElse(topicPartition, 0L), log.logEndOffset)

// 创建 Replica 对象
new Replica(replicaId, this, time, offset, Some(log))
}
// 如果是远程副本,无需加载本地对应的日志数据
else new Replica(replicaId, this, time)
})
}

如果参数指定的副本 ID 对应的副本 Replica 对象在本地 AR 集合中不存在,则方法会执行创建对应的 Replica 对象。这里区分本地副本和远程副本,对于远程副本来说创建的过程如上述代码所示,比较简单,而对于本地副本来说,因为本地副本持有副本所属分区对应的 Log 对象,所以需要加载相关数据信息,包括配置、初始 HW 值,以及分区对应的 Log 对象。其中构造 Log 对象的过程由 LogManager#createLog 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def createLog(topicPartition: TopicPartition, config: LogConfig): Log = {
logCreationOrDeletionLock synchronized {
// 获取指定 topic 分区对应的 Log 对象
getLog(topicPartition).getOrElse {
// 如果存在多个 log 目录,则选择 Log 数目最少的目录
val dataDir = this.nextLogDir()
// 创建当前 topic 分区对应的日志目录
val dir = new File(dataDir, topicPartition.topic + "-" + topicPartition.partition)
dir.mkdirs()
// 创建 Log 对象
val log = new Log(dir, config, recoveryPoint = 0L, scheduler, time)
// 缓存到本地 logs 字段中
logs.put(topicPartition, log)
info("Created log for partition [%s,%d] in %s with properties {%s}."
.format(topicPartition.topic, topicPartition.partition, dataDir.getAbsolutePath, config.originals.asScala.mkString(", ")))
log
}
}
}

副本角色切换

副本有 leader 和 follower 角色之分,Partition 分别提供了 Partition#makeLeader 方法和 Partition#makeFollower 方法用于将本地副本切换成相应的 leader 和 follower 角色。

切换本地副本为 leader 角色

方法 Partition#makeLeader 用于将本地副本切换成 leader 角色,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def makeLeader(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {
val (leaderHWIncremented, isNewLeader) = inWriteLock(leaderIsrUpdateLock) {
// 1. 更新本地记录的 controller 的年代信息
controllerEpoch = partitionStateInfo.controllerEpoch

// 2. 获取/创建请求信息中 AR 和 ISR 集合中所有副本对应的 Replica 对象
val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
allReplicas.foreach(replica => getOrCreateReplica(replica))
val newInSyncReplicas = partitionStateInfo.isr.asScala.map(r => getOrCreateReplica(r)).toSet

// 3. 移除本地缓存的所有已过期的的副本对象
(assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)

// 4. 更新本地记录的分区 leader 副本相关信息
inSyncReplicas = newInSyncReplicas // 更新 ISR 集合
leaderEpoch = partitionStateInfo.leaderEpoch // 更新 leader 副本的年代信息
zkVersion = partitionStateInfo.zkVersion // 更新 ZK 的版本信息

// 5. 检测分区 leader 副本是否发生变化
val isNewLeader =
if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == localBrokerId) {
false // 未发生变化
} else {
// leader 发生变化,更新分区 leader 副本 ID
leaderReplicaIdOpt = Some(localBrokerId)
true
}

// 6. 遍历所有的 follower 副本,更新对应副本的相关时间戳信息
val leaderReplica = getReplica().get // 获取 leader 副本 Replica 对象
val curLeaderLogEndOffset = leaderReplica.logEndOffset.messageOffset // 获取 leader 副本的 LEO 值
val curTimeMs = time.milliseconds
(assignedReplicas - leaderReplica).foreach { replica =>
val lastCaughtUpTimeMs = if (inSyncReplicas.contains(replica)) curTimeMs else 0L
replica.resetLastCaughtUpTime(curLeaderLogEndOffset, curTimeMs, lastCaughtUpTimeMs)
}

// 7. 如果当前 leader 是新选举出来的,则修正 leader 副本的 HW 值,并重置本地缓存的所有远程副本的相关信息
if (isNewLeader) {
// 尝试修正新 leader 副本的 HW 值
leaderReplica.convertHWToLocalOffsetMetadata()
// 重置本地缓存的所有远程副本的相关信息
assignedReplicas.filter(_.brokerId != localBrokerId).foreach(_.updateLogReadResult(LogReadResult.UnknownLogReadResult))
}

// 8. 尝试后移 leader 副本的 HW 值
(maybeIncrementLeaderHW(leaderReplica), isNewLeader)
}

// 9. 如果 leader 副本的 HW 值增加了,则尝试执行监听当前 topic 分区的 DelayedFetch 和 DelayedProduce 任务
if (leaderHWIncremented) tryCompleteDelayedRequests()

isNewLeader
}

切换副本为 leader 角色的整体流程可以概括为:

  1. 更新本地记录的 kafka controller 的年代信息;
  2. 获取分区新的 AR 集合和 ISR 集合中所有副本对应的 Replica 对象,如果不存在则创建;
  3. 移除本地缓存的对应分区已经过期的副本 Replica 对象;
  4. 更新本地记录的分区 leader 副本的相关信息,包括 ISR 集合、leader 副本的年代信息等;
  5. 检测分区 leader 副本是否发生变化,如果当前副本之前是 follower 角色,或者对应的 topic 分区的副本之前未分配给当前 broker 节点,则说明对应 topic 分区的 leader 副本发生了变化;
  6. 遍历所有的 follower 副本,更新对应副本的相关时间戳信息,包括最近一次从 leader 副本拉取消息的时间戳,以及 leader 副本的 LEO 值等;
  7. 如果当前 leader 副本是新选举出来的,则尝试修正对应副本的 HW 值,并重置本地缓存的所有远程副本的相关信息;
  8. 尝试后移 leader 副本的 HW 值;
  9. 如果上一步后移了 leader 副本的 HW 值,则尝试执行监听当前 topic 分区的 DelayedFetch 和 DelayedProduce 延时任务,因为等待的条件可能已经满足。

其中,方法 Partition#maybeIncrementLeaderHW 用于尝试向后移动 leader 副本的 HW 值,相关实现我们将在本篇的后续部分进行分析。

切换本地副本为 follower 角色

方法 Partition#makeFollower 用于将本地副本切换成 follower 角色,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def makeFollower(controllerId: Int, partitionStateInfo: PartitionState, correlationId: Int): Boolean = {
inWriteLock(leaderIsrUpdateLock) {
val allReplicas = partitionStateInfo.replicas.asScala.map(_.toInt)
val newLeaderBrokerId: Int = partitionStateInfo.leader

// 1. 更新本地记录的 controller 的年代信息
controllerEpoch = partitionStateInfo.controllerEpoch

// 2. 获取/创建请求信息中所有副本对应的 Replica 对象
allReplicas.foreach(r => getOrCreateReplica(r))

// 3. 移除本地缓存的所有已过期的的副本对象
(assignedReplicas.map(_.brokerId) -- allReplicas).foreach(removeReplica)

// 4. 更新本地记录的分区 leader 副本相关信息,其中 ISR 集合由 leader 副本维护,将 follower 副本上的 ISR 集合置空
inSyncReplicas = Set.empty[Replica]
leaderEpoch = partitionStateInfo.leaderEpoch // 更新 leader 副本的年代信息
zkVersion = partitionStateInfo.zkVersion // 更新 zk 版本信息

// 5. 检测分区 leader 副本是否发生变化,如果发生变化则更新本地记录的 ID 值
if (leaderReplicaIdOpt.isDefined && leaderReplicaIdOpt.get == newLeaderBrokerId) {
false
} else {
// 发生变化,更新本地记录的分区 leader 副本的 ID
leaderReplicaIdOpt = Some(newLeaderBrokerId)
true
}
}
}

切换副本为 follower 角色的整体流程可以概括为:

  1. 更新本地记录的 kafka controller 的年代信息;
  2. 获取分区新的 AR 集合中所有副本对应的 Replica 对象,如果不存在则创建;
  3. 移除本地缓存的已经过期的副本 Replica 对象;
  4. 更新本地记录的分区 leader 副本的相关信息,因为 ISR 集合由 leader 副本管理,所以需要将 follower 副本记录的 ISR 集合置为空;
  5. 检测分区 leader 副本是否发生变化,如果发生变化则需要更新本地记录的 leader 副本的 ID。

相对于切换成 leader 角色来说,将本地副本切换成 follower 的过程要简单许多。

日志数据操作

Partition 提供了 Partition#delete 方法和 Partition#appendRecordsToLeader 方法用于操作日志数据,其中前者用于清空当前分区记录的副本相关信息,包括 AR 集合、ISR 集合,以及 leader 副本的 ID 值等信息,并异步删除分区对应的日志文件和索引文件(由 LogManager#asyncDelete 方法实现,会将日志文件和索引文件添加 .delete 标记删除后缀,并交由定时任务执行删除操作),而后者用于往当前分区的 leader 副本追加消息。删除操作的实现比较简单,这里重点来看一下往 leader 副本追加消息的过程,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
def appendRecordsToLeader(records: MemoryRecords, requiredAcks: Int = 0): LogAppendInfo = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal match {
// 只有 leader 副本支持追加消息操作
case Some(leaderReplica) =>
// 获取 leader 副本对应的 Log 对象
val log = leaderReplica.log.get
// 对应 min.insync.replicas 配置,表示 ISR 集合的最小值
val minIsr = log.config.minInSyncReplicas
// 获取当前分区 ISR 集合的大小
val inSyncSize = inSyncReplicas.size

// 如果用户指定 acks = -1,但是当前 ISR 集合小于允许的最小值,则不允许追加消息,防止数据丢失
if (inSyncSize < minIsr && requiredAcks == -1) {
throw new NotEnoughReplicasException(
"Number of insync replicas for partition %s is [%d], below required minimum [%s]".format(topicPartition, inSyncSize, minIsr))
}

// 往 leader 副本的 Log 对象中追加消息
val info = log.append(records)
// 有新的日志数据被追加,尝试执行监听当前 topic 分区的 DelayedFetch 延时任务
replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))
// 尝试后移 leader 副本的 HW 值
(info, maybeIncrementLeaderHW(leaderReplica))

// 如果不是 leader 副本,则抛出异常
case None =>
throw new NotLeaderForPartitionException("Leader not local for partition %s on broker %d".format(topicPartition, localBrokerId))
}
}

// 如果 leader 副本的 HW 值增加了,则尝试执行监听当前 topic 分区的 DelayedFetch 和 DelayedProduce 任务
if (leaderHWIncremented) tryCompleteDelayedRequests()

info
}

首先我们多次提到的一点是,Kafka 只允许往目标 topic 分区的 leader 副本追加消息,而 follower 只能从 leader 副本同步消息,所以如果当前追加操作的是 follower 副本,则会抛出异常。

对于 leader 副本来说,在具体执行追加操作之前,如果用户指定了 acks 参数为 -1,即要求所有 ISR 副本在全部收到消息后才允许对客户端进行成功响应,那么会先检测当前分区的 ISR 集合中的副本数目是否大于等于配置的阈值(对应 min.insync.replicas 配置),如果数目不达标则会拒绝执行追加操作,防止数据丢失。具体追加消息数据的操作交由 Log#append 方法执行,该方法已经在前面的文章中分析过,这里不再重复撰述。完成了消息数据的追加操作后,Kafka 会立即尝试执行监听当前 topic 分区的 DelayedFetch 延时任务,避免让客户端和 follower 副本等待太久或超时,此外还会尝试后移 leader 副本的 HW 值。

ISR 集合管理

分区 leader 副本的一个重要的职责就是维护当前分区的 ISR 集合。在分布式应用中,考虑网络、机器性能等因素,follower 副本同步 leader 副本数据的状态是在动态变化的,如果一个 follower 副本与 leader 副本之间存在较大的同步延迟,则不应该被加入到 ISR 集合中,否则应该被纳入到 ISR 集合中的一员,从而能够在 leader 副本失效时,竞选成为新的 leader 副本,以保证 Kafka 服务的可用性。

Partition 类型分别定义了 Partition#maybeExpandIsr 方法和 Partition#maybeShrinkIsr 方法,用于将指定的副本在满足条件下加入到 ISR 集合中,以及依据给定的时间阈值将滞后于 leader 副本超过阈值时间的 follower 副本移出 ISR 集合。首先来看一下 Partition#maybeExpandIsr 方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def maybeExpandIsr(replicaId: Int, logReadResult: LogReadResult) {
val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal match {
// 只有当本地副本是 leader 副本时,才执行扩张操作,因为 ISR 集合由 leader 副本维护
case Some(leaderReplica) =>
// 获取目标 follower 副本对应的 Replica 对象
val replica = getReplica(replicaId).get
// 获取 leader 副本对应的 HW 值
val leaderHW = leaderReplica.highWatermark
// 判断当前 follower 是否应该被加入到 ISR 集合,并在成功加入后更新相关信息
if (!inSyncReplicas.contains(replica) // follower 副本不在 ISR 集合中
&& assignedReplicas.map(_.brokerId).contains(replicaId) // AR 集合中包含该 follower 副本
&& replica.logEndOffset.offsetDiff(leaderHW) >= 0) { // follower 副本的 LEO 已经追赶上 leader 副本的 HW 值
// 将 follower 副本添加到 ISR 集合中
val newInSyncReplicas = inSyncReplicas + replica
info(s"Expanding ISR for partition $topicPartition from ${inSyncReplicas.map(_.brokerId).mkString(",")} to ${newInSyncReplicas.map(_.brokerId).mkString(",")}")
// 更新 ZK 和本地记录的新的 ISR 集合信息
this.updateIsr(newInSyncReplicas)
replicaManager.isrExpandRate.mark()
}
// 尝试后移 leader 副本的 HW 值
this.maybeIncrementLeaderHW(leaderReplica, logReadResult.fetchTimeMs)

// 如果不是 leader 副本,啥也不干
case None => false
}
}

// 如果 leader 副本的 HW 值发生变化,尝试执行监听当前 topic 分区的 DelayedFetch 和 DelayedProduce 延时任务
if (leaderHWIncremented) tryCompleteDelayedRequests()
}

ISR 集合的扩张和收缩操作均由 leader 副本负责,对于给定的 follower 副本如果同时满足以下条件,则将其添加到 ISR 集合中:

  1. 目标 follower 副本不在当前分区的 ISR 集合中;
  2. 目标 follower 副本位于当前分区的 AR 集合中;
  3. 目标 follower 副本的 LEO 值已经追赶上对应 leader 副本的 HW 值。

对于同时满足上述条件的 follower 副本,Kafka 会将其添加到对应 topic 分区的 ISR 集合中,并将新的 ISR 集合信息记录到 ZK,同时更新 leader 副本本地记录的 ISR 集合。

方法 Partition#maybeShrinkIsr 用于收缩当前分区的 ISR 集合,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def maybeShrinkIsr(replicaMaxLagTimeMs: Long) {
val leaderHWIncremented = inWriteLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal match {
// 只有当本地副本是 leader 副本时,才执行缩减操作,因为 ISR 集合由 leader 副本维护
case Some(leaderReplica) =>
// 从 ISR 集合中获取滞后的 follower 副本集合
val outOfSyncReplicas = this.getOutOfSyncReplicas(leaderReplica, replicaMaxLagTimeMs)
if (outOfSyncReplicas.nonEmpty) {
// 将滞后的 follower 副本从 ISR 集合中剔除
val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
assert(newInSyncReplicas.nonEmpty)
info("Shrinking ISR for partition [%s,%d] from %s to %s".format(topic, partitionId,
inSyncReplicas.map(_.brokerId).mkString(","), newInSyncReplicas.map(_.brokerId).mkString(",")))
// 将新的 ISR 集合信息上报给 ZK,同时更新本地记录的 ISR 集合信息
this.updateIsr(newInSyncReplicas)
replicaManager.isrShrinkRate.mark()
// 尝试后移 leader 副本的 HW 值
this.maybeIncrementLeaderHW(leaderReplica)
} else {
false
}
// 如果不是 leader 副本,则啥也不做
case None => false
}
}

// 如果 leader 副本的 HW 值发生变化,尝试执行监听当前 topic 分区的 DelayedFetch 和 DelayedProduce 延时任务
if (leaderHWIncremented) tryCompleteDelayedRequests()
}

def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): Set[Replica] = {
// 获取 ISR 集合中所有的 follower 副本
val candidateReplicas = inSyncReplicas - leaderReplica
// 获取超过给定时间(对应 replica.lag.time.max.ms 配置)未向 leader 副本请求拉取消息的 follower 副本集合
val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
laggingReplicas
}

对于 ISR 集合中的 follower 副本,如果其最近一次成功从 leader 副本拉取数据的时间戳相对于当前时间超过指定的阈值(对应 replica.lag.time.max.ms 配置,默认为 10 秒),则将其从 ISR 集合中移出,而不管当前 follower 副本与 leader 副本的数据延迟差异。一旦 follower 被从 ISR 踢出,Kafka 会将新的 ISR 集合信息上报给 ZK,同时更新 leader 副本本地记录的 ISR 集合。

HW 和 LEO 位置管理

Partition 定义了 Partition#checkEnoughReplicasReachOffset 方法和 Partition#maybeIncrementLeaderHW 方法,分别用于检测指定 offset 之前的消息是否已经被 ISR 集合中足够多的 follower 副本确认(ack),以及尝试向后移动 leader 副本的 HW 值。先来看一下 Partition#checkEnoughReplicasReachOffset 方法,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def checkEnoughReplicasReachOffset(requiredOffset: Long): (Boolean, Errors) = {
leaderReplicaIfLocal match {
// 如果当前副本是 leader 副本
case Some(leaderReplica) =>
// 获取 ISR 集合
val curInSyncReplicas = inSyncReplicas

// 对应 min.insync.replicas 配置
val minIsr = leaderReplica.log.get.config.minInSyncReplicas

// 如果当前请求的 offset 小于等于 HW 的 offset
if (leaderReplica.highWatermark.messageOffset >= requiredOffset) {
// 如果当前分区的 ISR 集合大小大于等于允许的最小值
if (minIsr <= curInSyncReplicas.size) (true, Errors.NONE)
// 否则返回 NOT_ENOUGH_REPLICAS_AFTER_APPEND 错误
else (true, Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND)
} else {
// 如果当前请求的 offset 大于 HW,则直接返回 false,因为 HW 之后的消息对于客户端不可见
(false, Errors.NONE)
}

// 如果当前副本是 follower 副本,则返回 NOT_LEADER_FOR_PARTITION 错误
case None => (false, Errors.NOT_LEADER_FOR_PARTITION)
}
}

方法 Partition#checkEnoughReplicasReachOffset 接收一个 requiredOffset 参数,用于检测该 offset 之前的消息是否已经被确认,本质上就是将该 offset 与 leader 副本的 HW 值进行比较,如果 leader 副本的 HW 值大于等于该 offset 值,则认为之前的消息已经全部被确认。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private def maybeIncrementLeaderHW(leaderReplica: Replica, curTime: Long = time.milliseconds): Boolean = {
// 获取位于 ISR 集合中,或最近一次从 leader 拉取消息的时间戳位于指定时间范围(对应 replica.lag.time.max.ms 配置)内的所有副本的 LEO 值
val allLogEndOffsets = assignedReplicas.filter { replica =>
curTime - replica.lastCaughtUpTimeMs <= replicaManager.config.replicaLagTimeMaxMs || inSyncReplicas.contains(replica)
}.map(_.logEndOffset)

// 以这些副本中最小的 LEO 值作为 leader 副本新的 HW 值
val newHighWatermark = allLogEndOffsets.min(new LogOffsetMetadata.OffsetOrdering)

// 比较新旧 HW 值,如果旧的 HW 小于新的 HW,或者旧的 HW 对应的 LogSegment 的 baseOffset 小于新的 HW 的 LogSegment 对象的 baseOffset,则更新
val oldHighWatermark = leaderReplica.highWatermark
if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || oldHighWatermark.onOlderSegment(newHighWatermark)) {
leaderReplica.highWatermark = newHighWatermark
debug("High watermark for partition [%s,%d] updated to %s".format(topic, partitionId, newHighWatermark))
true
} else {
debug("Skipping update high watermark since Old hw %s is larger than new hw %s for partition [%s,%d]. All leo's are %s"
.format(oldHighWatermark, newHighWatermark, topic, partitionId, allLogEndOffsets.mkString(",")))
false
}
}

上述方法曾在前面的分析中多次出现,用于尝试后移 leader 副本的 HW 位置,其核心思想是选取 ISR 集合中副本最小的 LEO 值作为 leader 副本的新 HW 值,如果计算出来的 HW 值大于 leader 副本当前的 HW 值,则进行更新。考虑到一些位于 ISR 集合之外但是有机会加入 ISR 集合的副本加入 ISR 集合有一个延迟的过程,所以这里也考虑了这些滞后于 leader 副本时间较小的 follower 副本。

前面我们曾提及过远程副本的作用在于协助 leader 副本更新分区 HW 值,这里我们具体说明一下这一过程。分区 leader 副本所在的 broker 节点以远程副本的形式记录着所有 follower 副本的 LEO 值,当 follower 副本从 leader 副本同步数据时会告知 leader 副本从什么位置开始拉取数据,leader 副本会使用该 offset 值更新远程副本的 LEO 位置值。当 leader 副本需要更新分区 HW 值时会从所有远程副本中筛选出那些位于 ISR 集合中,或者与 leader 副本之间同步时间间隔位于 replica.lag.time.max.ms 内的副本,当这些副本中最小的 LEO 值大于当前 leader 副本的 HW 值时,则更新 leader 副本的 HW 值。

Partition 提供了 Partition#updateReplicaLogReadResult 方法用于更新指定 follower 副本的 LEO 值(具体通过调用 Replica#updateLogReadResult 方法实现),并在完成更新之后尝试调用 Partition#maybeExpandIsr 方法来扩张 ISR 集合,整体过程实现比较简单,不再展开。

Follower 副本在与 leader 副本进行数据同步时,会将从 leader 副本获取到的 HW 值与当前副本的 LEO 值进行比对,并选择较小者作为当前 follower 副本的 HW 值。这样就产生了一个问题,即 follower 副本的 HW 值与 leader 副本的 HW 值是有差距的,当选举某个 HW 滞后的 follower 副本作为新的 leader 时需要对数据进行截断,从而存在丢失消息的风险。为此,Kafka 0.11 版本引入了 Leader Epoch 机制以解决这一问题,关于 Leader Epoch 机制我们以后再补充说明。

ReplicaManager 组件

ReplicaManager 类用于管理分布在当前 broker 节点上的所有分区的副本信息,主要提供了创建并获取指定 topic 分区对象、副本管理、日志数据读写、副本角色转换,以及更新当前 broker 节点缓存的整个集群中全部分区的状态信息等功能。ReplicaManager 的字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
class ReplicaManager(val config: KafkaConfig, // 相关配置对象
metrics: Metrics,
time: Time, // 时间戳工具
val zkUtils: ZkUtils, // ZK 工具类
scheduler: Scheduler, // 定时任务调度器
val logManager: LogManager, // 用于对分区日志数据执行读写操作
val isShuttingDown: AtomicBoolean, // 标记 kafka 服务是否正在执行关闭操作
quotaManager: ReplicationQuotaManager,
threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {

/**
* 记录 kafka controller 的年代信息,当重新选择 controller leader 时会递增该字段,
* 用于校验来自 controller 的请求的年代信息,防止处理来自老的 controller 的请求
*/
@volatile var controllerEpoch: Int = KafkaController.InitialControllerEpoch - 1
/** 本地 broker 的 ID */
private val localBrokerId = config.brokerId
/** 记录当前 broker 管理的所有分区信息,如果不存在则创建 */
private val allPartitions = new Pool[TopicPartition, Partition](Some(tp => new Partition(tp.topic, tp.partition, time, this)))
/** 管理向 leader 副本发送 FetchRequest 请求的 ReplicaFetcherThread 线程 */
val replicaFetcherManager = new ReplicaFetcherManager(config, this, metrics, time, threadNamePrefix, quotaManager)
/** 标记 highwatermark-checkpoint 定时任务是否已经启动 */
private val highWatermarkCheckPointThreadStarted = new AtomicBoolean(false)
/** 记录每个 log 目录与对应 topic 分区 HW 值的映射关系 */
val highWatermarkCheckpoints: Predef.Map[String, OffsetCheckpoint] = config.logDirs.map(dir =>
(new File(dir).getAbsolutePath, new OffsetCheckpoint(new File(dir, ReplicaManager.HighWatermarkFilename)))).toMap
/** 标记 highwatermark-checkpoint 定时任务是否已经启动 */
private var hwThreadInitialized = false
/** 记录 ISR 集合发生变化的 topic 分区信息 */
private val isrChangeSet: mutable.Set[TopicPartition] = new mutable.HashSet[TopicPartition]()
private val lastIsrChangeMs = new AtomicLong(System.currentTimeMillis())
private val lastIsrPropagationMs = new AtomicLong(System.currentTimeMillis())
/** 管理 DelayedProduce 延时任务的炼狱 */
val delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce] = DelayedOperationPurgatory[DelayedProduce](
purgatoryName = "Produce", localBrokerId, config.producerPurgatoryPurgeIntervalRequests)
/** 管理 DelayedFetch 延时任务的炼狱 */
val delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch] = DelayedOperationPurgatory[DelayedFetch](
purgatoryName = "Fetch", localBrokerId, config.fetchPurgatoryPurgeIntervalRequests)

// ... 省略相关方法定义

}

Kafka 服务在启动时会创建 ReplicaManager 对象,并调用 ReplicaManager#startup 方法启动 ReplicaManager 管理的定时任务,即 isr-expiration 和 isr-change-propagation 定时任务。实现如下:

1
2
3
4
5
6
def startup() {
// 定时检测当前 broker 节点管理的每个分区是否需要缩减 ISR 集合,并执行缩减操作
scheduler.schedule("isr-expiration", maybeShrinkIsr, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
// 定时将 ISR 集合发生变化的 topic 分区记录到 ZK
scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges, period = 2500L, unit = TimeUnit.MILLISECONDS)
}

定时任务 isr-expiration 周期性执行 ReplicaManager#maybeShrinkIsr 方法,尝试缩减当前 broker 节点管理的分区对应的 ISR 集合,具体缩减操作由 Partition#maybeShrinkIsr 方法实现,前面已经分析过,不再重复撰述。

定时任务 isr-change-propagation 周期性将 ISR 集合发生变化的 topic 副本信息更新到 ZK 相应节点下,Kafka 集群控制器基于 ZK 的 Watcher 机制监听相应节点,并在节点内容发生变化时向所有可用的 broker 节点发送 UpdateMetadataRequest 请求,以更新相应 broker 节点本地管理的整个集群中所有分区的状态信息。定时任务的执行逻辑由 ReplicaManager#maybePropagateIsrChanges 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def maybePropagateIsrChanges() {
val now = System.currentTimeMillis()
isrChangeSet synchronized {
// 定期将 ISR 集合发生变化的分区记录到 ZK,kafka controller 对相应 ZK 路径添加了 Watcher,
// 当 Watcher 被触发后会向所有可用的 broker 节点发送 UpdateMetadataRequest 请求,以更新 broker 节点缓存的所有分区状态信息
if (isrChangeSet.nonEmpty &&
// 最后一次有 ISR 集合发生变化的时间距离现在已经超过 5 秒
(lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now
// 上次写入 ZK 的时间距离现在已经超过 1 分钟
|| lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {
// 将 ISR 集合发生变更的 topic 分区信息记录到 ZK
ReplicationUtils.propagateIsrChanges(zkUtils, isrChangeSet)
isrChangeSet.clear()
lastIsrPropagationMs.set(now)
}
}
}

为了避免频繁操作 ZK,上述方法在设计上添加了一定的过滤条件,只有当最近一次 ISR 集合变化的时间距离现在超过 5 秒,或者距离上一次操作 ZK 已经超过 1 分钟,才允许再次操作 ZK。Kafka Controller 在成为 leader 角色时会在相应 ZK 路径上注册 Watcher 监听器,当监听到有数据变化时,会构建 UpdateMetadataRequest 请求对象发送给所有可用的 broker 节点,以更新 broker 节点本地缓存的整个集群所有分区的状态信息。

ReplicaManager 提供了 ReplicaManager#maybeUpdateMetadataCache 方法来处理 UpdateMetadataRequest 请求,方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def maybeUpdateMetadataCache(correlationId: Int,
updateMetadataRequest: UpdateMetadataRequest,
metadataCache: MetadataCache): Seq[TopicPartition] = {
replicaStateChangeLock synchronized {
// 校验 controller 的年代信息,避免处理来自已经过期的 controller 的请求
if (updateMetadataRequest.controllerEpoch < controllerEpoch) {
val stateControllerEpochErrorMessage = ("Broker %d received update metadata request with correlation id %d from an " +
"old controller %d with epoch %d. Latest known controller epoch is %d").format(localBrokerId,
correlationId, updateMetadataRequest.controllerId, updateMetadataRequest.controllerEpoch, controllerEpoch)
stateChangeLogger.warn(stateControllerEpochErrorMessage)
throw new ControllerMovedException(stateControllerEpochErrorMessage)
} else {
// 更新所有分区的状态信息,并返回需要被移除的分区集合
val deletedPartitions = metadataCache.updateCache(correlationId, updateMetadataRequest)
// 更新本地缓存的 controller 年代信息
controllerEpoch = updateMetadataRequest.controllerEpoch
deletedPartitions
}
}
}

上述方法首先会校验当前 UpdateMetadataRequest 请求的年代信息,避免处理那些来自老的 kafka controller 的请求。对于合法的 UpdateMetadataRequest 请求,则会调用 MetadataCache#updateCache 方法更新所有分区的状态信息,并返回需要被移除的分区集合,同时更新本地缓存的 kafka controller 的年代信息。关于 MetadataCache 类的实现,留到后面针对性分析,这里先不展开。

除了上面介绍的 2 个定时任务以外,ReplicaManager 还定义了另外一个定时任务 highwatermark-checkpoint,该任务周期性将当前 broker 节点管理的每个 topic 分区的 HW 值更新到对应 log 目录下的 replication-offset-checkpoint 文件中。相关逻辑由 ReplicaManager#startHighWaterMarksCheckPointThread 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def startHighWaterMarksCheckPointThread(): Unit = {
if (highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
scheduler.schedule(
"highwatermark-checkpoint",
checkpointHighWatermarks,
period = config.replicaHighWatermarkCheckpointIntervalMs,
unit = TimeUnit.MILLISECONDS)
}

def checkpointHighWatermarks() {
// 获取所有分区全部的本地副本 Replica 对象
val replicas = allPartitions.values.flatMap(_.getReplica(localBrokerId))
// 按照副本所在的 log 目录进行分组
val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath)
// 遍历将位于相同 log 目录下的分区 HW 值,写入到对应的 replication-offset-checkpoint 文件中
for ((dir, reps) <- replicasByDir) {
// 获取每个 topic 分区对应的 HW 值
val hwms: Map[TopicPartition, Long] = reps.map(r => r.partition.topicPartition -> r.highWatermark.messageOffset).toMap
try {
// 更新对应 log 目录下的 replication-offset-checkpoint 文件
highWatermarkCheckpoints(dir).write(hwms)
} catch {
case e: IOException =>
fatal("Error writing to highwatermark file: ", e)
Runtime.getRuntime.halt(1)
}
}
}

具体执行逻辑如代码注释,比较简单。该定时任务会在当前 ReplicaManager 首次收到来自 kafka controller 的 LeaderAndIsrRequest 请求时被启动。

消息同步机制

为了支持在 topic 分区 leader 副本失效时,有新的副本可以继续对外提供服务,Kafka 为副本引入了 leader/follower 模型设计,follower 副本在平时并不负责与客户端进行交互,主要职责在于从 leader 副本同步消息数据,以备在 leader 副本失效时可以从所有符合条件的 follower 副本中选举一个新的 leader 副本,从而避免对应 topic 的长时间停车,本小节我们重点来分析一下 follower 副本从 leader 副本同步消息的操作。

ReplicaManager 使用 ReplicaFetcherManager 管理 follower 副本与 leader 副本的同步工作,ReplicaFetcherManager 继承自 AbstractFetcherManager 抽象类。ReplicaFetcherManager 将当前 broker 节点管理的分区对应的副本按照一定的条件进行分组,并为每个组创建一个 fetcher 线程,用于从对应 leader 副本所在的 broker 节点拉取指定 offset 的消息数据。

Fetcher 线程由 ReplicaFetcherThread 实现,ReplicaFetcherThread 继承自 AbstractFetcherThread 抽象类。每个 ReplicaFetcherManager 维护了一个 HashMap[BrokerAndFetcherId, AbstractFetcherThread] 类型的 AbstractFetcherManager#fetcherThreadMap 集合,用于记录每个分组对应的 fetcher 线程对象,其中 BrokerAndFetcherId 封装了目标 broker 节点的 id、host、port,以及对应 fetcher 线程 ID 等信息。

ReplicaFetcherManager 提供了多个方法用于管理 AbstractFetcherManager#fetcherThreadMap 集合,主要包括:

  1. AbstractFetcherManager#addFetcherForPartitions:将指定的待同步 topic 分区分组,并为每个分组创建并启动一个 fetcher 线程,从指定的 offset 开始与 leader 副本进行同步。
  2. AbstractFetcherManager#removeFetcherForPartitions:停止对指定 topic 分区集合的副本同步任务。
  3. AbstractFetcherManager#shutdownIdleFetcherThreads:关闭空闲的 fetcher 线程,相应线程不再为任何 topic 分区执行同步工作。

上述方法中 2 和 3 在实现上都比较简单,下面重点来看一下方法 1,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def addFetcherForPartitions(partitionAndOffsets: Map[TopicPartition, BrokerAndInitialOffset]) {
mapLock synchronized {
val partitionsPerFetcher = partitionAndOffsets.groupBy {
case (topicPartition, brokerAndInitialOffset) =>
// 由分区所属的 topic 和分区编号计算得到对应的 fetcher 线程 ID,并与 broker 的网络位置信息组成 key,然后按 key 进行分组,
// 后面会为每组分配一个 fetcher 线程,每个线程只连接一个 broker,可以同时为组内多个分区的 follower 副本执行同步操作。
BrokerAndFetcherId(brokerAndInitialOffset.broker, this.getFetcherId(topicPartition.topic, topicPartition.partition))
}

// 启动所有的的 fetcher 线程,如果对应线程不存在,则创建并启动
for ((brokerAndFetcherId, partitionAndOffsets) <- partitionsPerFetcher) {
var fetcherThread: AbstractFetcherThread = null
fetcherThreadMap.get(brokerAndFetcherId) match {
case Some(f) => fetcherThread = f
case None =>
// 创建 ReplicaFetcherThread 线程对象,并记录到 fetcherThreadMap 集合中
fetcherThread = this.createFetcherThread(brokerAndFetcherId.fetcherId, brokerAndFetcherId.broker)
fetcherThreadMap.put(brokerAndFetcherId, fetcherThread)
fetcherThread.start() // 启动线程
}

// 将 topic 分区和同步起始位置传递给 fetcher 线程,并唤醒 fetcher 线程开始同步
fetcherThreadMap(brokerAndFetcherId).addPartitions(partitionAndOffsets.map {
case (tp, brokerAndInitOffset) => tp -> brokerAndInitOffset.initOffset
})
}
}
}

上述方法首先会考虑目标 broker 节点的网络位置信息(brokerId、host 和 port)和 fetcher 线程的 ID 对待同步的 topic 分区进行分组,并以这些信息作为对应 fetcher 线程对象在 AbstractFetcherManager#fetcherThreadMap 集合中的 key,如果 key 对应的 fetcher 线程对象不存在则会创建并启动新的线程,同时将待同步 topic 分区的同步起始 offset 传递给对应线程,然后唤醒线程执行。创建 fetcher 线程的实现如下:

1
2
3
4
5
6
7
override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
val threadName = threadNamePrefix match {
case None => "ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id)
case Some(p) => "%s:ReplicaFetcherThread-%d-%d".format(p, fetcherId, sourceBroker.id)
}
new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, brokerConfig, replicaMgr, metrics, time, quotaManager)
}

ReplicaFetcherThread 继承自 ShutdownableThread 抽象方法,所以在线程被启动之后会循环调度执行 AbstractFetcherThread#doWork 方法,该方法会构造 FetchRequest 请求从 leader 副本拉取指定 offset 对应的消息数据,并处理 FetchResponse 响应。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
override def doWork() {
val fetchRequest = inLock(partitionMapLock) {
// 创建 FetchRequest 请求对象
val fetchRequest = this.buildFetchRequest(partitionStates.partitionStates.asScala.map { state =>
state.topicPartition -> state.value
})
// 如果没有拉取消息的需求,则等待一会后重试
if (fetchRequest.isEmpty) {
trace("There are no active partitions. Back off for %d ms before sending a fetch request".format(fetchBackOffMs))
partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
}
fetchRequest
}
// 发送 FetchRequest 请求,并处理 FetchResponse 响应
if (!fetchRequest.isEmpty) this.processFetchRequest(fetchRequest)
}

上述方法仅仅是构造了 FetchRequest 请求,而发送和处理响应的过程则由 AbstractFetcherThread#processFetchRequest 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private def processFetchRequest(fetchRequest: REQ) {
val partitionsWithError = mutable.Set[TopicPartition]()
var responseData: Seq[(TopicPartition, PD)] = Seq.empty

// 1. 发送 FetchRequest 请求,并阻塞等待响应
try {
trace("Issuing to broker %d of fetch request %s".format(sourceBroker.id, fetchRequest))
responseData = this.fetch(fetchRequest) // 模板方法
} catch {
// ... 省略异常处理
}
fetcherStats.requestRate.mark()

// 2. 处理响应
if (responseData.nonEmpty) {
inLock(partitionMapLock) {
// 遍历处理每个 topic 分区对应的响应
responseData.foreach { case (topicPartition, partitionData) =>
val topic = topicPartition.topic
val partitionId = topicPartition.partition
Option(partitionStates.stateValue(topicPartition)).foreach(currentPartitionFetchState =>
// 如果从发送 FetchRequest 请求到收到响应期间,offset 没有发生变化,则追加收到的日志数据
if (fetchRequest.offset(topicPartition) == currentPartitionFetchState.offset) {
Errors.forCode(partitionData.errorCode) match {
case Errors.NONE =>
try {
// 获取返回的消息集合
val records = partitionData.toRecords
// 获取返回的最后一条消息的 offset 值
val newOffset = records.shallowEntries.asScala.lastOption.map(_.nextOffset).getOrElse(currentPartitionFetchState.offset)

fetcherLagStats.getAndMaybePut(topic, partitionId).lag = Math.max(0L, partitionData.highWatermark - newOffset)
// 将从 leader 副本获取到的消息追加到当前 follower 副本对应的 Log 对象中
this.processPartitionData(topicPartition, currentPartitionFetchState.offset, partitionData)

val validBytes = records.validBytes
if (validBytes > 0) {
// 更新本地缓存的 fetch 状态
partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset))
fetcherStats.byteRate.mark(validBytes)
}
} catch {
// ... 省略异常处理
}
// follower 请求的 offset 超出了 leader 的 LEO 值
case Errors.OFFSET_OUT_OF_RANGE =>
try {
// 计算有效的 offset,并更新本地缓存的 fetch 状态
val newOffset = this.handleOffsetOutOfRange(topicPartition)
partitionStates.updateAndMoveToEnd(topicPartition, new PartitionFetchState(newOffset))
error("Current offset %d for partition [%s,%d] out of range; reset offset to %d".format(currentPartitionFetchState.offset, topic, partitionId, newOffset))
} catch {
// ... 省略异常处理
}
// ... 其他异常
}
})
}
}
}

// 对于操作存在异常的 topic 分区,暂停发送 FetchRequest 请求,休息一会儿
if (partitionsWithError.nonEmpty) {
debug("handling partitions with error for %s".format(partitionsWithError))
this.handlePartitionsWithErrors(partitionsWithError)
}
}

由上述实现可以看到方法 AbstractFetcherThread#processFetchRequest 主要做了两件事情:发送 FetchRequest 请求并阻塞等待响应,以及处理响应。其中发送 FetchRequest 请求的过程由 ReplicaFetcherThread#fetch 方法实现,该方法使用 NetworkClient 的阻塞版本 NetworkClientBlockingOps 向目标 broker 节点发送 FetchRequest 请求,并阻塞等待响应结果,然后将针对每个 topic 分区的响应结果封装成 PartitionData 对象交由后续处理。

在遍历处理对于每个 topic 分区的 FetchResponse 响应时,分为 3 种情况:

  1. 正常响应,拉回指定 offset 对应的消息数据。
  2. 异常响应,请求的 offset 不在 leader 副本允许的范围内。
  3. 其它异常响应。

对于 第 1 种情况 来说,会调用 ReplicaFetcherThread#processPartitionData 方法将从对应 leader 副本拉取回来的消息数据写入 follower 副本对应的 Log 对象中,并更新本地缓存的对应分区的消息同步状态信息。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def processPartitionData(topicPartition: TopicPartition, fetchOffset: Long, partitionData: PartitionData) {
try {
val replica = replicaMgr.getReplica(topicPartition).get
val records = partitionData.toRecords

// 如果拉取到的消息数据过大,则打印异常
this.maybeWarnIfOversizedRecords(records, topicPartition)

if (fetchOffset != replica.logEndOffset.messageOffset)
throw new RuntimeException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(topicPartition, fetchOffset, replica.logEndOffset.messageOffset))

// 将消息追加到 Log 中,因为 leader 已经为消息分配了 offset,所以 follower 无需在对消息分配 offset 值
replica.log.get.append(records, assignOffsets = false)

// 更新对应 follower 副本的 HW 值
val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.highWatermark)
replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)

if (quota.isThrottled(topicPartition)) quota.record(records.sizeInBytes)
} catch {
case e: KafkaStorageException =>
fatal(s"Disk error while replicating data for $topicPartition", e)
Runtime.getRuntime.halt(1)
}
}

追加消息数据由对应副本持有的 Log 对象的 Log#append 方法完成,如果成功追加则会更新副本对应的 HW 值。

如果在执行同步操作时,某个 topic 分区出现异常,则需要依据对应的异常类型分别处理,如果是除 OFFSET_OUT_OF_RANGE 以外的错误(对应 第 3 种情况 ),则会暂停到对应分区 leader 副本同步数据的请求,休整一段时间(对应 replica.fetch.backoff.ms 配置)之后再继续,对应 ReplicaFetcherThread#handlePartitionsWithErrors 方法实现,比较简单。

下面来看一下 第 2 种情况 ,如果同步操作请求的 offset 不合法,即位于 leader 副本的 [startOffset, LEO] 之外,则需要修正本地缓存的对应副本的同步状态信息,修正 offset 的过程由 ReplicaFetcherThread#handleOffsetOutOfRange 方法实现,这里需要区分 2 种情况:

  1. 请求同步的 offset 大于对应 leader 副本的 LEO 值。
  2. 请求同步的 offset 小于对应 leader 副本的 startOffset 值。

一般 follower 副本的 LEO 值都是小于等于 leader 副本的 LEO 值,但是如果发生以下场景(unclean leader election),则可能出现 follower 副本的 LEO 值大于 leader 副本的 LEO 值,此时如果 follower 副本请求同步 leader 副本就有可能出现请求的 offset 大于目标 leader 副本的 LEO 值的情况。这类场景的发生过程为(令场景中 follower 副本为 F):

  1. F 副本失效,期间 F 所属分区的 leader 副本继续追加消息数据;
  2. F 副本失效后恢复,继续从 leader 副本同步数据,但是在追赶上 leader 副本之前,所有 ISR 集合中的副本全部失效;
  3. 为了保证 Kafka 服务的正常运行,选举 F 成为对应 topic 分区新的 leader 副本,并开始负责处理来自生产者的消息读写请求;
  4. 上一任 leader 从失效中恢复,并成为 follower 角色,此时其 LEO 值很有可能大于 F 的 LEO 值。

针对这种情况简单的处理方式是将 follower 副本的消息进行截断,但是 Kafka 也提供了 unclean.leader.election.enable 配置,允许在发生这种情况时停服。相关实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = {
val replica = replicaMgr.getReplica(topicPartition).get

// 发送 ListOffsetRequest 请求,获取 leader 副本的 LEO 值
val leaderEndOffset: Long = this.earliestOrLatestOffset(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP, brokerConfig.brokerId)

// 如果 leader 副本的 LEO 值落后于 follower 副本的 LEO 值
if (leaderEndOffset < replica.logEndOffset.messageOffset) {
// 依据配置(unclean.leader.election.enable)决定是否需要停机
if (!LogConfig.fromProps(brokerConfig.originals,
AdminUtils.fetchEntityConfig(replicaMgr.zkUtils, ConfigType.Topic, topicPartition.topic)).uncleanLeaderElectionEnable) {
// Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur.
fatal("Exiting because log truncation is not allowed for partition %s,".format(topicPartition) +
" Current leader %d's latest offset %d is less than replica %d's latest offset %d"
.format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset.messageOffset))
System.exit(1)
}

warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d"
.format(brokerConfig.brokerId, topicPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderEndOffset))
// 将分区对应的 Log 截断到 leader 副本的 LEO 位置,从该位置开始重新与 leader 副本进行同步
replicaMgr.logManager.truncateTo(Map(topicPartition -> leaderEndOffset))

// 返回下次获取消息的 offset 位置
leaderEndOffset
} else {
// ... 第 2 种情况
}
}

有时候 leader 副本的 LEO 值也会明显领先于某个 follower 副本的 LEO 值,此时 follower 请求同步 leader 副本时可能出现请求同步的 offset 小于对应 leader 副本的 startOffset 值。出现这种情况的原因一般有以下 2 种:

  1. follower 副本长时间失效,期间 leader 副本不断在追加新的数据,等到 follower 再次上线时,leader 副本对应 offset 位置的日志数据已被定时任务清除。
  2. 出现前面介绍的 unclean leader election 场景,follower 在执行截断操作到 HW 位置后,offset 仍然大于新 leader 的 LEO 值,此时执行同步会导致 OffsetOutOfRangeException 异常,follower 在处理该异常的期间,leader 副本因为追加了大量的数据而导致 follower 再次请求同步时,offset 小于 leader 副本的 startOffset 值。

出现以上这 2 种情况只需要将 follower 同步请求同步的 offset 置为 leader 副本的 startOffset 即可,此外还需要清空 follower 副本的 Log 对象,因为其中的数据已经全部失效,没有继续保留的意义。相关实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def handleOffsetOutOfRange(topicPartition: TopicPartition): Long = {
val replica = replicaMgr.getReplica(topicPartition).get

// 发送 ListOffsetRequest 请求,获取 leader 副本的 LEO 值
val leaderEndOffset: Long = this.earliestOrLatestOffset(topicPartition, ListOffsetRequest.LATEST_TIMESTAMP, brokerConfig.brokerId)

// 如果 leader 副本的 LEO 值落后于 follower 副本的 LEO 值
if (leaderEndOffset < replica.logEndOffset.messageOffset) {
// ... 第 1 种情况
} else {
// 发送 ListOffsetRequest 请求,获取 leader 副本的 startOffset 值
val leaderStartOffset: Long = this.earliestOrLatestOffset(topicPartition, ListOffsetRequest.EARLIEST_TIMESTAMP, brokerConfig.brokerId)
warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d"
.format(brokerConfig.brokerId, topicPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset))
// 选择下次获取消息的起始 offset 值
val offsetToFetch = Math.max(leaderStartOffset, replica.logEndOffset.messageOffset)
// 如果当前 leader 的 startOffset 大于对应副本的 LEO 值,则将该副本的 Log 全部截断,并创建新的 activeSegment 对象
if (leaderStartOffset > replica.logEndOffset.messageOffset)
replicaMgr.logManager.truncateFullyAndStartAt(topicPartition, leaderStartOffset)

// 返回下次获取消息的 offset 位置
offsetToFetch
}
}

上述方法 AbstractFetcherThread#handleOffsetOutOfRange 还会在 AbstractFetcherThread#addPartitions 方法中被调用,该方法用于为每个 topic 分区构造合法的分区同步状态 PartitionFetchState 对象,并更新本地缓存,同时唤醒消息数据同步操作,前面分析过的 AbstractFetcherManager#addFetcherForPartitions 调用了该方法。实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def addPartitions(partitionAndOffsets: Map[TopicPartition, Long]) {
partitionMapLock.lockInterruptibly()
try {
// 基于指定的 offset 构造每个 topic 分区合法的 PartitionFetchState 对象,忽略已经存在的 topic 分区
val newPartitionToState = partitionAndOffsets
.filter { case (tp, _) => !partitionStates.contains(tp) }
.map { case (tp, offset) =>
// 基于指定的 offset 创建对应的 PartitionFetchState 对象,如果 offset 无效,则尝试解析得到合法的 offset 值
val fetchState =
if (PartitionTopicInfo.isOffsetInvalid(offset)) new PartitionFetchState(this.handleOffsetOutOfRange(tp))
else new PartitionFetchState(offset)
tp -> fetchState
}
// 获取并更新本地缓存的已有的 topic 分区与 PartitionFetchState 对象之间的映射关系
val existingPartitionToState = partitionStates.partitionStates.asScala.map { state => state.topicPartition -> state.value }.toMap
partitionStates.set((existingPartitionToState ++ newPartitionToState).asJava)
// 唤醒当前 fetcher 线程,执行同步操作
partitionMapCond.signalAll()
} finally {
partitionMapLock.unlock()
}
}

该方法中调用 AbstractFetcherThread#handleOffsetOutOfRange 方法的目的在于当参数未指定 offset 时,利用该方法获取合法的同步 offset 值。

副本角色切换

ReplicaManager 定义了 ReplicaManager#becomeLeaderOrFollower 方法,用于处理来自 kafka controller 的 LeaderAndIsrRequest 请求,指导位于当前 broker 节点上的相应分区副本的角色切换工作。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def becomeLeaderOrFollower(correlationId: Int,
leaderAndISRRequest: LeaderAndIsrRequest,
metadataCache: MetadataCache,
onLeadershipChange: (Iterable[Partition], Iterable[Partition]) => Unit): BecomeLeaderOrFollowerResult = {

replicaStateChangeLock synchronized {
// 用于记录每个分区角色切换操作的状态码
val responseMap = new mutable.HashMap[TopicPartition, Short]
// 校验 controller 的年代信息,避免处理来自已经过期的 controller 的请求
if (leaderAndISRRequest.controllerEpoch < controllerEpoch) {
BecomeLeaderOrFollowerResult(responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
} else {
val controllerId = leaderAndISRRequest.controllerId
// 1. 更新本地缓存的 kafka controller 的年代信息
controllerEpoch = leaderAndISRRequest.controllerEpoch

// 2. 校验请求的 leader 副本的年代信息,以及是否由当前 broker 节点管理,将满足条件的分区信息记录到 partitionState 集合中
val partitionState = new mutable.HashMap[Partition, PartitionState]()
leaderAndISRRequest.partitionStates.asScala.foreach { case (topicPartition, stateInfo) =>
// 获取/创建指定 topic 分区的 Partition 对象
val partition = this.getOrCreatePartition(topicPartition)
// 获取 leader 副本的年代信息
val partitionLeaderEpoch = partition.getLeaderEpoch
// 校验 leader 副本的年代信息,需要保证请求中的 leader 副本的年代信息大于本地缓存的 topic 分区 leader 副本的年代信息
if (partitionLeaderEpoch < stateInfo.leaderEpoch) {
// 如果请求的分区副本位于当前 broker 节点上,记录到 partitionState 集合中
if (stateInfo.replicas.contains(localBrokerId))
partitionState.put(partition, stateInfo)
else {
// 请求的分区副本不在当前 broker 节点上,响应 UNKNOWN_TOPIC_OR_PARTITION 错误
responseMap.put(topicPartition, Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
}
} else {
// 请求中的 leader 副本的年代信息小于等于本地记录的对应 topic 分区 leader 副本的年代信息,响应 STALE_CONTROLLER_EPOCH 错误
responseMap.put(topicPartition, Errors.STALE_CONTROLLER_EPOCH.code)
}
}

// 3. 将请求对象中的分区集合分割成 leader 和 follower 两类,并执行角色切换
val partitionsTobeLeader = partitionState.filter { case (_, stateInfo) => stateInfo.leader == localBrokerId }
val partitionsToBeFollower = partitionState -- partitionsTobeLeader.keys

// 3.1 将指定分区的副本切换成 leader 角色
val partitionsBecomeLeader = if (partitionsTobeLeader.nonEmpty)
this.makeLeaders(controllerId, controllerEpoch, partitionsTobeLeader, correlationId, responseMap)
else
Set.empty[Partition]

// 3.2 将指定分区的副本切换成 follower 角色
val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
this.makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap, metadataCache)
else
Set.empty[Partition]

// 4. 如果 highwatermark-checkpoint 定时任务尚未启动,则执行启动
if (!hwThreadInitialized) {
this.startHighWaterMarksCheckPointThread()
hwThreadInitialized = true
}

// 5. 关闭空闲的 fetcher 线程
replicaFetcherManager.shutdownIdleFetcherThreads()

// 6. 执行回调函数,完成 GroupCoordinator 的迁移操作
onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)

// 7. 封装结果对象返回
BecomeLeaderOrFollowerResult(responseMap, Errors.NONE.code)
}
}
}

副本角色切换的整体执行流程可以概括为:

  1. 更新本地缓存的 kafka controller 的年代信息;
  2. 校验请求的合法性,确保请求操作对应的分区 leader 副本年代信息合法,以及请求操作的分区副本位于当前 broker 节点上;
  3. 对请求的分区副本按照角色分类,并执行角色切换;
  4. 如果 highwatermark-checkpoint 定时任务尚未启动,则执行启动;
  5. 关闭空闲的副本数据同步 fetcher 线程;
  6. 因为副本角色发生变化,可能影响消费者的消费操作,尝试执行 GroupCoordinator 迁移操作;
  7. 封装响应结果返回。

上面的步骤中我们重点来看一下步骤 3,关于 GroupCoordinator 将留到后面的篇章中针对性分析。步骤 3 首先会将待处理的副本集合按照角色分为 leader 和 follower 两组,然后针对 leader 分组调用 ReplicaManager#makeLeaders 方法将对应的分区切换成 leader 角色,调用 ReplicaManager#makeFollowers 方法将对应的分区切换成 follower 角色。

方法 ReplicaManager#makeLeaders 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private def makeLeaders(controllerId: Int,
epoch: Int,
partitionState: Map[Partition, PartitionState], // 记录需要切换成 leader 角色的分区副本信息
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Short]): Set[Partition] = {

// 初始化每个 topic 分区的错误码为 NONE
for (partition <- partitionState.keys)
responseMap.put(partition.topicPartition, Errors.NONE.code)

val partitionsToMakeLeaders: mutable.Set[Partition] = mutable.Set()
try {
// 如果对应的副本当前是 follower 角色,需要要先停止这些副本的消息同步工作
replicaFetcherManager.removeFetcherForPartitions(partitionState.keySet.map(_.topicPartition))

// 遍历处理 partitionState 集合,将其中记录的分区转换成 leader 角色
partitionState.foreach {
case (partition, partitionStateInfo) =>
// 调用 Partition#makeLeader 方法,将分区的本地副本切换成 leader 角色
if (partition.makeLeader(controllerId, partitionStateInfo, correlationId))
partitionsToMakeLeaders += partition // 记录成功完成 leader 角色切换的副本对应的分区
else
// ... 省略日志打点
}
} catch {
// ... 省略异常处理
}

partitionsToMakeLeaders
}

切换副本角色为 leader 的过程比较简单,首先停止这些待切换 follower 副本的数据同步 fetcher 线程,然后调用 Partition#makeLeader 方法逐个将副本切换成 leader 角色,该方法已在前面分析过,不再重复撰述。

方法 ReplicaManager#makeFollowers 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private def makeFollowers(controllerId: Int,
epoch: Int,
partitionState: Map[Partition, PartitionState],
correlationId: Int,
responseMap: mutable.Map[TopicPartition, Short],
metadataCache: MetadataCache): Set[Partition] = {

// 初始化每个 topic 分区的错误码为 NONE
for (partition <- partitionState.keys)
responseMap.put(partition.topicPartition, Errors.NONE.code)

val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set()
try {
partitionState.foreach {
case (partition, partitionStateInfo) =>
// 检测 leader 副本所在的 broker 是否可用
val newLeaderBrokerId = partitionStateInfo.leader
metadataCache.getAliveBrokers.find(_.id == newLeaderBrokerId) match {
// 仅对 leader 副本所在 broker 节点可用的副本执行角色切换
case Some(_) =>
// 调用 Partition#makeFollower 方法,将分区的本地副本切换成 follower 角色
if (partition.makeFollower(controllerId, partitionStateInfo, correlationId))
partitionsToMakeFollower += partition // 记录成功完成 follower 角色切换的副本对应的分区
else
// ... 省略日志打点
// 对应 leader 副本所在的 broker 节点失效
case None =>
// 即使 leader 副本所在的 broker 不可用,也要创建本地副本对象,主要是为了在 checkpoint 文件中记录此分区的 HW 值
partition.getOrCreateReplica()
}
}

// 停止与旧的 leader 副本同步的 fetcher 线程
replicaFetcherManager.removeFetcherForPartitions(partitionsToMakeFollower.map(_.topicPartition))

// 由于 leader 副本发生变化,所以新旧 leader 在 [HW, LEO] 之间的消息可能不一致,
// 但是 HW 之前的消息是一致的,所以将 Log 截断到 HW 位置,可能会出现 unclean leader election 的场景
logManager.truncateTo(partitionsToMakeFollower.map { partition =>
(partition.topicPartition, partition.getOrCreateReplica().highWatermark.messageOffset)
}.toMap)

// 尝试完成监听对应分区的 DelayedProduce 和 DelayedFetch 延时任务
partitionsToMakeFollower.foreach { partition =>
val topicPartitionOperationKey = new TopicPartitionOperationKey(partition.topicPartition)
this.tryCompleteDelayedProduce(topicPartitionOperationKey)
this.tryCompleteDelayedFetch(topicPartitionOperationKey)
}

// 检测 ReplicaManager 的运行状态
if (isShuttingDown.get()) {
// ... 省略日志打点
}
// 重新启用与新 leader 副本同步的 fetcher 线程
else {
val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
partition.topicPartition -> BrokerAndInitialOffset(
metadataCache.getAliveBrokers.find(_.id == partition.leaderReplicaIdOpt.get).get.getBrokerEndPoint(config.interBrokerListenerName),
partition.getReplica().get.logEndOffset.messageOffset)).toMap
// 为需要同步的分区创建并启动同步线程,从指定的 offset 开始与 leader 副本进行同步
replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
}
} catch {
// ... 省略异常处理
}

partitionsToMakeFollower
}

切换副本为 follower 角色的过程相对要复杂一些,整体执行流程可以概括为:

  1. 检测对应新的 leader 副本所在 broker 节点是否可用,如果不可用则无需执行切换操作,否则调用 Partition#makeFollower 方法执行副本角色切换;
  2. 停止待切换副本的数据同步 fetcher 线程;
  3. 由于 leader 副本发生变化,新旧 leader 在 [HW, LEO] 之间的数据可能不一致,所以需要将当前副本截断到 HW 位置,以保证数据一致性;
  4. 尝试完成监听对应分区的 DelayedProduce 和 DelayedFetch 延时任务;
  5. 为新的 follower 副本集合创建并启动对应的数据同步 fetcher 线程(如果已存在,则复用)。

上述过程中涉及到的相关方法已经在前面分析过,不再重复撰述。

分区与副本管理

ReplicaManager 定义了 ReplicaManager#getOrCreatePartition 方法和 ReplicaManager#getPartition 方法用于获取本地缓存的指定 topic 分区的 Partition 对象,二者的区别在于前者会在本地检索不到目标 topic 分区时创建对应的 Partition 对象。同时,ReplicaManager 还提供了 ReplicaManager#getReplicaOrExceptionReplicaManager#getLeaderReplicaIfLocal,以及 ReplicaManager#getReplica 方法用于获取指定 topic 分区的指定副本对象,实现上都比较简单,不展开分析。

下面来重点看一下关闭副本的 ReplicaManager#stopReplicas 方法实现,当 broker 节点收到来自 kafka controller 的 StopReplicaRequest 请求时,会关闭指定的副本,包括停止副本的数据同步 fetcher 线程,以及依据参数决定是否删除副本对应的 Log 对象和文件,并清空本地缓存的相关信息。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
def stopReplicas(stopReplicaRequest: StopReplicaRequest): (mutable.Map[TopicPartition, Short], Short) = {
replicaStateChangeLock synchronized {
val responseMap = new collection.mutable.HashMap[TopicPartition, Short]
// 校验 controller 的年代信息,避免处理来自已经过期的 controller 的请求
if (stopReplicaRequest.controllerEpoch() < controllerEpoch) {
(responseMap, Errors.STALE_CONTROLLER_EPOCH.code)
} else {
val partitions = stopReplicaRequest.partitions.asScala
// 更新本地记录的 kafka controller 的年代信息
controllerEpoch = stopReplicaRequest.controllerEpoch
// 停止对指定分区的数据同步 fetcher 线程
replicaFetcherManager.removeFetcherForPartitions(partitions)
for (topicPartition <- partitions) {
// 关闭指定分区的副本
val errorCode = this.stopReplica(topicPartition, stopReplicaRequest.deletePartitions())
responseMap.put(topicPartition, errorCode)
}
(responseMap, Errors.NONE.code)
}
}
}

def stopReplica(topicPartition: TopicPartition, deletePartition: Boolean): Short = {
val errorCode = Errors.NONE.code
getPartition(topicPartition) match {
case Some(_) =>
// 如果 deletePartition = true,则删除分区对应的副本及其日志和索引文件
if (deletePartition) {
// 从本地移除指定的 topic 分区
val removedPartition = allPartitions.remove(topicPartition)
if (removedPartition != null) {
// 删除分区的日志和索引文件,并清空本地缓存的相关信息
removedPartition.delete() // this will delete the local log
val topicHasPartitions = allPartitions.keys.exists(tp => topicPartition.topic == tp.topic)
if (!topicHasPartitions) BrokerTopicStats.removeMetrics(topicPartition.topic)
}
}
// 本地未缓存对应的分区(一般发生在对应的 topic 已经被删除,但是期间 broker 宕机了),直接尝试对应的删除日志和索引文件
case None =>
if (deletePartition && logManager.getLog(topicPartition).isDefined) logManager.asyncDelete(topicPartition)
}
errorCode
}

如果在 StopReplicaRequest 请求中指明了要删除对应 topic 分区的日志和索引文件,则方法会调用 Partition#delete 方法执行删除操作,并清空本地缓存的相关信息。如果某个 broker 节点在宕机中恢复后,之前管理的 topic 分区很可能已经被分配到新的 broker 节点上,此时该 broker 节点已经不再管理相应的 topic 分区对象,如果收到相应的 StopReplicaRequest 请求,则仍然会调用 LogManager#asyncDelete 方法尝试删除之前遗留的日志文件和索引文件。

日志数据读写

ReplicaManager 提供了 ReplicaManager#appendRecords 方法,用于处理 ProduceRequest 请求,将给定的日志数据追加到对应 topic 分区的 leader 副本中。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
def appendRecords(timeout: Long,
requiredAcks: Short,
internalTopicsAllowed: Boolean,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit) {

// 如果 acks 参数合法
if (this.isValidRequiredAcks(requiredAcks)) {
val sTime = time.milliseconds
// 将消息追加到 Log 对象中
val localProduceResults = this.appendToLocalLog(internalTopicsAllowed, entriesPerPartition, requiredAcks)
debug("Produce to local log in %d ms".format(time.milliseconds - sTime))

// 封装数据追加结果
val produceStatus = localProduceResults.map { case (topicPartition, result) =>
topicPartition -> ProducePartitionStatus(
result.info.lastOffset + 1, // 下一次请求日志的 offset 值
new PartitionResponse(result.error, result.info.firstOffset, result.info.logAppendTime)) // response status
}

// 如果需要生成 DelayedProduce 延时任务
if (this.delayedRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
// 创建 DelayedProduce 延时任务对象,将回调响应函数封装到延时任务对象中
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback)

// 创建当前延时任务监听的一系列 key 对象,监听本次追加操作的所有 topic 分区
val producerRequestKeys = entriesPerPartition.keys.map(new TopicPartitionOperationKey(_)).toSeq

// 尝试执行延时任务,如果还未到期则将任务交由炼狱管理
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
} else {
// 无需生成 DelayedProduce 延时任务,立即响应
val produceResponseStatus = produceStatus.mapValues(status => status.responseStatus)
responseCallback(produceResponseStatus)
}
} else {
// 对应的 acks 参数错误,构造 INVALID_REQUIRED_ACKS 响应
val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
topicPartition -> new PartitionResponse(
Errors.INVALID_REQUIRED_ACKS, LogAppendInfo.UnknownLogAppendInfo.firstOffset, Record.NO_TIMESTAMP)
}
// 回调响应
responseCallback(responseStatus)
}
}

如果请求的 acks 参数合法,则会调用 ReplicaManager#appendToLocalLog 方法往相应 leader 副本对应的 Log 对象中追加日志数据,并依据以下条件决定是否延时响应:

  1. acks 参数为 -1,表示需要 ISR 集合中全部的 follower 副本确认追加的消息数据;
  2. 请求添加的消息数据不为空;
  3. 至少有一个 topic 分区的消息追加成功。

如果上面 3 个条件同时满足,则方法会创建对应的 DelayedProduce 延时任务对象,并交由相应的炼狱进行管理。DelayedProduce 对象封装了响应回调函数(即 KafkaApis#handleProducerRequest 方法中定义的 sendResponseCallback 方法),当 ISR 集合中所有的 follower 副本完成对本次追加的日志数据的同步操作之后会触发响应操作,这里延时任务监听的 key 是 topic 分区对象,当某个 topic 分区完成消息追加操作时可以提前触发延时任务执行。关于 DelayedProduce 延时任务我们已经在前面分析过,读者可以将上述逻辑与上一篇中对 DelayedProduce 的分析结合起来进一步加深理解。

方法 ReplicaManager#appendToLocalLog 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
private def appendToLocalLog(internalTopicsAllowed: Boolean, // 是否允许往内部 topic 追加消息
entriesPerPartition: Map[TopicPartition, MemoryRecords], // 对应分区需要追加的消息数据
requiredAcks: Short // acks
): Map[TopicPartition, LogAppendResult] = {

// 遍历处理每个 topic 分区及其待追加的消息数据
entriesPerPartition.map { case (topicPartition, records) =>
// 如果追加的对象是内部 topic,依据参数 internalTopicsAllowed 决定是否追加
if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
(topicPartition, LogAppendResult(
LogAppendInfo.UnknownLogAppendInfo,
Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))
} else {
try {
// 获取 topic 分区对应的 Partition 对象
val partitionOpt = this.getPartition(topicPartition)
val info = partitionOpt match {
// 往 leader 副本对应的 Log 对象中追加消息数据
case Some(partition) => partition.appendRecordsToLeader(records, requiredAcks)
// 找不到 topic 分区对应的 Partition 对象
case None => throw new UnknownTopicOrPartitionException("Partition %s doesn't exist on %d".format(topicPartition, localBrokerId))
}

// 返回每个分区写入的消息结果
(topicPartition, LogAppendResult(info))
} catch {
// ... 省略异常处理
}
}
}
}

上述方法最终调用了 Partition#appendRecordsToLeader 方法将消息数据追加到指定 topic 分区的 leader 副本中。

ReplicaManager 定义了 ReplicaManager#fetchMessages 方法,用于处理来自消费者或 follower 副本读取消息数据的 FetchRequest 请求。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
def fetchMessages(timeout: Long,
replicaId: Int,
fetchMinBytes: Int,
fetchMaxBytes: Int,
hardMaxBytesLimit: Boolean,
fetchInfos: Seq[(TopicPartition, PartitionData)],
quota: ReplicaQuota = UnboundedQuota,
responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit) {
// 标记是否是来自 follower 的 fetch 请求
val isFromFollower = replicaId >= 0
// 是否只读 leader 副本的消息,一般 debug 模式下可以读 follower 副本的数据
val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
// 是否只读已完成提交的消息(即 HW 之前的消息),如果是来自消费者的请求则该参数是 true,如果是 follower 则该参数是 false
val fetchOnlyCommitted: Boolean = !Request.isValidBrokerId(replicaId)

// 读取指定位置和大小的消息数据
val logReadResults = this.readFromLocalLog(
replicaId = replicaId,
fetchOnlyFromLeader = fetchOnlyFromLeader,
readOnlyCommitted = fetchOnlyCommitted,
fetchMaxBytes = fetchMaxBytes,
hardMaxBytesLimit = hardMaxBytesLimit,
readPartitionInfo = fetchInfos,
quota = quota)

// 如果当前是来自 follower 的同步消息数据请求,则更新 follower 副本的状态,
// 并尝试扩张 ISR 集合,同时尝试触发监听对应 topic 分区的 DelayedProduce 延时任务
if (Request.isValidBrokerId(replicaId))
this.updateFollowerLogReadResults(replicaId, logReadResults)

val logReadResultValues = logReadResults.map { case (_, v) => v }
val bytesReadable = logReadResultValues.map(_.info.records.sizeInBytes).sum
val errorReadingData = logReadResultValues.foldLeft(false)(
(errorIncurred, readResult) => errorIncurred || (readResult.error != Errors.NONE))

if (timeout <= 0 // 请求希望立即响应
|| fetchInfos.isEmpty // 请求不期望有响应数据
|| bytesReadable >= fetchMinBytes // 已经有足够的数据可以响应
|| errorReadingData) { // 读取数据出现错误
val fetchPartitionData = logReadResults.map { case (tp, result) =>
tp -> FetchPartitionData(result.error, result.hw, result.info.records)
}
// 立即响应
responseCallback(fetchPartitionData)
} else {
// 构造 DelayedFetch 延时任务
val fetchPartitionStatus = logReadResults.map { case (topicPartition, result) =>
val fetchInfo = fetchInfos.collectFirst {
case (tp, v) if tp == topicPartition => v
}.getOrElse(sys.error(s"Partition $topicPartition not found in fetchInfos"))
(topicPartition, FetchPartitionStatus(result.info.fetchOffsetMetadata, fetchInfo))
}
val fetchMetadata = FetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit,
fetchOnlyFromLeader, fetchOnlyCommitted, isFromFollower, replicaId, fetchPartitionStatus)
val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, responseCallback)

// 构造延时任务关注的 key,即相应的 topic 分区对象
val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => new TopicPartitionOperationKey(tp) }

// 交由炼狱管理
delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
}
}

从指定 topic 分区 leader 副本拉取消息的整体执行流程如下:

  1. 从本地副本读取指定位置和大小的消息数据;
  2. 如果是来自 follower 副本的请求,则更新对应的 follower 副本的状态信息,并尝试扩张对应 topic 分区的 ISR 集合,同时尝试执行监听该分区的 DelayedProduce 延时任务;
  3. 判定是否需要对请求方进行立即响应,如果需要则立即触发响应回调函数;
  4. 否则,构造 DelayedFetch 延时任务,监听对应的 topic 分区对象,并交由炼狱管理。

下面对上述各个步骤逐一进行分析,首先来看 步骤 1 ,对应 ReplicaManager#readFromLocalLog 方法,实现了从本地读取指定 topic 分区相应位置和大小的消息数据的功能,具体的消息数据读操作由 Log#read 方法实现。方法 ReplicaManager#readFromLocalLog 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def readFromLocalLog(replicaId: Int, // 请求的 follower 副本 ID
fetchOnlyFromLeader: Boolean, // 是否只读 leader 副本的消息,一般 debug 模式下可以读 follower 副本的数据
readOnlyCommitted: Boolean, // 是否只读已完成提交的消息(即 HW 之前的消息),如果是来自消费者的请求则该参数是 true,如果是 follower 则该参数是 false
fetchMaxBytes: Int, // 最大 fetch 字节数
hardMaxBytesLimit: Boolean,
readPartitionInfo: Seq[(TopicPartition, PartitionData)], // 每个分区读取的起始 offset 和最大字节数
quota: ReplicaQuota): Seq[(TopicPartition, LogReadResult)] = {

def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
val offset = fetchInfo.offset
val partitionFetchSize = fetchInfo.maxBytes
try {
// 获取待读取消息的副本对象,一般都是从本地副本读取(debug 模式除外)
val localReplica = if (fetchOnlyFromLeader) getLeaderReplicaIfLocal(tp) else getReplicaOrException(tp)
// 计算读取消息的 offset 上界,如果是来自消费者的请求,则上界为 HW,如果是来自 follower 的请求,则上界为 LEO
val maxOffsetOpt = if (readOnlyCommitted) Some(localReplica.highWatermark.messageOffset) else None
val initialLogEndOffset = localReplica.logEndOffset.messageOffset // LEO
val initialHighWatermark = localReplica.highWatermark.messageOffset // HW
val fetchTimeMs = time.milliseconds
val logReadInfo = localReplica.log match {
case Some(log) =>
val adjustedFetchSize = math.min(partitionFetchSize, limitBytes)
// 从 Log 中读取消息数据
val fetch = log.read(offset, adjustedFetchSize, maxOffsetOpt, minOneMessage)

// 限流检测
if (shouldLeaderThrottle(quota, tp, replicaId)) FetchDataInfo(fetch.fetchOffsetMetadata, MemoryRecords.EMPTY)
// For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
// progress in such cases and don't need to report a `RecordTooLargeException`
else if (!hardMaxBytesLimit && fetch.firstEntryIncomplete) FetchDataInfo(fetch.fetchOffsetMetadata, MemoryRecords.EMPTY)
else fetch

// 对应副本的 Log 对象不存在
case None =>
error(s"Leader for partition $tp does not have a local log")
FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY)
}

// 封装结果返回
LogReadResult(info = logReadInfo,
hw = initialHighWatermark,
leaderLogEndOffset = initialLogEndOffset,
fetchTimeMs = fetchTimeMs,
readSize = partitionFetchSize,
exception = None)
} catch {
// ... 省略异常处理
}
} // ~ end of read

var limitBytes = fetchMaxBytes
val result = new mutable.ArrayBuffer[(TopicPartition, LogReadResult)]
var minOneMessage = !hardMaxBytesLimit
// 遍历读取每个 topic 分区的消息数据
readPartitionInfo.foreach {
case (tp, fetchInfo) =>
val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)
val messageSetSize = readResult.info.records.sizeInBytes
if (messageSetSize > 0) minOneMessage = false
limitBytes = math.max(0, limitBytes - messageSetSize)
result += (tp -> readResult)
}
result
}

如果本次请求是由 follower 副本发起,则会执行 ReplicaManager#updateFollowerLogReadResults 方法( 步骤 2 ),该方法主要做了以下 4 件事情:

  1. 更新指定 follower 副本的状态信息(包括 LEO 值、最近一次成功从 leader 拉取消息的时间戳等);
  2. 尝试扩张副本所属分区的 ISR 集合,因为 follower 的 LEO 值递增,可能已经符合加入 ISR 集合的条件;
  3. 因为有新的消息被成功追加,尝试后移对应 leader 副本的 HW 值;
  4. 尝试执行监听对应 topic 分区的 DelayedProduce 延时任务。

方法 ReplicaManager#updateFollowerLogReadResults 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
private def updateFollowerLogReadResults(replicaId: Int, readResults: Seq[(TopicPartition, LogReadResult)]) {
debug("Recording follower broker %d log read results: %s ".format(replicaId, readResults))
// 遍历处理对应 topic 分区的日志数据读取结果
readResults.foreach {
case (topicPartition, readResult) =>
getPartition(topicPartition) match {
case Some(partition) =>
// 更新指定 follower 副本的状态,并尝试扩张对应分区的 ISR 集合,以及后移 leader 副本的 HW 值
partition.updateReplicaLogReadResult(replicaId, readResult)
// 尝试执行 DelayedProduce 延时任务,因为此时对应 topic 分区下已经有新的消息成功写入
this.tryCompleteDelayedProduce(new TopicPartitionOperationKey(topicPartition))
case None =>
warn("While recording the replica LEO, the partition %s hasn't been created.".format(topicPartition))
}
}
}

步骤 3 会判定是否需要立即响应当前拉取消息的 FetchRequest 请求,如果满足以下条件之一则执行回调函数,立即响应请求:

  1. 请求指定期望立即响应。
  2. 请求不期望有响应数据。
  3. 当前已经有足够的响应数据。
  4. 读取日志数据期间出错。

如果满足以上条件之一,则会立即触发执行回调函数(即 KafkaApis#handleFetchRequest 方法中定义的 sendResponseCallback 方法)响应请求,该函数已经在前面分析过,不再重复撰述。否则会构造 DelayedFetch 延时任务,并交由相应的炼狱进行管理( 步骤 4 )。

集群分区状态管理

Kafka 的所有 broker 节点在本地均使用 MetadataCache 缓存整个集群上所有 topic 分区的状态信息,并由 kafka controller 通过 UpdateMetadataRequest 请求进行维护。MetadataCache 的字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private[server] class MetadataCache(brokerId: Int) extends Logging {

/** 缓存每个分区的状态信息 */
private val cache = mutable.Map[String, mutable.Map[Int, PartitionStateInfo]]() // [topic, [分区 ID, 分区状态信息]]
/** kafka controller leader 的 ID */
private var controllerId: Option[Int] = None
/** 记录当前可用的 broker 信息 */
private val aliveBrokers = mutable.Map[Int, Broker]()
/** 记录当前可用的 broker 节点信息 */
private val aliveNodes = mutable.Map[Int, collection.Map[ListenerName, Node]]()

// ... 省略方法定义

}

ReplicaManager 提供了 ReplicaManager#maybeUpdateMetadataCache 方法用于处理 UpdateMetadataRequest 请求,该方法首先会校验请求中 kafka controller 的年代信息,以避免处理来自已经过期的 kafka controller 的请求,对于合法的请求则会调用 MetadataCache#updateCache 方法更新本地缓存的整个集群的 topic 分区状态信息。前面我们已经分析了 ReplicaManager#maybeUpdateMetadataCache 方法,但对于其中调用的 MetadataCache#updateCache 方法未展开分析,这里我们继续分析一下该方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def updateCache(correlationId: Int, updateMetadataRequest: UpdateMetadataRequest): Seq[TopicPartition] = {
inWriteLock(partitionMetadataLock) {
// 更新本地缓存的 kafka controller 的 ID
controllerId = updateMetadataRequest.controllerId match {
case id if id < 0 => None
case id => Some(id)
}

// 清除本地缓存的集群可用的 broker 节点信息,并由 UpdateMetadataRequest 请求重新构建
aliveNodes.clear()
aliveBrokers.clear()
updateMetadataRequest.liveBrokers.asScala.foreach { broker =>
// aliveNodes 是一个请求热点,所以这里使用 java.util.HashMap 来提升性能,如果是 scala 2.10 之后可以使用 AnyRefMap 代替
val nodes = new java.util.HashMap[ListenerName, Node]
val endPoints = new mutable.ArrayBuffer[EndPoint]
broker.endPoints.asScala.foreach { ep =>
endPoints += EndPoint(ep.host, ep.port, ep.listenerName, ep.securityProtocol)
nodes.put(ep.listenerName, new Node(broker.id, ep.host, ep.port))
}
aliveBrokers(broker.id) = Broker(broker.id, endPoints, Option(broker.rack))
aliveNodes(broker.id) = nodes.asScala
}

// 基于 UpdateMetadataRequest 请求更新每个分区的状态信息,并返回需要被移除的分区集合
val deletedPartitions = new mutable.ArrayBuffer[TopicPartition]
updateMetadataRequest.partitionStates.asScala.foreach {
case (tp, info) =>
val controllerId = updateMetadataRequest.controllerId
val controllerEpoch = updateMetadataRequest.controllerEpoch
// 如果请求标记对应的 topic 分区需要被删除
if (info.leader == LeaderAndIsr.LeaderDuringDelete) {
// 删除本地缓存的对应 topic 分区的状态信息
this.removePartitionInfo(tp.topic, tp.partition)
deletedPartitions += tp
} else {
// PartitionState -> PartitionStateInfo
val partitionInfo = this.partitionStateToPartitionStateInfo(info)
// 更新本地缓存的对应 topic 分区的状态信息
this.addOrUpdatePartitionInfo(tp.topic, tp.partition, partitionInfo)
}
}
deletedPartitions
}
}

MetadataCache 使用 MetadataCache#aliveBrokersMetadataCache#aliveNodes 字段记录整个集群中可用的 broker 节点信息,当收到来自 kafka controller 的 UpdateMetadataRequest 请求时,MetadataCache 会清空本地缓存,并由请求信息重新构建新的可用的 broker 节点信息。此外还会依据 UpdateMetadataRequest 请求更新本地缓存的整个集群 topic 分区的状态信息(对应 MetadataCache#cache 字段)。

MetadataCache 提供了 MetadataCache#getTopicMetadata 方法用于获取本地缓存的指定 topic 的元数据信息,包括是否是内部 topic,以及对应 topic 下所有分区的元数据信息。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def getTopicMetadata(topics: Set[String],
listenerName: ListenerName,
errorUnavailableEndpoints: Boolean = false): Seq[MetadataResponse.TopicMetadata] = {
inReadLock(partitionMetadataLock) {
topics.toSeq.flatMap { topic =>
// 获取指定 topic 下分区元数据信息,并与 topic 一起构造 topic 元数据对象返回
this.getPartitionMetadata(topic, listenerName, errorUnavailableEndpoints).map { partitionMetadata =>
new MetadataResponse.TopicMetadata(Errors.NONE, topic, Topic.isInternal(topic), partitionMetadata.toBuffer.asJava)
}
}
}
}

private def getPartitionMetadata(
topic: String,
listenerName: ListenerName,
errorUnavailableEndpoints: Boolean): Option[Iterable[MetadataResponse.PartitionMetadata]] = {
// 遍历每个 topic 对应的分区集合
cache.get(topic).map { partitions =>
partitions.map { case (partitionId, partitionState) =>
val topicPartition = TopicAndPartition(topic, partitionId)

// 获取分区对应的 LeaderAndIsr 对象,其中封装了对应分区的 leader 副本 ID 和 ISR 集合等信息
val leaderAndIsr = partitionState.leaderIsrAndControllerEpoch.leaderAndIsr
// 获取 leader 副本所在的节点信息
val maybeLeader = this.getAliveEndpoint(leaderAndIsr.leader, listenerName)
// 获取分区的 AR 集合
val replicas = partitionState.allReplicas
// 获取 AR 集合中可用的副本对应的节点信息
val replicaInfo = this.getEndpoints(replicas, listenerName, errorUnavailableEndpoints)

maybeLeader match {
// 分区 leader 副本不可用
case None =>
new MetadataResponse.PartitionMetadata(Errors.LEADER_NOT_AVAILABLE,
partitionId, Node.noNode(), replicaInfo.asJava, java.util.Collections.emptyList())
case Some(leader) =>
// 获取分区的 ISR 集合
val isr = leaderAndIsr.isr
// 获取 ISR 集合中可用的副本对应的节点信息
val isrInfo = this.getEndpoints(isr, listenerName, errorUnavailableEndpoints)
if (replicaInfo.size < replicas.size) {
// 如果 AR 集合中存在不可用的副本,则返回 REPLICA_NOT_AVAILABLE 错误
new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, leader, replicaInfo.asJava, isrInfo.asJava)
} else if (isrInfo.size < isr.size) {
// 如果 ISR 集合中存在不可用的的副本,则返回 REPLICA_NOT_AVAILABLE 错误
new MetadataResponse.PartitionMetadata(Errors.REPLICA_NOT_AVAILABLE, partitionId, leader, replicaInfo.asJava, isrInfo.asJava)
} else {
// AR 集合和 ISR 集合中的副本都是可用的
new MetadataResponse.PartitionMetadata(Errors.NONE, partitionId, leader, replicaInfo.asJava, isrInfo.asJava)
}
}
}
}
}

方法 MetadataCache#getPartitionMetadata 会校验对应分区的 AR 集合和 ISR 集合中的副本是否可用,如果存在不可用的副本则会返回 REPLICA_NOT_AVAILABLE 错误,如果分区的副本均可用则会返回分区的元数据信息,包括分区 ID、leader 副本所在节点信息、AR 集合,以及 ISR 集合。

总结

本文我们分析了 Kafka 的分区副本实现机制,了解到 Kafka 会为每个 topic 分区设置多个副本,并基于 leader/follower 模式将这些副本分为一个 leader 角色和多个 follower 角色。在 topic 分区正常运行期间,由 leader 副本负责处理来自客户端的消息读写请求,而 follower 副本仅负责从 leader 副本同步消息数据。一旦 leader 副本失效,Kafka 会从位于 ISR 集合中的 follower 副本中选择一个成为新的 leader 副本,以保证对应的 topic 能够继续对外提供服务。

冗余策略在分布式计算和存储领域是一种简单且有效的可靠性保障措施,了解 Kafka 的分区副本实现机制能够指导我们更好的设计实现自己的分布式应用。