CountDownLatch与CyclicBarrier基本原理及区别

目录

一、CountDownLatch测试demo

二、CountDownLatch源码分析

三、cyclebarrier測試demo

四、CountDownLatch源码分析

兩者區別:


一、CountDownLatch测试demo

注:這裏調用countDownLatch.await()的綫程稱之爲主綫程

public class CountDownLatchTest {private static  final int NUM=3 ;public static void main(String[] args) throws InterruptedException {CountDownLatch countDownLatch = new CountDownLatch(NUM);for(int i = 1; i <= NUM; i++){new Thread(() -> {System.out.println(String.format("%s已经准备好了",Thread.currentThread().getName()));countDownLatch.countDown();System.out.println(String.format("还有%d位玩家未准备好",countDownLatch.getCount()));}, "玩家"+i).start();}System.out.println("主线程正在等待所有玩家准备好");countDownLatch.await();TimeUnit.SECONDS.sleep(1);System.out.println("主线程被唤醒,开始主线程逻辑");}

主线程正在等待所有玩家准备好
玩家2已经准备好了
玩家3已经准备好了
玩家1已经准备好了
还有2位玩家未准备好
还有1位玩家未准备好
还有0位玩家未准备好
主线程被唤醒,开始主线程逻辑

二、CountDownLatch源码分析

核心类AQS——抽象队列同步器AbstractQueuedSynchronizer

①主线程countDownLatch.await() ->doAcquireSharedInterruptibly死循环->第二次循环进入“标记1”的parkAndCheckInterrupt(),里面LockSupport.park(this),主线程进入等待状态;

②子线程countDownLatch.countDown(),每执行一次检查并修改AQS的state,state是否已countDown到0,如果到0则unparkSuccessor(h),里面调用LockSupport.unpark(s.thread),唤醒调用countDownLatch.await的主线程,主线程唤醒后继续上图的死循环,到“标记2”时tryAcquireShared会检查AQS的state并返回r=1退出死循环,然后执行主线程countDownLatch.await后面的逻辑

 

三、cyclebarrier測試demo

package com.example.demo;import java.io.IOException;
import java.util.concurrent.*;public class CyclicBarrierTest {public static void main(String[] args) throws IOException, InterruptedException {CyclicBarrier barrier = new CyclicBarrier(3, () -> {System.out.println("栅栏已达到开放条件");});ExecutorService executor = Executors.newFixedThreadPool(3);executor.submit(new Thread(new Runner(barrier, "1号选手")));executor.submit(new Thread(new Runner(barrier, "2号选手")));executor.submit(new Thread(new Runner(barrier, "3号选手")));executor.submit(new Thread(new Runner(barrier, "4号选手")));executor.submit(new Thread(new Runner(barrier, "5号选手")));executor.submit(new Thread(new Runner(barrier, "6号选手")));executor.submit(new Thread(new Runner(barrier, "7号选手")));System.out.println("主线程完毕");executor.shutdown();}
}class Runner implements Runnable {private CyclicBarrier barrier;private String name;public Runner(CyclicBarrier barrier, String name) {super();this.barrier = barrier;this.name = name;}@Overridepublic void run() {try {System.out.println(name + " 准备好了...");barrier.await(3, TimeUnit.SECONDS);} catch (Exception e) {e.printStackTrace();}System.out.println(name + " 起跑!");}
}

 

四、CountDownLatch源码分析

核心类——抽象队列同步器AbstractQueuedSynchronizer+Condition接口

①子綫程每次調用barrier.await,int index = --count計數器減一,如果index!=0,進入“標記1”trip.await()。trip為Condition接口類型,實現仍然是AQS,裏面是LockSupport.park(this);

如果index=0則進入“標記2”,這之前如果cyclebarrier如果傳入了barrierAction會在這裏執行【最后一个线程到达之后栅栏触发(但在释放所有线程之前),该命令只在每个屏障点运行一次】。“標記2”nextGeneration()方法裏會執行trip.signalAll()喚醒柵欄上所有綫程並重置計數器count。

 

兩者區別:

1.countDownLatch.countdown()只是在countdown到0时对"主"线程进行唤醒(如果多个线程调用await()方法,则都会被唤醒),子线程不会阻塞且会执行countDownLatch.countdown()后的逻辑,子线程会优先主线程结束;

2.circlebarrier阻塞的是barrier.await()的线程,计数器count递减到0时重置计数器,唤醒所有阻塞的线程,然后继续执行barrier.await()后的逻辑;

3.circlebarrier计数器可重置;

4.两者都是基于park/unpark等待唤醒机制实现,两者的await()都支持等待超时\中断,规避死锁

 

其它:任何一个await阻塞的线程被中断,那么栅栏就认为是打破了,所有阻塞的await将继续运行并抛出BrokenBarrierException;await超时同样如此

package com.smtc.cloud.msg.mqtt;import java.io.IOException;
import java.util.concurrent.*;public class CyclicBarrierTest {public static void main(String[] args) throws IOException, InterruptedException {CyclicBarrier barrier = new CyclicBarrier(3, () -> {System.out.println("栅栏已达到开放条件");});ExecutorService executor = Executors.newFixedThreadPool(3);executor.submit(new Thread(new Runner(barrier, "1号选手")));executor.submit(new Thread(new Runner(barrier, "2号选手")));System.out.println("主线程完毕");executor.shutdown();}
}class Runner implements Runnable {private CyclicBarrier barrier;private String name;public Runner(CyclicBarrier barrier, String name) {super();this.barrier = barrier;this.name = name;}@Overridepublic void run() {try {System.out.println(name + " 准备好了...");if (Thread.currentThread().getName().contains("2")){Thread.currentThread().interrupt();}barrier.await();} catch (Exception e) {e.printStackTrace();}System.out.println(name + " 起跑!");}
}

 

其它测试

public class ConcurrentTest  {// 请求总数public static int clientTotal = 5000;// 同时并发执行的线程数public static int threadTotal = 200;// public static int count=0;有线程安全问题,结果可能不为5000public static AtomicInteger count=new AtomicInteger(0);public static void main(String[] args) throws Exception {ExecutorService executorService = Executors.newCachedThreadPool();final Semaphore semaphore = new Semaphore(threadTotal);final CountDownLatch countDownLatch = new CountDownLatch(clientTotal);for (int i = 0; i < clientTotal ; i++) {executorService.execute(() -> {try {semaphore.acquire();//5000次循环完成,每次到这里获取令牌并-1当没有令牌就阻塞,一个线程执行完业务后释放令牌计数+1,阻塞的4800队列来抢这个信号执行Thread.sleep(1111);count.addAndGet(1);//count++;semaphore.release();} catch (Exception e) {}countDownLatch.countDown();//所有线程执行到这阻塞到countdown到0时一起执行System.out.println(countDownLatch.getCount());});}countDownLatch.await();//executorService.shutdown();System.out.println(count);}}

 


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部