java并发编程之同步器代码示例

同步器是一些使线程能够等待另一个线程的对象,允许它们协调动作。最常用的同步器是CountDownLatch和Semaphore,不常用的是Barrier和Exchanger

队列同步器AbstractQueuedSynchronizer是用来构建锁或者其他同步组件的基础框架,它内部使用了一个volatiole修饰的int类型的成员变量state来表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。

同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行修改,这时就需要使用同步器来提供的3个方法(getState()、setState(intnewState)/和compareAndSetState(intexpect,intupdate))来进行操作,因为他们能够保证状态的改变是安全的。子类推荐被定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取个释放的方法来供自定义同步组件使用,同步器既可以独占式的获取同步状态,也可以支持共享式的获取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、ReadWriteLock、和CountDownLatch等)。

同步器是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。他们二者直接的关系就是:锁是面向使用者的,它定义了使用者与锁交互的接口,隐藏了实现的细节;同步器则是面向锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好的隔离了使用者与实现者所需关注的领域。

同步器的设计是基于模版方法模式实现的,使用者需要继承同步器并重写这顶的方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模版方法,而这些模版方法将会调用使用者重写的方法。

同步器提供的模版方法基本上分为3类:独占式获取锁与释放同步状态、共享式获取与释放同步状态和查询同步队列中的等待线程情况。自定义同步组件将使用同步器提供的模版方法来实现自己的同步语义。倒计数器锁存器是一次性障碍,允许一个或者多个线程等待一个或者多个其它线程来做某些事情。CountDownLatch的唯一构造器带一个int类型的参数,这个int参数是指允许所有在等待线程被处理之前,必须在锁存器上调用countDown方法的次数。

EG:

package hb.java.thread;
import java.util.concurrent.CountDownLatch;
/**
 *
 * @author hb
 *     CountDownLatch最重要的方法是countDown()和await(),前者主要是倒数一次,后者是等待倒数到0,如果没有到达0
 *     ,就只有阻塞等待了。 *JAVA同步器之
 *     CountDownLatch(不能循环使用,如果需要循环使用可以考虑使用CyclicBarrier) 两种比较常规用法: 1:new
 *     CountDownLatch(1);所有的线程在开始工作前需要做一些准备工作,当所有的线程都准备到位后再统一执行时有用 2:new
 *     CountDownLatch(THREAD_COUNT);当所有的线程都执行完毕后,等待这些线程的其他线程才开始继续执行时有用
 */
public class CountDownLatchTest {
	private static final int THREAD_COUNT = 10;
	// 在调用startSingal.countDown()之前调用了startSingal.await()的线程一律等待,直到startSingal.countDown()的调用
	private static final CountDownLatch startSingal = new CountDownLatch(1);
	// 在finishedSingal的初始化记数量通过调用finishedSingal.countDown()减少为0时调用了finishedSingal.await()的线程一直阻塞
	private static final CountDownLatch finishedSingal = new CountDownLatch(
				THREAD_COUNT);
	public static void main(String[] args) throws InterruptedException {
		for (int i = 0; i < THREAD_COUNT; i++) {
			new Thread("Task " + i) {
				public void run() {
					System.out.println(Thread.currentThread().getName()
												+ " prepared!!");
					try {
						startSingal.await();
					}
					catch (InterruptedException e) {
						e.printStackTrace();
					}
					System.out.println(Thread.currentThread().getName()
												+ " finished!!");
					finishedSingal.countDown();
				}
				;
			}
			.start();
		}
		Thread.sleep(1000);
		startSingal.countDown();
		// 所有的线程被唤醒,同时开始工作.countDown 方法的线程等到计数到达零时才继续
		finishedSingal.await();
		// 等待所有的线程完成!!
		System.out.println("All task are finished!!");
	}
}
package hb.java.thread;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
 *
 * JAVA同步器之Barrier(能够循环使用,当计数器增加到Barrier的初始化计数器之后马上会被置为0为下一次循环使用做准备)
 * Barrier能够为指定的一个或多个(一般为多个)线程设置一道屏障,只有当所有的线程都到达该屏障后才能一起冲过该屏障继续其他任务 一般可以new
 * CyclicBarrier(ThreadCount)来进行初始化,也可以new
 * CyclicBarrier(ThreadCount,RunableAction)当初始化数量的线程都调用
 * 了await()方法后触发RunableAction线程,也可以通过初始化一个new
 * CyclicBarrier(ThreadCount+1)的Barrier在前置线程未执行完成时一直阻塞一个或多个
 * 后续线程,这一点类似于CountDownLatch
 */
public class BarrierTest {
	private static final int THREAD_COUNT = 10;
	private static final CyclicBarrier barrier = new CyclicBarrier(
	      THREAD_COUNT + 1, new Runnable() {
		@Override
		        public void run() {
			System.out.println("All task are prepared or finished!!");
		}
	}
	);
	public static void main(String[] args) throws InterruptedException,
	      BrokenBarrierException {
		for (int i = 0; i < THREAD_COUNT; i++) {
			new Thread("Task " + i) {
				public void run() {
					try {
						System.out.println(Thread.currentThread().getName()
						                + " prepared!!");
						barrier.await();
					}
					catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					catch (BrokenBarrierException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					// do something
					System.out.println(Thread.currentThread().getName()
					              + " finished!!");
				}
				;
			}
			.start();
		}
		barrier.await();
		// --------------开始准备循环使用--------------
		for (int i = 0; i < THREAD_COUNT; i++) {
			new Thread("Task " + i) {
				public void run() {
					// do something
					System.out.println(Thread.currentThread().getName()
					              + " finished!!");
					try {
						barrier.await();
					}
					catch (InterruptedException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
					catch (BrokenBarrierException e) {
						// TODO Auto-generated catch block
						e.printStackTrace();
					}
				}
				;
			}
			.start();
		}
		barrier.await();
	}
}
package hb.java.thread;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Exchanger;
public class ExchangerTest {
	final static Exchanger<List<String>> exchanger = new Exchanger<List<String>>();
	public static void main(String[] args) {
		new Producer("Producer", exchanger).start();
		new Consumer("Consumer", exchanger).start();
	}
	static class Producer extends Thread {
		private Exchanger<List<String>> exchanger;
		/**
     *
     */
		public Producer(String threadName, Exchanger<List<String>> exchanger) {
			super(threadName);
			this.exchanger = exchanger;
		}
		/*
     * (non-Javadoc)
     *
     * @see java.lang.Thread#run()
     */
		@Override
		    public void run() {
			List<String> products = new ArrayList<String>();
			for (int i = 0; i < 10; i++) {
				products.add("product " + i);
			}
			try {
				List<String> results = exchanger.exchange(products);
				System.out.println("get results from consumer");
				for (String s : results) {
					System.out.println(s);
				}
			}
			catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
	static class Consumer extends Thread {
		private Exchanger<List<String>> exchanger;
		/**
     *
     */
		public Consumer(String threadName, Exchanger<List<String>> exchanger) {
			super(threadName);
			this.exchanger = exchanger;
		}
		/*
     * (non-Javadoc)
     *
     * @see java.lang.Thread#run()
     */
		@Override
		    public void run() {
			List<String> products = new ArrayList<String>();
			for (int i = 0; i < 10; i++) {
				products.add("consumed " + i);
			}
			try {
				List<String> results = exchanger.exchange(products);
				System.out.println("got products from produces");
				for (String s : results) {
					System.out.println(s);
				}
			}
			catch (InterruptedException e) {
				// TODO Auto-generated catch block
				e.printStackTrace();
			}
		}
	}
}

总结

以上就是本文关于java并发编程之同步器代码示例的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:

深入分析java并发编程中volatile的实现原理

Javaweb应用使用限流处理大量的并发请求详解

java并发学习之BlockingQueue实现生产者消费者详解

如有不足之处,欢迎留言指出。

时间: 2017-11-20

Java从同步容器到并发容器的操作过程

引言 容器是Java基础类库中使用频率最高的一部分,Java集合包中提供了大量的容器类来帮组我们简化开发,我前面的文章中对Java集合包中的关键容器进行过一个系列的分析,但这些集合类都是非线程安全的,即在多线程的环境下,都需要其他额外的手段来保证数据的正确性,最简单的就是通过synchronized关键字将所有使用到非线程安全的容器代码全部同步执行.这种方式虽然可以达到线程安全的目的,但存在几个明显的问题:首先编码上存在一定的复杂性,相关的代码段都需要添加锁.其次这种一刀切的做法在高并发情况下性

详解Java利用同步块synchronized()保证并发安全

本文实例为大家分享了Java利用同步块synchronized()保证并发安全的具体代码,供大家参考,具体内容如下 package day10; /** * 同步块 * 有效地缩小同步范围 * 可以在保证并发安全的同时尽可能提高并发效率 * * 实例:模拟两个人同时进店买衣服,为提高效率 * 只在试衣服阶段进行同步排队过程,其他阶段无需排队. * @author kaixu * */ public class SyncDemo2 { public static void main(String[

Java中同步与并发用法分析

本文较为详细的分析了Java中同步与并发的用法.分享给大家供大家参考.具体分析如下: 1.同步容器类包括两部分:vector和hashtable 另一类是同步包装类,由Collections.synchronizedXXX创建.同步容器对容器的所有状态进行串行访问,从而实现线程安全. 它们存在如下问题: a) 对于符合操作,需要额外的锁保护.比如迭代,缺少则添加等条件运算. b) toString,hashCode,equals都会间接的调用迭代,都需要注意并发.   2.java5.0中的并发

Java同步容器和并发容器详解

同步容器 在 Java 中,同步容器主要包括 2 类: Vector.Stack.HashTableCollections 类中提供的静态工厂方法创建的类(由 Collections.synchronizedXxxx 等方法) Collections类中提供的静态工厂方法创建的类 Vector 实现了 List 接口,Vector 实际上就是一个数组,和 ArrayList 类似,但是Vector 中的方法都是 synchronized 方法,即进行了同步措施. Stack 也是一个同步容器,它

Java并发之传统线程同步通信技术代码详解

本文研究的主要是Java并发之传统线程同步通信技术的相关代码示例,具体介绍如下. 先看一个问题: 有两个线程,子线程先执行10次,然后主线程执行5次,然后再切换到子线程执行10,再主线程执行5次--如此往返执行50次. 看完这个问题,很明显要用到线程间的通信了, 先分析一下思路:首先肯定要有两个线程,然后每个线程中肯定有个50次的循环,因为每个线程都要往返执行任务50次,主线程的任务是执行5次,子线程的任务是执行10次.线程间通信技术主要用到wait()方法和notify()方法.wait()方

浅谈Java并发 J.U.C之AQS:CLH同步队列

CLH同步队列是一个FIFO双向队列,AQS依赖它来完成同步状态的管理,当前线程如果获取同步状态失败时,AQS则会将当前线程已经等待状态等信息构造成一个节点(Node)并将其加入到CLH同步队列,同时会阻塞当前线程,当同步状态释放时,会把首节点唤醒(公平锁),使其再次尝试获取同步状态. 在CLH同步队列中,一个节点表示一个线程,它保存着线程的引用(thread).状态(waitStatus).前驱节点(prev).后继节点(next),其定义如下: static final class Node

浅谈Java并发编程之Lock锁和条件变量

简单使用Lock锁 Java 5中引入了新的锁机制--java.util.concurrent.locks中的显式的互斥锁:Lock接口,它提供了比synchronized更加广泛的锁定操作.Lock接口有3个实现它的类:ReentrantLock.ReetrantReadWriteLock.ReadLock和ReetrantReadWriteLock.WriteLock,即重入锁.读锁和写锁.lock必须被显式地创建.锁定和释放,为了可以使用更多的功能,一般用ReentrantLock为其实例

浅谈Java并发编程基础知识

进程和线程 在并行程序中进程和线程是两个基本的运行单元,在Java并发编程中,并发主要核心在于线程 1. 进程 一个进程有其专属的运行环境,一个进程通常有一套完整.私有的运行时资源:尤其是每个进程都有其专属的内存空间. 通常情况下,进程等同于运行的程序或者应用,然而很多情况下用户看到的一个应用实际上可能是多个进程协作的.为了达到进程通信的目的,主要的操作系统都实现了Inter Process Communication(IPC)资源,例如pipe和sockets,IPC不仅能支持同一个系统中的进

浅谈Java 并发的底层实现

并发编程的目的是让程序运行更快,但是使用并发并不定会使得程序运行更快,只有当程序的并发数量达到一定的量级的时候才能体现并发编程的优势.所以谈并发编程在高并发量的时候才有意义.虽然目前还没有开发过高并发量的程序,但是学习并发是为了更好理解一些分布式架构.那么当程序的并发量不高,比如是单线程的程序,单线程的执行效率反而比多线程更高.这又是为什么呢?熟悉操作系统的应该知道,CPU是通过给每个线程分配时间片的方式实现多线程的.这样,当CPU从一个任务切换到另一个任务的时候,会保存上一个任务的状态,当执行

浅谈Java并发中的内存模型

什么是JavaMemoryModel(JMM)? JMM通过构建一个统一的内存模型来屏蔽掉不同硬件平台和不同操作系统之间的差异,让Java开发者无需关注不同平台之间的差异,达到一次编译,随处运行的目的,这也正是Java的设计目的之一. CPU和内存 在讲JMM之前,我想先和大家聊聊硬件层面的东西.大家应该都知道执行运算操作的CPU本身是不具备存储能力的,它只负责根据指令对传递进来的数据做相应的运算,而数据存储这一任务则交给内存去完成.虽然内存的运行速度虽然比起硬盘快非常多,但是和3GHZ,4GH

浅谈Java多线程处理中Future的妙用(附源码)

java 中Future是一个未来对象,里面保存这线程处理结果,它像一个提货凭证,拿着它你可以随时去提取结果.在两种情况下,离开Future几乎很难办.一种情况是拆分订单,比如你的应用收到一个批量订单,此时如果要求最快的处理订单,那么需要并发处理,并发的结果如果收集,这个问题如果自己去编程将非常繁琐,此时可以使用CompletionService解决这个问题.CompletionService将Future收集到一个队列里,可以按结果处理完成的先后顺序进队.另外一种情况是,如果你需要并发去查询一

浅谈Java线程间通信之wait/notify

Java中的wait/notify/notifyAll可用来实现线程间通信,是Object类的方法,这三个方法都是native方法,是平台相关的,常用来实现生产者/消费者模式.先来我们来看下相关定义: wait() :调用该方法的线程进入WATTING状态,只有等待另外线程的通知或中断才会返回,调用wait()方法后,会释放对象的锁. wait(long):超时等待最多long毫秒,如果没有通知就超时返回. notify() :通知一个在对象上等待的线程,使其从wait()方法返回,而返回的前提

浅谈java Collection中的排序问题

这里讨论list.set.map的排序,包括按照map的value进行排序. 1)list排序 list排序可以直接采用Collections的sort方法,也可以使用Arrays的sort方法,归根结底Collections就是调用Arrays的sort方法. public static <T> void sort(List<T> list, Comparator<? super T> c) { Object[] a = list.toArray(); Arrays.

浅谈java中的一维数组、二维数组、三维数组、多维数组

这个数组可以看做新手学习,从一维数组 到 多维 数组 循环渐进,其实看起也很简单,一看便知,众所周知,一维.二维或许经常用到,用到二维以上应该就很少了. public class test { public static void main(String[] args) { /*一维数组*/ int num[] = {0,1,2}; /*下面输出 3 行数据,0 ~ 2*/ for (int i = 0; i < num.length; i++) { System.out.println("

浅谈java指令重排序的问题

指令重排序是个比较复杂.觉得有些不可思议的问题,同样是先以例子开头(建议大家跑下例子,这是实实在在可以重现的,重排序的概率还是挺高的),有个感性的认识 /** * 一个简单的展示Happen-Before的例子. * 这里有两个共享变量:a和flag,初始值分别为0和false.在ThreadA中先给 a=1,然后flag=true. * 如果按照有序的话,那么在ThreadB中如果if(flag)成功的话,则应该a=1,而a=a*1之后a仍然为1,下方的if(a==0)应该永远不会为 * 真,