深入理解 JUC:SynchronousQueue

本文我们一起来分析一下 SynchronousQueue 的设计与实现。不同于前面介绍的一系列线程安全队列,SynchronousQueue 从真正意义上来说并不能算是一个队列,而将其理解为一个用于线程之间通信的组件更为恰当。SynchronousQueue 没有容量的概念,一个线程在执行完入队列操作之后,必须等待另外一个线程与之匹配完成出队列后方可继续再次入队列,反之亦然。此外,有别于我们通常理解的队列中的结点只承载元素,SynchronousQueue 中的结点还需要附着对应的操作线程,这些线程在对应的结点上等待被匹配(fulfill)。

SynchronousQueue 实现自 BlockingQueue 接口,底层基于 LockSupport 工具类 实现线程的阻塞和唤醒操作,并依赖 CAS 保证线程安全。在构造 SynchronousQueue 对象时,允许通过参数指定是否启用公平模式。SynchronousQueue 基于 Dual Stack 数据结构实现非公平的线程通信,基于 Dual Queue 数据结构实现公平的线程通信。SynchronousQueue 的公平模式因为减少了线程之间的冲突,在竞争频繁的场景下反而具备更高的性能,而非公平模式能够更好的维持线程局部性(thread locality),减少线程上下文切换的开销。

SynchronousQueue 示例

本小节我们以“生产者-消费者”示例演示 SynchronousQueue 的基本使用,在示例中我们设置了一个生产者和两个消费者,以展示 SynchronousQueue 公平性特征。示例实现如下(省略了异常处理):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
private static BlockingQueue<Integer> queue = new SynchronousQueue<>(true);

private static class Producer implements Runnable {

@Override
public void run() {
int count = 0;
while (count < 10) {
int val = count++;
System.out.println("Producer produce: " + val);
queue.put(val);
TimeUnit.SECONDS.sleep(1);
}
}
}

private static class Consumer implements Runnable {

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
System.out.println("Consumer " + Thread.currentThread().getName() + " consume: " + queue.take());
}
}
}

public static void main(String[] args) {
Thread producer = new Thread(new Producer());
Thread consumer1 = new Thread(new Consumer());
Thread consumer2 = new Thread(new Consumer());
consumer1.setName("A");
consumer2.setName("B");

producer.start();
consumer1.start();
consumer2.start();
}

运行输出如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Producer produce: 0
Consumer A consume: 0
Producer produce: 1
Consumer A consume: 1
Producer produce: 2
Consumer B consume: 2
Producer produce: 3
Consumer A consume: 3
Producer produce: 4
Consumer B consume: 4
Producer produce: 5
Consumer A consume: 5
Producer produce: 6
Consumer B consume: 6
Producer produce: 7
Consumer A consume: 7
Producer produce: 8
Consumer B consume: 8
Producer produce: 9
Consumer A consume: 9

可以看到,当生产者往 SynchronousQueue 中插入一个元素之后,生产者线程会等待消费者完成消费,而消费者线程在完成消费之后会等待生产者生产。SynchronousQueue 的公平性特性尽可能保证了消费者 A 和 B 能够交替执行消费操作。

在上述示例中,如果我们将 Producer 入队列的方法由 put 改为 offer,那么在 Consumer 入队列成功之前,Producer 始终不能入队列成功,这对于一般的队列而言显得有些奇怪。实际上,这里说的不能成功入队列不够准确,要知道 offer 是一类带有超时机制的方法,也就是说当 Producer 在将某个元素执行入队列之后,它希望有一个 Consumer 能够在自己期望的时间内与该元素进行匹配,否则就只能返回 false,从表象上来看就是没有入队列成功。实际应用中我们需要考虑此类表象是否符合自己的业务场景,如果不满足则可以考虑使用 put 方法执行入队列操作。

核心方法实现

SynchronousQueue 实现自 BlockingQueue 接口,但并未对接口中声明的方法全部支持,例如 SynchronousQueue 的 SynchronousQueue#peek 方法就始终返回 null,在使用时推荐先阅读 API 文档,避免影响程序的正确性。本文主要分析 SynchronousQueue 的实现机制,所以下面重点来看一下 SynchronousQueue 已实现的出队列和入队列操作。

前面我们提及到 SynchronousQueue 内部基于 Dual Stack 和 Dual Queue 数据结构实现,在 SynchronousQueue 中定义了一个 Transferer 抽象类,该类抽象了 Dual Stack 和 Dual Queue 数据结构的实现,定义如下:

1
2
3
abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}

SynchronousQueue 的出队列和入队列操作均委托给 Transferer#transfer 方法执行(如下),该方法接收 3 个参数,其中参数 e 表示待添加到队列中的元素值,对于出队列操作来说,e 始终等于 null;参数 timed 用于设置当前操作是否具备超时策略,如果是则需要使用参数 nanos 参数指定超时时间。

  • SynchronousQueue#put(E e) -> transferer.transfer(e, false, 0)
  • SynchronousQueue#offer(E) -> transferer.transfer(e, true, 0)
  • SynchronousQueue#offer(E, long, TimeUnit) -> transferer.transfer(e, true, unit.toNanos(timeout))
  • SynchronousQueue#take -> transferer.transfer(null, false, 0)
  • SynchronousQueue#poll() -> transferer.transfer(null, true, 0)
  • SynchronousQueue#poll(long, TimeUnit) -> transferer.transfer(null, true, unit.toNanos(timeout))

针对 Dual Stack 和 Dual Queue 数据结构,SynchronousQueue 分别定义了 TransferStack 和 TransferQueue 实现类,下面的小节将针对这两个类的实现机制展开分析。

在开始之前,我们先对 匹配 一词在 SynchronousQueue 中的含义进行解释,在下面的章节中将多次提及匹配的概念。我们大致已经了解到 SynchronousQueue 在内部基于栈或队列实现线程间的交互,以“生产者-消费者”为例,如果使用的是栈结构(队列亦如此),当生产者往 SynchronousQueue 中插入一个元素时,该生产者线程在插入成功之后并不会立即返回,而是等待消费者前来消费。当消费者执行消费时发现栈上正好有生产者在等待,于是执行消费逻辑,也称为开始执行匹配(fulfill)进程,将当前消费者与生产者匹配成一对儿纷纷出栈。

Dual Stack

针对 Dual Stack 数据结构,SynchronousQueue 实现了 TransferStack 类。TransferStack 继承自 Transferer 抽象类,并定义了 SNode 类描述栈上的结点。针对结点的运行模式,TransferStack 定义了 3 个 int 类型的常量字段予以描述,如下:

  1. REQUEST:标识未匹配的消费者结点。
  2. DATA:标识未匹配的生产者结点。
  3. FULFILLING:标识结点正在执行匹配操作。

栈在运行期间要么为空,要么存放着一个或多个未匹配的消费者结点或生产者结点,对应的消费者或生产者线程依附在具体的结点上等待。一个栈上不可能同时共存未匹配的消费者结点和未匹配的生产者结点,也就是说同一时间栈上所有结点的运行模式(即 SNode#mode 字段值)都应该是一致的,除了栈顶结点可能会因为正在执行匹配进程而附加 FULFILLING 状态。

SNode 类的字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static final class SNode {
/** 后继指针 */
volatile SNode next; // next node in stack
/** 记录匹配的结点,如果当前结点被取消,则指向自己 */
volatile SNode match; // the node matched to this
/** 在当前结点上等待的线程对象 */
volatile Thread waiter; // to control park/unpark
/** 结点元素值,如果是消费者结点则为 null */
Object item; // data; or null for REQUESTs
/**
* 结点运行模式:
* - 0:代表消费者结点
* - 1:代表生产者结点
* - (2 | 0) or (2 | 1):代表结点正在或已被匹配
*/
int mode;

// ... 省略方法实现

}

各字段的含义如代码注释,我们将在下面分析 TransferStack#transfer 方法实现时一并分析 SNode 中定义的方法,并对各个字段的含义结合具体场景做进一步介绍。

前面在介绍 Transferer 抽象类时,我们知道该抽象类仅声明了一个方法,即 Transferer#transfer 方法,该方法也是整个 SynchronousQueue 中最核心的实现。在开始分析 TransferStack 之于该方法的实现之前,我们先从整体出发,感知一下 TransferStack 的运行流程。

以“生产者-消费者”为例,假设当前有 3 个生产者依次执行往 SynchronousQueue 中插入元素,执行的顺序为 1 -> 2 -> 3,则入栈之后得到的栈结构如下:

1
2
3
 3 -> 2 -> 1 -> null

head

入栈后的 3 个生产者线程将在栈对应结点上等待。如果来了一个消费者执行出队列操作,此时消费者将与 head 结点上的生产者进行匹配,匹配成功之后得到的栈结构如下:

1
2
3
 2 -> 1 -> null

head

此时剩下的生产者线程将继续等待,期间可以允许新的消费者出队列,也可以允许新的生产者入队列。

上述过程就是 TransferStack#transfer 方法的核心执行逻辑,对此有了一个大概的感知之后,下面来深入分析 TransferStack#transfer 方法的具体实现。实际上在 TransferStack#transfer 方法的开头,作者已经对整个方法的运行流程给出了直观的概括,摘录如下:

  1. If apparently empty or already containing nodes of same mode, try to push node on stack and wait for a match, returning it, or null if cancelled.

  2. If apparently containing node of complementary mode, try to push a fulfilling node on to stack, match with corresponding waiting node, pop both from stack, and return matched item. The matching or unlinking might not actually be necessary because of other threads performing action 3:

  3. If top of stack already holds another fulfilling node, help it out by doing its match and/or pop operations, and then continue. The code for helping is essentially the same as for fulfilling, except that it doesn’t return the item.

方法 TransferStack#transfer 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed

// 操作模式判定,如果为 null 则说明当前是出队列操作,否则说明是入队列操作
int mode = (e == null) ? REQUEST : DATA;

for (; ; ) {
SNode h = head;
// 1. 如果栈为空,或者包含相同模式的结点,将结点入栈等待匹配
if (h == null || h.mode == mode) { // empty or same-mode
// 如果设置超时且到期
if (timed && nanos <= 0) { // can't wait
// 如果 head 结点被取消,则后移 head 指针
if (h != null && h.isCancelled()) {
this.casHead(h, h.next); // pop cancelled node
} else {
// 否则返回 null
return null;
}
}
// 否则,说明当前线程需要在栈上等待,先创建一个结点入栈,之后对应的线程会在该结点上等待
else if (this.casHead(h, s = snode(s, e, h, mode))) {
// 等待结点被匹配或取消,返回的是与当前结点匹配的结点,或者结点自己(即结点被取消)
SNode m = this.awaitFulfill(s, timed, nanos);
// 如果返回的是结点自己,则说明是被取消了
if (m == s) { // wait was cancelled
// 清理无效结点
this.clean(s);
return null;
}

/* 当前结点被匹配了 */

// 与 s 匹配的结点就是 head 结点,将 s 和 m 出栈,这里只是一个优化,不影响程序执行的正确性
if ((h = head) != null && h.next == s) {
this.casHead(h, s.next); // help s's fulfiller
}

// 如果是出队列则返回匹配结点的元素值,如果是入队列则返回新添加的结点元素值
return (E) ((mode == REQUEST) ? m.item : s.item);
}
}
// 2. 栈中包含互补模式的结点,且 head 结点不处于 FULFILLING 状态,执行匹配操作
else if (!isFulfilling(h.mode)) { // try to fulfill
// 头结点已经被取消,则后移 head 指针后重试
if (h.isCancelled()) { // already cancelled
this.casHead(h, h.next); // pop and retry
}
// 入队一个带有 FULFILLING 标志的新结点 s,同一时间栈中最多只有一个带有 FULFILLING 标志的结点,且该结点一定是 head 结点
else if (this.casHead(h, s = snode(s, e, h, FULFILLING | mode))) {
for (; ; ) { // loop until matched or waiters disappear
// 获取本次与 s 结点执行匹配的结点,也就是 s 的 next 结点
SNode m = s.next; // m is s's match
// 如果待匹配的结点为 null,说明已经被其它线程取消
if (m == null) { // all waiters are gone
// 将结点 s 出队列,并退出循环
this.casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
// 如果待匹配的结点不为 null,则尝试执行匹配
SNode mn = m.next;
if (m.tryMatch(s)) { // 尝试将结点 m 的 match 指针指向结点 s
// 匹配成功,修改头结点为已匹配结点 m 的 next 结点
this.casHead(s, mn); // pop both s and m
// 如果是出队列则返回已匹配结点的元素值,如果是入队列则返回新添加的结点元素值
return (E) ((mode == REQUEST) ? m.item : s.item);
} else { // lost match
// 匹配失败,说明结点 m 被取消,继续尝试匹配 m 的 next 结点
s.casNext(m, mn); // help unlink
}
}
}
}
// 3. 栈中包含互补模式的结点,且 head 结点处于 FULFILLING 状态
else { // help a fulfiller
SNode m = h.next; // m is h's match
if (m == null) { // waiter is gone
this.casHead(h, null); // pop fulfilling node
} else {
SNode mn = m.next;
if (m.tryMatch(h)) { // help match
this.casHead(h, mn); // pop both h and m
} else { // lost match
h.casNext(m, mn); // help unlink
}
}
}
}
}

上述实现中 for 循环内部的 if ... else if ... else 控制结构分别对应作者给出的 3 段注释(已在代码中标出),其中场景 3 主要是对场景 2 的辅助,下面重点分析场景 1 和场景 2 的实现和执行流程。

首先来看一下 场景 1 ,此时栈为空,或者栈中等待的线程运行模式与当前线程的运行模式相同,此时需要将结点入栈,并让当前线程在结点上等待。执行流程可以概括为:

  1. 如果设置了超时且已经到期,则顺带判断 head 结点是否被取消,如果是则后移 head 指针并进入下一轮循环,否则返回 null;
  2. 否则新建一个包含待添加元素 e 的结点入栈,并执行 TransferStack#awaitFulfill 方法让当前线程在该结点上等待匹配(或被取消);
  3. 如果在等待期间被取消,则清理栈上的无效结点,并返回 null;
  4. 否则说明结点被成功匹配,如果当前线程是消费者线程则返回匹配结点的元素值,如果当前线程是生产者线程则返回刚刚添加的元素值。

下面利用图示演示上述执行流程。假设当前操作线程是一个生产者,期望将元素 3 插入到 SynchronousQueue 中,并且当前栈中已经包含两个处于等待状态的生产者(如下图 1 所示)。因为当前线程与栈中等待的线程模式相同(均为 DATA),所以新建一个元素值为 3 的结点入栈(如下图 2 所示),并让当前线程在结点上等待。

image

继续分析让线程等待的 TransferStack#awaitFulfill 方法,线程会阻塞(或自旋)在该方法上等待被匹配,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
// 如果设置了超时,则计算到期时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 计算自旋的次数
int spins = (this.shouldSpin(s) ? (timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (; ; ) {
// 从阻塞中醒来,先检查期间是否被中断
if (w.isInterrupted()) {
// 如果被中断,则将结点的 match 指针指向自己,表示结点被取消
s.tryCancel();
}

// 获取与当前结点匹配的结点,要么是结点自己,要么就是某个与之匹配的结点,只要不为 null 就返回
SNode m = s.match;
if (m != null) {
return m;
}

/* 结点未被匹配或取消 */

// 如果设置了超时且已经到期,则取消结点
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel();
continue;
}
}

// 在阻塞之前先自旋几次,如果 Producer 和 Consumer 之间交互频繁,自旋相对于阻塞性能更高
if (spins > 0) {
spins = this.shouldSpin(s) ? (spins - 1) : 0;
}
// 如果结点的 waiter 为空,则设置为当前线程对象
else if (s.waiter == null) {
s.waiter = w; // establish waiter so can park next iter
}
// 未设置超时,则无限期等待
else if (!timed) {
LockSupport.park(this);
}
// 设置了超时,则超时等待
else if (nanos > spinForTimeoutThreshold) {
LockSupport.parkNanos(this, nanos);
}
}
}

上述方法首先会依据是否设置超时来计算剩余的到期时间和自旋次数,然后执行:

  1. 判断等待期间是否被中断,如果是则取消当前结点,即将结点的 match 指针指向自己;
  2. 判断结点的 match 指针是否指向 null,只要不为 null 就说明当前结点被成功匹配或取消(此时 match 指针指向结点自己),返回 match 指针指向的结点;
  3. 否则,说明结点未被匹配或取消,如果设置了超时且已经到期,则取消当前结点,并在下一轮循环中返回;
  4. 在进入阻塞之前,先尝试自旋几次;
  5. 如果自旋几次之后仍然未完成匹配则阻塞等待,依据是否设置超时来决定是无限期等待还是超时等待,并在等待之前判断当前结点上是否有绑定线程,如果未绑定则将当前线程绑定到该结点上。

由上述实现可以看到,等待的线程并没有立即阻塞,而是先尝试自旋了几次,这主要是考虑生产者和消费者频繁交互的情况。这类场景下当生产者执行入队列操作之后马上会有消费者前来执行出队列,此时生产者线程无需被阻塞,只需要自旋几次即被匹配成功,从而避免线程阻塞和唤醒所带来的性能开销。如果生产者和消费者交互并不频繁,因为自旋的次数并不多,所以不会造成太多的 CPU 开销,几乎可以忽略。

如果结点在等待期间被取消,则上述方法会将结点的 match 指针指向自己,后续流程会基于该特征识别被取消的结点,并调用 TransferStack#clean 方法执行清理工作,该方法实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
void clean(SNode s) {
s.item = null; // forget item
s.waiter = null; // forget thread

// 寻找 s 的最近一个后继有效(未被取消)结点,作为本次遍历的哨兵(sentinel)结点
SNode past = s.next;
if (past != null && past.isCancelled()) {
past = past.next;
}

// 从头开始遍历,将 head 指针指向第一个有效(未被取消)结点
SNode p;
while ((p = head) != null && p != past && p.isCancelled()) {
this.casHead(p, p.next);
}

// 从当前有效的头结点开始遍历,直到遇到哨兵结点,移除期间遇到的无效结点
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled()) {
p.casNext(n, n.next);
} else {
p = n;
}
}
}

清理的过程首先会确立一个哨兵(sentinel)结点,该结点是位于结点 s 之后最近一个有效(未被取消)的结点,然后从栈顶开始遍历清除那些已经被取消的结点。至于为什么需要设置一个哨兵结点,考虑在并发场景下结点 s 可能已经被其它线程移除,设置哨兵结点能够避免对整个栈进行遍历。

接着来看一下 场景 2 ,此时栈中正在等待的线程运行模式与当前线程互补(可以简单理解为栈中等待的线程都是生产者,而当前线程是消费者),并且此时没有线程正在执行匹配操作,所以进入匹配进程。本次与当前线程匹配的是 head 结点上的线程,所以首先需要从上至下在栈上找到第一个有效(未被取消)的 head 结点,然后执行:

  1. 创建一个结点元素为 e,附加 FULFILLING 标志的结点 s,并将结点入栈;
  2. 获取本次待与 s 匹配的结点 m,如果 m 为 null 则说明栈上已经没有处于等待的结点,需要退出匹配进程并继续判定接下去进入哪个场景;
  3. 否则,调用 SNode#tryMatch 方法执行匹配操作;
  4. 如果匹配成功则后移 head 指针,并返回(如果当前线程是消费者线程则返回匹配结点的元素值,如果当前线程是生产者线程则返回刚刚添加的元素值);
  5. 如果匹配失败,说明结点 m 已经被取消,尝试继续匹配 m 的后继结点。

下面利用图示演示上述执行流程。如下图 1 所示,假设当前操作线程是一个消费者(图中黄色结点),期望对 SynchronousQueue 执行出队列操作,并且当前栈中已经包含两个处于等待状态的生产者(图中青色结点)。因为当前线程与栈中等待的线程模式互补,所以新建一个元素值为 null 的结点入栈(如下图 2 所示),并附加 FULFILLING 标志(图中红色结点)。

image

然后开始执行匹配进程,设置 m 和 mn 指针,如上图 3 所示。在成功执行完 SNode#tryMatch 方法之后会将结点 m 的 match 指针指向结点 s,表示结点 m 和 s 匹配成功,如上图 4 所示。

继续来分析一下执行匹配进程的 SNode#tryMatch 方法,实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
boolean tryMatch(SNode s) {
// 基于 CAS 将当前结点的 match 字段设置为 s 结点
if (match == null && UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
// 唤醒阻塞在当前结点上的线程
LockSupport.unpark(w);
}
return true;
}
return match == s;
}

匹配的过程核心在于将待匹配结点的 match 指针指向当前操作线程对应的结点。

关于 Dual Stack 的运行机制就介绍这么多,受栈 FILO 特性的约束,基于 Dual Stack 的 SynchronousQueue 始终在栈顶执行入队列和出队列操作,后入队的线程会先被匹配,这也解释了为什么基于 Dual Stack 的 SynchronousQueue 是非公平的。基于 Dual Stack 的 SynchronousQueue 潜在的一个问题是可能导致先入队的线程长期得不到匹配而饥饿,而优点在于能够更好的维持线程局部性(thread locality),减少线程上下文切换的开销。

Dual Queue

针对 Dual Queue 数据结构,SynchronousQueue 实现了 TransferQueue 类,TransferQueue 同样继承自 Transferer 抽象类,并定义了 QNode 类描述队列上的结点。TransferQueue 定义了 TransferQueue#headTransferQueue#tail 指针字段,分别指向队列的头结点和尾结点。

QNode 类的字段定义如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
static final class QNode {
/** 后继指针 */
volatile QNode next; // next node in queue
/** 结点元素值,如果等于结点自己则说明被取消 */
volatile Object item; // CAS'ed to or from null
/** 在当前结点上等待的线程对象 */
volatile Thread waiter; // to control park/unpark
/** 标识当前是消费者结点,还是生产者结点 */
final boolean isData;

// ... 省略方法定义

}

各字段的含义如代码注释,其中 QNode#isData 字段用于标识对应结点是生产者结点还是消费者结点。不同于 TransferStack 的 SNode 需要使用 SNode#mode 字段描述结点是未匹配的生产者、未匹配的消费者,或者是正在匹配中等状态,TransferQueue 因为出队列和入队列分别在 head 和 tail 结点上执行,所以无需定义专门的字段描述结点的运行模式。我们将在下面分析 TransferQueue#transfer 方法实现时一并分析 QNode 中定义的方法,并对各个字段的含义结合具体场景做进一步介绍。

在开始分析 TransferQueue 之于 Transferer#transfer 方法的实现之前,我们还是先从整体出发,感知一下 TransferQueue 的运行流程。同样以“生产者-消费者”为例,假设当前有 3 个生产者依次执行往 SynchronousQueue 中插入元素,执行的顺序为 1 -> 2 -> 3,则入队列之后得到的队列结构如下:

1
2
3
 1 -> 2 -> 3 -> null
↓ ↓
head tail

入队列后的 3 个生产者线程将在队列对应结点上等待。如果来了一个消费者执行出队列操作,此时消费者将与 head 结点上的生产者进行匹配,匹配成功之后得到的队列结构如下:

1
2
3
 2 -> 3 -> null
↓ ↓
head tail

此时剩下的生产者线程将继续等待,期间可以允许新的消费者出队列,也可以允许新的生产者入队列。

上述过程就是 TransferQueue#transfer 方法的核心执行逻辑,对此有了一个大概的感知之后,下面来深入分析 TransferQueue#transfer 方法的具体实现。实际上在 TransferQueue#transfer 方法的开头,作者同样已经对整个方法的运行流程给出了直观的概括,摘录如下:

  1. If queue apparently empty or holding same-mode nodes, try to add node to queue of waiters, wait to be fulfilled (or cancelled) and return matching item.

  2. If queue apparently contains waiting items, and this call is of complementary mode, try to fulfill by CAS’ing item field of waiting node and dequeuing it, and then returning matching item.

方法 TransferQueue#transfer 实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
// 标识当前是生产模式还是消费模式
boolean isData = (e != null);

for (; ; ) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) { // saw uninitialized value
continue; // spin
}

// 1. 队列为空,或者包含相同模式的结点,将结点入队列等待匹配
if (h == t || t.isData == isData) { // empty or same-mode
QNode tn = t.next;
// 期间有其它线程入队列,进入下一轮循环重新获取 tail
if (t != tail) { // inconsistent read
continue;
}
// t 不是队列尾结点,尝试后移 tail 指针
if (tn != null) { // lagging tail
this.advanceTail(t, tn);
continue;
}
// 设置超时且已到期,则返回 null
if (timed && nanos <= 0) { // can't wait
return null;
}
if (s == null) {
s = new QNode(e, isData);
}
// 基于 CAS 将结点 s 添加到队列末端
if (!t.casNext(null, s)) { // failed to link in
// 添加失败则重试
continue;
}

/* 将 s 结点入队列成功 */

// 后移 tail 指针
this.advanceTail(t, s); // swing tail and wait
// 等待结点被匹配或取消,返回已匹配的结点元素值,或者结点自己(表示已取消)
Object x = this.awaitFulfill(s, e, timed, nanos);
// 结点已经被取消
if (x == s) { // wait was cancelled
// 执行清理工作
this.clean(t, s);
return null;
}

/* 结点被匹配 */

// 结点 s 的 next 指针未指向自己,表示结点 s 未出队列
if (!s.isOffList()) { // not already unlinked
// t 当前是 s 的前驱结点,也是当前的 head 结点,
// 因为 s 已经匹配,说明 s 前面的结点都已经被匹配
this.advanceHead(t, s); // unlink if head
// 将 s 的 item 指向自己,说明结点被取消
if (x != null) { // and forget fields
s.item = s;
}
s.waiter = null;
}
// 如果是出队列则返回匹配结点的元素值,如果是入队列则返回新添加的结点元素值
return (x != null) ? (E) x : e;
}
// 2. 队列中存在互补模式的结点
else { // complementary-mode
QNode m = h.next; // node to fulfill
// 期间有入队或出队操作,或者待匹配的结点 m 为 null,则进入下一轮循环
if (t != tail || m == null || h != head) {
continue; // inconsistent read
}

// 获取结点 m 的元素值
Object x = m.item;
// 如果 isData = true,则说明当前操作线程为生产者,期望 m 为消费者,即 x == null
// 如果 isData = false,则说明当前操作线程为消费者,期望 m 为生产者,即 x != null
if (isData == (x != null) // m already fulfilled
// 结点 m 已经被取消
|| x == m // m cancelled
// 尝试修改结点 m 的元素值为 e 失败
|| !m.casItem(x, e)) { // lost CAS
// 结点 m 已经被匹配,或被取消,或已经被其它线程匹配,则后移 head 指针继续
this.advanceHead(h, m); // dequeue and retry
continue;
}

// 匹配成功,后移 head 指针
this.advanceHead(h, m); // successfully fulfilled
// 唤醒阻塞在匹配结点 m 上的线程
LockSupport.unpark(m.waiter);
// 如果是出队列则返回匹配结点的元素值,如果是入队列则返回新添加的结点元素值
return (x != null) ? (E) x : e;
}
}
}

上述实现中 for 循环内部的 if ... else 控制结构分别对应作者给出的 2 段注释(已在代码中标出),在 for 循环的一开始会判断 head 或 tail 指针是否为 null,但是在 SynchronousQueue 运行期间正常是不会出现 head 或 tail 指针为 null 的情况,作者在注释中给出的解释如下:

The loop starts off with a null check guarding against seeing uninitialized head or tail values. This never happens in current SynchronousQueue, but could if callers held non-volatile/final ref to the transferer. The check is here anyway because it places null checks at top of loop, which is usually faster than having them implicitly interspersed.

下面展开分析场景 1 和场景 2 的实现和执行流程。首先来看一下 场景 1 ,此时队列为空,或者队列中等待的线程运行模式与当前线程的运行模式相同,此时需要将结点入队列,并让当前线程在结点上等待。执行流程可以概括为:

  1. 因为要入队列操作,所以要保证 tail 指向队列真正的尾结点;
  2. 如果设置了超时且已到期,则返回 null;
  3. 否则新建一个包含待添加元素 e 的结点入队列,如果失败进入下一轮循环重试,否则后移 tail 指针并调用 TransferQueue#awaitFulfill 方法让当前线程在该结点上等待匹配(或被取消);
  4. 如果在等待期间被取消,则清理队列上的无效结点,并返回 null;
  5. 否则说明结点被成功匹配,更新 head 指针,如果当前线程是消费者线程则返回匹配结点的元素值,如果当前线程是生产者线程则返回刚刚添加的元素值。

下面利用图示演示上述执行流程。假设当前操作线程是一个生产者,期望将元素 3 插入到 SynchronousQueue 中,并且当前栈中已经包含了两个处于等待状态的生产者(如下图 1 所示)。因为当前线程与队列中等待的线程模式相同(即 isData=true),所以新建一个元素值为 3 的结点入队列(如下图 2 所示),并让当前线程在结点上等待。

image

TransferQueue 实现的让线程等待的方法 TransferQueue#awaitFulfill 与 TransferStack 中实现的 TransferStack#awaitFulfill 方法在设计和实现思路上相同,这里不再重复介绍。下面来分析一下执行清理工作的 TransferQueue#clean 方法,实现如下(其中 s 是待清理的结点,pred 是 s 的前驱结点):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
void clean(QNode pred, QNode s) {
s.waiter = null; // forget thread
while (pred.next == s) { // Return early if already unlinked
QNode h = head;
QNode hn = h.next; // Absorb cancelled first node as head
// 从头开始,将 head 指针指向有效(未被取消)的头结点
if (hn != null && hn.isCancelled()) {
this.advanceHead(h, hn);
continue;
}
QNode t = tail; // Ensure consistent read for tail
// 队列已经为空,返回
if (t == h) {
return;
}
QNode tn = t.next;
// t 不是最新的 tail 结点
if (t != tail) {
continue;
}
// tail 指针未指向最新的尾结点
if (tn != null) {
this.advanceTail(t, tn);
continue;
}
// 如果待删除的 s 结点不是 tail 结点,直接清理
if (s != t) { // If not tail, try to unsplice
QNode sn = s.next;
if (sn == s || pred.casNext(s, sn)) {
return;
}
}

/* 当前待删除的 s 结点是 tail 结点 */

QNode dp = cleanMe;
// cleanMe 非空,此时该 cleanMe 的 next 指针一定不是 tail 结点,清理 cleanMe.next
if (dp != null) { // Try unlinking previous cancelled node
QNode d = dp.next; // d 是需要清理的结点
QNode dn; // d 的 next 结点
if (d == null // d is gone or
|| d == dp // d is off list or
|| !d.isCancelled() // d not cancelled or
|| (d != t // d not tail and
&& (dn = d.next) != null // has successor
&& dn != d // that is on list
&& dp.casNext(d, dn))) { // d unspliced
// 将 cleanMe 设置为 null
this.casCleanMe(dp, null);
}
if (dp == pred) {
return; // s is already saved node
}
}
// cleanMe 为空,需要将 s 结点的 pred 标记为 cleanMe,以后再清理 s 结点
else if (this.casCleanMe(null, pred)) {
return; // Postpone cleaning s
}
}
}

如果待删除结点 s 不是 tail 结点,则只需要简单移除 s 即可,否则暂时不能移除 s 结点,会导致 tail 为 null,影响后续入队列操作。针对这种场景,作者设计了一个 cleanMe 结点,该结点的 next 指针指向需要被移除的 s 结点(此时 s 为 tail 结点),当结点 s 后续不再是 tail 结点时,延后删除。

接着来看一下 场景 2 ,此时队列中正在等待的线程运行模式与当前线程互补,所以进入匹配进程。本次与当前线程匹配的是 head 结点的后继结点上的线程,所以首先需要从前往后在队列上找到第一个有效(未被取消)的 head 后继结点,然后执行:

  1. 获取 head 结点的后继结点 m;
  2. 如果结点 m 已经被匹配,或被取消,则后移 head 指针后进入下一轮循环重试;
  3. 否则,基于 CAS 尝试将结点 m 的元素值替换为 e,如果失败则说明结点 m 已经被其它线程匹配,继续后移 head 指针后进入下一轮循环重试;
  4. 否则,说明匹配成功,后移 head 指针,并唤醒在匹配结点 m 上等待的线程,如果当前线程是消费者线程则返回匹配结点的元素值,如果当前线程是生产者线程则返回刚刚添加的元素值。

下面利用图示演示上述执行流程。如下图 1 所示,假设当前操作线程是一个消费者(图中黄色结点),期望对 SynchronousQueue 执行出队列操作,并且当前队列中已经包含两个处于等待状态的生产者(图中青色结点)。因为当前线程与队列中等待的线程模式互补,所以获取 head 结点的 next 结点 m 作为待匹配结点(如下图 2 所示)。基于 CAS 尝试将结点 m 的元素值修改为 null,如下图 3 所示,然后后移 head 指针指向 m 结点,并唤醒在结点 m 上等待的线程,如下图 4 所示。

image

关于 Dual Queue 的运行机制就介绍这么多,受队列 FIFO 特性的约束,基于 Dual Queue 的 SynchronousQueue 在队头执行出队列操作,并在队尾执行入队列操作,先入队的线程通常会先被匹配,这也解释了为什么基于 Dual Queue 的 SynchronousQueue 是公平的。基于 Dual Queue 的 SynchronousQueue 因为入队和出队的冲突相对较小,所以在竞争频繁的场景下相对于非公平模式反而具有更好的性能。

总结

本文我们分析了 SynchronousQueue 的设计与实现,相对于之前文章中介绍的一系列线程安全队列而言,SynchronousQueue 在实现和使用上有其特别之处。SynchronousQueue 没有容量的概念,入队列的线程在完成入队列操作之后会在队列上等待出队列的线程前来执行出队列操作,反之亦然。SynchronousQueue 中的结点除了承载结点元素之外,还附着着相应的操作线程,这些线程在对应的结点上等待被匹配。此外,SynchronousQueue 区分公平和非公平模式,其中公平模式基于 Dual Queue 数据结构实现,非公平模式基于 Dual Stack 数据结构实现。理解 SynchronousQueue 的核心在于理解 Dual Stack 和 Dual Queue 的设计思想。

参考

  1. JDK 1.8 源码
  2. Nonblocking Concurrent Data Structureswith Condition Synchronization
  3. SynchronousQueue 的一些理解