Kafka 源码解析:集群协同运行控制器

Kafka 集群由一系列的 broker 节点构成,在这些 broker 节点中会选举一个节点成为所有 broker 节点的 leader(称之为 kafka controller),其余的 broker 节点均为 follower 角色。Kafka Controller 负责管理集群中所有 topic 分区和副本的状态,协调集群中所有 broker 节点的运行,同时也负责 Kafka 与 ZK 之间的交互,下文中如果不特殊说明,Kafka Controller 均指代 leader 角色。

KafkaController 组件的定义与启动

Kafka 定义了 KafkaController 类来描述 Kafka Controller,KafkaController 类的字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class KafkaController(val config: KafkaConfig, // 配置信息
zkUtils: ZkUtils, // ZK 交互工具类
val brokerState: BrokerState, // 描述 broker 节点的状态
time: Time, // 时间戳工具类
metrics: Metrics,
threadNamePrefix: Option[String] = None) extends Logging with KafkaMetricsGroup {

/** 标识是否启动 */
private var isRunning = true
/** 维护上下文信息,缓存 ZK 中记录的整个集群的元数据信息 */
val controllerContext = new ControllerContext(zkUtils)
/** 管理集群中所有分区状态的状态机 */
val partitionStateMachine = new PartitionStateMachine(this)
/** 管理集群中所有副本状态的状态机 */
val replicaStateMachine = new ReplicaStateMachine(this)
/** 用于故障转移,选举 leader */
private val controllerElector = new ZookeeperLeaderElector(
controllerContext, ZkUtils.ControllerPath, onControllerFailover, onControllerResignation, config.brokerId, time)
/** 分区再平衡定时任务调度器 */
private val autoRebalanceScheduler = new KafkaScheduler(1)
/** 用于对指定的 topic 执行删除操作 */
var deleteTopicManager: TopicDeletionManager = _
/** Leader 副本选举策略 */
val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
/** 用于批量向 broker 节点发送请求 */
private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)
/** ZK 监听器 */
private val partitionReassignedListener = new PartitionsReassignedListener(this)
private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
private val isrChangeNotificationListener = new IsrChangeNotificationListener(this)

// ... 省略方法定义

}

在 Kafka 服务启动时,每个 broker 节点都会创建对应的 Kafka Controller 实例,并调用 KafkaController#startup 方法启动运行:

1
2
3
4
5
6
7
8
9
10
11
12
def startup(): Unit = {
inLock(controllerContext.controllerLock) {
info("Controller starting up")
// 注册 SessionExpirationListener 监听器,监听 Controller 与 ZK 的连接状态
this.registerSessionExpirationListener()
// 标识当前 controller 已经启动,现在还是 follower 角色
isRunning = true
// 启动故障转移机制
controllerElector.startup
info("Controller startup complete")
}
}

启动过程中会注册 SessionExpirationListener 监听器监听 Kafka Controller 与 ZK 之间的连接状态,并启动故障转移机制,在初始启动时可以借助该机制为集群选择 leader 角色。这里先了解一下启动的流程,关于 ZK 监听机制和故障转移机制,留到下面的小节中针对性分析。

上下文信息管理

ControllerContext 类用于管理 Kafka Controller 的上下文信息,并提供与集群中所有 broker 之间建立连接并通信的功能。ControllerContext 类的字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class ControllerContext(val zkUtils: ZkUtils) {

/** 管理 controller 与集群中其它 broker 之间的连接 */
var controllerChannelManager: ControllerChannelManager = _
/** 记录正在关闭的 brokerId 集合 */
var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty
/** controller 的年代信息,初始为 0,每次重新选举之后值加 1 */
var epoch: Int = KafkaController.InitialControllerEpoch - 1
/** 年代信息对应的 ZK 版本,初始为 0 */
var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion - 1
/** 集群中全部的 topic 集合 */
var allTopics: Set[String] = Set.empty
/** 记录每个分区对应的 AR 集合 */
var partitionReplicaAssignment: mutable.Map[TopicAndPartition, Seq[Int]] = mutable.Map.empty
/** 记录每个分区的 leader 副本所在的 brokerId、ISR 集合,以及 controller 年代信息 */
var partitionLeadershipInfo: mutable.Map[TopicAndPartition, LeaderIsrAndControllerEpoch] = mutable.Map.empty
/** 记录正在重新分配副本的分区 */
val partitionsBeingReassigned: mutable.Map[TopicAndPartition, ReassignedPartitionsContext] = new mutable.HashMap
/** 记录了正在进行优先副本选举的分区 */
val partitionsUndergoingPreferredReplicaElection: mutable.Set[TopicAndPartition] = new mutable.HashSet
/** 记录了当前可用的 broker 集合 */
private var liveBrokersUnderlying: Set[Broker] = Set.empty
/** 记录了当前可用的 brokerId 集合 */
private var liveBrokerIdsUnderlying: Set[Int] = Set.empty

// ... 省略方法定义

}

ControllerContext 类提供了对这些字段管理的方法,实现比较简单,不展开分析。

下面我们重点看一下 ControllerChannelManager 类定义,该类用于建立到集群中所有 broker 节点的连接,并与之通信。ControllerChannelManager 类定义了 ControllerChannelManager#brokerStateInfo 字段,用于记录到对应 broker 节点的通讯相关信息。ControllerChannelManager 类在被实例化时会调用 ControllerChannelManager#addNewBroker 方法初始化 ControllerChannelManager#brokerStateInfo 字段,为每个可用的 broker 节点构造一个 ControllerBrokerStateInfo 对象,其中封装了目标 broker 节点信息、网络客户端对象、缓存请求的队列,以及请求发送线程对象,ControllerBrokerStateInfo 样例类定义如下:

1
2
3
4
case class ControllerBrokerStateInfo(networkClient: NetworkClient,
brokerNode: Node,
messageQueue: BlockingQueue[QueueItem],
requestSendThread: RequestSendThread)

方法 ControllerChannelManager#addNewBroker 的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
private def addNewBroker(broker: Broker) {
// 创建消息队列,用于存放发往指定 broker 节点的请求
val messageQueue = new LinkedBlockingQueue[QueueItem]
debug("Controller %d trying to connect to broker %d".format(config.brokerId, broker.id))
val brokerEndPoint = broker.getBrokerEndPoint(config.interBrokerListenerName)
val brokerNode = new Node(broker.id, brokerEndPoint.host, brokerEndPoint.port)
// 创建网络连接客户端
val networkClient = {
val channelBuilder = ChannelBuilders.clientChannelBuilder(
config.interBrokerSecurityProtocol,
LoginType.SERVER,
config.values,
config.saslMechanismInterBrokerProtocol,
config.saslInterBrokerHandshakeRequestEnable
)
val selector = new Selector(
NetworkReceive.UNLIMITED,
Selector.NO_IDLE_TIMEOUT_MS,
metrics,
time,
"controller-channel",
Map("broker-id" -> broker.id.toString).asJava,
false,
channelBuilder
)
new NetworkClient(
selector,
new ManualMetadataUpdater(Seq(brokerNode).asJava),
config.brokerId.toString,
1,
0,
Selectable.USE_DEFAULT_BUFFER_SIZE,
Selectable.USE_DEFAULT_BUFFER_SIZE,
config.requestTimeoutMs,
time,
false
)
}

// 创建请求发送线程对象
val threadName = threadNamePrefix match {
case None => "Controller-%d-to-broker-%d-send-thread".format(config.brokerId, broker.id)
case Some(name) => "%s:Controller-%d-to-broker-%d-send-thread".format(name, config.brokerId, broker.id)
}
val requestThread = new RequestSendThread(
config.brokerId, controllerContext, messageQueue, networkClient, brokerNode, config, time, threadName)
requestThread.setDaemon(false)

// 记录到 brokerStateInfo 集合
brokerStateInfo.put(broker.id, ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue, requestThread))
}

RequestSendThread 类继承自 ShutdownableThread 抽象类,所以我们重点来看一下 RequestSendThread#doWork 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
override def doWork(): Unit = {

def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(100))

// 获取缓冲队列中的 QueueItem 对象,封装了请求类型、请求对象,以及响应回调函数
val QueueItem(apiKey, requestBuilder, callback) = queue.take()

import NetworkClientBlockingOps._ // 阻塞模式

var clientResponse: ClientResponse = null
try {
lock synchronized {
var isSendSuccessful = false // 标识请求是否发送成功
while (isRunning.get() && !isSendSuccessful) {
// 当 broker 节点宕机后,会触发 ZK 的监听器调用 removeBroker 方法停止当前线程,在停止前会一直尝试重试
try {
// 阻塞等待指定 broker 节点是否允许接收请求
if (!brokerReady()) {
isSendSuccessful = false
backoff() // 等待 100 毫秒
} else {
// 创建并发送请求,同时阻塞等待响应
val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder, time.milliseconds(), true)
clientResponse = networkClient.blockingSendAndReceive(clientRequest)(time)
isSendSuccessful = true
}
} catch {
// ... 省略异常处理,如果发送失败,则断开连接,并等待一段时间后重试
}
}

// 解析响应
if (clientResponse != null) {
// 解析请求类型
val api = ApiKeys.forId(clientResponse.requestHeader.apiKey)
// 只允许 LEADER_AND_ISR、STOP_REPLICA,以及 UPDATE_METADATA_KEY 这 3 种请求类型
if (api != ApiKeys.LEADER_AND_ISR && api != ApiKeys.STOP_REPLICA && api != ApiKeys.UPDATE_METADATA_KEY)
throw new KafkaException(s"Unexpected apiKey received: $apiKey")

// 执行响应回调函数
val response = clientResponse.responseBody
if (callback != null) {
callback(response)
}
}
}
} catch {
// ... 省略异常处理
}
}

RequestSendThread 线程在运行期间会循环消费存放请求的阻塞队列,队列元素的类型为 QueueItem,其中封装了具体的请求类型、请求对象,以及响应回调函数。如果存在待发送的请求对象则会向目标 broker 节点发送请求,并阻塞等待响应,当拿到响应对象之后调用响应函数进行处理。ControllerChannelManager 的发送请求函数 ControllerChannelManager#sendRequest 本质上就是将请求信息封装成 QueueItem 对象,并记录到请求阻塞队列中,然后交由 RequestSendThread 线程异步处理。

批量发送请求

ControllerBrokerRequestBatch 类实现了向所有可用 broker 节点批量发送 LeaderAndIsrRequest、StopReplicaRequest 和 UpdateMetadataRequest 请求的功能,它定义了 3 个集合分别缓存这 3 类请求的相关信息,并提供了相关方法用于添加待发送的请求信息,以及批量发送请求。ControllerBrokerRequestBatch 类的字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
class ControllerBrokerRequestBatch(controller: KafkaController) extends Logging {

/** Controller 上下文信息 */
val controllerContext: ControllerContext = controller.controllerContext
/** Controller 节点 ID */
val controllerId: Int = controller.config.brokerId
/** 记录发往指定 broker 节点的 LeaderAndIsrRequest 请求所需的信息 */
val leaderAndIsrRequestMap = mutable.Map.empty[Int, mutable.Map[TopicPartition, PartitionStateInfo]]
/** 记录发往指定 broker 节点的 StopReplicaRequest 请求所需的信息 */
val stopReplicaRequestMap = mutable.Map.empty[Int, Seq[StopReplicaRequestInfo]]
/** UpdateMetadataRequest 请求对应的目标 brokerId 集合 */
val updateMetadataRequestBrokerSet = mutable.Set.empty[Int]
/** UpdateMetadataRequest 请求对应的请求信息 */
val updateMetadataRequestPartitionInfoMap = mutable.Map.empty[TopicPartition, PartitionStateInfo]

// ... 省略方法定义

}

ControllerBrokerRequestBatch 提供了 ControllerBrokerRequestBatch#newBatch 方法用于校验缓存相应请求信息的 3 个集合是否为空,如果不为空则抛出异常,表示此时还存在未发送完成的请求,不允许添加新的待发送请求对象。同时,也提供了 ControllerBrokerRequestBatch#clear 方法用于清空这 3 个集合。

下面列出的 3 个方法分别用于往对应集合中添加相应的请求信息:

  1. ControllerBrokerRequestBatch#addLeaderAndIsrRequestForBrokers:用于往 leaderAndIsrRequestMap 集合中添加待发送的 LeaderAndIsrRequest 请求所需的数据,并构造发往所有可用 broker 节点的 UpdateMetadataRequest 请求,缓存到 updateMetadataRequestPartitionInfoMap 集合中等待发送。
  2. ControllerBrokerRequestBatch#addStopReplicaRequestForBrokers:用于往 stopReplicaRequestMap 集合中添加待发送的 StopReplicaRequest 请求所需的数据。
  3. ControllerBrokerRequestBatch#addUpdateMetadataRequestForBrokers:用于往 updateMetadataRequestPartitionInfoMap 集合中添加 UpdateMetadataRequest 请求所需的数据。

方法 ControllerBrokerRequestBatch#sendRequestsToBrokers 会遍历处理这 3 个集合,并调用 KafkaController#sendRequest 方法发送请求,底层还是依赖于前面分析过的 RequestSendThread 线程执行异步请求操作。

ControllerBrokerRequestBatch 类中定义的方法在实现上都比较直观,这里不展开分析。

分区状态管理

PartitionStateMachine 定义了 Kafka Controller 的分区状态机,用于管理集群中分区的状态信息,每个 Kafka Controller 都定义了自己的分区状态机,但只有在当前 Controller 实例成为 leader 角色时才会启动运行名下的状态机。分区状态机使用 PartitionState 特质定义分区的状态,同时提供了多个样例对象实现,分别表示不同的分区状态。分区状态样例对象说明:

  1. NewPartition :新创建出来的分区对应的状态,此时分区可能已经被分配了 AR 集合,但是还没有分配 ISR 集合和 leader 副本。
  2. OnlinePartition :当一个分区选举出 leader 副本之后,该分区即处于此状态。
  3. OfflinePartition :如果一个分区的 leader 副本失效,则切换成此状态。
  4. NonExistentPartition :描述一个不存在的分区,或者之前存在但是现在已经被删除了。

分区状态转换图如下:

image

PartitionStateMachine 的字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
class PartitionStateMachine(controller: KafkaController) extends Logging {

/** Controller 的上下文信息 */
private val controllerContext = controller.controllerContext
/** Controller 节点的 ID */
private val controllerId = controller.config.brokerId
/** ZK 工具类 */
private val zkUtils = controllerContext.zkUtils
/** 记录每个分区对应的分区状态信息 */
private val partitionState: mutable.Map[TopicAndPartition, PartitionState] = mutable.Map.empty
/** 用于向指定的 broker 批量发送请求 */
private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
/** 表示分区状态机是否已经启动 */
private val hasStarted = new AtomicBoolean(false)
/** 默认 leader 副本选举器 */
private val noOpPartitionLeaderSelector = new NoOpLeaderSelector(controllerContext)
/** 用于监听 topic 变化的 ZK 监听器 */
private val topicChangeListener = new TopicChangeListener(controller)
/** 用于监听 topic 删除的 ZK 监听器 */
private val deleteTopicsListener = new DeleteTopicsListener(controller)
/** 记录监听对应 topic 分区变化的监听器集合 */
private val partitionModificationsListeners: mutable.Map[String, PartitionModificationsListener] = mutable.Map.empty

// ... 省略方法定义

}

当 Kafka Controller 实例从 follower 角色选举成为 leader 角色时,会调用 PartitionStateMachine#startup 方法启动对应的分区状态机,该方法实现如下:

1
2
3
4
5
6
7
8
def startup() {
// 初始化本地记录的所有分区状态
this.initializePartitionState()
// 标识分区状态机已经启动
hasStarted.set(true)
// 尝试将集群中所有 OfflinePartition 或 NewPartition 状态的可用分区切换成 OnlinePartition 状态
this.triggerOnlinePartitionStateChange()
}

分区状态机使用 PartitionStateMachine#partitionState 字段记录集群中所有可用分区的状态,在启动时会初始化该字段,即初始化每个 topic 分区对应的状态信息,尝试将所有 OfflinePartition 或 NewPartition 状态的可用分区切换成 OnlinePartition 状态。

方法 PartitionStateMachine#initializePartitionState 会遍历集群中所有的 topic 分区,并尝试获取分区对应的 leader 副本和 ISR 集合等信息,如果这些信息不存在则将对应分区初始化为 NewPartition 状态。否则,校验分区 leader 副本所在的 broker 节点是否可用,如果可用则将对应分区初始化为 OnlinePartition 状态,如果不可用则将对应分区初始化为 OfflinePartition 状态。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private def initializePartitionState() {
// 遍历集群中的所有分区
for (topicPartition <- controllerContext.partitionReplicaAssignment.keys) {
// 获取对应分区 leader 副本所在的 brokerId、ISR 集合,以及 controller 年代信息
controllerContext.partitionLeadershipInfo.get(topicPartition) match {
// 存在 leader 副本和 ISR 集合
case Some(currentLeaderIsrAndEpoch) =>
// 分区 leader 副本所在的 broker 可用,初始化分区为 OnlinePartition 状态
if (controllerContext.liveBrokerIds.contains(currentLeaderIsrAndEpoch.leaderAndIsr.leader))
partitionState.put(topicPartition, OnlinePartition)
// 分区 leader 副本所在的 broker 不可用,初始化为 OfflinePartition 状态
else
partitionState.put(topicPartition, OfflinePartition)
// 如果不存在,则说明是一个新创建的分区,设置分区状态为 NewPartition
case None =>
partitionState.put(topicPartition, NewPartition)
}
}
}

方法 PartitionStateMachine#triggerOnlinePartitionStateChange 会遍历所有可用的分区(不包含那些待删除的 topic 名下的分区),并尝试对状态为 OfflinePartition 或 NewPartition 的分区执行状态切换,切换成 OnlinePartition 状态。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def triggerOnlinePartitionStateChange() {
try {
// 校验待发送的请求集合,确保历史的请求已经全部发送完毕
brokerRequestBatch.newBatch()
// 遍历处理集群中所有的分区,不包含正在等待被删除的 topic 的分区,尝试切换分区状态为 OnlinePartition
for ((topicAndPartition, partitionState) <- partitionState
if !controller.deleteTopicManager.isTopicQueuedUpForDeletion(topicAndPartition.topic)) {
// 对于 OfflinePartition 或 NewPartition 状态的分区,尝试将对应分区状态修改为 OnlinePartition 状态
if (partitionState.equals(OfflinePartition) || partitionState.equals(NewPartition))
this.handleStateChange(
topicAndPartition.topic,
topicAndPartition.partition,
OnlinePartition,
controller.offlinePartitionSelector,
(new CallbackBuilder).build)
}
// 发送请求
brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
} catch {
// ... 省略异常处理
}
}

而具体执行分区状态切换的操作则交由 PartitionStateMachine#handleStateChange 方法完成,关于该方法的实现将在接下来的小节中进行分析。

分区状态切换

分区状态机定义了 PartitionStateMachine#handleStateChanges 方法用于将指定的 topic 分区集合中的分区状态切换成指定的目标状态,方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def handleStateChanges(partitions: Set[TopicAndPartition], // 待处理的 topic 分区集合
targetState: PartitionState, // 目标分区状态
leaderSelector: PartitionLeaderSelector = noOpPartitionLeaderSelector, // 分区 leader 副本选择器
callbacks: Callbacks = (new CallbackBuilder).build) {
info("Invoking state change to %s for partitions %s".format(targetState, partitions.mkString(",")))
try {
// 校验待发送的请求集合,确保历史的请求已经全部发送完毕
brokerRequestBatch.newBatch()
// 遍历待处理的 topic 分区集合,执行分区状态切换
partitions.foreach { topicAndPartition =>
this.handleStateChange(topicAndPartition.topic, topicAndPartition.partition, targetState, leaderSelector, callbacks)
}
// 发送请求
brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
} catch {
// ... 省略异常处理
}
}

其中核心实现在于 PartitionStateMachine#handleStateChange 方法,前面分析分区状态机启动过程时也提到了该方法,下面一起来看一下该方法的具体实现(省略了日志打点):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
private def handleStateChange(topic: String, partition: Int,
targetState: PartitionState,
leaderSelector: PartitionLeaderSelector, // 执行 leader 选举的选择器
callbacks: Callbacks) {
val topicAndPartition = TopicAndPartition(topic, partition)

// 检测当前分区状态机是否已经启动,只有 kafka controller leader 的分区状态机才需要启动,如果没有启动则抛出异常
if (!hasStarted.get)
throw new StateChangeFailedException(("Controller %d epoch %d initiated state change for partition %s to %s failed because " +
"the partition state machine has not started").format(controllerId, controller.epoch, topicAndPartition, targetState))

// 获取分区的当前状态,没有则初始化为 NonExistentPartition 状态
val currState = partitionState.getOrElseUpdate(topicAndPartition, NonExistentPartition)
try {
// 在转换开始之前,会依据于 targetState 检查分区的前置状态是否合法
targetState match {
case NewPartition =>
// 如果目标状态为 NewPartition,则前置状态必须是 NonExistentPartition
this.assertValidPreviousStates(topicAndPartition, List(NonExistentPartition), NewPartition)
// 切换分区状态为 NewPartition
partitionState.put(topicAndPartition, NewPartition)
case OnlinePartition =>
// 如果目标状态为 OnlinePartition,则前置状态必须是 NewPartition, OnlinePartition, OfflinePartition 中的一个
this.assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OnlinePartition)
partitionState(topicAndPartition) match {
case NewPartition =>
// 如果前置状态是 NewPartition,则需要为分区分配 leader 副本和 ISR 集合
this.initializeLeaderAndIsrForPartition(topicAndPartition)
case OfflinePartition =>
// 如果前置状态是 OfflinePartition,则需要为分区选举新的 leader 副本
this.electLeaderForPartition(topic, partition, leaderSelector)
case OnlinePartition => // invoked when the leader needs to be re-elected
// 如果前置状态为 OnlinePartition,则需要为分区重新选举新的 leader 副本
this.electLeaderForPartition(topic, partition, leaderSelector)
case _ => // should never come here since illegal previous states are checked above
}
// 设置分区状态为 OnlinePartition
partitionState.put(topicAndPartition, OnlinePartition)
case OfflinePartition =>
// 如果目标状态为 OfflinePartition,则前置状态必须是 NewPartition, OnlinePartition, OfflinePartition 中的一个
this.assertValidPreviousStates(topicAndPartition, List(NewPartition, OnlinePartition, OfflinePartition), OfflinePartition)
// 设置分区状态为 OfflinePartition
partitionState.put(topicAndPartition, OfflinePartition)
case NonExistentPartition =>
// 如果目标状态为 NonExistentPartition,则前置状态必须是 OfflinePartition
this.assertValidPreviousStates(topicAndPartition, List(OfflinePartition), NonExistentPartition)
// 设置分区状态为 NonExistentPartition
partitionState.put(topicAndPartition, NonExistentPartition)
}
} catch {
// ... 省略异常处理
}
}

分区状态切换的整体实现思路是依据切换的目标状态对当前分区状态执行校验,保证当前分区状态属于合法的目标切换状态的前置状态。方法 PartitionStateMachine#assertValidPreviousStates 实现了前置状态的校验,如果前置状态不合法则会抛出异常。

如果目标切换状态是 NewPartition、OfflinePartition 和 NonExistentPartition 中的一个,则切换的过程比较简单。下面主要来看一下目标状态为 OnlinePartition 的分区状态切换,按照前置状态分为 3 种场景:

  1. 如果前置分区状态为 NewPartition,则需要为对应 topic 分区分配 leader 副本和 ISR 集合。
  2. 如果前置分区状态为 OfflinePartition,则需要为对应 topic 分区选举新的 leader 副本。
  3. 如果前置分区状态为 OnlinePartition,则需要为对应 topic 分区重新选举新的 leader 副本。

场景 2 和 3 具体由 PartitionStateMachine#electLeaderForPartition 方法实现,我们将在稍后分析分区 leader 副本选举机制时介绍该方法,这里先来看一下场景 1,具体由 PartitionStateMachine#initializeLeaderAndIsrForPartition 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private def initializeLeaderAndIsrForPartition(topicAndPartition: TopicAndPartition) {
// 获取分区的 AR 集合
val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
// 获取 AR 集合中可用的副本集合
val liveAssignedReplicas = replicaAssignment.filter(r => controllerContext.liveBrokerIds.contains(r))
liveAssignedReplicas.size match {
// 没有可用的副本,抛出异常
case 0 =>
val failMsg = "encountered error during state change of partition %s from New to Online, assigned replicas are [%s], live brokers are [%s]. No assigned replica is alive."
.format(topicAndPartition, replicaAssignment.mkString(","), controllerContext.liveBrokerIds)
stateChangeLogger.error("Controller %d epoch %d ".format(controllerId, controller.epoch) + failMsg)
throw new StateChangeFailedException(failMsg)
case _ =>
debug("Live assigned replicas for partition %s are: [%s]".format(topicAndPartition, liveAssignedReplicas))
// 将可用的 AR 集合中的第一个副本选为 leader 副本
val leader = liveAssignedReplicas.head
// 创建 LeaderIsrAndControllerEpoch 对象,其中的 ISR 集合是可用的 AR 集合
val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(new LeaderAndIsr(leader, liveAssignedReplicas.toList), controller.epoch)
debug("Initializing leader and isr for partition %s to %s".format(topicAndPartition, leaderIsrAndControllerEpoch))
try {
// 将分区 leader 副本和 ISR 集合等信息写入 ZK,路径:/brokers/topics/{topic_name}/partitions/{partitionId}/state
zkUtils.createPersistentPath(
getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
zkUtils.leaderAndIsrZkData(leaderIsrAndControllerEpoch.leaderAndIsr, controller.epoch))
// 更新本地缓存的指定 topic 分区的相关信息
controllerContext.partitionLeadershipInfo.put(topicAndPartition, leaderIsrAndControllerEpoch)
// 添加 LeaderAndIsrRequest 请求,待发送
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(
liveAssignedReplicas, topicAndPartition.topic, topicAndPartition.partition, leaderIsrAndControllerEpoch, replicaAssignment)
} catch {
// ... 省略异常处理
}
}
}

对于 NewPartition 状态的 topic 分区而言,会从该分区可用的副本中选举第 1 个副本作为 leader 副本,并将所有可用的副本添加到 ISR 集合中,然后将这些信息记录到 ZK 中,同时向对应 broker 节点发送 LeaderAndIsrRequest 请求,以执行分区副本角色切换。

分区 leader 副本选举

分区状态机定义了 PartitionStateMachine#electLeaderForPartition 方法,基于给定的分区 leader 副本选择器对指定 topic 分区执行 leader 副本选择操作。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def electLeaderForPartition(topic: String, partition: Int, leaderSelector: PartitionLeaderSelector) {
val topicAndPartition = TopicAndPartition(topic, partition)
try {
var zookeeperPathUpdateSucceeded: Boolean = false
var newLeaderAndIsr: LeaderAndIsr = null
var replicasForThisPartition: Seq[Int] = Seq.empty[Int]
while (!zookeeperPathUpdateSucceeded) {
// 从 ZK 获取分区当前的 leader 副本、ISR 集合、zkVersion 等信息,如果不存在则抛出异常
val currentLeaderIsrAndEpoch = this.getLeaderIsrAndEpochOrThrowException(topic, partition)
val currentLeaderAndIsr = currentLeaderIsrAndEpoch.leaderAndIsr
val controllerEpoch = currentLeaderIsrAndEpoch.controllerEpoch
// 检测 controller 的年代信息,如果当前年代信息小于 ZK 中记录的年代信息,则说明存在新的 controller,需要放弃本次选举操作
if (controllerEpoch > controller.epoch) {
// ... 抛出 StateChangeFailedException 异常,省略
}
// 使用指定的 leader 副本选举器选择新的 leader 副本和 ISR 集合
val (leaderAndIsr, replicas) = leaderSelector.selectLeader(topicAndPartition, currentLeaderAndIsr)
// 将新的 leader 副本和 ISR 集合信息转换成 JSON 格式记录到 ZK,路径:/brokers/topics/{topic_name}/partitions/{partitionId}/state
val (updateSucceeded, newVersion) = ReplicationUtils.updateLeaderAndIsr(
zkUtils, topic, partition, leaderAndIsr, controller.epoch, currentLeaderAndIsr.zkVersion)
newLeaderAndIsr = leaderAndIsr
newLeaderAndIsr.zkVersion = newVersion
zookeeperPathUpdateSucceeded = updateSucceeded
replicasForThisPartition = replicas
}
val newLeaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(newLeaderAndIsr, controller.epoch)
// 更新本地缓存的指定 topic 分区的相关信息
controllerContext.partitionLeadershipInfo.put(TopicAndPartition(topic, partition), newLeaderIsrAndControllerEpoch)
// 获取指定分区的 AR 集合
val replicas = controllerContext.partitionReplicaAssignment(TopicAndPartition(topic, partition))
// 添加 LeaderAndIsrRequest 请求,待发送
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(replicasForThisPartition, topic, partition, newLeaderIsrAndControllerEpoch, replicas)
} catch {
// ... 省略异常处理
}
debug("After leader election, leader cache is updated to %s".format(controllerContext.partitionLeadershipInfo.map(l => (l._1, l._2))))
}

在执行分区 leader 副本选举时会基于给定的分区 leader 副本选择器为对应 topic 分区选择新的 leader 副本,并返回新的 ISR 集合,因为对应分区的状态信息发生了变更,所以需要将更新后的分区状态更新到 ZK,并向集群中所有可用的 broker 节点发送 LeaderAndIsrRequest 请求,通知对应 broker 节点执行分区副本角色切换,并更新本地缓存的集群元数据信息。

PartitionLeaderSelector 特质抽象定义了分区 leader 副本选择器,定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
trait PartitionLeaderSelector {

/**
* 选举分区 leader 副本
*
* @param topicAndPartition 需要执行 leader 副本选举的分区
* @param currentLeaderAndIsr 目标分区当前 leader 副本、ISR 集合
* @return 选举后的新的 leader 副本和新 ISR 集合信息,以及需要接收 LeaderAndIsrRequest 的 brokerId 集合
* @throws NoReplicaOnlineException 如果 AR 集合中不存在可用的副本,则抛出异常
*/
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int])

}

Kafka 目前围绕 PartitionLeaderSelector 特质定义了 5 种分区 leader 副本选择策略:

  1. NoOpLeaderSelector
  2. OfflinePartitionLeaderSelector
  3. ReassignedPartitionLeaderSelector
  4. PreferredReplicaPartitionLeaderSelector
  5. ControlledShutdownLeaderSelector

其中 NoOpLeaderSelector 在实现上最简单,它实际上并没有做什么事情,只是将参数传递的目标分区当前 leader 副本和 ISR 集合作为结果直接返回,下面来具体分析一下剩余 4 种分区 leader 选择策略。

OfflinePartitionLeaderSelector

OfflinePartitionLeaderSelector 分区 leader 副本选择器会尝试从 ISR 集合中选择新的 leader 副本,如果 ISR 集合中不存在可用的副本,则在配置允许的情况下尝试从 AR 集合中选择新的 leader 副本。策略实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
controllerContext.partitionReplicaAssignment.get(topicAndPartition) match {
// 处理分区的 AR 集合
case Some(assignedReplicas) =>
// 获取分区 AR 集合中可用的副本
val liveAssignedReplicas = assignedReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
// 获取 ISR 集合中可用的副本
val liveBrokersInIsr = currentLeaderAndIsr.isr.filter(r => controllerContext.liveBrokerIds.contains(r))
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion

// 依据 ISR 集合中是否有可用的副本决定是从 ISR 集合中选举新的 leader 副本,还是从 AR 集合中选举新的 leader 副本
val newLeaderAndIsr =
if (liveBrokersInIsr.isEmpty) { // 如果 ISR 集合中不存在可用的副本
// 依据配置(unclean.leader.election.enable)决定是否从 AR 集合中选择 leader 副本,如果不允许则抛出异常
if (!LogConfig.fromProps(config.originals, AdminUtils.fetchEntityConfig(
controllerContext.zkUtils, ConfigType.Topic, topicAndPartition.topic)).uncleanLeaderElectionEnable) {
// ... 抛出 NoReplicaOnlineException 异常,省略
}
debug("No broker in ISR is alive for %s. Pick the leader from the alive assigned replicas: %s".format(topicAndPartition, liveAssignedReplicas.mkString(",")))
if (liveAssignedReplicas.isEmpty) {
// AR 集合中没有可用副本,直接抛出异常
// ... 抛出 NoReplicaOnlineException 异常,省略
} else {
// 从 AR 集合可用的副本中选取第一个副本作为新的 leader 副本,新的 ISR 集合中只有新的 leader 副本自己
ControllerStats.uncleanLeaderElectionRate.mark()
val newLeader = liveAssignedReplicas.head
warn("No broker in ISR is alive for %s. Elect leader %d from live brokers %s. There's potential data loss.".format(topicAndPartition, newLeader, liveAssignedReplicas.mkString(",")))
new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, List(newLeader), currentLeaderIsrZkPathVersion + 1)
}
} else { // ISR 集合中存在可用的副本
val liveReplicasInIsr = liveAssignedReplicas.filter(r => liveBrokersInIsr.contains(r))
// 从 ISR 集合中选择第一个副本作为新的 leader 副本,以 ISR 集合中可用的副本集合作为新的 ISR 集合
val newLeader = liveReplicasInIsr.head
debug("Some broker in ISR is alive for %s. Select %d from ISR %s to be the leader.".format(topicAndPartition, newLeader, liveBrokersInIsr.mkString(",")))
// 构造 LeaderAndIsr 对象并返回
new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, liveBrokersInIsr, currentLeaderIsrZkPathVersion + 1)
}
info("Selected new leader and ISR %s for offline partition %s".format(newLeaderAndIsr.toString(), topicAndPartition))

// 需要向 AR 集合中所有的可用副本所在 broker 节点发送 LeaderAndIsrRequest 请求
(newLeaderAndIsr, liveAssignedReplicas)
// 当前 topic 分区没有可用的副本
case None =>
throw new NoReplicaOnlineException("Partition %s doesn't have replicas assigned to it".format(topicAndPartition))
}
}

如果是从 ISR 集合中选择新的 leader 副本,则以 ISR 集合中所有可用的副本作为新的 ISR 集合。如果是从 AR 集合中选择新的 leader 副本,则需要配置 unclean.leader.election.enable=true,这种情况下 ISR 集合只包含 leader 副本。

ReassignedPartitionLeaderSelector

ReassignedPartitionLeaderSelector 分区 leader 副本选择器在副本重新分配的前提下会选择既位于新分配的 AR 集合中,同时又位于 ISR 集合中的副本作为新的 leader 副本,并以之前的 ISR 集合作为新的 ISR 集合。策略实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
// 获取新分配的 AR 集合
val reassignedInSyncReplicas = controllerContext.partitionsBeingReassigned(topicAndPartition).newReplicas
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
// 新选择的 leader 副本必须在新分配的 AR 集合和 ISR 集合中
val aliveReassignedInSyncReplicas = reassignedInSyncReplicas
.filter(r => controllerContext.liveBrokerIds.contains(r) && currentLeaderAndIsr.isr.contains(r))
val newLeaderOpt = aliveReassignedInSyncReplicas.headOption
newLeaderOpt match {
// 存在满足条件的 leader 副本,以之前的 ISR 集合作为新的 ISR 集合
case Some(newLeader) =>
(new LeaderAndIsr(newLeader, currentLeaderEpoch + 1, currentLeaderAndIsr.isr, currentLeaderIsrZkPathVersion + 1), reassignedInSyncReplicas)
// 不存在满足条件的 leader 副本,抛出异常
case None =>
reassignedInSyncReplicas.size match {
case 0 =>
throw new NoReplicaOnlineException("List of reassigned replicas for partition %s is empty. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
case _ =>
throw new NoReplicaOnlineException("None of the reassigned replicas for partition %s are in-sync with the leader. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
}
}
}

由上述实现可以看出,如果不存在满足条件的副本则会抛出异常。

PreferredReplicaPartitionLeaderSelector

PreferredReplicaPartitionLeaderSelector 分区 leader 副本选择器尝试选择优先副本(AR 集合中的第一个副本)作为 leader 副本,前提是该副本必须位于 ISR 集合中,并且以当前 ISR 集合作为新的 ISR 集合。策略实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
// 获取分区 AR 集合
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
// 以分区 AR 集合的第一个副本作为优先副本
val preferredReplica = assignedReplicas.head
val currentLeader = controllerContext.partitionLeadershipInfo(topicAndPartition).leaderAndIsr.leader
if (currentLeader == preferredReplica) {
// 优先副本已经是 leader 副本
throw new LeaderElectionNotNeededException("Preferred replica %d is already the current leader for partition %s".format(preferredReplica, topicAndPartition))
} else {
info("Current leader %d for partition %s is not the preferred replica.".format(currentLeader, topicAndPartition) + " Triggering preferred replica leader election")
// 如果优先副本所在的 broker 节点可用,且位于 ISR 集合中,则选择成为新的 leader 副本
if (controllerContext.liveBrokerIds.contains(preferredReplica) && currentLeaderAndIsr.isr.contains(preferredReplica)) {
(new LeaderAndIsr(preferredReplica, currentLeaderAndIsr.leaderEpoch + 1, currentLeaderAndIsr.isr, currentLeaderAndIsr.zkVersion + 1), assignedReplicas)
} else {
throw new StateChangeFailedException("Preferred replica %d for partition ".format(preferredReplica) + "%s is either not alive or not in the isr. Current leader and ISR: [%s]".format(topicAndPartition, currentLeaderAndIsr))
}
}
}

由上述实现可以看出,如果优先副本所在 broker 节点不可用,或者优先副本不位于 ISR 集合中,则会抛出异常。

ControlledShutdownLeaderSelector

ControlledShutdownLeaderSelector 分区 leader 副本选择器会尝试将副本所在 broker 节点正在关闭的副本从 ISR 集合中移除,并将 ISR 集合中剩下的副本作为新的 ISR 集合,同时从中选择一个副本作为新的 leader 副本。策略实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def selectLeader(topicAndPartition: TopicAndPartition, currentLeaderAndIsr: LeaderAndIsr): (LeaderAndIsr, Seq[Int]) = {
val currentLeaderEpoch = currentLeaderAndIsr.leaderEpoch
val currentLeaderIsrZkPathVersion = currentLeaderAndIsr.zkVersion
val currentLeader = currentLeaderAndIsr.leader
val assignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
// 获取当前可用的 brokerId 集合
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
val liveAssignedReplicas = assignedReplicas.filter(r => liveOrShuttingDownBrokerIds.contains(r))
// 从当前 ISR 集合中移除副本所在 broker 节点正在关闭的副本
val newIsr = currentLeaderAndIsr.isr.filter(brokerId => !controllerContext.shuttingDownBrokerIds.contains(brokerId))
// 从可用的 AR 集合中选择位于新的 ISR 集合中的副本作为 leader 副本
liveAssignedReplicas.find(newIsr.contains) match {
case Some(newLeader) =>
debug("Partition %s : current leader = %d, new leader = %d".format(topicAndPartition, currentLeader, newLeader))
(LeaderAndIsr(newLeader, currentLeaderEpoch + 1, newIsr, currentLeaderIsrZkPathVersion + 1), liveAssignedReplicas)
case None =>
throw new StateChangeFailedException("No other replicas in ISR %s for %s besides shutting down brokers %s".format(currentLeaderAndIsr.isr.mkString(","), topicAndPartition, controllerContext.shuttingDownBrokerIds.mkString(",")))
}
}

副本状态管理

ReplicaStateMachine 定义了 Kafka Controller 的副本状态机,用于管理集群中副本的状态信息,每个 Kafka Controller 都定义了自己的副本状态机,但是只有在当前 Controller 实例成为 leader 角色时才会启动运行名下的状态机。副本状态机使用 ReplicaState 特质定义副本的状态,同时提供了多个样例对象实现,分别表示不同的副本状态。副本状态样例对象说明:

  1. NewReplica :新创建出来的副本对应的状态,处于该状态的副本只能是 follower 副本。
  2. OnlineReplica :当副本成为 AR 集合中的一员即位于该状态,此时副本既可以是 leader 角色,也可以是 follower 角色。
  3. OfflineReplica :当副本所在的 broker 节点宕机后,副本所对应的状态。
  4. ReplicaDeletionStarted :当开始删除副本时,会先将副本切换成该状态,然后开始执行删除操作。
  5. ReplicaDeletionSuccessful :当副本被成功删除后对应的状态。
  6. ReplicaDeletionIneligible :当副本删除失败后对应的状态。
  7. NonExistentReplica :一个被成功删除的副本,最终将切换成该状态。

image

ReplicaStateMachine 的字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
class ReplicaStateMachine(controller: KafkaController) extends Logging {

/** Controller 上下文信息 */
private val controllerContext = controller.controllerContext
/** Controller 的 ID */
private val controllerId = controller.config.brokerId
/** ZK 工具类 */
private val zkUtils = controllerContext.zkUtils
/** 记录每个副本对应的状态 */
private val replicaState: mutable.Map[PartitionAndReplica, ReplicaState] = mutable.Map.empty
/** ZK 监听器,用于监听 broker 节点的上下线 */
private val brokerChangeListener = new BrokerChangeListener(controller)
/** 用于向 broker 节点批量发送请求 */
private val brokerRequestBatch = new ControllerBrokerRequestBatch(controller)
/** 标识当前副本状态机是否成功启动 */
private val hasStarted = new AtomicBoolean(false)

// ... 省略方法定义

}

当 Kafka Controller 实例从 follower 角色选举成为 leader 角色时,会调用 ReplicaStateMachine#startup 方法启动对应的副本状态机,该方法实现如下:

1
2
3
4
5
6
7
8
def startup() {
// 初始化每个 topic 分区 AR 集合中的副本状态
this.initializeReplicaState()
// 标识当前副本状态机启动成功
hasStarted.set(true)
// 尝试将所有可用副本切换成 OnlineReplica 状态
this.handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
}

副本状态机使用 ReplicaStateMachine#replicaState 字段记录集群中所有副本的状态,在启动时会初始化该字段,即初始化每个副本的状态信息,尝试将所有可用的副本状态切换成 OnlineReplica 状态,而将所有不可用的副本切换成 ReplicaDeletionIneligible 状态。

方法 ReplicaStateMachine#initializeReplicaState 会遍历处理每个 topic 分区的 AR 集合,并依据副本所在 broker 节点是否可用对本地记录的副本状态执行初始化操作,方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private def initializeReplicaState() {
for ((topicPartition, assignedReplicas) <- controllerContext.partitionReplicaAssignment) {
val topic = topicPartition.topic
val partition = topicPartition.partition
// 遍历每个分区的 AR 集合,初始化对应分区的状态
assignedReplicas.foreach { replicaId =>
val partitionAndReplica = PartitionAndReplica(topic, partition, replicaId)
// 将可用的副本初始化为 OnlineReplica 状态
if (controllerContext.liveBrokerIds.contains(replicaId)) replicaState.put(partitionAndReplica, OnlineReplica)
// 将不可用的副本初始化为 ReplicaDeletionIneligible 状态
else replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
}
}
}

方法 ReplicaStateMachine#handleStateChanges 会遍历所有可用的副本,并尝试将对应副本的状态切换成 OnlineReplica 状态,方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def handleStateChanges(replicas: Set[PartitionAndReplica],
targetState: ReplicaState,
callbacks: Callbacks = (new CallbackBuilder).build) {
if (replicas.nonEmpty) {
info("Invoking state change to %s for replicas %s".format(targetState, replicas.mkString(",")))
try {
// 校验待发送的请求集合,确保历史的请求已经全部发送完毕
brokerRequestBatch.newBatch()
// 遍历切换每个副本的状态
replicas.foreach(r => this.handleStateChange(r, targetState, callbacks))
// 发送请求
brokerRequestBatch.sendRequestsToBrokers(controller.epoch)
} catch {
case e: Throwable => error("Error while moving some replicas to %s state".format(targetState), e)
}
}
}

具体的状态切换交由 ReplicaStateMachine#handleStateChange 方法实现,这也是 ReplicaStateMachine 中定义的最核心的方法,实现如下(省略了部分日志打点):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def handleStateChange(partitionAndReplica: PartitionAndReplica, targetState: ReplicaState, callbacks: Callbacks) {
val topic = partitionAndReplica.topic
val partition = partitionAndReplica.partition
val replicaId = partitionAndReplica.replica
val topicAndPartition = TopicAndPartition(topic, partition)
// 检测副本状态机是否已经启动
if (!hasStarted.get)
throw new StateChangeFailedException(
"Controller %d epoch %d initiated state change of replica %d for partition %s to %s failed because replica state machine has not started"
.format(controllerId, controller.epoch, replicaId, topicAndPartition, targetState))

// 获取指定副本当前的状态,如果不存在则初始化为 NonExistentReplica 状态
val currState = replicaState.getOrElseUpdate(partitionAndReplica, NonExistentReplica)
try {
// 获取分区的 AR 集合
val replicaAssignment = controllerContext.partitionReplicaAssignment(topicAndPartition)
// 在转换开始之前,会依据于目标状态检查副本的前置状态是否合法
targetState match {
case NewReplica =>
// 如果目标状态为 NewReplica,则前置状态必须是 NonExistentReplica
this.assertValidPreviousStates(partitionAndReplica, List(NonExistentReplica), targetState)
// 从 ZK 获取分区的 leader 副本和 ISR 集合等信息
val leaderIsrAndControllerEpochOpt = ReplicationUtils.getLeaderIsrAndEpochForPartition(zkUtils, topic, partition)
leaderIsrAndControllerEpochOpt match {
case Some(leaderIsrAndControllerEpoch) =>
// NewReplica 状态的副本不能是 leader 角色
if (leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId)
throw new StateChangeFailedException("Replica %d for partition %s cannot be moved to NewReplica"
.format(replicaId, topicAndPartition) + "state as it is being requested to become leader")
// 向该副本所在 broker 节点发送 LeaderAndIsrRequest 请求,并发送 UpdateMetadataRequest 给所有可用的 broker 节点
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(
List(replicaId), topic, partition, leaderIsrAndControllerEpoch, replicaAssignment)
case None => // new leader request will be sent to this replica when one gets elected
}
// 更新当前副本状态为 NewReplica
replicaState.put(partitionAndReplica, NewReplica)
case ReplicaDeletionStarted =>
// 如果目标状态为 ReplicaDeletionStarted,则前置状态必须是 OfflineReplica
this.assertValidPreviousStates(partitionAndReplica, List(OfflineReplica), targetState)
// 更新当前副本状态为 ReplicaDeletionStarted
replicaState.put(partitionAndReplica, ReplicaDeletionStarted)
// 向该副本所在 broker 节点发送 StopReplicaRequest 请求
brokerRequestBatch.addStopReplicaRequestForBrokers(
List(replicaId), topic, partition, deletePartition = true, callbacks.stopReplicaResponseCallback)
case ReplicaDeletionIneligible =>
// 如果目标状态为 ReplicaDeletionIneligible,则前置状态必须是 ReplicaDeletionStarted
this.assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
// 更新当前副本状态为 ReplicaDeletionIneligible
replicaState.put(partitionAndReplica, ReplicaDeletionIneligible)
case ReplicaDeletionSuccessful =>
// 如果目标状态为 ReplicaDeletionSuccessful,则前置状态必须是 ReplicaDeletionStarted
this.assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionStarted), targetState)
// 更新当前副本状态为 ReplicaDeletionSuccessful
replicaState.put(partitionAndReplica, ReplicaDeletionSuccessful)
case NonExistentReplica =>
// 如果目标状态为 NonExistentReplica,则前置状态必须是 ReplicaDeletionSuccessful
this.assertValidPreviousStates(partitionAndReplica, List(ReplicaDeletionSuccessful), targetState)
// 从对应 topic 分区的 AR 集合中移除当前副本
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas.filterNot(_ == replicaId))
// 移除副本缓存在本地的状态
replicaState.remove(partitionAndReplica)
case OnlineReplica =>
// 如果目标状态为 OnlineReplica,则前置状态必须是 NewReplica、OnlineReplica、OfflineReplica 和 ReplicaDeletionIneligible 中的一个
this.assertValidPreviousStates(partitionAndReplica,
List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
replicaState(partitionAndReplica) match {
case NewReplica =>
// 如果前置状态是 NewReplica,则尝试将当前副本添加到对应分区的 AR 集合中
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
if (!currentAssignedReplicas.contains(replicaId))
controllerContext.partitionReplicaAssignment.put(topicAndPartition, currentAssignedReplicas :+ replicaId)
case _ =>
// 检测是否存在 leader 副本
controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
// 如果存在 leader 副本,则向该副本所在 broker 节点发送 LeaderAndIsrRequest 请求,并发送 UpdateMetadataRequest 给所有可用的 broker 节点
case Some(leaderIsrAndControllerEpoch) =>
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(
List(replicaId), topic, partition, leaderIsrAndControllerEpoch, replicaAssignment)
replicaState.put(partitionAndReplica, OnlineReplica)
// 不存在 leader 副本
case None => // that means the partition was never in OnlinePartition state, this means the broker never
// started a log for that partition and does not have a high watermark value for this partition
}
}
// 更新当前副本状态为 OnlineReplica
replicaState.put(partitionAndReplica, OnlineReplica)
case OfflineReplica =>
// 如果目标状态为 OfflineReplica,则前置状态必须是 NewReplica、OnlineReplica、OfflineReplica 和 ReplicaDeletionIneligible 中的一个
this.assertValidPreviousStates(partitionAndReplica,
List(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible), targetState)
// 向副本所在 broker 节点发送 StopReplicaRequest 请求
brokerRequestBatch.addStopReplicaRequestForBrokers(List(replicaId), topic, partition, deletePartition = false)
// As an optimization, the controller removes dead replicas from the ISR
val leaderAndIsrIsEmpty: Boolean =
controllerContext.partitionLeadershipInfo.get(topicAndPartition) match {
case Some(_) =>
// 将当前副本从所在分区的 ISR 集合中移除
controller.removeReplicaFromIsr(topic, partition, replicaId) match {
case Some(updatedLeaderIsrAndControllerEpoch) =>
// send the shrunk ISR state change request to all the remaining alive replicas of the partition.
val currentAssignedReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition)
if (!controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition)) {
// 向除当前副本以外可用的 AR 集合副本发送 LeaderAndIsrRequest 请求,并向集群中所有可用的 broker 发送 UpdateMetadataRequest 请求
brokerRequestBatch.addLeaderAndIsrRequestForBrokers(currentAssignedReplicas.filterNot(_ == replicaId),
topic, partition, updatedLeaderIsrAndControllerEpoch, replicaAssignment)
}
// 更新当前副本状态为 OfflineReplica
replicaState.put(partitionAndReplica, OfflineReplica)
false
case None =>
true
}
case None =>
true
}
if (leaderAndIsrIsEmpty && !controller.deleteTopicManager.isPartitionToBeDeleted(topicAndPartition))
throw new StateChangeFailedException(
"Failed to change state of replica %d for partition %s since the leader and isr path in zookeeper is empty".format(replicaId, topicAndPartition))
}
}
catch {
// ... 省略异常处理
}
}

同前面分析过的分区状态切换 PartitionStateMachine#handleStateChange 方法类似,副本状态的整体实现思路同样是依据切换的目标状态对当前副本状态执行校验,保证当前副本状态属于合法的目标切换状态的前置状态。方法 ReplicaStateMachine#assertValidPreviousStates 实现了前置状态的校验,如果前置状态不合法则会抛出异常。具体的副本状态切换逻辑如上述方法中的代码注释,思想上类似前面介绍的分区状态切换的实现,这里不再展开分析。

Topic 删除机制

TopicDeletionManager 负责对管理员指定的 topic 执行删除操作,它定义了 DeleteTopicsThread 线程,采用异步的方式删除待删除的 topic 集合。TopicDeletionManager 的字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class TopicDeletionManager(controller: KafkaController,
initialTopicsToBeDeleted: Set[String] = Set.empty,
initialTopicsIneligibleForDeletion: Set[String] = Set.empty) extends Logging {

/** Controller 上下文 */
val controllerContext: ControllerContext = controller.controllerContext
/** 管理分区状态的状态机 */
val partitionStateMachine: PartitionStateMachine = controller.partitionStateMachine
/** 管理副本状态的状态机 */
val replicaStateMachine: ReplicaStateMachine = controller.replicaStateMachine
/** 条件对象,用于同步其他线程与 deleteTopicsThread 线程 */
val deleteTopicsCond: Condition = deleteLock.newCondition()
/** 标识 topic 删除操作是否开始 */
val deleteTopicStateChanged: AtomicBoolean = new AtomicBoolean(false)
/** 执行 topic 删除操作的线程 */
var deleteTopicsThread: DeleteTopicsThread = _
/** 是否允许删除 topic,对应 delete.topic.enable 配置 */
val isDeleteTopicEnabled: lang.Boolean = controller.config.deleteTopicEnable
/** 记录待删除的 topic 集合 */
val topicsToBeDeleted: mutable.Set[String] = if (isDeleteTopicEnabled) {
mutable.Set.empty[String] ++ initialTopicsToBeDeleted
} else {
// 如果配置不允许删除,则将对应的 topic 从 ZK 对应节点(/admin/delete_topics)下移除
val zkUtils = controllerContext.zkUtils
for (topic <- initialTopicsToBeDeleted) {
val deleteTopicPath = getDeleteTopicPath(topic)
info("Removing " + deleteTopicPath + " since delete topic is disabled")
zkUtils.zkClient.delete(deleteTopicPath)
}
mutable.Set.empty[String]
}
/** 记录不可删除的 topic 集合 */
val topicsIneligibleForDeletion: mutable.Set[String] = mutable.Set.empty[String] ++ (initialTopicsIneligibleForDeletion & topicsToBeDeleted)
/** 记录待删除的分区集合 */
val partitionsToBeDeleted: mutable.Set[TopicAndPartition] = topicsToBeDeleted.flatMap(controllerContext.partitionsForTopic)

// ... 省略方法定义

}

Kafka 提供了 delete.topic.enable 配置项,用于配置是否启用 topic 删除机制,如果未开启则不会真正执行删除操作,而是将指定的待删除 topic 信息从 /admin/delete_topics 节点下移除。另外,由上面的字段定义可以看到 TopicDeletionManager 还管理了不可删除的 topic 集合,当一个 topic 满足以下条件之一时,我们认为暂时不能对其执行删除操作:

  1. Topic 的某个分区正在执行副本重新分配。
  2. Topic 的某个分区正在执行优先副本选举。
  3. Topic 的某个副本不可用,即所在的 broker 节点宕机。

当 Kafka Controller 实例成为 leader 角色时会调用 TopicDeletionManager#start 方法启动 topic 删除机制,该方法主要用于启动后台删除线程 DeleteTopicsThread:

1
2
3
4
5
6
7
8
9
def start() {
if (isDeleteTopicEnabled) {
deleteTopicsThread = new DeleteTopicsThread()
// 表示 topic 删除操作开始运行
if (topicsToBeDeleted.nonEmpty) deleteTopicStateChanged.set(true)
// 启动后台删除线程
deleteTopicsThread.start()
}
}

DeleteTopicsThread 继承自 ShutdownableThread 抽象类,其 DeleteTopicsThread#doWork 方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
override def doWork() {
// 等待线程被唤醒
awaitTopicDeletionNotification()

if (!isRunning.get) return

inLock(controllerContext.controllerLock) {
// 获取并处理待删除的 topic 集合
val topicsQueuedForDeletion = Set.empty[String] ++ topicsToBeDeleted
topicsQueuedForDeletion.foreach { topic =>
// 如果当前 topic 的所有副本都已经被成功删除
if (controller.replicaStateMachine.areAllReplicasForTopicDeleted(topic)) {
// 变更 topic 及其分区和副本的状态,并从 ZK 和 Controller 上下文中移除 topic 相关的信息
completeDeleteTopic(topic)
info("Deletion of topic %s successfully completed".format(topic))
}
// 如果当前 topic 存在未完成删除的副本
else {
// 如果任一副本处于 ReplicaDeletionStarted 状态,则等待
if (controller.replicaStateMachine.isAtLeastOneReplicaInDeletionStartedState(topic)) {
// ... 暂时先跳过该 topic,这里省略日志打点
}
// 否则说明 topic 还未达到执行删除的条件,或者存在某个副本删除失败(对应 ReplicaDeletionIneligible 状态)
else {
// 如果任一副本处于 ReplicaDeletionIneligible 状态,则将对应状态重置为 OfflineReplica 状态后重试
if (controller.replicaStateMachine.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
markTopicForDeletionRetry(topic)
}
}
}

// 检测当前 topic 是否可以删除
if (isTopicEligibleForDeletion(topic)) {
info("Deletion of topic %s (re)started".format(topic))
// 开始执行 topic 删除操作
onTopicDeletion(Set(topic))
} else if (isTopicIneligibleForDeletion(topic)) {
info("Not retrying deletion of topic %s at this time since it is marked ineligible for deletion".format(topic))
}
}
}
}

Topic 删除的执行流程可以概括为:

  1. 获取待删除的 topic 集合;
  2. 如果 topic 的所有副本都已经成功被删除,则变更 topic 及其分区和副本的状态,并从 ZK 和 Controller 上下文中移除 topic 相关信息;
  3. 否则,如果 topic 存在任一副本处于删除准备的状态(ReplicaDeletionStarted),则跳过当前 topic 继续处理其它 topic;
  4. 否则,如果 topic 存在任一副本处于删除失败的状态(ReplicaDeletionIneligible),则尝试将对应副本状态重置为 OfflineReplica,等待后续删除重试;
  5. 检测当前 topic 是否可以被删除,如果可以则开始执行删除操作。

下面来重点分析一下步骤 2 和 5。 步骤 2 用于对已经成功被删除的 topic 执行一些后置清理工作,包括注销 ZK 监听器、切换分区和副本的状态,以及从 ZK 和 Controller 上下文中清除 topic 相关的数据等,具体实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private def completeDeleteTopic(topic: String) {
// 1. 注销 PartitionModificationsListener 监听器
partitionStateMachine.deregisterPartitionChangeListener(topic)

// 2. 将指定 topic 下被成功删除的副本状态切换成 NonExistentReplica
val replicasForDeletedTopic = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
replicaStateMachine.handleStateChanges(replicasForDeletedTopic, NonExistentReplica)

// 3. 将指定 topic 下所有的 topic 分区状态切换成 NonExistentPartition
val partitionsForDeletedTopic = controllerContext.partitionsForTopic(topic)
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, OfflinePartition)
partitionStateMachine.handleStateChanges(partitionsForDeletedTopic, NonExistentPartition)

// 4. 将 topic 及其分区从待删除集合中移除
topicsToBeDeleted -= topic
partitionsToBeDeleted.retain(_.topic != topic)

// 5. 清除 ZK 上与该 topic 相关的数据
val zkUtils = controllerContext.zkUtils
zkUtils.zkClient.deleteRecursive(getTopicPath(topic))
zkUtils.zkClient.deleteRecursive(getEntityConfigPath(ConfigType.Topic, topic))
zkUtils.zkClient.delete(getDeleteTopicPath(topic))

// 6. 清空 Controller 上下文中与该 topic 相关的数据
controllerContext.removeTopic(topic)
}

步骤 5 负责向所有可用的 broker 节点发送 UpdateMetadataRequest 请求,通知这些节点相关 topic 需要被删除,并对 topic 名下分区的 AR 集合执行删除操作。一个 topic 可以执行删除需要满足以下 2 个条件:

  1. Topic 待删除,且还未开始进行删除操作。
  2. Topic 未标记为不可删除。

对于同时满足上述条件的 topic 会调用 TopicDeletionManager#onTopicDeletion 方法执行删除操作:

1
2
3
4
5
6
7
8
9
10
11
12
private def onTopicDeletion(topics: Set[String]) {
info("Topic deletion callback for %s".format(topics.mkString(",")))
val partitions = topics.flatMap(controllerContext.partitionsForTopic)
// 向所有可用的 broker 节点发送 UpdateMetadataRequest 请求,通知当前 topic 需要被删除
controller.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
// 按照 topic 进行分组
val partitionReplicaAssignmentByTopic = controllerContext.partitionReplicaAssignment.groupBy(p => p._1.topic)
// 开始执行分区的删除操作
topics.foreach { topic =>
this.onPartitionDeletion(partitionReplicaAssignmentByTopic(topic).keySet)
}
}

方法 TopicDeletionManager#onPartitionDeletion 只是简单获取了对应 topic 分区的 AR 集合,并调用 TopicDeletionManager#startReplicaDeletion 方法对这些副本执行删除操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
private def startReplicaDeletion(replicasForTopicsToBeDeleted: Set[PartitionAndReplica]) {
replicasForTopicsToBeDeleted.groupBy(_.topic).keys.foreach { topic =>
// 获取 topic 名下所有可用的副本集合
val aliveReplicasForTopic = controllerContext.allLiveReplicas().filter(p => p.topic == topic)
// 获取 topic 名下中所有的不可用副本集合
val deadReplicasForTopic = replicasForTopicsToBeDeleted -- aliveReplicasForTopic
// 获取 topic 名下所有已完成删除的副本集合
val successfullyDeletedReplicas = controller.replicaStateMachine.replicasInState(topic, ReplicaDeletionSuccessful)
// 获取 topic 名下未完成删除的 topic 集合
val replicasForDeletionRetry = aliveReplicasForTopic -- successfullyDeletedReplicas
// 将不可用副本状态变更为 ReplicaDeletionIneligible
replicaStateMachine.handleStateChanges(deadReplicasForTopic, ReplicaDeletionIneligible)
// 将待删除的副本状态变更为 OfflineReplica,用于关闭 follower 对于 leader 的 fetch 请求
replicaStateMachine.handleStateChanges(replicasForDeletionRetry, OfflineReplica)
debug("Deletion started for replicas %s".format(replicasForDeletionRetry.mkString(",")))
// 将待删除的副本状态变更为 ReplicaDeletionStarted,标记当前副本准备好开始删除
controller.replicaStateMachine.handleStateChanges(replicasForDeletionRetry, ReplicaDeletionStarted,
new Callbacks.CallbackBuilder().stopReplicaCallback(deleteTopicStopReplicaCallback).build)
// 如果 topic 存在不可用的副本,标记该 topic 不可删除
if (deadReplicasForTopic.nonEmpty) {
debug("Dead Replicas (%s) found for topic %s".format(deadReplicasForTopic.mkString(","), topic))
markTopicIneligibleForDeletion(Set(topic))
}
}
}

private def deleteTopicStopReplicaCallback(stopReplicaResponseObj: AbstractResponse, replicaId: Int) {
val stopReplicaResponse = stopReplicaResponseObj.asInstanceOf[StopReplicaResponse]
debug("Delete topic callback invoked for %s".format(stopReplicaResponse))
val responseMap = stopReplicaResponse.responses.asScala
// 获取删除失败的分区集合
val partitionsInError =
if (stopReplicaResponse.errorCode != Errors.NONE.code) responseMap.keySet
else responseMap.filter { case (_, error) => error != Errors.NONE.code }.keySet
// 获取删除失败的副本集合
val replicasInError = partitionsInError.map(p => PartitionAndReplica(p.topic, p.partition, replicaId))
inLock(controllerContext.controllerLock) {
// 将删除失败的副本状态切换成 ReplicaDeletionIneligible,并唤醒删除线程再次尝试删除
this.failReplicaDeletion(replicasInError)
// 存在某些副本被成功删除,将这些副本状态切换成 ReplicaDeletionSuccessful
if (replicasInError.size != responseMap.size) {
val deletedReplicas = responseMap.keySet -- partitionsInError // 已经成功删除的副本
this.completeReplicaDeletion(deletedReplicas.map(p => PartitionAndReplica(p.topic, p.partition, replicaId)))
}
}
}

对于删除失败的副本会将其状态切换成 ReplicaDeletionIneligible,并唤醒删除线程再次尝试删除;对于删除成功的副本则将其状态置为 ReplicaDeletionSuccessful。如果一个待删除 topic 所有的副本状态均为 ReplicaDeletionSuccessful,则 DeleteTopicsThread 线程会对该 topic 执行后置清理工作,即我们前面分析的步骤 2。

副本再分配机制

Kafka Controller 提供了分区副本再分配机制,用于为指定的 topic 分区重新分配副本。当一个 Kafka Controller 实例竞选成为 leader 角色,或者管理员手动指定需要为某些 topic 分区重新分配副本时会触发该机制。这里我们以 Kafka Controller 实例竞选成为 leader 角色触发分区副本再分配的场景为例进行说明,关于管理员手动触发的场景留到后面分析 ZK 监听机制时再进行分析,实际上二者只是入口不同,具体的执行流程还是一样的。

我们从 KafkaController#maybeTriggerPartitionReassignment 方法开始说起,当 Kafka Controller 实例竞选成为 leader 角色时会触发执行该方法。在 Controller 的上下文中定义了 ControllerContext#partitionsBeingReassigned 字段,用于记录需要和正在执行副本再分配操作的 topic 分区。而 KafkaController#maybeTriggerPartitionReassignment 方法只是简单了遍历了该字段,并调用 KafkaController#initiateReassignReplicasForTopicPartition 方法为每个需要执行副本再分配的 topic 分区执行再分配操作。

在开始分析相关实现之前,我们需要明确 2 个概念:RAR 和 OAR,其中 RAR 表示分区新分配的 AR 集合,OAR 表示分区之前的 AR 集合。下面开始分析方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
// 获取新分配的 AR 集合:RAR
val newReplicas = reassignedPartitionContext.newReplicas
val topic = topicAndPartition.topic
val partition = topicAndPartition.partition
try {
// 获取指定 topic 分区之前的 AR 集合:OAR
val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
assignedReplicasOpt match {
case Some(assignedReplicas) =>
// 如果新旧副本未发生变化(OAR == RAR),则无需再分配
if (assignedReplicas == newReplicas) {
throw new KafkaException("Partition %s to be reassigned is already assigned to replicas".format(topicAndPartition) + " %s. Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
} else {
info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
// 为分区注册一个 ReassignedPartitionsIsrChangeListener 监听器
this.watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
// 标记 topic 为不可删除,因为需要执行副本再分配操作
deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
// 执行副本再分配操作
this.onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
}
case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist".format(topicAndPartition))
}
} catch {
// ... 省略异常处理
}
}

如果新分配的副本集合较当前的 AR 集合有变更,则会触发执行再分配操作,在开始操作之前,需要为对应 topic 分区注册一个 ReassignedPartitionsIsrChangeListener 监听器,并标记分区所属的 topic 不可被删除。ReassignedPartitionsIsrChangeListener 用于监听当前分区 ISR 集合的变化,具体实现我们留到后面的小节中针对性分析,下面重点来看一下 KafkaController#onPartitionReassignment 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
// 获取新分配的副本集合 RAR
val reassignedReplicas = reassignedPartitionContext.newReplicas
// 如果新分配的副本集合存在一个或多个副本不位于 ISR 集合中
if (!this.areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas)) {
info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) + "reassigned not yet caught up with the leader")
// 获取新添加的副本集合:RAR - OAR
val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
// 获取全部的副本集合:RAR + OAR
val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
// 1. 将 topic 的全部副本集合(OAR + RAR)更新到 Controller 上下文和 ZK
this.updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
// 2. 向 topic 的全部副本集合(OAR + RAR)所在的 broker 节点发送 LeaderAndIsrRequest 请求
this.updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition), newAndOldReplicas.toSeq)
// 3. 将新增的副本(RAR - OAR)状态切换成 NewReplica
this.startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) + "reassigned to catch up with the leader")
}
// 如果新分配的副本全部位于 ISR 集合中
else {
// 1. 获取旧的副本集合(OAR - RAR)
val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
// 2. 将新分配的副本集合(RAR)中所有副本的状态切换成 OnlineReplica
reassignedReplicas.foreach { replica =>
replicaStateMachine.handleStateChanges(Set(PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, replica)), OnlineReplica)
}
// 3. 更新本地记录的分区 AR 集合(OAR -> RAR),同时按需选择新的 leader 副本,并通知到集群中相关节点
this.moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
// 4. 将旧的副本状态切换成 NonExistentReplica
this.stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
// 5. 更新 ZK 中记录对应分区的 AR 集合(RAR)
this.updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
// 6. 将对应 topic 分区的副本再分配信息从 ZK 和上下文中移除
this.removePartitionFromReassignedPartitions(topicAndPartition)
info("Removed partition %s from the list of reassigned partitions in zookeeper".format(topicAndPartition))
controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
// 7. 向所有可用的 broker 节点发送 UpdateMetadataRequest 请求,通知副本再分配后的集群状态信息
this.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
// 8. 取消相关 topic 的不可删除标记,并唤醒 DeleteTopicsThread 线程
deleteTopicManager.resumeDeletionForTopics(Set(topicAndPartition.topic))
}
}

如果 RAR 中存在一个或多个副本不在 ISR 集合中,则再分配的执行流程如下:

  1. 以 OAR + RAR 中全部副本集合作为对应 topic 分区的 AR 集合,更新到 ZK 和 Controller 上下文中;
  2. 向 OAR + RAR 中全部副本所在的 broker 节点发送 LeaderAndIsrRequest 请求,更新对应节点缓存的分区 leader 副本的年代信息;
  3. 切换新增的副本(RAR - OAR)状态为 NewReplica。

这一场景下再分配操作并未执行完成,实际上大部分再分配操作新分配的 RAR 集合中都包含一个或多个不存在于 ISR 集合中的副本,所以上面的执行流程可以看做是副本再分配操作的前置流程。当这些新增的副本在运行一段时间之后,与 leader 副本进行同步,并逐一加入到 ISR 集合之后会触发 ReassignedPartitionsIsrChangeListener 监听器,回调执行后续的流程(即 RAR 中的副本全部存在于 ISR 集合中)。

如果 RAR 中的副本均包含在对应分区的 ISR 集合中,则再分配的执行流程如下:

  1. 切换 RAR 中所有副本的状态为 OnlineReplica;
  2. 使用 RAR 集合更新对应 topic 分区的 AR 集合,并在 leader 副本不在 RAR 集合中或所在的 broker 节点失效的情况下,基于 ReassignedPartitionLeaderSelector 分区 leader 副本选择器重新选择新的 leader 副本;
  3. 切换旧的副本(OAR - RAR)状态为 NonExistentReplica;
  4. 更新 ZK 中记录的对应分区的 AR 集合;
  5. 从 ZK 和 Controller 上下文中移除对应 topic 分区的副本再分配信息;
  6. 向所有可用的 broker 节点发送 UpdateMetadataRequest 请求,更新副本再分配后的集群状态信息;
  7. 取消对应 topic 的不可删除标记(前面有标记为不可删除),并唤醒 DeleteTopicsThread 线程。

各步骤的方法实现都比较简单,这里不再继续深入。

ZK 监听机制

Kafka 与 ZK 的交互依赖于 zkclient 客户端,zkclient 定义了 3 种类型的监听器接口实现:IZkDataListener、IZkChildListener 和 IZkStateListener。其中 IZkDataListener 用于监听指定节点数据的变化,IZkChildListener 用于监听指定节点下子节点的变化,IZkStateListener 则用于监听 ZK 连接状态的变化。本小节我们重点关注与 Kafka Controller 相关的 ZK 监听器实现。

ZK 连接状态监听器

SessionExpirationListener 实现了 IZkStateListener 接口,用于监听 Kafka Controller 与 ZK 之间的连接状态。SessionExpirationListener 提供了 SessionExpirationListener#handleNewSession 方法实现,当与 ZK 建立新的连接会话时会触发回调该方法,尝试选举新的 leader 角色。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def handleNewSession() {
info("ZK expired; shut down all controller components and try to re-elect")
// 如果 ZK 上记录的 controller leader 不是当前 broker 节点
if (controllerElector.getControllerID != config.brokerId) {
// 尝试清理 controller 之前的状态
onControllerResignation()
inLock(controllerContext.controllerLock) {
// 尝试竞选成为新的 controller leader
controllerElector.elect
}
} else {
info("ZK expired, but the current controller id %d is the same as this broker id, skip re-elect".format(config.brokerId))
}
}

当 broker 与 ZK 建立新的会话时,上述方法会检查当前 ZK 上记录的 leader 节点是否是当前实例所在的节点,如果不是的话则需要调用 KafkaController#onControllerResignation 执行一些状态清理工作(因为当前节点之前可能是 leader 角色),然后调用 ZookeeperLeaderElector#elect 方法基于 ZK 的临时节点机制尝试竞选成为新的 leader。关于 ZookeeperLeaderElector#elect 方法,我们在后面会专门分析,这里先来看一下 KafkaController#onControllerResignation 方法实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
def onControllerResignation() {
debug("Controller resigning, broker id %d".format(config.brokerId))

// 取消 ZK 上的监听器
this.deregisterIsrChangeNotificationListener()
this.deregisterReassignedPartitionsListener()
this.deregisterPreferredReplicaElectionListener()

// 关闭 topic 删除机制
if (deleteTopicManager != null) deleteTopicManager.shutdown()

// 关闭 partition-rebalance 分区再平衡定时任务
if (config.autoLeaderRebalanceEnable) autoRebalanceScheduler.shutdown()

inLock(controllerContext.controllerLock) {
// 取消所有的 ReassignedPartitionsIsrChangeListener 监听器
this.deregisterReassignedPartitionsIsrChangeListeners()
// 关闭分区状态机
partitionStateMachine.shutdown()
// 关闭副本状态机
replicaStateMachine.shutdown()
// 关闭 ControllerChannelManager,断开与集群中其他 broker 节点之间的连接
if (controllerContext.controllerChannelManager != null) {
controllerContext.controllerChannelManager.shutdown()
controllerContext.controllerChannelManager = null
}
// 清除 controller 年代信息
controllerContext.epoch = 0
controllerContext.epochZkVersion = 0
// 切换 broker 节点的状态
brokerState.newState(RunningAsBroker)

info("Broker %d resigned as the controller".format(config.brokerId))
}
}

上述方法会在 Kafka Controller 由 leader 角色降级为 follower 角色时被触发,具体的执行逻辑如代码注释。

ZK 节点状态监听器

TopicChangeListener

TopicChangeListener 实现了 IZkChildListener 接口,用于监听 /brokers/topics 节点,当有新的 topic 创建或者删除已有 topic 时,会触发执行相应的回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
def doHandleChildChange(parentPath: String, children: Seq[String]) {
inLock(controllerContext.controllerLock) {
if (hasStarted.get) {
try {
// 获取 /brokers/topics 路径下的子节点,即当前有效的 topic 集合
val currentChildren = {
debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
children.toSet
}
// 获取新添加的 topic 集合
val newTopics = currentChildren -- controllerContext.allTopics
// 获取已删除的 topic 集合
val deletedTopics = controllerContext.allTopics -- currentChildren
// 更新本地记录的所有 topic 集合
controllerContext.allTopics = currentChildren

// 从 ZK 读取新增分区的 AR 集合,路径:/brokers/topics/{topic_name}
val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
// 更新上下文中记录的每个分区对应的 AR 集合
controllerContext.partitionReplicaAssignment =
controllerContext.partitionReplicaAssignment.filter(p => !deletedTopics.contains(p._1.topic))
controllerContext.partitionReplicaAssignment ++= addedPartitionReplicaAssignment
info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics, deletedTopics, addedPartitionReplicaAssignment))
// 处理新增的 topic,及其新增的分区
if (newTopics.nonEmpty)
controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet)
} catch {
case e: Throwable => error("Error while handling new topic", e)
}
}
}
}

上述方法基于 ZK 感知当前新增和已删除的 topic 集合,并更新本地记录的可用的 topic 集合,及其分区的 AR 集合信息。对于新增的 topic 集合,Kafka Controller 会调用 KafkaController#onNewTopicCreation 方法为每个 topic 注册一个 PartitionModificationsListener 监听器,同时切换对应 topic 新增分区及其副本的状态,使其能够上线运行。相关实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {
info("New topic creation callback for %s".format(newPartitions.mkString(",")))
// 为每个新增的 topic 注册一个 PartitionModificationsListener 监听器
topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
// 切换新增分区及其副本状态
this.onNewPartitionCreation(newPartitions)
}

def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
info("New partition creation callback for %s".format(newPartitions.mkString(",")))
// 将所有新增的分区状态转换为 NewPartition
partitionStateMachine.handleStateChanges(newPartitions, NewPartition)
// 将新增分区的所有副本都转换为 NewReplica,向副本所在 broker 节点发送 LeaderAndIsrRequest 请求,并发送 UpdateMetadataRequest 给所有可用的 broker 节点
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
// 为所有新增的分区分配 leader 副本和 ISR 集合,并将分区状态转换为 OnlinePartition
partitionStateMachine.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
// 将新增分区的所有副本状态转换为 OnlineReplica,并尝试将副本添加到 AR 集合中
replicaStateMachine.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
}

由上面的实现可以看出,分区和副本的状态并非是一次性切换成 Online 状态的,而是先切换成 New 状态,再切换成 Online 状态,期间会为分区分配 leader 副本和 ISR 集合,并尝试将新的副本添加到对应分区的 AR 集合中。

DeleteTopicsListener

DeleteTopicsListener 实现了 IZkChildListener 接口,用于监听 /admin/delete_topics 节点,当管理员指定要删除一些 topic 时,对应的 topic 会被写入到该 ZK 节点下,然后触发执行 DeleteTopicsListener 的回调方法 DeleteTopicsListener#doHandleChildChange,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
def doHandleChildChange(parentPath: String, children: Seq[String]) {
inLock(controllerContext.controllerLock) {
// 从 ZK 获取待删除的 topic 集合
var topicsToBeDeleted = children.toSet
debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
// 检查 topic 是否存在,对于不存在的 topic 直接将其从 /admin/delete_topics 路径下删除
val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
if (nonExistentTopics.nonEmpty) {
warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
}
topicsToBeDeleted --= nonExistentTopics

// 如果允许删除 topic,对应 delete.topic.enable 配置
if (controller.config.deleteTopicEnable) {
if (topicsToBeDeleted.nonEmpty) {
info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
// 检查待删除的 topic 是否处于不可删除的情况
topicsToBeDeleted.foreach { topic =>
// 1. 检测待删除的 topic 是否有分区正在进行优先副本选举
val preferredReplicaElectionInProgress =
controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
// 2. 检测待删除的 topic 是否有分区正在进行副本再分配
val partitionReassignmentInProgress =
controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
// 如果满足上述 2 个条件之一,则将 topic 标记为不可删除
if (preferredReplicaElectionInProgress || partitionReassignmentInProgress)
controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
}
// 将可删除的 topic 提交给 TopicDeletionManager 执行删除操作
controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
}
} else {
// 如果配置不允许删除 topic,则从 ZK 上删除对应的节点(/admin/delete_topics)
for (topic <- topicsToBeDeleted) {
info("Removing " + getDeleteTopicPath(topic) + " since delete topic is disabled")
zkUtils.zkClient.delete(getDeleteTopicPath(topic))
}
}
}
}

当检测到有新的 topic 需要被删除时,上述方法会获取需要被删除的 topic 集合,并判定对应的 topic 是否是有效的(是否是真实存在的),如果无效则直接将相关删除信息从 ZK 节点下移除,对于有效的 topic 集合,在配置(对应 delete.topic.enable 配置)允许的情况下会检查待删除的 topic 分区是否满足以下 2 个条件:

  1. 存在正在进行优先副本选举的分区。
  2. 存在正在进行副本重新分配的分区。

如果待删除分区满足上述 2 个条件之一则将其标记为不可删除,否则将对应的 topic 提交给 TopicDeletionManager 执行删除操作。TopicDeletionManager 会将待删除的 topic 及其分区集合添加到 TopicDeletionManager 定义的待删除集合中,并唤醒 DeleteTopicsThread 线程执行删除操作。关于 TopicDeletionManager 的运行机制可以参考前面小节的分析。

BrokerChangeListener

BrokerChangeListener 实现了IZkChildListener 接口,用于监听 /broker/ids 节点,当有 broker 节点上线或者下线时,会触发执行相应的回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def doHandleChildChange(parentPath: String, currentBrokerList: Seq[String]) {
info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.sorted.mkString(",")))
inLock(controllerContext.controllerLock) {
if (hasStarted.get) {
ControllerStats.leaderElectionTimer.time {
try {
// 从 ZK 获取 broker 节点列表
val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)
val curBrokerIds = curBrokers.map(_.id)
val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
// 筛选新增的 broker 节点列表
val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
// 筛选故障的 broker 节点列表
val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id))
// 更新 Controller 上下文信息
controllerContext.liveBrokers = curBrokers
val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
info("Newly added brokers: %s, deleted brokers: %s, all live brokers: %s"
.format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))
// 创建 controller 到新增的 broker 节点之间的网络连接,并启动请求发送线程
newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
// 关闭 controller 到故障的 broker 节点之间的网络连接
deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
// 如果存在新增的 broker 节点,通知集群中的其它 broker 节点,并上线新增的分区副本等
if (newBrokerIds.nonEmpty) controller.onBrokerStartup(newBrokerIdsSorted)
// 如果存在故障的 broker 节点,则下线故障 broker 节点上的分区和副本,并通知到集群中的其它 broker 节点
if (deadBrokerIds.nonEmpty) controller.onBrokerFailure(deadBrokerIdsSorted)
} catch {
case e: Throwable => error("Error while handling broker changes", e)
}
}
}
}
}

对于新上线的 broker 节点会触发 Kafka Controller 创建到这些节点的网络连接,并通知集群中所有可用的 broker 节点有新的 broker 节点上线,同时切换新增 broker 节点上的分区副本状态,以上线对外提供服务。相关实现位于 KafkaController#onBrokerStartup 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def onBrokerStartup(newBrokers: Seq[Int]) {
info("New broker startup callback for %s".format(newBrokers.mkString(",")))
val newBrokersSet = newBrokers.toSet

// 1. 向集群中所有可用的 broker 节点发送 UpdateMetadataRequest 请求,发送的是所有的分区信息,通知节点有新的 broker 加入
this.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)

// 2. 将新增的 broker 节点上的副本状态设置为 OnlineReplica,以上线对外提供服务
val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
replicaStateMachine.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica)

// 3. 尝试将状态为 OfflinePartition 和 NewPartition 的分区设置为 OnlinePartition,以触发失效分区的 leader 副本选举
partitionStateMachine.triggerOnlinePartitionStateChange()

// 4. 检查正在重新分配副本的分区是否需要重新分配副本
val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter {
case (_, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains)
}
partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2))

// 5. 如果新增 broker 上有待删除的 topic 的副本,则唤醒 DeleteTopicsThread 线程进行删除,因为对应 topic 之前可能因为该副本失效而被标记为不可删除
val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
if (replicasForTopicsToBeDeleted.nonEmpty) {
info("Some replicas %s for topics scheduled for deletion %s are on the newly restarted brokers %s. Signaling restart of topic deletion for these topics"
.format(replicasForTopicsToBeDeleted.mkString(","), deleteTopicManager.topicsToBeDeleted.mkString(","), newBrokers.mkString(",")))
deleteTopicManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic))
}
}

对于已经下线的 broker 节点会触发 Kafka Controller 关闭到这些节点的网络连接,并将分配给故障节点的副本置为 OfflineReplica 状态。如果某些分区的 leader 副本正好位于故障 broker 节点上,则需要将这些分区置为 OfflinePartition 状态,并通知到集群中所有可用的 broker 节点。相关实现位于 KafkaController#onBrokerFailure 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
def onBrokerFailure(deadBrokers: Seq[Int]) {
info("Broker failure callback for %s".format(deadBrokers.mkString(",")))
// 移除正在关闭的 broker 节点
val deadBrokersThatWereShuttingDown = deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))
info("Removed %s from list of shutting down brokers.".format(deadBrokersThatWereShuttingDown))
val deadBrokersSet = deadBrokers.toSet
// 1. 如果分区 leader 副本在故障 broker 节点上,将分区状态设置为 OfflinePartition
val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader =>
deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader) &&
!deleteTopicManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet
partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)

// 2. 尝试将 OfflinePartition 状态的分区切换成 OnlinePartition 状态
partitionStateMachine.triggerOnlinePartitionStateChange()

// 3. 获取分配给故障 broker 节点的副本集合,将这些副本设置为 OfflineReplica 状态
val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokersSet)
val activeReplicasOnDeadBrokers = allReplicasOnDeadBrokers.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
replicaStateMachine.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica)

// 4. 检查故障 broker 节点上是否有待删除的 topic 副本,如果存在则将其状态转换成 ReplicaDeletionIneligible 状态,并标记 topic 不可删除
val replicasForTopicsToBeDeleted = allReplicasOnDeadBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
if (replicasForTopicsToBeDeleted.nonEmpty) {
deleteTopicManager.failReplicaDeletion(replicasForTopicsToBeDeleted)
}

// 5. 向所有可用 broker 节点发送 UpdateMetadataRequest 请求,通知部分分区已经失效
if (partitionsWithoutLeader.isEmpty) {
this.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
}
}
IsrChangeNotificationListener

IsrChangeNotificationListener 实现了 IZkChildListener 接口,用于监听 /isr_change_notification 节点,当监听到某些分区的 ISR 集合发生变化时,会触发执行相应的回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def doHandleChildChange(parentPath: String, currentChildren: Seq[String]): Unit = {
inLock(controller.controllerContext.controllerLock) {
debug("ISR change notification listener fired")
try {
// 从 ZK 上读取 ISR 集合发生变更的 topic 分区集合
val topicAndPartitions = currentChildren.flatMap(getTopicAndPartition).toSet
if (topicAndPartitions.nonEmpty) {
// 从 ZK 读取指定分区的 leader 副本、ISR 集合等信息,更新 Controller 上下文
controller.updateLeaderAndIsrCache(topicAndPartitions)
// 向集群所有可用的 broker 节点发送 UpdateMetadataRequest 请求,更新对应 broker 节点缓存的集群元数据信息
processUpdateNotifications(topicAndPartitions)
}
} finally {
// 删除 /isr_change_notification/partitions 路径下已经处理的信息
currentChildren.map(x => controller.controllerContext.zkUtils.deletePath(ZkUtils.IsrChangeNotificationPath + "/" + x))
}
}
}

private def processUpdateNotifications(topicAndPartitions: immutable.Set[TopicAndPartition]) {
val liveBrokers: Seq[Int] = controller.controllerContext.liveOrShuttingDownBrokerIds.toSeq
debug("Sending MetadataRequest to Brokers:" + liveBrokers + " for TopicAndPartitions:" + topicAndPartitions)
controller.sendUpdateMetadataRequest(liveBrokers, topicAndPartitions)
}

相关逻辑如代码注释,比较简单。

ZK 数据状态监听器

LeaderChangeListener

LeaderChangeListener 实现了 IZkDataListener 接口,用于监听 /controller 节点,当节点数据发生变更或被删除时,会触发执行相应的回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def handleDataChange(dataPath: String, data: Object) {
val shouldResign = inLock(controllerContext.controllerLock) {
val amILeaderBeforeDataChange = amILeader
// 更新本地记录的新的 controller leader 的 ID
leaderId = KafkaController.parseControllerId(data.toString)
info("New leader is %d".format(leaderId))
// 之前是 leader,但是现在切换成了 follower 角色
amILeaderBeforeDataChange && !amILeader
}
// 如果当前 broker 由 leader 变为 follower,则需要执行相应的清理工作
if (shouldResign) onResigningAsLeader()
}

def handleDataDeleted(dataPath: String) {
val shouldResign = inLock(controllerContext.controllerLock) {
debug("%s leader change listener fired for path %s to handle data deleted: trying to elect as a leader".format(brokerId, dataPath))
amILeader
}

// 如果 ZK 上记录的 leader 节点被删除,且当前节点之前是 leader,则需要执行相应的清理工作
if (shouldResign) onResigningAsLeader()

// 尝试竞选成为新的 leader
inLock(controllerContext.controllerLock) {
elect
}
}

具体的回调逻辑如代码注释,其中 ZookeeperLeaderElector#elect 方法的实现将留到后面的小节中进行分析,另外回调方法 ZookeeperLeaderElector#onResigningAsLeader 实际上就是 KafkaController#onControllerResignation 方法,这个在前面已经分析过,不再重复撰述。

PartitionModificationsListener

PartitionModificationsListener 实现了 IZkDataListener 接口,用于监听 /brokers/topics/{topic_name} 节点,当某个 topic 的分区发生变化时(即增加分区,因为分区数目只增不减),会触发执行相应的回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
def doHandleDataChange(dataPath: String, data: AnyRef) {
inLock(controllerContext.controllerLock) {
try {
info(s"Partition modification triggered $data for path $dataPath")
// 从 ZK 获取 topic 的分区和副本信息
val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic))
// 筛选新增的分区和副本信息
val partitionsToBeAdded = partitionReplicaAssignment
.filter(p => !controllerContext.partitionReplicaAssignment.contains(p._1))
// 如果 topic 待删除
if (controller.deleteTopicManager.isTopicQueuedUpForDeletion(topic))
error("Skipping adding partitions %s for topic %s since it is currently being deleted".format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
else {
// 对于正常运行的 topic 的新增分区,更新分区的 AR 集合
if (partitionsToBeAdded.nonEmpty) {
info("New partitions to be added %s".format(partitionsToBeAdded))
// 将新增的分区信息添加到 controller 上下文中
controllerContext.partitionReplicaAssignment ++= partitionsToBeAdded
// 切换新增分区及其副本的状态,使其上线对外提供服务
controller.onNewPartitionCreation(partitionsToBeAdded.keySet)
}
}
} catch {
case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e)
}
}
}

上述回调方法针对正常运行的 topic,如果有新增分区则会切换这些新增分区和副本的状态,使这些分区和副本能够上线对外提供服务。其中 KafkaController#onNewTopicCreation 方法已在前面分析过,这里不再重复撰述。

PreferredReplicaElectionListener

PreferredReplicaElectionListener 实现了 IZkDataListener 接口,用于监听 /admin/preferred_replica_election 节点,为指定的 topic 分区选举优先副本作为 leader 副本,以保证集群中 leader 副本的均衡分布。相关回调方法实现如下(省略了日志打点):

1
2
3
4
5
6
7
8
9
10
11
12
def doHandleDataChange(dataPath: String, data: AnyRef) {
inLock(controllerContext.controllerLock) {
// 获取需要进行优先副本选举的 topic 分区集合
val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString)
// 过滤已经处于优先副本选举的分区
val partitions = partitionsForPreferredReplicaElection -- controllerContext.partitionsUndergoingPreferredReplicaElection
// 过滤掉待删除的 topic 分区
val partitionsForTopicsToBeDeleted = partitions.filter(p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
// 对剩余的分区执行优先副本选举
controller.onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted)
}
}

当管理员手动指定某些 topic 分区需要执行优先副本选举时,相应的信息会被写入到 /admin/preferred_replica_election 节点下,然后触发执行上述回调方法。对于这些指定需要执行优先副本选举,且对应 topic 正常运行的分区,最终会调用 KafkaController#onPreferredReplicaElection 方法基于 PreferredReplicaPartitionLeaderSelector 分区 leader 副本选择器选举 leader 副本。方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = false) {
info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
try {
// 将分区添加到参与优先副本选举的分区集合中
controllerContext.partitionsUndergoingPreferredReplicaElection ++= partitions
// 设置对应的 topic 为不可删除
deleteTopicManager.markTopicIneligibleForDeletion(partitions.map(_.topic))
// 设置分区为 OnlinePartition 状态,并使用优先副本选择器重选 leader 副本,同时更新 ZK 和发送 LeaderAndIsrRequest 和 UpdateMetadataRequest 请求
partitionStateMachine.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
} catch {
case e: Throwable => error("Error completing preferred replica leader election for partitions %s".format(partitions.mkString(",")), e)
} finally {
// 清理 ZK 和上下文中记录的相关数据
this.removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance)
// 将 topic 恢复为可删除,并唤醒 DeleteTopicsThread 线程
deleteTopicManager.resumeDeletionForTopics(partitions.map(_.topic))
}
}

关于 PreferredReplicaPartitionLeaderSelector 的实现,在前面已经分析过,这里不再重复撰述。

PartitionsReassignedListener

PartitionsReassignedListener 实现了 IZkDataListener 接口,用于监听 /admin/reassign_partitions 节点,当管理员指定需要为某些 topic 重新分配副本时,相关信息会写入到该节点下,并触发执行相应的回调:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
def doHandleDataChange(dataPath: String, data: AnyRef) {
debug("Partitions reassigned listener fired for path %s. Record partitions to be reassigned %s".format(dataPath, data))
// 从 ZK 读取分区副本的再分配信息
val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString)
// 过滤掉正在进行再分配的分区集合
val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
}
partitionsToBeReassigned.foreach { partitionToBeReassigned =>
inLock(controllerContext.controllerLock) {
// 检测 topic 是否为待删除的 topic,如果是的话则放弃对名下分区的再分配操作
if (controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
error("Skipping reassignment of partition %s for topic %s since it is currently being deleted".format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
} else {
val context = ReassignedPartitionsContext(partitionToBeReassigned._2)
// 为副本再分配做一些前期准备工作
controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
}
}
}
}

相关执行逻辑比较简单,如代码注释,其中 KafkaController#initiateReassignReplicasForTopicPartition 方法已经在前面分析过,不再重复撰述。

ReassignedPartitionsIsrChangeListener

ReassignedPartitionsIsrChangeListener 实现了 IZkDataListener 接口,用于监听指定 topic 分区的状态变更(关注 ISR 集合的变更),相关回调实现如下(省略部分日志打点):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
def doHandleDataChange(dataPath: String, data: AnyRef) {
inLock(controllerContext.controllerLock) {
debug("Reassigned partitions isr change listener fired for path %s with children %s".format(dataPath, data))
val topicAndPartition = TopicAndPartition(topic, partition)
try {
controllerContext.partitionsBeingReassigned.get(topicAndPartition) match {
// 对应的 topic 分区正在执行副本再分配操作
case Some(reassignedPartitionContext) =>
// 从 ZK 上获取对应 topic 分区的 leader 副本和 ISR 集合
val newLeaderAndIsrOpt = zkUtils.getLeaderAndIsrForPartition(topic, partition)
newLeaderAndIsrOpt match {
case Some(leaderAndIsr) => // check if new replicas have joined ISR
val caughtUpReplicas = reassignedReplicas & leaderAndIsr.isr.toSet
// 如果 RAR 中的副本已经全部进入 ISR 集合中
if (caughtUpReplicas == reassignedReplicas) {
// 执行副本再分配的后续操作
controller.onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
} else {
// 啥也不干,等待下一次回调
}
case None => error("Error handling reassignment of partition %s to replicas %s as it was never created".format(topicAndPartition, reassignedReplicas.mkString(",")))
}
// 对应的 topic 分区已经完成副本再分配操作
case None =>
}
} catch {
case e: Throwable => error("Error while handling partition reassignment", e)
}
}
}

上述回调主要的逻辑就是判断对应 topic 分区重新分配的 RAR 集合中的副本是否都已经进入 ISR 集合,如果是的话则触发执行 KafkaController#onPartitionReassignment 方法的后续操作,否则什么也不做,继续等待下一次回调。建议将该监听器与前面第 7 小节结合起来看,能够更好的梳理整个副本再分配的执行流程。

故障转移机制

一个 Kafka 集群包含多个 broker 节点,每个 broker 节点上都会运行一个 Kafka Controller 实例,但是这些实例中只有一个是 leader 角色,其余均为 follower 角色,这些 follower 会在 leader 节点宕机时竞选成为新的 leader,以保证集群的可用性。

Kafka 定义了 ZookeeperLeaderElector 类来处理故障转移,用于在 leader 节点宕机时从 follower 节点中选举新的 leader。ZookeeperLeaderElector 的字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class ZookeeperLeaderElector(controllerContext: ControllerContext, // Controller 上下文对象
electionPath: String, // /controller
onBecomingLeader: () => Unit, // KafkaController#onControllerFailover
onResigningAsLeader: () => Unit, // KafkaController#onControllerResignation
brokerId: Int, // broker 节点 ID
time: Time // 时间戳工具类
) extends LeaderElector with Logging {

/** 当前 controller leader 的 ID */
var leaderId: Int = -1
/** 监听 ZK 的 /controller 节点的数据变化 */
val leaderChangeListener = new LeaderChangeListener

// ... 省略方法定义

}

ZookeeperLeaderElector 的 ZookeeperLeaderElector#startup 方法会在 Kafka Controller 启动时被调用,以启动故障转移机制。该方法会在 /controller 节点上注册 LeaderChangeListener 监听器,并尝试竞选成为新的 leader。方法实现如下:

1
2
3
4
5
6
7
8
def startup {
inLock(controllerContext.controllerLock) {
// 注册 ZK 监听器,监听 /controller 节点下的数据变更
controllerContext.zkUtils.zkClient.subscribeDataChanges(electionPath, leaderChangeListener)
// 执行 leader 选举
elect
}
}

方法 ZookeeperLeaderElector#elect 已在前面多次提及过,用于执行 leader 选举,该方法主要在以下 3 种场景下被触发:

  1. Kafka Controller 实例启动时。
  2. ZK 节点 /controller 下的数据被清除时。
  3. Broker 节点与 ZK 重新建立会话时。

下面来看一下 ZookeeperLeaderElector#elect 方法的实现,该方法基于 ZK 的临时节点机制竞选 leader 角色,并返回当前节点是不是新的 leader 角色:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
def elect: Boolean = {
val timestamp = time.milliseconds.toString
val electString = Json.encode(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp))

// 获取 ZK 中记录的 controller leader 的 ID
leaderId = this.getControllerID
// 已经存在 controller leader,放弃选举
if (leaderId != -1) {
debug("Broker %d has been elected as leader, so stopping the election process.".format(leaderId))
return amILeader
}

try {
// 尝试创建 ZK 临时节点,如果临时节点已经存在,则抛出异常
val zkCheckedEphemeral = new ZKCheckedEphemeral(electionPath,
electString,
controllerContext.zkUtils.zkConnection.getZookeeper,
JaasUtils.isZkSecurityEnabled)
zkCheckedEphemeral.create()
info(brokerId + " successfully elected as leader")
// 创建成功,更新 leader 节点 ID
leaderId = brokerId
// 回调
onBecomingLeader()
} catch {
// leader 已经存在
case _: ZkNodeExistsException =>
// If someone else has written the path, then
leaderId = getControllerID
case e2: Throwable =>
error("Error while electing or becoming leader on broker %d".format(brokerId), e2)
// 重置 leaderId,并删除 /controller 节点
resign()
}
// 检测当前 broker 节点是否成为 leader
amILeader
}

基于 ZK 的临时节点机制实施 leader 选举是一个比较典型且成熟的方案,在很多分布式系统中均有应用。ZK 临时节点的特性就在于当 broker 节点与 ZK 断开连接时,之前创建的临时节点会被删除,如果对应的临时节点已经存在,则其它节点再次尝试创建时会抛出 ZkNodeExistsException 异常。如果当前 broker 节点成功竞选成为新的 leader,则会回调 ZookeeperLeaderElector#onBecomingLeader 方法,对应 KafkaController#onControllerFailover 实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def onControllerFailover() {
if (isRunning) {
info("Broker %d starting become controller state transition".format(config.brokerId))
// 1. 从 ZK 读取历史 controller 年代信息,并更新 Controller 上下文
this.readControllerEpochFromZookeeper()
// 2. 递增 controller 年代信息,并更新到 ZK
this.incrementControllerEpoch(zkUtils.zkClient)

// 3. 注册 ZK 监听器

this.registerReassignedPartitionsListener() // 注册 PartitionsReassignedListener
this.registerIsrChangeNotificationListener() // 注册 IsrChangeNotificationListener
this.registerPreferredReplicaElectionListener() // 注册 PreferredReplicaElectionListener
partitionStateMachine.registerListeners() // 注册 TopicChangeListener 和 DeleteTopicsListener
replicaStateMachine.registerListeners() // 注册 BrokerChangeListener

// 4. 初始化 Controller 上下文信息
this.initializeControllerContext()

// 5. 向集群中所有可用的 broker 发送 UpdateMetadataRequest 请求,更新对应节点缓存的集群元数据
this.sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)

// 6. 启动副本状态机,并初始化各个副本的状态
replicaStateMachine.startup()

// 7. 启动分区状态机,并初始化各个分区的状态
partitionStateMachine.startup()

// 8. 为所有的 topic 注册 PartitionModificationsListener 监听器
controllerContext.allTopics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))

// 9. 处理副本需要再分配的分区
this.maybeTriggerPartitionReassignment()

// 10. 处理需要进行优先副本选举的分区
this.maybeTriggerPreferredReplicaElection()

// 11. 依据配置决定是否开启分区自动均衡的功能
if (config.autoLeaderRebalanceEnable) {
info("starting the partition rebalance scheduler")
// 启用定时任务,周期性检测
autoRebalanceScheduler.startup()
autoRebalanceScheduler.schedule(
"partition-rebalance-thread",
checkAndTriggerPartitionRebalance,
5,
config.leaderImbalanceCheckIntervalSeconds.toLong,
TimeUnit.SECONDS)
}
// 12. 启动 TopicDeletionManager,用于对指定的 topic 执行删除操作
deleteTopicManager.start()
} else
info("Controller has been shut down, aborting startup/failover")
}

上述方法实现了一个 broker 节点竞选成为新的 leader 之后所需要执行的一些初始化操作,其中一些步骤已经在前面分析过了,例如状态机的启动和初始化过程、分区副本再分配机制等,下面重点来看一下步骤 4 和 11。

初始化上下文信息

前面我们分析了管理 Kafka Controller 上下文的类 ControllerContext, 步骤 4 实现了当一个 broker 节点由 follower 角色切换成 leader 角色时对上下文执行初始化的操作。相关实现位于 KafkaController#initializeControllerContext 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
private def initializeControllerContext() {
// 读取 /brokers/ids 节点,初始化可用的 broker 集合
controllerContext.liveBrokers = zkUtils.getAllBrokersInCluster.toSet
// 读取 /brokers/topics 节点,初始化集群中全部的 topic 集合
controllerContext.allTopics = zkUtils.getAllTopics.toSet
// 读取 /brokers/topics/{topic_name}/partitions 节点,初始化每个分区的 AR 集合
controllerContext.partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(controllerContext.allTopics.toSeq)
controllerContext.partitionLeadershipInfo = new mutable.HashMap[TopicAndPartition, LeaderIsrAndControllerEpoch]
controllerContext.shuttingDownBrokerIds = mutable.Set.empty[Int]
// 读取 /brokers/topics/{topic_name}/partition/{partitionId}/stat 节点,初始化每个 topic 分区的 leader 副本和 ISR 集合等信息
this.updateLeaderAndIsrCache()
// 启动 ControllerChannelManager,用于建立到集群中所有 broker 节点的连接,并与之通信
this.startChannelManager()
// 读取 /admin/preferred_replica_election 节点,初始化需要执行优先副本选举的分区
this.initializePreferredReplicaElection()
// 读取 /admin/reassign_partitions 节点,初始化需要进行副本重新分配的分区
this.initializePartitionReassignment()
// 启动 TopicDeletionManager,用于管理待删除的 topic 和不可删除的 topic 集合
this.initializeTopicDeletion()
}

分区再平衡机制

本小节介绍的分区再平衡机制与前面分析消费者和 GroupCoordinator 组件时提到的分区再分配机制不同,分区再分配的目的在于为一个 group 名下的消费者分配分区,而分区再平衡的目的在于将 topic 分区的 leader 副本尽量均匀分散在不同的 broker 节点上,以保证各个 broker 节点的负载均衡。

步骤 11 依据配置 auto.leader.rebalance.enable 决定是否启动 partition-rebalance-thread 定时任务,以对集群中的 topic 分区执行再平衡策略,从而保证各个 broker 节点的负载均衡。当一个 topic 被新建时,topic 名下的分区和分区对应的 leader 副本会尽可能均衡分散到集群中的 broker 节点上,但是随着服务的运行可能存在一些 boker 节点的失效,从而逐渐让各个 broker 节点上运行的分区 leader 副本数目失衡,造成某些 broker 节点负载较高,最终影响 Kafka 的性能。定时任务 partition-rebalance-thread 的作用在于主动发现负载较高的 broker 节点,并执行分区 leader 副本再平衡操作。

相关逻辑位于 KafkaController#checkAndTriggerPartitionRebalance 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
private def checkAndTriggerPartitionRebalance(): Unit = {
if (isActive) {
trace("checking need to trigger partition rebalance")
// 获取所有可用的副本集合,key 是优先副本所在的 broker 节点 ID
var preferredReplicasForTopicsByBrokers: Map[Int, Map[TopicAndPartition, Seq[Int]]] = null
inLock(controllerContext.controllerLock) {
// 获取优先副本所在的 broker 节点 ID 与分区的对应关系
preferredReplicasForTopicsByBrokers =
controllerContext.partitionReplicaAssignment
// 过滤掉待删除的 topic
.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p._1.topic))
// 按照第一个副本(优先副本) ID 进行分组
.groupBy { case (_, assignedReplicas) => assignedReplicas.head }
}
debug("preferred replicas by broker " + preferredReplicasForTopicsByBrokers)

// 计算每个 broker 节点的副本不均衡比率(imbalanceRatio)
preferredReplicasForTopicsByBrokers.foreach { case (leaderBroker, topicAndPartitionsForBroker) =>
var imbalanceRatio: Double = 0
// 计算存在 leader 副本,但不是以优先副本作为 leader 副本的分区副本集合
var topicsNotInPreferredReplica: Map[TopicAndPartition, Seq[Int]] = null
inLock(controllerContext.controllerLock) {
topicsNotInPreferredReplica = topicAndPartitionsForBroker.filter { case (topicPartition, _) =>
controllerContext.partitionLeadershipInfo.contains(topicPartition) && // 存在 leader 副本
controllerContext.partitionLeadershipInfo(topicPartition).leaderAndIsr.leader != leaderBroker // 且 leader 副本不是优先副本
}
debug("topics not in preferred replica " + topicsNotInPreferredReplica)
val totalTopicPartitionsForBroker = topicAndPartitionsForBroker.size
val totalTopicPartitionsNotLedByBroker = topicsNotInPreferredReplica.size
// 计算当前 broker 节点的 imbalance 比率:(不是以优先副本作为 leader 副本的分区数 / 所在 broker 节点的总分区数)
imbalanceRatio = totalTopicPartitionsNotLedByBroker.toDouble / totalTopicPartitionsForBroker
trace("leader imbalance ratio for broker %d is %f".format(leaderBroker, imbalanceRatio))
}

// 当 broker 节点上的不均衡比率大于阈值(对应 leader.imbalance.per.broker.percentage 配置)时,触发优先副本选举机制
if (imbalanceRatio > (config.leaderImbalancePerBrokerPercentage.toDouble / 100)) {
topicsNotInPreferredReplica.keys.foreach { topicPartition =>
inLock(controllerContext.controllerLock) {
// 对满足条件的 topic 分区,执行优先副本选举
if (controllerContext.liveBrokerIds.contains(leaderBroker) && // 对应的 broker 节点是有效的
controllerContext.partitionsBeingReassigned.isEmpty && // 没有分区正在执行副本再分配
controllerContext.partitionsUndergoingPreferredReplicaElection.isEmpty && // 没有分区正在执行优先副本选举
!deleteTopicManager.isTopicQueuedUpForDeletion(topicPartition.topic) && // 分区所属 topic 正常运行
controllerContext.allTopics.contains(topicPartition.topic)) { // 分区所属 topic 是有效的
// 执行优先副本选举
onPreferredReplicaElection(Set(topicPartition), isTriggeredByAutoRebalance = true)
}
}
}
}
}
}
}

分区 leader 副本再平衡的执行流程可以概括如下:

  1. 获取所有分区及其副本集合,按照优先副本 ID 进行组织;
  2. 计算各个 broker 节点的不均衡比率;
  3. 当某个 broker 节点的不均衡比率超过阈值时,按条件执行优先副本选举。

一个 broker 节点不均衡比率在计算上等于 节点上不是以优先副本作为 leader 副本的分区数除以 broker 节点上运行的分区总数 ,当不均衡比率超过 leader.imbalance.per.broker.percentage 配置时,如果对应 topic 分区同时满足以下条件则触发优先副本选举,保证 broker 节点的负载均衡:

  1. 对应的 broker 节点是有效的。
  2. 没有分区正在执行副本再分配。
  3. 没有分区正在执行优先副本选举。
  4. 分区所属 topic 是有效且正常运行的。

Controlled Shutdown 机制

前面分析过 BrokerChangeListener 监听器,用于处理 boker 节点上下线的逻辑,这里的下线预示着对应的 broker 节点已经失效,而实际运维中还存在另外一种下线的场景,即由管理员主动触发下线(例如迁移机房、升级软件,修改 Kafka 配置等)。这一场景下对应的 broker 节点是正常运行的,如果我们需要下线这一类 broker 节点,Kafka 提供了更加温柔的方式,即 Controlled Shutdown 机制。相对于 broker 节点的宕机,Controlled Shutdown 关停 broker 节点的优势在于:

  1. 可以让日志数据全部落盘,避免重新上线后的日志恢复操作。
  2. 可以对 leader 副本位于待下线 broker 节点上的分区进行迁移,保证分区的可用性。

当管理员希望对目标 broker 节点执行 Controlled Shutdown 操作时,可以使用命令行工具向 Kafka Controller 发送 ControlledShutdownRequest 请求,相应的处理逻辑位于 KafkaController#shutdownBroker 方法中,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
def shutdownBroker(id: Int): Set[TopicAndPartition] = {
// 保证当前 Controller 是 leader 角色
if (!isActive) {
throw new ControllerMovedException("Controller moved to another broker. Aborting controlled shutdown")
}

controllerContext.brokerShutdownLock synchronized {
info("Shutting down broker " + id)
inLock(controllerContext.controllerLock) {
// 校验目标 broker 节点是否处于运行中,对于不存在或已经关闭的 broker 节点不需要执行关闭操作
if (!controllerContext.liveOrShuttingDownBrokerIds.contains(id)) throw new BrokerNotAvailableException("Broker id %d does not exist.".format(id))
// 记录正在关闭的 broker 节点 ID
controllerContext.shuttingDownBrokerIds.add(id)
}

// 获取待关闭 broker 节点上所有的分区和副本信息
val allPartitionsAndReplicationFactorOnBroker: Set[(TopicAndPartition, Int)] =
inLock(controllerContext.controllerLock) {
controllerContext.partitionsOnBroker(id).map(topicAndPartition =>
(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition).size))
}

// 遍历处理待关闭 broker 节点上的分区和副本
allPartitionsAndReplicationFactorOnBroker.foreach {
case (topicAndPartition, replicationFactor) =>
// Move leadership serially to relinquish lock.
inLock(controllerContext.controllerLock) {
controllerContext.partitionLeadershipInfo.get(topicAndPartition).foreach { currLeaderIsrAndControllerEpoch =>
// 如果开启了副本机制
if (replicationFactor > 1) {
// 如果分区 leader 副本位于待关闭的 broker 节点上
if (currLeaderIsrAndControllerEpoch.leaderAndIsr.leader == id) {
// 使用 ControlledShutdownLeaderSelector 选择器重新为分区选择新的 leader 和 ISR 集合,
// 并将结果写入 ZK,然后发送 LeaderAndIsrRequest 和 UpdateMetadataRequest 请求给集群中相应 broker 节点
partitionStateMachine.handleStateChanges(
Set(topicAndPartition), OnlinePartition, controlledShutdownPartitionLeaderSelector)
}
// 如果分区 leader 副本不位于待关闭的 broker 节点上
else {
try {
// 发送 StopReplicaRequest 请求给待关闭的 broker 节点,关闭分区位于该节点上的副本(不删除副本)
brokerRequestBatch.newBatch()
brokerRequestBatch.addStopReplicaRequestForBrokers(
Seq(id), topicAndPartition.topic, topicAndPartition.partition, deletePartition = false)
brokerRequestBatch.sendRequestsToBrokers(epoch)
} catch {
// ... 省略异常处理
}
// 将副本状态切换成 OfflineReplica,并尝试从 ISR 集合中移除,同时通知到集群中相应 broker 节点
replicaStateMachine.handleStateChanges(
Set(PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition, id)), OfflineReplica)
}
}
}
}
}

// 统计 leader 副本依然处于待关闭 broker 节点上的分区数目
def replicatedPartitionsBrokerLeads(): Iterable[TopicAndPartition] = inLock(controllerContext.controllerLock) {
trace("All leaders = " + controllerContext.partitionLeadershipInfo.mkString(","))
controllerContext.partitionLeadershipInfo.filter {
case (topicAndPartition, leaderIsrAndControllerEpoch) =>
leaderIsrAndControllerEpoch.leaderAndIsr.leader == id &&
controllerContext.partitionReplicaAssignment(topicAndPartition).size > 1
}.keys
}

replicatedPartitionsBrokerLeads().toSet
}
}

对于位于待关停的 broker 节点上的分区,如果启用了副本机制则需要判断分区 leader 副本是否位于待关停的 broker 节点上,如果是的话则需要使用 ControlledShutdownLeaderSelector 分区 leader 副本选择器为当前分区重新分配新的 leader 副本和 ISR 集合,并将结果通知给集群中相应的 broker 节点;如果分区 leader 副本不位于待关停 broker 节点上则直接向该节点发送 StopReplicaRequest 请求,关闭节点上的副本即可,这里可能涉及到分区 ISR 集合的变更,需要将变更的结果通知给集群中相应的 broker 节点。

总结

本文介绍了 Kafka Controller 组件的功能与实现,在一个 Kafka 集群中运行着多个 broker 节点,这些节点在启动时彼此是相互独立的,但是依托于 Kafka Controller 组件可以协调这些 broker 节点的运行,以集群的身份统一对外提供服务。Kafka Controller 提供了对集群中所有分区和副本的状态管理、集群上下文信息管理、副本再分配、分区再平衡、Controlled Shutdown 机制、故障转移机制,以及与 ZK 交互等功能,可以看做是 Kafka 集群的中央控制器。