详解java CountDownLatch和CyclicBarrier在内部实现和场景上的区别

前言

CountDownLatch和CyclicBarrier两个同为java并发编程的重要工具类,它们在诸多多线程并发或并行场景中得到了广泛的应用。但两者就其内部实现和使用场景而言是各有所侧重的。

内部实现差异

前者更多依赖经典的AQS机制和CAS机制来控制器内部状态的更迭和计数器本身的变化,而后者更多依靠可重入Lock等机制来控制其内部并发安全性和一致性。

 public class {
   //Synchronization control For CountDownLatch.
   //Uses AQS state to represent count.
  private static final class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 4982264981922014374L;

    Sync(int count) {
      setState(count);
    }

    int getCount() {
      return getState();
    }

    protected int tryAcquireShared(int acquires) {
      return (getState() == 0) ? 1 : -1;
    }

    protected boolean tryReleaseShared(int releases) {
      // Decrement count; signal when transition to zero
      for (;;) {
        int c = getState();
        if (c == 0)
          return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
          return nextc == 0;
      }
    }
  }

  private final Sync sync;
  ... ...//
 }
 public class CyclicBarrier {
  /**
   * Each use of the barrier is represented as a generation instance.
   * The generation changes whenever the barrier is tripped, or
   * is reset. There can be many generations associated with threads
   * using the barrier - due to the non-deterministic way the lock
   * may be allocated to waiting threads - but only one of these
   * can be active at a time (the one to which {@code count} applies)
   * and all the rest are either broken or tripped.
   * There need not be an active generation if there has been a break
   * but no subsequent reset.
   */
  private static class Generation {
    boolean broken = false;
  }

  /** The lock for guarding barrier entry */
  private final ReentrantLock lock = new ReentrantLock();
  /** Condition to wait on until tripped */
  private final Condition trip = lock.newCondition();
  /** The number of parties */
  private final int parties;
  /* The command to run when tripped */
  private final Runnable barrierCommand;
  /** The current generation */
  private Generation generation = new Generation();

  /**
   * Number of parties still waiting. Counts down from parties to 0
   * on each generation. It is reset to parties on each new
   * generation or when broken.
   */
  private int count;

  /**
   * Updates state on barrier trip and wakes up everyone.
   * Called only while holding lock.
   */
  private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
  }

  /**
   * Sets current barrier generation as broken and wakes up everyone.
   * Called only while holding lock.
   */
  private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
  }

  /**
   * Main barrier code, covering the various policies.
   */
  private int dowait(boolean timed, long nanos)
    throws InterruptedException, BrokenBarrierException,
        TimeoutException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
      final Generation g = generation;

      if (g.broken)
        throw new BrokenBarrierException();

      if (Thread.interrupted()) {
        breakBarrier();
        throw new InterruptedException();
      }

      int index = --count;
      if (index == 0) { // tripped
        boolean ranAction = false;
        try {
          final Runnable command = barrierCommand;
          if (command != null)
            command.run();
          ranAction = true;
          nextGeneration();
          return 0;
        } finally {
          if (!ranAction)
            breakBarrier();
        }
      }

      // loop until tripped, broken, interrupted, or timed out
      for (;;) {
        try {
          if (!timed)
            trip.await();
          else if (nanos > 0L)
            nanos = trip.awaitNanos(nanos);
        } catch (InterruptedException ie) {
          if (g == generation && ! g.broken) {
            breakBarrier();
            throw ie;
          } else {
            // We're about to finish waiting even if we had not
            // been interrupted, so this interrupt is deemed to
            // "belong" to subsequent execution.
            Thread.currentThread().interrupt();
          }
        }

        if (g.broken)
          throw new BrokenBarrierException();

        if (g != generation)
          return index;

        if (timed && nanos <= 0L) {
          breakBarrier();
          throw new TimeoutException();
        }
      }
    } finally {
      lock.unlock();
    }
  }
  ... ... //
 }

实战 - 展示各自的使用场景

/**
 *类说明:共5个初始化子线程,6个闭锁扣除点,扣除完毕后,主线程和业务线程才能继续执行
 */
public class UseCountDownLatch {

  static CountDownLatch latch = new CountDownLatch(6);

  /*初始化线程*/
  private static class InitThread implements Runnable{

    public void run() {
      System.out.println("Thread_"+Thread.currentThread().getId()
         +" ready init work......");
      latch.countDown();
      for(int i =0;i<2;i++) {
        System.out.println("Thread_"+Thread.currentThread().getId()
           +" ........continue do its work");
      }
    }
  }

  /*业务线程等待latch的计数器为0完成*/
  private static class BusiThread implements Runnable{

    public void run() {
      try {
        latch.await();
      } catch (InterruptedException e) {
        e.printStackTrace();
      }
      for(int i =0;i<3;i++) {
        System.out.println("BusiThread_"+Thread.currentThread().getId()
           +" do business-----");
      }
    }
  }

  public static void main(String[] args) throws InterruptedException {
    new Thread(new Runnable() {
      public void run() {
        SleepTools.ms(1);
        System.out.println("Thread_"+Thread.currentThread().getId()
           +" ready init work step 1st......");
        latch.countDown();
        System.out.println("begin step 2nd.......");
        SleepTools.ms(1);
        System.out.println("Thread_"+Thread.currentThread().getId()
           +" ready init work step 2nd......");
        latch.countDown();
      }
    }).start();
    new Thread(new BusiThread()).start();
    for(int i=0;i<=3;i++){
      Thread thread = new Thread(new InitThread());
      thread.start();
    }
    latch.await();
    System.out.println("Main do ites work........");
  }
}
/**
 *类说明:共4个子线程,他们全部完成工作后,交出自己结果,
 *再被统一释放去做自己的事情,而交出的结果被另外的线程拿来拼接字符串
 */
class UseCyclicBarrier {
  private static CyclicBarrier barrier
      = new CyclicBarrier(4,new CollectThread());

  //存放子线程工作结果的容器
  private static ConcurrentHashMap<String,Long> resultMap
      = new ConcurrentHashMap<String,Long>();

  public static void main(String[] args) {
    for(int i=0;i<4;i++){
      Thread thread = new Thread(new SubThread());
      thread.start();
    }

  }

  /*汇总的任务*/
  private static class CollectThread implements Runnable{

    @Override
    public void run() {
      StringBuilder result = new StringBuilder();
      for(Map.Entry<String,Long> workResult:resultMap.entrySet()){
        result.append("["+workResult.getValue()+"]");
      }
      System.out.println(" the result = "+ result);
      System.out.println("do other business........");
    }
  }

  /*相互等待的子线程*/
  private static class SubThread implements Runnable{
    @Override
    public void run() {
      long id = Thread.currentThread().getId();
      resultMap.put(Thread.currentThread().getId()+"",id);
      try {
          Thread.sleep(1000+id);
          System.out.println("Thread_"+id+" ....do something ");
        barrier.await();
        Thread.sleep(1000+id);
        System.out.println("Thread_"+id+" ....do its business ");
        barrier.await();
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  }
}

 两者总结

1. Cyclicbarrier结果汇总的Runable线程可以重复被执行,通过多次触发await()方法,countdownlatch可以调用await()方法多次;cyclicbarrier若没有结果汇总,则调用一次await()就够了;

2. New cyclicbarrier(threadCount)的线程数必须与实际的用户线程数一致;

3. 协调线程同时运行:countDownLatch协调工作线程执行,是由外面线程协调;cyclicbarrier是由工作线程之间相互协调运行;

4. 从构造函数上看出:countDownlatch控制运行的计数器数量和线程数没有关系;cyclicbarrier构造中传入的线程数等于实际执行线程数;

5. countDownLatch在不能基于执行子线程的运行结果做处理,而cyclicbarrier可以;

6. 就使用场景而言,countdownlatch 更适用于框架加载前的一系列初始化工作等场景; cyclicbarrier更适用于需要多个用户线程执行后,将运行结果汇总再计算等典型场景;

到此这篇关于详解java CountDownLatch和CyclicBarrier在内部实现和场景上的区别的文章就介绍到这了,更多相关java CountDownLatch和CyclicBarrier区别内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

时间: 2020-05-20

Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解

Java并发编程:CountDownLatch与CyclicBarrier和Semaphore的实例详解 在java 1.5中,提供了一些非常有用的辅助类来帮助我们进行并发编程,比如CountDownLatch,CyclicBarrier和Semaphore,今天我们就来学习一下这三个辅助类的用法. 以下是本文目录大纲: 一.CountDownLatch用法 二.CyclicBarrier用法 三.Semaphore用法 若有不正之处请多多谅解,并欢迎批评指正. 一.CountDownLatch

Java并发编程包中atomic的实现原理示例详解

线程安全: 当多个线程访问某个类时,不管运行时环境采用何种调度方式或者这些进程将如何交替执行,并且在主调代码中不需要任何额外的同步或协调,这个类都能表现出正确的行为,那么就称这个类时线程安全的. 线程安全主要体现在以下三个方面: 原子性:提供了互斥访问,同一时刻只能有一个线程对它进行操作 可见性:一个线程对主内存的修改可以及时的被其他线程观察到 有序性:一个线程观察其他线程中的指令执行顺序,由于指令重排序的存在,该观察结果一般杂乱无序 引子 在多线程的场景中,我们需要保证数据安全,就会考虑同步的

java动态添加外部jar包到classpath的实例详解

java动态添加外部jar包到classpath的实例详解 前言: 在项目开发过程中我们有时候需要动态的添加外部jar包,但是具体的业务需求还没有遇到过,因为如果动态添加外部jar包后,我们就需要修改业务代码,而修改代码就需要重新启动服务,那样好像就没有必要动态添加外部jar包了,怎么样才能不重新启动服务器就可以使用最新的代码我没有找到方法,如果各位知道的话给我点建议,回归主题,实现动态添加外部jar包到classpath的方法如下: String beanClassName = "com.dy

Java与Oracle实现事务(JDBC事务)实例详解

Java与Oracle实现事务(JDBC事务)实例详解 J2EE支持JDBC事务.JTA事务和容器事务事务,这里说一下怎样实现JDBC事务. JDBC事务是由Connection对象所控制的,它提供了两种事务模式:自己主动提交和手动提交,默认是自己主动提交. 自己主动提交就是:在JDBC中.在一个连接对象Connection中.默认把每一个SQL语句的运行都当做是一个事务(即每次运行完SQL语句都会马上将操作更新到数据库). 手动提交就是:当须要一次性运行多个SQL语句,将多个SQL语句组成一个

Java 中桥接模式——对象结构型模式的实例详解

Java  中桥接模式--对象结构型模式的实例详解 一.意图 将抽象部分与它的实现部分分离,使他们都可以独立的变化. 二.适用性 以下一些情况使用Bridge模式 你不希望在抽象和它的实现部分之间有一个固定的绑定关系.例如这种情况可能因为,在程序运行时刻实现部分应可以被选择或者切换. 类的抽象以及它的实现都应该可以通过生成子类的方法加以扩充.这时Bridge模式使你可以对不同的抽象接口和实现部分进行组合,并分别对他们进行扩充. 对一个抽象的实现部分的修改应对客户不产生影响,即客户代码不必重新编译

Java基于外观模式实现美食天下食谱功能实例详解

本文实例讲述了Java基于外观模式实现美食天下食谱功能.分享给大家供大家参考,具体如下: 一.模式定义 外观模式,是软件工程师常用的一种软件设计模式.它为子系统中的一组接口提供一个统一的高层接口,使子系统更容易使用.外观模式通过一个外观接口读/写子系统中的各接口的数据资源,而客户可以通过外观接口读取内部资源库,不与子系统产生交互. 二.模式举例 1. 模式分析 我们借用美食天下菜谱中制作糖醋排骨这一道菜来说明这一模式. 2. 外观模式静态类图 3. 代码示例 3.1 创建糖醋排骨接口一ISpar

java 加密之RSA算法加密与解密的实例详解

java 加密之RSA算法加解密与解密的实例详解 前言: RSA是第一个比较完善的公开密钥算法,它既能用于加密,也能用于数字签名.RSA以它的三个发明者Ron Rivest, Adi Shamir, Leonard Adleman的名字首字母命名,这个算法经受住了多年深入的密码分析,虽然密码分析者既不能证明也不能否定RSA的安全性,但这恰恰说明该算法有一定的可信性,目前它已经成为最流行的公开密钥算法. RSA的安全基于大数分解的难度.其公钥和私钥是一对大素数(100到200位十进制数或更大)的函

java数据库开发之JDBC基础使用方法及实例详解

1.什么是JDBC JDBC是一种用于执行SQL语句的Java API,可以为多种关系数据库提供统一访问,它由一组用Java语言编写的类和接口组成.JDBC提供了一种基准,据此可以构建更高级的工具和接口,使数据库开发人员能够编写数据库应用程序 JDBC 数据库访问规范 应用程序 <-> JDBC <-> MySQL驱动 <-> MySQL                  <-> Oracle驱动 <-> Oracle 导入jar包 加载驱动 C

Java并发编程之栅栏(CyclicBarrier)实例介绍

栅栏类似闭锁,但是它们是有区别的. 1.闭锁用来等待事件,而栅栏用于等待其他线程.什么意思呢?就是说闭锁用来等待的事件就是countDown事件,只有该countDown事件执行后所有之前在等待的线程才有可能继续执行;而栅栏没有类似countDown事件控制线程的执行,只有线程的await方法能控制等待的线程执行. 2.CyclicBarrier强调的是n个线程,大家相互等待,只要有一个没完成,所有人都得等着. 场景分析:10个人去春游,规定达到一个地点后才能继续前行.代码如下 复制代码 代码如

python并发编程 Process对象的其他属性方法join方法详解

一 Process对象的join方法 在主进程运行过程中如果想并发地执行其他的任务,我们可以开启子进程,此时主进程的任务与子进程的任务分两种情况 情况一: 在主进程的任务与子进程的任务彼此独立的情况下,主进程的任务先执行完毕后,主进程还需要等待子进程执行完毕,然后统一回收资源. 这种是没有join方法 情况二: 如果主进程的任务在执行到某一个阶段时,需要等待子进程执行完毕后才能继续执行, 就需要有一种机制能够让主进程检测子进程是否运行完毕,在子进程执行完毕后才继续执行,否则一直在原地阻塞,这就是