深入理解AQS之CountDownLatch
创始人
2024-01-20 09:32:31
0

一. 简介

CountDownLatch(闭锁)是一个同步协助类,允许一个或多个线程等待,直到其他线程完成操作集。
CountDownLatch使用给定的计数值(count)初始化。await方法会阻塞直到当前的计数值(count)由于countDown方法的调用达到0,count为0之后所有等待的线程都会被释放,并且随后对await方法的调用都会立即返回。这是一个一次性现象 —— count不会被重置。如果你需要一个重置count的版本,那么请考虑使用CyclicBarrier。

二. 使用

构造器

 public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}

常用方法

 // 调用 await() 方法的线程会被挂起,它会等待直到 count 值为 0 才继续执行
public void await() throws InterruptedException { };  
// 和 await() 类似,若等待 timeout 时长后,count 值还是没有变为 0,不再等待,继续执行
public boolean await(long timeout, TimeUnit unit) throws InterruptedException { };  
// 会将 count 减 1,直至为 0
public void countDown() {sync.releaseShared(1);}

三. 应用场景

CountDownLatch一般用作多线程倒计时计数器,强制它们等待其他一组(CountDownLatch的初始化决定)任务执行完成。
CountDownLatch的两种使用场景:

  • 让多个线程等待
  • 让单个线程等待。

场景1 让多个线程等待:模拟并发,让并发线程一起执行

 public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(1);for (int i = 0; i < 5; i++) {new Thread(() -> {try {//等待countDownLatch.await();String parter = "【" + Thread.currentThread().getName() + "】";System.out.println(parter + "开始执行……");} catch (InterruptedException e) {e.printStackTrace();}}).start();}Thread.sleep(2000);countDownLatch.countDown();}

for循环中等待阻塞,直到执行countdown方法。

场景2 让单个线程等待:多个线程(任务)完成后,进行汇总合并。

很多时候,我们的并发任务,存在前后依赖关系;比如数据详情页需要同时调用多个接口获取数据,并发请求获取到数据后、需要进行结果合并;或者多个数据操作完成后,需要数据check;这其实都是:在多个线程(任务)完成后,进行汇总合并的场景。

   public static void main(String[] args) throws Exception {CountDownLatch countDownLatch = new CountDownLatch(5);for (int i = 0; i < 5; i++) {final int index = i;new Thread(() -> {try {Thread.sleep(1000 +ThreadLocalRandom.current().nextInt(1000));System.out.println(Thread.currentThread().getName()+ " finish task" + index);countDownLatch.countDown();} catch (InterruptedException e) {e.printStackTrace();}}).start();}// 主线程在阻塞,当计数器==0,就唤醒主线程往下执行。countDownLatch.await();System.out.println("主线程:在所有任务运行完成后,进行结果汇总");}

四. 底层原理

底层基于 AbstractQueuedSynchronizer 实现,CountDownLatch 构造函数中指定的count直接赋给AQS的state;每次countDown()则都是release(1)减1,最后减到0时unpark线程;这一步是由最后一个执行countdown方法的线程执行的。
而调用await()方法时,当前线程就会判断state属性是否为0,如果为0,则继续往下执行,如果不为0,则使当前线程进入等待状态,直到某个线程将state属性置为0,其就会唤醒在await()方法中等待的线程。

构造方法

 public CountDownLatch(int count) {if (count < 0) throw new IllegalArgumentException("count < 0");this.sync = new Sync(count);}Sync(int count) {setState(count);}protected final void setState(int newState) {state = newState;}

阻塞

public void await() throws InterruptedException {sync.acquireSharedInterruptibly(1);}public final void acquireSharedInterruptibly(int arg)throws InterruptedException {if (Thread.interrupted())throw new InterruptedException();//arg为1,不为0 ,返回-1,这里小于0if (tryAcquireShared(arg) < 0)//入队阻塞doAcquireSharedInterruptibly(arg);}protected int tryAcquireShared(int acquires) {return (getState() == 0) ? 1 : -1;}

入队阻塞

private void doAcquireSharedInterruptibly(int arg)throws InterruptedException {//入队,创建节点 使用共享模式final Node node = addWaiter(Node.SHARED);boolean failed = true;try {for (;;) {//获取当前节点的前躯节点final Node p = node.predecessor();//如果节点为head节点if (p == head) {//阻塞动作比较重,通常会再尝试获取资源,没有获取到返回负数int r = tryAcquireShared(arg);if (r >= 0) {setHeadAndPropagate(node, r);p.next = null; // help GCfailed = false;return;}}//判断是否可以阻塞if (shouldParkAfterFailedAcquire(p, node) &&parkAndCheckInterrupt())throw new InterruptedException();}} finally {if (failed)cancelAcquire(node);}}

countDown方法减一

  public void countDown() {sync.releaseShared(1);}public final boolean releaseShared(int arg) {if (tryReleaseShared(arg)) {doReleaseShared();return true;}return false;}
//尝试释放共享锁
protected boolean tryReleaseShared(int releases) {// Decrement count; signal when transition to zerofor (;;) {int c = getState();if (c == 0)return false;int nextc = c-1;if (compareAndSetState(c, nextc))//减到0的时候返回true,进行唤醒return nextc == 0;}}

唤醒逻辑

 private void doReleaseShared() {for (;;) {Node h = head;if (h != null && h != tail) {int ws = h.waitStatus;if (ws == Node.SIGNAL) {//wa为-1时,将其状态设置为0,并且唤醒if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))continue;          unparkSuccessor(h);}else if (ws == 0 &&!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))continue;                // loop on failed CAS}if (h == head)                   // loop if head changedbreak;}}private void unparkSuccessor(Node node) {int ws = node.waitStatus;if (ws < 0)//ws状态小于0就将其设置为0compareAndSetWaitStatus(node, ws, 0);Node s = node.next;if (s == null || s.waitStatus > 0) {s = null;for (Node t = tail; t != null && t != node; t = t.prev)if (t.waitStatus <= 0)s = t;}if (s != null)//s不为空就调用unparkLockSupport.unpark(s.thread);}

五. CountDownLatch与Thread.join的区别

  • CountDownLatch的作用就是允许一个或多个线程等待其他线程完成操作,看起来有点类似join() 方法,但其提供了比 join() 更加灵活的API。
  • CountDownLatch可以手动控制在n个线程里调用n次countDown()方法使计数器进行减一操作,也可以在一个线程里调用n次执行减一操作。
  • 而 join() 的实现原理是不停检查join线程是否存活,如果 join 线程存活则让当前线程永远等待。所以两者之间相对来说还是CountDownLatch使用起来较为灵活。

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
一帆风顺二龙腾飞三阳开泰祝福语... 本篇文章极速百科给大家谈谈一帆风顺二龙腾飞三阳开泰祝福语,以及一帆风顺二龙腾飞三阳开泰祝福语结婚对应...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...