详解Java 信号量Semaphore

  Semaphore也是一个同步器,和前面两篇说的CountDownLatch和CyclicBarrier不同,这是递增的,初始化的时候可以指定一个值,但是不需要知道需要同步的线程个数,只需要在同步的地方调用acquire方法时指定需要同步的线程个数;

一.简单使用

  同步两个子线程,只有其中两个子线程执行完毕,主线程才会执行:

package com.example.demo.study;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class Study0217 {
  //创建一个信号量的实例,信号量初始值为0
  static Semaphore semaphore = new Semaphore(0);

  public static void main(String[] args) throws Exception {
    ExecutorService pool = Executors.newFixedThreadPool(3);
    pool.submit(()->{
      System.out.println("Thread1---start");
      //信号量加一
      semaphore.release();
    });

    pool.submit(()->{
      System.out.println("Thread2---start");
      //信号量加一
      semaphore.release();
    });
    pool.submit(()->{
      System.out.println("Thread3---start");
      //信号量加一
      semaphore.release();
    });
    //等待两个子线程执行完毕就放过,必须要信号量等于2才放过
    semaphore.acquire(2);
    System.out.println("两个子线程执行完毕");

    //关闭线程池,正在执行的任务继续执行
    pool.shutdown();

  }

}

这个信号量也可以复用,类似CyclicBarrier:

package com.example.demo.study;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;

public class Study0217 {
  //创建一个信号量的实例,信号量初始值为0
  static Semaphore semaphore = new Semaphore(0);

  public static void main(String[] args) throws Exception {
    ExecutorService pool = Executors.newFixedThreadPool(3);
    pool.submit(()->{
      System.out.println("Thread1---start");
      //信号量加一
      semaphore.release();
    });

    pool.submit(()->{
      System.out.println("Thread2---start");
      //信号量加一
      semaphore.release();
    });

    //等待两个子线程执行完毕就放过,必须要信号量等于2才放过
    semaphore.acquire(2);
    System.out.println("子线程1,2执行完毕");

    pool.submit(()->{
      System.out.println("Thread3---start");
      //信号量加一
      semaphore.release();
    });
    pool.submit(()->{
      System.out.println("Thread4---start");
      //信号量加一
      semaphore.release();
    });

    semaphore.acquire(2);
    System.out.println("子线程3,4执行完毕");

    //关闭线程池,正在执行的任务继续执行
    pool.shutdown();

  }

}

二.信号量原理 

  看看下面这个图,可以知道信号量Semaphore还是根据AQS实现的,内部有个Sync工具类操作AQS,还分为公平策略和非公平策略;

构造器:

//默认是非公平策略
public Semaphore(int permits) {
  sync = new NonfairSync(permits);
}
//可以根据第二个参数选择是公平策略还是非公平策略
public Semaphore(int permits, boolean fair) {
  sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}

acquire(int permits)方法:

public void acquire(int permits) throws InterruptedException {
  if (permits < 0) throw new IllegalArgumentException();
  sync.acquireSharedInterruptibly(permits);
}

//AQS中的方法
public final void acquireSharedInterruptibly(int arg)
    throws InterruptedException {
  if (Thread.interrupted()) throw new InterruptedException();
  //这里根据子类是公平策略还是非公平策略
  if (tryAcquireShared(arg) < 0)
    //获取失败会进入这里,将线程放入阻塞队列,然后再尝试,还是失败的话就调用park方法挂起当前线程
    doAcquireSharedInterruptibly(arg);
}
//非公平策略
protected int tryAcquireShared(int acquires) {
  return nonfairTryAcquireShared(acquires);
}
final int nonfairTryAcquireShared(int acquires) {
  //一个无限循环,获取state剩余的信号量,因为每调用一次release()方法的话,信号量就会加一,这里将
  //最新的信号量减去传进来的参数比较,比如有两个线程,其中一个线程已经调用了release方法,然后调用acquire(2)方法,那么
  //这里remaining的值就是-1,返回-1,然后当前线程就会被丢到阻塞队列中去了;如果另外一个线程也调用了release方法,
  //那么此时的remaining==0,所以在这里的if中会调用CAS将0设置到state
  //
  for (;;) {
    int available = getState();
    int remaining = available - acquires;
    if (remaining < 0 || compareAndSetState(available, remaining))
      return remaining;
  }
}
//公平策略
//和上面非公平差不多,只不过这里会查看阻塞队列中当前节点前面有没有前驱节点,有的话直接返回-1,
//就会把当前线程丢到阻塞队列中阻塞去了,没有前驱节点的话,就跟非公平模式一样的了
protected int tryAcquireShared(int acquires) {
  for (;;) {
    if (hasQueuedPredecessors())
      return -1;
    int available = getState();
    int remaining = available - acquires;
    if (remaining < 0 ||compareAndSetState(available, remaining))
      return remaining;
  }
}

再看看release(int permits)方法:

//这个方法的作用就是将信号量加一
public void release(int permits) {
  if (permits < 0) throw new IllegalArgumentException();
  sync.releaseShared(permits);
}
//AQS中方法
public final boolean releaseShared(int arg) {
  //tryReleaseShared尝试释放资源
  if (tryReleaseShared(arg)) {
    //释放资源成功就调用park方法唤醒唤醒AQS队列中最前面的节点中的线程
    doReleaseShared();
    return true;
  }
  return false;
}

protected final boolean tryReleaseShared(int releases) {
  //一个无限循环,获取state,然后加上传进去的参数,如果新的state的值小于旧的state,说明已经超过了state的最大值,溢出了
  //没有溢出的话,就用CAS更新state的值
  for (;;) {
    int current = getState();
    int next = current + releases;
    if (next < current) // overflow
      throw new Error("Maximum permit count exceeded");
    if (compareAndSetState(current, next))
      return true;
  }
}

private void doReleaseShared() {

  for (;;) {
    Node h = head;
    if (h != null && h != tail) {
      int ws = h.waitStatus;
      //ws==Node.SIGNAL表示节点中线程需要被唤醒
      if (ws == Node.SIGNAL) {
        if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
          continue;      // loop to recheck cases
        //调用阻塞队列中线程的unpark方法唤醒线程
        unparkSuccessor(h);
      }
      //ws == 0表示节点中线程是初始状态
      else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
        continue;        // loop on failed CAS
    }

    if (h == head)          // loop if head changed
      break;
  }
}

  以最上面的例子简单说一下,其实不是很难,首先线程1和线程2分别去调用release方法,这个方法里面会将AQS中的state加一,但是在执行这个操作之前,主线程肯定会先到acquire(2),在这个方法里面,假如默认使用非公平策略,首先获取当前的信号量state(state的初始值是0),用当前信号量减去2,如果小于0,那么当前主线程就会丢到AQS队列中阻塞;

  这个时候线程1的release方法执行了,于是就把信号量state加一(此时state==1),CAS更新state为一,成功的话,就调用doReleaseShared()方法唤醒AQS阻塞队列中最先挂起的线程(这里就是因为调用acquire方法而阻塞的主线程),主线程唤醒之后又会去获取最新的信号量,与2比较,发现还是小于0,于是又会阻塞;

  线程2此时的release方法执行完成,重复线程一的操作,主线程唤醒之后(此时state==2),又去获取最新的信号量发现是2,减去acquire方法的参数2等于0,于是就用CAS更新state的值,然后acquire方法也就执行完毕,主线程继续执行后面的代码;

  其实信号量还是很有意思的,记得在项目里,有人利用信号量实现了一个故障隔离,什么时候我可以把整理之后的代码贴出来分享一下,还是很有意思的,就跟springcloud的熔断机制差不多,场景是:比如你在service的一个方法调用第三方的接口,你不知道调不调得通,而且你不希望每次前端过来都会去调用,比如当调用失败的次数超过100次,那么五分钟之后才会再去实际调用这个第三方服务!这五分钟内前调用这个服务,就会触发我们这个故障隔离的机制,向前端返回一个特定的错误码和错误信息!

以上就是详解Java 信号量Semaphore的详细内容,更多关于Java 信号量Semaphore的资料请关注我们其它相关文章!

时间: 2020-09-14

JAVA 多线程之信号量(Semaphore)实例详解

java Semaphore 简介 信号量(Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施, 它负责协调各个线程, 以保证它们能够正确.合理的使用公共资源. 一个计数信号量.从概念上讲,信号量维护了一个许可集.如有必要,在许可可用前会阻塞每一个 acquire(),然后再获取该许可.每个 release() 添加一个许可,从而可能释放一个正在阻塞的获取者.但是,不使用实际的许可对象,Semaphore 只对可用许可的号码进行计数,并采取相应的行动.拿到信号量的线程可以进入

Java中Semaphore(信号量)的使用方法

Semaphore的作用: 在java中,使用了synchronized关键字和Lock锁实现了资源的并发访问控制,在同一时间只允许唯一了线程进入临界区访问资源(读锁除外),这样子控制的主要目的是为了解决多个线程并发同一资源造成的数据不一致的问题.在另外一种场景下,一个资源有多个副本可供同时使用,比如打印机房有多个打印机.厕所有多个坑可供同时使用,这种情况下,Java提供了另外的并发访问控制--资源的多副本的并发访问控制,今天学习的信号量Semaphore即是其中的一种. Semaphore实现

Java并发编程Semaphore计数信号量详解

Semaphore 是一个计数信号量,它的本质是一个共享锁.信号量维护了一个信号量许可集.线程可以通过调用acquire()来获取信号量的许可:当信号量中有可用的许可时,线程能获取该许可:否则线程必须等待,直到有可用的许可为止. 线程可以通过release()来释放它所持有的信号量许可(用完信号量之后必须释放,不然其他线程可能会无法获取信号量). 简单示例: package me.socketthread; import java.util.concurrent.ExecutorService;

Java 信号量Semaphore的实现

近日于LeetCode看题遇1114 按序打印,获悉一解法使用了Semaphore,顺势研究,记心得于此. 此解视Semaphore为锁,以保证同一时刻单线程的顺序执行.在此原题上,我作出如下更改. package test; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; public class

Java并发编程之Semaphore(信号量)详解及实例

Java并发编程之Semaphore(信号量)详解及实例 概述 通常情况下,可能有多个线程同时访问数目很少的资源,如客户端建立了若干个线程同时访问同一数据库,这势必会造成服务端资源被耗尽的地步,那么怎样能够有效的来控制不可预知的接入量呢?及在同一时刻只能获得指定数目的数据库连接,在JDK1.5 java.util.concurrent 包中引入了Semaphore(信号量),信号量是在简单上锁的基础上实现的,相当于能令线程安全执行,并初始化为可用资源个数的计数器,通常用于限制可以访问某些资源(物

Java Web编程之Servlet技术详解

Java Web编程之Servlet技术,知多少? 1.Servlet基础 针对Servlet技术开发,Sun公司提供了一些列接口和类,其中最重要的是javax.servlet.Servlet接口,两个重要的包是javax.servlet和javax.servlet.http,Servlet就是一种实现了Servlet接口的类,它由Web容器(Tomcat/Jetty等)负责调用并创建,用于接收和响应用户请求.Servlet接口中定义了5个抽象方法: Servlet顶层类结构如下所示: 2.第一

Java并发教程之volatile关键字详解

引言 说到多线程,我觉得我们最重要的是要理解一个临界区概念. 举个例子,一个班上1个女孩子(临界区),49个男孩子(线程),男孩子的目标就是这一个女孩子,就是会有竞争关系(线程安全问题).推广到实际场景,例如对一个数相加或者相减等等情形,因为操作对象就只有一个,在多线程环境下,就会产生线程安全问题.理解临界区概念,我们对多线程问题可以有一个好意识. Jav内存模型(JMM) 谈到多线程就应该了解一下Java内存模型(JMM)的抽象示意图.下图: 线程A和线程B执行的是时候,会去读取共享变量(临界

java多线程编程之Synchronized关键字详解

本文介绍JAVA多线程中的synchronized关键字作为对象锁的一些知识点. 所谓对象锁,就是就是synchronized 给某个对象 加锁.关于 对象锁 可参考:这篇文章  一.分析 synchronized可以修饰实例方法,如下形式: public class MyObject { synchronized public void methodA() { //do something.... } 这里,synchronized 关键字锁住的是当前对象.这也是称为对象锁的原因. 为啥锁住当

java并发编程之cas详解

CAS(Compare and swap)比较和替换是设计并发算法时用到的一种技术.简单来说,比较和替换是使用一个期望值和一个变量的当前值进行比较,如果当前变量的值与我们期望的值相等,就使用一个新值替换当前变量的值.这听起来可能有一点复杂但是实际上你理解之后发现很简单,接下来,让我们跟深入的了解一下这项技术. CAS的使用场景 在程序和算法中一个经常出现的模式就是"check and act"模式.先检查后操作模式发生在代码中首先检查一个变量的值,然后再基于这个值做一些操作.下面是一个

Java并发编程之Condition源码分析(推荐)

Condition介绍 上篇文章讲了ReentrantLock的加锁和释放锁的使用,这篇文章是对ReentrantLock的补充.ReentrantLock#newCondition()可以创建Condition,在ReentrantLock加锁过程中可以利用Condition阻塞当前线程并临时释放锁,待另外线程获取到锁并在逻辑后通知阻塞线程"激活".Condition常用在基于异步通信的同步机制实现中,比如dubbo中的请求和获取应答结果的实现. 常用方法 Condition中主要的

Java并发编程总结——慎用CAS详解

一.CAS和synchronized适用场景 1.对于资源竞争较少的情况,使用synchronized同步锁进行线程阻塞和唤醒切换以及用户态内核态间的切换操作额外浪费消耗cpu资源:而CAS基于硬件实现,不需要进入内核,不需要切换线程,操作自旋几率较少,因此可以获得更高的性能. 2.对于资源竞争严重的情况,CAS自旋的概率会比较大,从而浪费更多的CPU资源,效率低于synchronized.以java.util.concurrent.atomic包中AtomicInteger类为例,其getAn

Java并发编程预防死锁过程详解

这篇文章主要介绍了Java并发编程预防死锁过程详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 在java并发编程领域已经有技术大咖总结出了发生死锁的条件,只有四个条件都发生时才会出现死锁: 1.互斥,共享资源X和Y只能被一个线程占用 2.占有且等待,线程T1已经取得共享资源X,在等待共享资源Y的时候,不释放共享资源X 3.不可抢占,其他线程不能强行抢占线程T1占有的资源 4.循环等待,线程T1等待线程T2占有的资源,线程T2等待线程T1占有

浅谈Java并发编程之Lock锁和条件变量

简单使用Lock锁 Java 5中引入了新的锁机制--java.util.concurrent.locks中的显式的互斥锁:Lock接口,它提供了比synchronized更加广泛的锁定操作.Lock接口有3个实现它的类:ReentrantLock.ReetrantReadWriteLock.ReadLock和ReetrantReadWriteLock.WriteLock,即重入锁.读锁和写锁.lock必须被显式地创建.锁定和释放,为了可以使用更多的功能,一般用ReentrantLock为其实例

深入分析Java并发编程之CAS

在Java并发编程的世界里,synchronized 和 Lock 是控制多线程并发环境下对共享资源同步访问的两大手段.其中 Lock 是 JDK 层面的锁机制,是轻量级锁,底层使用大量的自旋+CAS操作实现的. 学习并发推荐<Java并发编程的艺术> 那什么是CAS呢?CAS,compare and swap,即比较并交换,什么是比较并交换呢?在Lock锁的理念中,采用的是一种乐观锁的形式,即多线程去修改共享资源时,不是在修改之前就加锁,而是乐观的认为没有别的线程和自己争锁,就是通过CAS的