JStorm 源码解析:Worker 的启动和运行机制

上一篇我们分析了 supervisor 节点的启动和运行过程,提及到 supervisor 的核心工作就是基于 ZK 从 nimbus 节点领取分配给它的任务,并启动 worker 执行。一个 worker 就是一个 JVM 进程,运行在 supervisor 节点上,多个 task 可以同时运行在一个 worker 进程之中,每个 task 都对应一个线程。

Worker 进程的启动位于 Worker 类中,前面我们在分析 supervisor 节点的启动过程时提及到了对于 Worker 类 main 函数的触发,supervisor 在启动相应 worker 进程时会指定 topologyId、supervisorId、workerPort、workerId,以及 classpath 等参数,worker 在拿到这些参数之后会先获取当前机器上端口对应的老进程,并逐一 kill 掉,然后调用 Worker#mk_worker 方法创建并启动对应的 worker 实例,该方法的核心实现如下:

1
2
Worker w = new Worker(conf, context, topologyId, supervisorId, port, workerId, jarPath);
return w.execute();

Worker 类仅包含一个实例属性 WorkerData,它封装了所有与 worker 运行相关的属性,实例化 Worker 对象的过程也是初始化 WorkerData 属性的过程,该过程主要包含以下工作:

  1. 初始化基本属性(包括运行基本配置项、消息上下文对象等),同时设置初始状态;
  2. 检查当前运行模式,对于集群模式会在本地创建相应的工作目录 workers/${worker_id}/pids
  3. 从 ZK 获取当前集群的运行状态;
  4. 加载 topology 配置信息,并注册相应的配置动态更新监听器;
  5. 注册一系列 mertics 监控项,用于打点 worker 的运行状态;
  6. 加载当前 topology 对应的序列化对象;
  7. 创建并初始化相关的消息传输队列。

初始化完成之后会调用 Worker#execute 方法创建并启动 worker 进程,该方法主要的执行流程可以概括如下:

  1. 为当前 worker 创建并启动一个 socket 连接,用于接收消息并分发给名下的 task 线程;
  2. 启动一个线程用于维护当前 worker 状态变更时,更新与其它 worker 之间的连接关系;
  3. 启动一个线程用于定期获取当前 topology 在 ZK 上的基本信息,当 topology 状态发生变更时触发本地相应操作;
  4. 启动一个线程循环消费当前 worker 的 tuple 队列发送给对应的下游 task 线程;
  5. 启动一个线程用于定期更新本地的 worker 心跳信息;
  6. 创建并启动当前 worker 下所有的 task 任务。

方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
public WorkerShutdown execute() throws Exception {
List<AsyncLoopThread> threads = new ArrayList<>();

// 1. 为 worker 创建一个 socket 连接,接收和分发消息给对应的 task
AsyncLoopThread controlRvThread = this.startDispatchThread();
threads.add(controlRvThread);

// 2. 创建线程用于在 worker 关闭或者新启动时更新与其他 worker 之间的连接信息
RefreshConnections refreshConn = this.makeRefreshConnections();
AsyncLoopThread refreshConnLoopThread = new AsyncLoopThread(refreshConn, false, Thread.MIN_PRIORITY, true);
threads.add(refreshConnLoopThread);

// 3. 获取 topology 在 ZK 上的状态,当状态发生变更时更新本地 task 状态
RefreshActive refreshZkActive = new RefreshActive(workerData);
AsyncLoopThread refreshZk = new AsyncLoopThread(refreshZkActive, false, Thread.MIN_PRIORITY, true);
threads.add(refreshZk);

// 4. 创建一个线程循环消费 tuple 队列发送给对应的下游 task
DrainerCtrlRunnable drainerCtrlRunnable = new DrainerCtrlRunnable(workerData, MetricDef.SEND_THREAD);
AsyncLoopThread controlSendThread = new AsyncLoopThread(drainerCtrlRunnable, false, Thread.MAX_PRIORITY, true);
threads.add(controlSendThread);

// Sync heartbeat to Apsara Container
AsyncLoopThread syncContainerHbThread = SyncContainerHb.mkWorkerInstance(workerData.getStormConf());
if (syncContainerHbThread != null) {
threads.add(syncContainerHbThread);
}

JStormMetricsReporter metricReporter = new JStormMetricsReporter(workerData);
metricReporter.init();
workerData.setMetricsReporter(metricReporter);

// 5. 更新本地心跳信息
RunnableCallback heartbeatFn = new WorkerHeartbeatRunnable(workerData);
AsyncLoopThread hb = new AsyncLoopThread(heartbeatFn, false, null, Thread.NORM_PRIORITY, true);
threads.add(hb);

// 6. 创建并启动当前 worker 下所有的 task
List<TaskShutdownDaemon> shutdownTasks = this.createTasks();
workerData.setShutdownTasks(shutdownTasks);

List<AsyncLoopThread> serializeThreads = workerData.setSerializeThreads();
threads.addAll(serializeThreads);
List<AsyncLoopThread> deserializeThreads = workerData.setDeserializeThreads();
threads.addAll(deserializeThreads);

return new WorkerShutdown(workerData, threads);
}

消息接收与分发

Storm 会为 worker 基于 Netty 创建并返回一个 socket 连接用于接收消息,同时 worker 与名下所有 task 之间会维持一个传输队列,并启动一个线程循环消费接收到的消息投递给对应 task 的传输队列中。该过程位于 Worker#startDispatchThread 方法中,该方法实现如下(去掉了一些非关键代码):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private AsyncLoopThread startDispatchThread() {
IContext context = workerData.getContext(); // 获取消息上下文:NettyContext
String topologyId = workerData.getTopologyId();

// 1. 创建一个接收消息的消息队列(disruptor)
Map stormConf = workerData.getStormConf();
long timeout = JStormUtils.parseLong(stormConf.get(Config.TOPOLOGY_DISRUPTOR_WAIT_TIMEOUT), 10); // 默认 10ms
WaitStrategy waitStrategy = new TimeoutBlockingWaitStrategy(timeout, TimeUnit.MILLISECONDS); // 10ms
int queueSize = JStormUtils.parseInt(stormConf.get(Config.TOPOLOGY_CTRL_BUFFER_SIZE), 256);
DisruptorQueue recvControlQueue = DisruptorQueue.mkInstance("Dispatch-control", ProducerType.MULTI, queueSize, waitStrategy, false, 0, 0);

// 2. 为当前 worker 基于 Netty 创建并返回一个 Socket 连接用于接收消息
IConnection recvConnection = context.bind(
topologyId, workerData.getPort(), workerData.getDeserializeQueues(), recvControlQueue, false, workerData.getTaskIds());
workerData.setRecvConnection(recvConnection);

// 3. 启动一个线程循环消费 worker 接收到的消息,并应用 DisruptorRunnable.onEvent 方法,
// 最终调用的是 VirtualPortCtrlDispatch.handleEvent 方法,将消息投递给指定 task 的消息队列
RunnableCallback recvControlDispatcher = new VirtualPortCtrlDispatch(
workerData, recvConnection, recvControlQueue, MetricDef.RECV_THREAD);
return new AsyncLoopThread(recvControlDispatcher, false, Thread.MAX_PRIORITY, true);
}

这里的消息队列底层都依赖于 disruptor 实现,最终对于接收到的消息都会调用 VirtualPortCtrlDispatch#handleEvent 方法进行处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public void handleEvent(Object event, boolean endOfBatch) throws Exception {
TaskMessage message = (TaskMessage) event;
int task = message.task(); // 获取当前消息对应的 taskId

// 消息反序列化
Object tuple = null;
try {
// there might be errors when calling update_topology
tuple = this.deserialize(message.message(), task);
} catch (Throwable e) {
if (Utils.exceptionCauseIsInstanceOf(KryoException.class, e)) {
throw new RuntimeException(e);
}
LOG.warn("serialize msg error", e);
}

// 获取 taskId 对应的消息通道
DisruptorQueue queue = controlQueues.get(task);
if (queue == null) {
LOG.warn("Received invalid control message for task-{}, Dropping...{} ", task, tuple);
return;
}
if (tuple != null) {
// 将消息投递给对应的 task 传输队列
queue.publish(tuple);
}
}

创建并启动用于维护 worker 之间连接关系的线程

在这一步会创建一个 RefreshConnections 对象,它继承了 RunnableCallback 类,所以同样是被异步循环线程模型接管(按照指定间隔循环调用其 RefreshConnections#run 方法),Storm 会定期检测 ZK 上的 topology 任务分配信息是否有更新,如果有比本地更新的任务分配(依赖于任务分配时间戳进行判定)则会判断新任务分配的类型来相应的更新本地的信息。

如果当前的任务分配类型仅仅是更新集群上已有的 topology,则 Storm 会遍历通知各个 task 执行相应的更新操作,同时会回调已注册的所有更新监听器以更新配置信息,实现如下:

1
2
3
4
5
6
7
8
9
10
11
// 当前任务分配已经更新且是更新 topology 操作,则通知所有的 task
List<TaskShutdownDaemon> taskShutdowns = workerData.getShutdownTasks();
Map newConf = StormConfig.read_supervisor_topology_conf(conf, topologyId);
workerData.getStormConf().putAll(newConf);
for (TaskShutdownDaemon taskSD : taskShutdowns) {
// 通知所有的 task
taskSD.update(newConf);
}
// disable/enable metrics on the fly
workerData.getUpdateListener().update(newConf); // 回调更新监听器,更新配置
workerData.setAssignmentType(AssignmentType.UpdateTopology);

如果当前是更新以外的任务分配类型(Assign、ScaleTopology),则 Storm 会从新的任务分配信息中分别获取新增的、待删除的,以及需要更新的 taskId 列表,并执行相应的创建、删除,以及更新 task 操作,同时会更新 worker 上所有 task 的下游 task 列表信息。部分代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// 获取新增的 taskId 列表
Set<Integer> addedTasks = this.getAddedTasks(assignment);
// 获取待删除的 taskId 列表
Set<Integer> removedTasks = this.getRemovedTasks(assignment);
// 获取待更新的 taskId 列表
Set<Integer> updatedTasks = this.getUpdatedTasks(assignment);

// 基于新任务分配信息更新 workerData
workerData.updateWorkerData(assignment);
workerData.updateKryoSerializer();

// 关闭需要移除的 task
this.shutdownTasks(removedTasks);
// 创建新增的 task
this.createTasks(addedTasks);
// 更新已有需要被更新的 task
this.updateTasks(updatedTasks);

// 更新当前 worker 上所有 task 的下游 task 列表信息
Set<Integer> tmpOutboundTasks = Worker.worker_output_tasks(workerData);
if (!outboundTasks.equals(tmpOutboundTasks)) {
for (int taskId : tmpOutboundTasks) {
if (!outboundTasks.contains(taskId)) {
workerData.addOutboundTaskStatusIfAbsent(taskId);
}
}
for (int taskId : workerData.getOutboundTaskStatus().keySet()) {
if (!tmpOutboundTasks.contains(taskId)) {
workerData.removeOutboundTaskStatus(taskId);
}
}
workerData.setOutboundTasks(tmpOutboundTasks);
outboundTasks = tmpOutboundTasks;
}
workerData.setAssignmentType(AssignmentType.Assign);

创建并启动定期获取 topology 基本信息的线程

在这一步会创建一个 RefreshActive 对象,它同样继承了 RunnableCallback 类,所以同样也是被异步循环线程模型接管(按照指定间隔循环调用其 RefreshActive#run 方法),Storm 会定期获取当前 topology 在 ZK 上的基本信息,当 topology 状态发生变更时触发本地执行相应的操作。

如果 topology 状态信息变为 active、upgrading,或者 rollback 时,Storm 会依次将本地 task 的状态设置为 TaskStatus.RUN,如果当前 task 对应的组件是 spout,则会触发 ISpout#activate 方法。如果当前 topology 状态不为 inactive 时,Storm 会依次将本地的 task 状态设置为 TaskStatus.PAUSE,如果当前 task 对应的组件是 spout,则会触发 ISpout#deactivate 方法。最后更新本地记录的 topology 状态。相关实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
if (newTopologyStatus.equals(StatusType.active) // 激活
|| newTopologyStatus.equals(StatusType.upgrading) // 灰度
|| newTopologyStatus.equals(StatusType.rollback)) { // 回滚
for (TaskShutdownDaemon task : tasks) {
if (task.getTask().getTaskStatus().isInit()) {
task.getTask().getTaskStatus().setStatus(TaskStatus.RUN);
} else {
task.active();
}
}
} else if (oldTopologyStatus == null || !oldTopologyStatus.equals(StatusType.inactive)) {
for (TaskShutdownDaemon task : tasks) {
if (task.getTask().getTaskStatus().isInit()) {
task.getTask().getTaskStatus().setStatus(TaskStatus.PAUSE);
} else {
task.deactive();
}
}
}
workerData.setTopologyStatus(newTopologyStatus);

创建并启动循环消费 worker tuple 队列的线程

在这一步会创建一个 DrainerCtrlRunnable 对象,它同样继承了 RunnableCallback 类,所以同样也是被异步循环线程模型接管(按照指定间隔循环调用其 DrainerCtrlRunnable#run 方法),Storm 会循环消费当前 worker 的 tuple 队列 transferCtrlQueue,并最终调用 DrainerCtrlRunnable#handleEvent 方法对拿到的消息进行处理,该方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public void handleEvent(Object event, boolean endOfBatch) throws Exception {
if (event == null) {
return;
}
ITupleExt tuple = (ITupleExt) event;
int targetTask = tuple.getTargetTaskId();

// 获取与下游 task 的连接
IConnection conn = this.getConnection(targetTask);
if (conn != null) {
byte[] tupleMessage = null;
try {
// there might be errors when calling update_topology
tupleMessage = this.serialize(tuple); // 序列化数据
} catch (Throwable e) {
// 省略异常处理
}
// 基于 netty 发送数据
TaskMessage message = new TaskMessage(TaskMessage.CONTROL_MESSAGE, targetTask, tupleMessage);
conn.sendDirect(message);
}
}

方法的逻辑比较简单,拿到当前 tuple 对应的下游 taskId,然后与之建立连接(netty)并将 tuple 发送给它。

创建并启动当前 worker 下所有的 task 线程

方法 Worker#createTasks 用于为当前 worker 下的所有 task 任务创建一个 Task 对象,并为每个 task 启动一个线程执行,同时为每个 task 任务创建一个 TaskShutdownDaemon 对象用于管理对应的 task 线程,方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private List<TaskShutdownDaemon> createTasks() throws Exception {
List<TaskShutdownDaemon> shutdownTasks = new ArrayList<>();

// 获取当前 worker 下所有的 taskId
Set<Integer> taskIds = workerData.getTaskIds();

Set<Thread> threads = new HashSet<>();
List<Task> taskArrayList = new ArrayList<>();
for (int taskId : taskIds) {
// 创建并启动 task
Task task = new Task(workerData, taskId);
Thread thread = new Thread(task);
threads.add(thread);
taskArrayList.add(task);
thread.start(); // 启动 task
}
for (Thread thread : threads) {
thread.join();
}
for (Task t : taskArrayList) {
shutdownTasks.add(t.getTaskShutdownDameon());
}
return shutdownTasks;
}

Task 类实现了 Runnable 接口,其 run 方法中简单调用了 Task#execute 方法,该方法首先会向系统 bolt 发送一条“startup”消息,然后依据当前的组件类型创建对应的任务执行器,创建的过程位于 Task#mkExecutor 方法中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public BaseExecutors mkExecutor() {
BaseExecutors baseExecutor = null;

if (taskObj instanceof IBolt) {
if (taskId == topologyContext.getTopologyMasterId()) {
baseExecutor = new TopologyMasterBoltExecutors(this);
} else {
baseExecutor = new BoltExecutors(this);
}
} else if (taskObj instanceof ISpout) {
if (this.isSingleThread(stormConf)) {
baseExecutor = new SingleThreadSpoutExecutors(this);
} else {
baseExecutor = new MultipleThreadSpoutExecutors(this);
}
}

return baseExecutor;
}

BaseExecutors 类是一个 RunnableCallback 类,所以其 run 方法会被异步循环调用。继承自 BaseExecutors 类有 5 个(如下),而 Task#mkExecutor 方法基于组件类型分别选择了相应的实现类进行实例化。

  • BoltExecutors
  • TopologyMasterBoltExecutors
  • SpoutExecutors
  • SingleThreadSpoutExecutors
  • MultipleThreadSpoutExecutors

先来看一下 BoltExecutors 和 TopologyMasterBoltExecutors,这是 bolt 组件的任务执行器,其中 TopologyMasterBoltExecutors 继承自 BoltExecutors,所以接下来我们主要来看一下 BoltExecutors 的实现。BoltExecutors 类的 run 方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public void run() {
if (!isFinishInit) {
// 执行初始化操作,主要是调用了 IBolt.prepare 方法
this.initWrapper();
}
while (!taskStatus.isShutdown()) {
try {
// 循环消费当前 task 的消息队列
this.consumeExecuteQueue();
} catch (Throwable e) {
// 省略异常处理逻辑
}
}
}

方法首先会判定是否完成了初始化操作,如果未完成则会调用 BaseExecutors#initWrapper 执行初始化,这期间主要是调用了 IBolt#prepare 方法,这也是我们在实现一个 bolt 时执行初始化的方法。如果当前 task 线程没有被销毁,则会一直循环调用 BoltExecutors#consumeExecuteQueue 消费当前 task 的消息队列。前面的分析我们知道 worker 会对接收到的消息按照 taskId 投递给对应 task 的消息队列,而消息队列的消费过程就在这里发生。针对接收到消息会逐条进行处理,这里最终调用的是 BoltExecutors#onEvent 方法,处理的消息就是我们熟悉的 Tuple 对象,而该方法的核心就是调用 IBolt#execute 方法,也就是调用用户自定义的策略对收到的 tuple 进行处理。

再来看一下 SingleThreadSpoutExecutors 和 MultipleThreadSpoutExecutors,这两类都继承自 SpoutExecutors 类,区别仅在于对于消息的附加处理和正常的业务逻辑是否位于同一个线程中,而核心逻辑都是调用 ISpout#nextTuple 方法,也就是执行用户自定义的业务逻辑。

针对 worker 的运行机制就分析到这里,但是 Storm 对于消息的处理并没有结束,下一篇我们将一起探寻 ack 机制,看看 Storm 如何保证消息至少被执行一次(at least once)。