目录
1 前言
2 常用方法
3 示例
4 解析
4.1 countDown()
4.2 await() 源码
countDownLatch( 门阀
、 计数器
)是多线程控制的一种工具 ,它用来协调各个线程之间的同步。
countDownLatch相当于一个计数器,能够使一个线程等待另外一些线程完成各自的工作后,再继续执行。这个计数器的初始值就是线程的数量,每当一个线程完成之后,计数器就进行减1,当计数器的值为0时,那么在countDownLatch上等待的线程就可以继续执行。
countDownLatch接收一个int类型的参数,表示要等待的工作线程个数
public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}
方法 | 说明 |
await() | 使当前线程进入同步队列进行等待,直到latch 的值被减到0 或者当前线程被中断,当前线程就会被唤醒。 |
await(long timeout, TimeUnit unit) | 带超时时间的await() 。 |
countDown() | 使latch 的值减1 ,如果减到了0 ,则会唤醒所有等待在这个latch 上的线程 |
getCount() | 获得latch 的数值 |
让5个子线程的任务执行完成之后再执行主线程的任务。
public class CountDownLatchDemo {private static final int THRED_NUM = 5;public static void main(String[] args) {//创建固定线城市数量的线程池ExecutorService pool = Executors.newFixedThreadPool(THRED_NUM);//如果有n个线程 就指定CountDownLatch的计数器为nCountDownLatch countDownLatch = new CountDownLatch(THRED_NUM);for (int i = 0; i < THRED_NUM; i++) {pool.execute(()->{try {System.out.println("子线程:"+Thread.currentThread().getName()+"开始执行");//模拟每个线程处理业务,耗时一秒钟Thread.sleep(1000);System.out.println("子线程:"+Thread.currentThread().getName()+"执行完成");//当前线程调用此方法,则计数减1countDownLatch.countDown();} catch (Exception e) {e.printStackTrace();}});}//阻塞当前线程,直到计数器的值为0,主线程才开始处理try {countDownLatch.await();System.out.println("等待子线程完成:,主线程"+Thread.currentThread().getName()+"开始执行,此时countDownLatch的计数器为0");} catch (Exception e) {e.printStackTrace();}//销毁线程池pool.shutdown();}}
运行结果:
注意:主线程的输出语句是在子线程结束之后再进行输出。
CountDownLatch
通过内部类Sync
来实现同步语义,而Sync又继承了AQS。
private static final class Sync extends AbstractQueuedSynchronizer {private static final long serialVersionUID = 4982264981922014374L;// 设置同步状态的值Sync(int count) {setState(count);}// 获取同步状态的值int getCount() {return getState();}// 尝试获取同步状态,只有同步状态的值为0的时候才成功protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}// 尝试释放同步状态,每次释放通过CAS将同步状态的值减1protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();// 如果同步状态的值已经是0了,不要再释放同步状态了,也不要减1了if (c == 0)return false;// 减1int nextc = c - 1;if (compareAndSetState(c, nextc))return nextc == 0;}}}
countDown()的源码如下:
public void countDown() {sync.releaseShared(1);}
调用的是AQS
的releaseShared(int arg)
方法:
public final boolean releaseShared(int arg) {// 尝试释放同步状态if (tryReleaseShared(arg)) {// 如果成功,进入自旋,尝试唤醒同步队列中头结点的后继节点doReleaseShared();return true;}return false;}
通过tryReleaseShared(arg)
尝试释放同步状态,具体的实现被Sync
重写了,源码:
protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;// 同步状态值减1int nextc = c - 1;if (compareAndSetState(c, nextc))return nextc == 0;}}
如果同步状态值减到0
,则释放成功,进入自旋,尝试唤醒同步队列中头结点的后继节点,调用的是AQS
的doReleaseShared()
函数:
private void doReleaseShared() {for (;;) {// 获取头结点Node h = head;if (h != null && h != tail) {// 获取头结点的状态int ws = h.waitStatus;// 如果是SIGNAL,尝试唤醒后继节点if (ws == Node.SIGNAL) {if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))continue; // loop to recheck cases// 唤醒头结点的后继节点unparkSuccessor(h);}else if (ws == 0 &&!h.compareAndSetWaitStatus(0, Node.PROPAGATE))continue; // loop on failed CAS}if (h == head) // loop if head changedbreak;}}
这里调用了unparkSuccessor(h)
去唤醒头结点的后继节点。
await()
源码如下:
public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}
调用的是AQS
的acquireSharedInterruptibly(int arg)
方法:
public final void acquireSharedInterruptibly(int arg)throws InterruptedException {// 如果被中断,抛出异常if (Thread.interrupted())throw new InterruptedException();// 尝试获取同步状态if (tryAcquireShared(arg) < 0)// 获取同步状态失败,自旋doAcquireSharedInterruptibly(arg);}
首先,通过tryAcquireShared(arg)
尝试获取同步状态,具体的实现被Sync
重写了,查看源码:
protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}
如果同步状态的值为0,获取成功。这就是CountDownLatch的机制,尝试获取latch的线程只有当latch的值减到0的时候,才能获取成功。
如果获取失败,则会调用AQS的doAcquireSharedInterruptibly(int arg)函数自旋,尝试挂起当前线程:
private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {// 将当前线程加入同步队列的尾部final Node node = addWaiter(Node.SHARED);try {// 自旋for (;;) {// 获取当前节点的前驱节点final Node p = node.predecessor();// 如果前驱节点是头结点,则尝试获取同步状态if (p == head) {// 当前节点尝试获取同步状态int r = tryAcquireShared(arg);if (r >= 0) {// 如果获取成功,则设置当前节点为头结点setHeadAndPropagate(node, r);p.next = null; // help GCreturn;}}// 如果当前节点的前驱不是头结点,尝试挂起当前线程if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} catch (Throwable t) {cancelAcquire(node);throw t;}}
这里,调用shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()
挂起当前线程。
参考文章:CountDownLatch详解_西瓜游侠的博客-CSDN博客_countdownlatch
上一篇:长龙航空电话24小时客服电话(长龙航空电话24小时客服电话怎么转人工) 娴欐睙闀块緳鑸┖瀹㈡湇浜哄伐鐢佃瘽 瑗垮畨闀块緳鑸┖瀹㈡湇浜哄伐鐢佃瘽
下一篇:修改微信头像显示系统维修是怎么回事(微信改头像说系统维护怎么办) 微信换头像说系统正在维修怎么办 更换微信头像显示系统维护怎么办