JStorm 源码解析:Supervisor 的启动和运行机制

Supervisor 节点可以理解为单机任务调度器,它负责监听 nimbus 节点的任务资源分配,启动相应的 worker 进程执行 nimbus 分配给当前节点的任务,同时监测 worker 的运行状态,一旦发现有 worker 运行异常,就会杀死该 worker 进程,并将原先分配给 worker 的任务交还给 nimbus 节点进行重新分配。

Supervisor 节点的启动过程位于 Supervisor 类中,main 方法的实现比较简单,主要就是创建了一个 Supervisor 类对象,并调用实例方法 Supervisor#run,该方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public void run() {
try {
/*
* 解析配置文件:
* 1. 解析 default.yaml
* 2. 解析 storm.yaml
* 3. 解析 -Dstorm.options 指定的命令行参数
* 4. 替换所有配置项中的 JSTORM_HOME 占位符
*/
Map<Object, Object> conf = Utils.readStormConfig();

// 确保当前为集群运行模式
StormConfig.validate_distributed_mode(conf);

// 创建进程文件: ${storm.local.dir}/supervisor/pids/${pid}
this.createPid(conf);

// 创建并启动 supervisor
SupervisorManger supervisorManager = this.mkSupervisor(conf, null);

JStormUtils.redirectOutput("/dev/null");

// 注册 SupervisorManger,当 JVM 进程停止时执行 shutdown 逻辑
this.initShutdownHook(supervisorManager);

// 循环监测 shutdown 方法是否执行完毕
while (!supervisorManager.isFinishShutdown()) {
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {
}
}
}
// 省略 catch 代码块
}

整个方法的逻辑比较清晰(如代码注释),核心实现位于 Supervisor#mkSupervisor 方法中,该方法主要用于创建和启动 supervisor 节点,基本执行流程如下:

  1. 创建并清空本地存放临时文件的目录(这些文件是从 nimbus 节点下载而来);
  2. 创建 ZK 操作对象和 worker 运行错误数据上报器;
  3. 创建 LocalState 对象,并获取(或创建)当前 supervisor 节点对应的 ID;
  4. 启动心跳机制,同步节点信息到 ZK;
  5. 启动并定期执行 SyncSupervisorEvent#run() 方法(默认间隔 10 秒),从 nimbus 节点领取分配给当前节点的任务并启动执行;
  6. 启动轻量级 HTTP 服务,主要用于查看和下载当前节点的运行日志数据;
  7. 启动 supervisor 运行状况检查机制和 nimbus 节点配置同步策略。

Supervisor 节点在启动时首先会在本地创建并清空临时目录(路径:supervisor/tmp),Supervisor 从 nimbus 节点下载下来的文件会临时存放在这里,包括 stormcode.cer、stormconf.cer、stormjar.jar,以及 lib 目录下面的文件等,经过简单处理之后会将其复制到 stormdist/${topology_id} 本地目录中,supervisor 本地文件说明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
+ ${supervisor_local_dir}
| ---- + supervisor
| ---- | ---- + stormdist
| ---- | ---- | ---- + ${topology_id}
| ---- | ---- | ---- | ---- + resources: 指定 topology 程序包 resources 目录下面的所有文件
| ---- | ---- | ---- | ---- + stormjar.jar: 包含指定 topology 所有代码的 jar 文件
| ---- | ---- | ---- | ---- + stormcode.ser: 包含指定 topology 对象的序列化文件
| ---- | ---- | ---- | ---- + stormconf.ser: 包含指定 topology 的配置信息文件
| ---- | ---- + localstate: 本地状态信息
| ---- | ---- + tmp: 临时目录,从 nimbus 下载的文件的临时存储目录,简单处理之后复制到 stormdist/${topology_id}
| ---- | ---- | ---- + ${uuid}
| ---- | ---- | ---- | ---- + stormjar.jar: 从 nimbus 节点下载下来的 jar 文件
| ---- | ---- | ---- + ${topology_id}
| ---- | ---- | ---- | ---- + stormjar.jar: 包含指定 topology 所有代码的 jar 文件(从 inbox 目录复制过来)
| ---- | ---- | ---- | ---- + stormcode.ser: 包含指定 topology 对象的序列化文件
| ---- | ---- | ---- | ---- + stormconf.ser: 包含指定 topology 的配置信息文件

| ---- + workers
| ---- | ---- + ${worker_id}
| ---- | ---- | ---- + pids
| ---- | ---- | ---- | ---- + ${pid}: 指定 worker 进程 ID
| ---- | ---- | ---- + heartbeats
| ---- | ---- | ---- | ---- + ${worker_id}: 指定 worker 心跳信息(心跳时间、worker 的进程 ID)

接下来 supervisor 会创建 StormClusterState 对象,用于操作 ZK 集群,同时还会创建一个 WorkerReportError 类对象,用于上报 worker 的运行错误数据到 ZK,该类仅包含一个实例方法 report,用于执行上报逻辑。然后 supervisor 节点会创建一个 LocalState 对象用于存储节点的状态信息,这是一个简单、低效的键值存储数据库,每一次操作都会落盘,在这里对应的落盘目录是 supervisor/localstate。Supervisor 的 ID(UUID 字符串) 就存储在该数据库中,supervisor 启动时会先尝试从本地状态信息对象中获取 ID 值,如果不存在的话就会创建一个新的 UUID 字符串作为 ID。

Supervisor 节点在启动的过程中会初始化心跳机制,间隔指定时间将当前节点的相关信息上报给 ZK(路径:supervisors/${supervisor_id}),包含当前 supervisor 节点的主机名、ID、最近一次上报时间、截止上次上报节点的运行时间,以及 worker 端口列表信息。相关信息的初始化在 Heartbeat 类对象实例化时进行设置,期间会依据当前机器 CPU 核心数和物理内存大小计算允许的 worker 端口数目,并默认从 6800 端口号开始分配 worker 端口。Supervisor 节点会启动一个线程,默认每间隔 60 秒调用 Heartbeat#update 方法同步心跳信息到 ZK,该方法的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public void update() {
// 更新本次上报时间为当前时间(单位:秒)
supervisorInfo.setTimeSecs(TimeUtils.current_time_secs());
// 更新截止目前节点的运行时间(单位:秒)
supervisorInfo.setUptimeSecs(TimeUtils.current_time_secs() - startTime);

// 依据具体配置和资源占用,调整端口号列表
this.updateSupervisorInfo();

try {
// 将 supervisor 信息写入 ZK:supervisors/${supervisor_id}
stormClusterState.supervisor_heartbeat(supervisorId, supervisorInfo);
} catch (Exception e) {
LOG.error("Failed to update SupervisorInfo to ZK", e);
}
}

具体过程如代码注释,下面是一个实际的心跳信息示例:

1
2
3
4
5
6
7
8
9
{
"hostName": "10.38.164.192",
"supervisorId": "980bbcfd-5438-4e25-aee9-bf411304a446",
"timeSecs": 1533373753,
"uptimeSecs": 2879598,
"workerPorts": [
6912, 6900, 6901, 6902, 6903, 6904, 6905, 6906, 6907, 6908, 6909, 6910, 6911
]
}

下面来重点看一下 supervisor 节点领取分配给当前节点的任务并启动执行的过程。该过程的实现代码块如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/*
* 5. 启动并定期执行 SyncSupervisorEvent#run() 方法(默认间隔 10 秒),从 nimbus 节点领取分配给当前节点的任务并启动执行
*/
ConcurrentHashMap<String, String> workerThreadPids = new ConcurrentHashMap<>();
SyncProcessEvent syncProcessEvent = new SyncProcessEvent(
supervisorId, conf, localState, workerThreadPids, sharedContext, workerReportError, stormClusterState);

EventManagerImp syncSupEventManager = new EventManagerImp();
AsyncLoopThread syncSupEventThread = new AsyncLoopThread(syncSupEventManager);
threads.add(syncSupEventThread);

SyncSupervisorEvent syncSupervisorEvent = new SyncSupervisorEvent(
supervisorId, conf, syncSupEventManager, stormClusterState, localState, syncProcessEvent, hb);

// ${supervisor.monitor.frequency.secs},默认为 10 秒
int syncFrequency = JStormUtils.parseInt(conf.get(Config.SUPERVISOR_MONITOR_FREQUENCY_SECS));
EventManagerPusher syncSupervisorPusher = new EventManagerPusher(syncSupEventManager, syncSupervisorEvent, syncFrequency);
/*
* 每间隔一段时间(默认为 10 秒)调用 EventManagerPusher#run(),
* 本质上是调用 EventManagerImp#add(RunnableCallback) 将 syncSupervisorEvent 记录到自己的阻塞队列中,
* 同时 EventManagerImp 也会循环消费阻塞队列,取出其中的 syncSupervisorEvent,并应用其 run 方法:SyncSupervisorEvent#run()
*/
AsyncLoopThread syncSupervisorThread = new AsyncLoopThread(syncSupervisorPusher);
threads.add(syncSupervisorThread);

要理解该过程的运行机制,我们应该倒着来看相应的源码实现,首先看一下代码块的倒数第二行:

1
AsyncLoopThread syncSupervisorThread = new AsyncLoopThread(syncSupervisorPusher);

由前面我们对 Storm 基本线程模型的分析可以知道,这行代码会启动一个线程去循环执行入参回调的 run 方法,这里也就是 EventManagerPusher#run 方法,该方法的实现比较简单:

1
2
3
4
@Override
public void run() {
eventManager.add(event);
}

也就是不断的调用 EventManager#add 方法(默认间隔时间为 10 秒),继续往前看我们知道这里的 EventManager 类实际实现是 EventManagerImp,而不断的调用其 add 方法添加的 event 本质上就是一个 SyncSupervisorEvent 实例对象。EventManagerImp 维护了一个阻塞队列来不断记录加入的 event,它本身也是一个回调,再往前看我们就可以看到它在实例化时也被 AsyncLoopThread 启动,EventManagerImp#run 方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
public void run() {
try {
RunnableCallback r = queue.take();
if (r == null) {
return;
}
r.run();
e = r.error();
this.processInc();
} catch (InterruptedException e) {
LOG.info("Interrupted when processing event.");
}
}

该方法就是不断的从阻塞队列中取出相应的回调并应用其 run 方法,也就是不断应用 SyncSupervisorEvent#run 方法。

以上就是步骤五的整体逻辑,简单描述就是定期的往阻塞队列中添加 SyncSupervisorEvent 事件,而线程会循环的消费队列,取出事件并应用事件的 run 方法。下面来深入分析一下 SyncSupervisorEvent 的 run 方法,该方法所做的工作也就是 supervisor 的核心逻辑,主要可以概括为 3 点:

  1. 从 ZK 上下载任务分配信息,并更新到本地;
  2. 从 nimbus 节点上下载 topology 对应的 jar 和配置文件;
  3. 启动 worker 执行分配给当前 supervisor 的 topology 任务。

SyncSupervisorEvent#run 方法的实现比较长,下面按照执行步骤逐步拆分进行分析,首先来看一下从 ZK 上下载任务分配信息,并更新到本地的过程,相应实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
/*
* 1.1. 同步所有 topology 的任务分配信息及其版本信息到本地
*/
if (healthStatus.isMoreSeriousThan(HealthStatus.ERROR)) {
// 检查当前 supervisor 的状态信息,如果是 PANIC 或 ERROR,则清除所有本地的任务分配相关信息
assignmentVersion.clear();
assignments.clear();
LOG.warn("Supervisor machine check status: " + healthStatus + ", killing all workers.");
} else {
// 同步所有 topology 的任务分配信息及其版本(即更新 assignmentVersion 和 assignments 参数)
this.getAllAssignments(assignmentVersion, assignments, syncCallback);
}
LOG.debug("Get all assignments " + assignments);

/*
* 1.2. 从 supervisor 本地(supervisor/stormdist/)获取已经下载的所有的 topologyId
*/
List<String> downloadedTopologyIds = StormConfig.get_supervisor_toplogy_list(conf);
LOG.debug("Downloaded storm ids: " + downloadedTopologyIds);

/*
* 1.3. 获取分配给当前 supervisor 的任务信息:<port, LocalAssignments>
*/
Map<Integer, LocalAssignment> zkAssignment = this.getLocalAssign(stormClusterState, supervisorId, assignments);

/*
* 1.4. 更新 supervisor 本地的任务分配信息
*/
Map<Integer, LocalAssignment> localAssignment;
try {
LOG.debug("Writing local assignment " + zkAssignment);
localAssignment = (Map<Integer, LocalAssignment>) localState.get(Common.LS_LOCAL_ASSIGNMENTS); // local-assignments
if (localAssignment == null) {
localAssignment = new HashMap<>();
}
localState.put(Common.LS_LOCAL_ASSIGNMENTS, zkAssignment);
} catch (IOException e) {
LOG.error("put LS_LOCAL_ASSIGNMENTS " + zkAssignment + " to localState failed");
throw e;
}

Supervisor 节点在本地会缓存任务分配信息,同时会定期从 ZK 同步最新的任务分配信息到本地,从 ZK 上获取任务分配信息的逻辑位于 SyncSupervisorEvent#getAllAssignments 方法中,方法会从 ZK 的 assignments 路径下获取所有的 topologyId,并与本地比较对应 topology 的任务分配信息版本,如果版本有更新则更新本地缓存的任务分配信息。

接下来 supervisor 会计算所有需要下载的 topology,包括需要更新的、需要重新下载的(之前下载有失败),以及在当前节点进行灰度的,并从 nimbus 节点下载各个 topology 对应的文件,包括 stormjar.jar、stormcode.ser、stormconf.ser,以及 lib 目录下面的依赖文件(如果存在的话),最后从本地删除那些之前下载过但是本次未分配给当前 supervisor 节点的 topology 文件,相应实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
/*
* 2.1. 获取所有需要执行下载操作的 topology_id 集合(包括需要更新的、需要重新下载,以及在当前节点灰度的)
*/
Set<String> updateTopologies = this.getUpdateTopologies(localAssignment, zkAssignment, assignments);
Set<String> reDownloadTopologies = this.getNeedReDownloadTopologies(localAssignment);
if (reDownloadTopologies != null) {
updateTopologies.addAll(reDownloadTopologies);
}
// 获取灰度发布且指定在当前 supervisor 的 topology:[topology_id, Pair(host, port)]
Map<String, Set<Pair<String, Integer>>> upgradeTopologyPorts =
this.getUpgradeTopologies(stormClusterState, localAssignment, zkAssignment);
if (upgradeTopologyPorts.size() > 0) {
LOG.info("upgrade topology ports:{}", upgradeTopologyPorts);
updateTopologies.addAll(upgradeTopologyPorts.keySet());
}

/*
* 2.2. 从 nimbus 下载对应的 topology 任务代码
*/
// 从 ZK 上获取分配给当前 supervisor 的 [topologyId, master-code-dir] 信息
Map<String, String> topologyCodes = getTopologyCodeLocations(assignments, supervisorId);
// downloadFailedTopologyIds which can't finished download binary from nimbus
Set<String> downloadFailedTopologyIds = new HashSet<>(); // 记录所有下载失败的 topologyId
// 从 nimbus 下载相应的 topology jar 文件到 supervisor 本地
this.downloadTopology(topologyCodes, downloadedTopologyIds, updateTopologies, assignments, downloadFailedTopologyIds);

/*
* 2.3. 删除无用的 topology 相关文件(之前下载过,但是本次未分配给当前 supervisor)
*/
this.removeUselessTopology(topologyCodes, downloadedTopologyIds);

文件下载的逻辑位于 SyncSupervisorEvent#downloadTopology 方法中,文件下载的过程可以概括为以下 5 个步骤:

  1. 从 nimbus 上下载 topology 相关文件到 supervisor 的临时目录:${storm.local.dir}/supervisor/tmp/${uuid}
  2. 抽取 stormjar.jar 的 resources 文件;
  3. 将临时目录下的文件移动到 ${storm.local.dir}/supervisor/stormdist/${topology_id} 目录;
  4. 清空临时目录;
  5. 添加对应的时间戳文件:${storm.local.dir}/supervisor/stormdist/${topology_id}/timestamp

最后 supervisor 节点会调用 SyncProcessEvent#run 方法杀死状态异常的 worker,同时启动新的 worker 执行分配的任务:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
/*
* 3. kill bad workers, start new workers
*/
syncProcesses.run(zkAssignment, downloadFailedTopologyIds, upgradeTopologyPorts);

// SyncProcessEvent#run
public void run(Map<Integer, LocalAssignment> localAssignments,
Set<String> downloadFailedTopologyIds,
Map<String, Set<Pair<String, Integer>>> upgradeTopologyPorts) {

LOG.debug("Syncing processes, interval (sec): " + TimeUtils.time_delta(lastTime));
lastTime = TimeUtils.current_time_secs();
try {
if (localAssignments == null) {
localAssignments = new HashMap<>();
}
LOG.debug("Assigned tasks: " + localAssignments);

/*
* 3.1 获取本地所有 worker 的状态信息:Map<worker_id [WorkerHeartbeat, state]>
*/
Map<String, StateHeartbeat> localWorkerStats;
try {
// Map[workerId, [worker heartbeat, state]]
localWorkerStats = this.getLocalWorkerStats(conf, localState, localAssignments);
} catch (Exception e) {
LOG.error("Failed to get local worker stats");
throw e;
}
LOG.debug("Allocated: " + localWorkerStats);

/*
* 3.2 杀死无用的 worker,并从 localWorkerStats 中移除
*/
Map<String, Integer> taskCleanupTimeoutMap;
Set<Integer> keepPorts = null;
try {
// [topology_id, cleanup_second]
taskCleanupTimeoutMap = (Map<String, Integer>) localState.get(Common.LS_TASK_CLEANUP_TIMEOUT); // task-cleanup-timeout
// 对于一些状态为 disallowed/timedOut 的 worker 进行 kill,并清空相应的数据,同时返可用的 worker port
keepPorts = this.killUselessWorkers(localWorkerStats, localAssignments, taskCleanupTimeoutMap);
localState.put(Common.LS_TASK_CLEANUP_TIMEOUT, taskCleanupTimeoutMap);
} catch (IOException e) {
LOG.error("Failed to kill workers", e);
}

// 3.3 检测 worker 是否正在启动中,清空处于运行态和启动失败 worker 的相应数据(workerIdToStartTimeAndPort 和 portToWorkerId)
this.checkNewWorkers(conf);

// 3.4 标记需要重新下载的 topology(没有启动成功,同时下载时间已经超过 2 分钟)
this.checkNeedUpdateTopologies(localWorkerStats, localAssignments);

// 3.5 启动新的 worker 执行 topology 任务
this.startNewWorkers(keepPorts, localAssignments, downloadFailedTopologyIds);

// 3.6 启动相应的 worker 执行在当前节点 灰度的 topology 任务
this.restartUpgradingWorkers(localAssignments, localWorkerStats, upgradeTopologyPorts);

} catch (Exception e) {
LOG.error("Failed to init SyncProcessEvent", e);
}
}

无论是新任务分配,还是灰度更新,启动 worker 的过程都是调用了 SyncProcessEvent#startWorkers 方法,该方法为每个新的 worker 基于 UUID 创建一个 workerId,以及进程目录 ${storm.local.dir}/workers/${worker_id}/pids,并调用 SyncProcessEvent#doLaunchWorker 方法启动 worker,同时更新 worker 在本地的相应数据。Worker 进程的启动和运行机制将在下一篇中进行详细说明。

在分析 nimbus 节点启动过程中有一步会启动一个 HTTP 服务,用于接收查询 nimbus 节点本地日志和配置等数据的需求,supervisor 节点的启动过程也同样包含这样一个过程。Supervisor 的 HTTP 服务默认会监听在 7622 端口,用于接收来自 UI 的请求。

最后对于集群模式,如果配置了 supervisor.enable.check=true 则 supervisor 节点在启动时会创建一个线程用于定期检查 supervisor 的运行状况,另外还会启动一个线程用于同步 nimbus 的配置信息到本地节点。最后会创建并返回一个 SupervisorManger 类对象,用于对于当前 supervisor 节点进行管理。

到此,supervisor 节点基本启动完成了,supervisor 会定期基于 ZK 从 nimbus 节点领取任务,然后启动 worker 去执行任务,而启动 worker 的过程我们将在下一篇中进行详细分析。