前言
CountDownLatch是一个倒数的同步器,和其他同步器不同的是,state为0时表示获取锁成功。常用来让一个线程等待其他N个线程执行完成再继续向下执行,比如主线程等待多个请求返回结果之后再进行汇总处理。
基本使用
public class Test {
// 定义CountDownLatch
private static CountDownLatch latch = new CountDownLatch(2);
public static void main(String[] args) {
new Thread(() -> {
try {
// do something...
latch.countDown(); // 计数减一
} catch (InterruptException e) {
// 处理异常
}
}, "t1").start();
new Thread(() -> {
try {
// do something...
latch.countDown(); // 计数减一
} catch (InterruptException e) {
// 处理异常
}
}, "t2").start();
}
latch.await(); // 主线程进行阻塞,等待两个子线程执行完
System.out.println("子线程都结束了...");
}
上面的示例演示了CountDownLatch的一般使用流程:
首先定义一个CountDownLatch对象,并指定计数的初始值为2;
创建两个子线程分别去处理任务,完成之后调用latch.countDownLatch()对计数进行减1;
主线程调用latch.await()方法进行等待,计数编程1之后会唤醒主线程继续向下执行。
await
尝试获取锁
// CountDownLatch
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// AQS
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
// CountDownLatch.Sync
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
CountDownLatch的await调用的是AQS的acquireSharedInterruptibly方法,根据名称可以推断是共享模式并且可以中断,所以首先判断是否已经产生了中断,是的话就抛出异常。
没有中断就通过tryAcquireShared方法尝试去获取锁,如果state的值是0表示获取锁成功,返回1,失败就返回-1。
获取锁失败
private void doAcquireSharedInterruptibly(int arg)vthrows InterruptedException {
final Node node = addWaiter(Node.SHARED); // 添加节点到队列
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor(); // 获取后继节点
if (p == head) {
int r = tryAcquireShared(arg); // 尝试获取锁
if (r >= 0) {
setHeadAndPropagate(node, r); // 更新头节点并唤醒后继节点
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt()) // 是否可以阻塞,可以就阻塞
throw new InterruptedException(); // 阻塞被中断抛出异常
}
} finally {
if (failed)
cancelAcquire(node);
}
}
获取锁失败会执行doAcquireSharedInterruptibly方法,主要流程为:
1.当前线程封装成Node
节点添加到队列中;
2.如果前驱节点是头节点,就尝试去获取锁:
- 获取锁成功,更新头节点并唤醒后继节点;
- 获取锁失败,进入第
3
步阻塞流程;
3.判断是否可以阻塞:
- 可以阻塞,就通过
park
进行阻塞; - 不可以阻塞,重新回到第
2
步
添加节点
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode); // 创建Node
Node pred = tail; // 尾节点
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) { // 更新尾节点
pred.next = node;
return node;
}
}
enq(node);
return node;
}
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 初始化队列
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) { // 更新尾节点
t.next = node;
return t;
}
}
}
}
首先根据当前线程和等待模式创建一个Node节点对象,如果队列有尾节点的话,就直接把当前节点添加到尾节点的后面,成为新的尾节点;如果没有尾节点就先初始化队列,然后再把当前节点添加到尾节点后面。
更新头结点并唤醒后继节点
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
如果前驱节点是头节点head,表明当前节点的位置有资格去获取锁,于是调用tryAcquireShared方法尝试获取锁,如果成功了会返回1,并作为参数propagate传递给setHeadAndPropagate方法。首先把当前节点更新为头节点head,然后如果后面有等待的共享节点,就尝试唤醒后继节点。
阻塞等待
// 判断是否可以阻塞
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
// 阻塞等待
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
没有资格获取锁或者获取锁失败,就会进入阻塞流程。因为阻塞的节点是通过前驱节点来唤醒的,所以需要找到一个waitStatus为Node.SIGNAL的前驱节点,才能进入阻塞,如果没有符合条件的前驱节点就重新尝试去获取锁,不进行阻塞,一直重试直到获取锁成功为止。如果找到了符合的前驱节点,就通过LockSupport.park(this)阻塞当前线程。
countDown方法
public void countDown() {
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
因为CountDownLatch允许多个线程同时持有锁,所以是属于共享模式,通过countDown()方法释放锁时调用的是releaseShared方法,执行流程为:
- 尝试释放锁
- 释放锁成功,唤醒后继节点
释放锁
protected boolean tryReleaseShared(int releases) {
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
释放锁的流程很简单,如果state已经为0了说明已经没有锁了,释放锁就失败了;否则就让state减1表示减少一次锁,然后更新锁数量。这里和其他同步器不同的是,只有锁数量为0了在阻塞的线程才能去获取锁,所以返回的是nextc == 0。
释放锁成功
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) { // 队列中有等待的节点
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 头节点可以唤醒后继节点
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) // 防止多个线程同时唤醒
continue;
unparkSuccessor(h); // 执行唤醒操作
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
释放锁成功后会执行doReleaseShared方法,如果队列中有等待的节点,并且头节点标记为可以唤醒,那就尝试去唤醒后继节点,被唤醒的节点如果获取锁成功了会成为新的头节点,这里的h==head条件为false,因为阻塞的节点是共享模式,多个节点都可以去获取锁,所以就继续尝试去唤醒后继节点。
唤醒后继节点
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}
如果头节点的后继节点不可用了,就不能通过这个节点继续向后查找,所以这里采用的是倒序查找的方法,最终找到一个离头节点head最近的可以被唤醒的节点,然后调用LockSupport.unpark(s.thread)唤醒该节点。
以上就是Java并发编程之CountDownLatch的使用的详细内容,更多关于Java CountDownLatch的资料请关注编程网其它相关文章!