Java多线程并发执行demo代码实例

主类:MultiThread,执行并发类

package java8test;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * @param <H> 为被处理的数据类型
 * @param <T>返回数据类型
 * 知识点1:X,T为泛型,为什么要用泛型,泛型和Object的区别请看:https://www.cnblogs.com/xiaoxiong2015/p/12705815.html
 */
public abstract class MultiThread<X, T> {

  public static int i = 0;

  // 知识点2:线程池:https://www.cnblogs.com/xiaoxiong2015/p/12706153.html
  private final ExecutorService exec; // 线程池

  // 知识点3:@author Doung Lea 队列:https://www.cnblogs.com/xiaoxiong2015/p/12825636.html
  private final BlockingQueue<Future<T>> queue = new LinkedBlockingQueue<>();

  // 知识点4:计数器,还是并发包大神 @author Doug Lea 编写。是一个原子安全的计数器,可以利用它实现发令枪
  private final CountDownLatch startLock = new CountDownLatch(1); // 启动门,当所有线程就绪时调用countDown

  private final CountDownLatch endLock; // 结束门

  private final List<X> listData;// 被处理的数据

  /**
   * @param list list.size()为多少个线程处理,list里面的H为被处理的数据
   */
  public MultiThread(List<X> list) {
    if (list != null && list.size() > 0) {
      this.listData = list;
      exec = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); // 创建线程池,线程池共有nThread个线程
      endLock = new CountDownLatch(list.size()); // 设置结束门计数器,当一个线程结束时调用countDown
    } else {
      listData = null;
      exec = null;
      endLock = null;
    }
  }

  /**
   *
   * @return 获取每个线程处理结速的数组
   * @throws InterruptedException
   * @throws ExecutionException
   */
  public List<T> getResult() throws InterruptedException, ExecutionException {

    List<T> resultList = new ArrayList<>();
    if (listData != null && listData.size() > 0) {
      int nThread = listData.size(); // 线程数量
      for (int i = 0; i < nThread; i++) {
        X data = listData.get(i);
        Future<T> future = exec.submit(new Task(i, data) {

          @Override
          public T execute(int currentThread, X data) {

            return outExecute(currentThread, data);
          }
        }); // 将任务提交到线程池
        queue.add(future); // 将Future实例添加至队列
      }
      startLock.countDown(); // 所有任务添加完毕,启动门计数器减1,这时计数器为0,所有添加的任务开始执行
      endLock.await(); // 主线程阻塞,直到所有线程执行完成
      for (Future<T> future : queue) {
        resultList.add(future.get());
      }
      exec.shutdown(); // 关闭线程池
    }
    return resultList;
  }

  /**
   * 每一个线程执行的功能,需要调用者来实现
   * @param currentThread 线程号
   * @param data 每个线程被处理的数据
   * @return T返回对象
   */
  public abstract T outExecute(int currentThread, X data);

  /**
   * 线程类
   */
  private abstract class Task implements Callable<T> {

    private int currentThread;// 当前线程号

    private X data;

    public Task(int currentThread, X data) {
      this.currentThread = currentThread;
      this.data = data;
    }

    @Override
    public T call() throws Exception {

      // startLock.await(); // 线程启动后调用await,当前线程阻塞,只有启动门计数器为0时当前线程才会往下执行
      T t = null;
      try {
        t = execute(currentThread, data);
      } finally {
        endLock.countDown(); // 线程执行完毕,结束门计数器减1
      }
      return t;
    }

    /**
     * 每一个线程执行的功能
     * @param currentThread 线程号
     * @param data 每个线程被处理的数据
     * @return T返回对象
     */
    public abstract T execute(int currentThread, X data);
  }
}

结果类:ResultVO,保存返回结果,根据实际情况替换成自己的

package java8test;
public class ResultVo {
  int i;
  public ResultVo(int i) {
    this.i = i;
  }
  public ResultVo() {
    // TODO Auto-generated constructor stub
  }
}

参数类:ParamVO,传入参数类,根据实际情况替换成自己的

package java8test;

public class ParamVo {

  private int i;

  ParamVo(int i) {
    this.i = i;
  }

  public int getI() {

    return i;
  }

  @Override
  public String toString() {

    return String.valueOf(i) + " " + hashCode();
  }
}

测试类:new两个MultiThread,可以看到MultiThread这个类不存在线程安全问题。

package java8test;

import java.util.ArrayList;
import java.util.List;

public class Test {

  public static void main(String[] args) {

    try {
      List<ParamVo> splitList = new ArrayList<ParamVo>();
      for (int i = 0; i < 100; i++) {
        splitList.add(new ParamVo(i));
      }
      List<ParamVo> splitList1 = new ArrayList<ParamVo>();
      for (int i = 200; i < 300; i++) {
        splitList1.add(new ParamVo(i));
      }
      MultiThread<ParamVo, ResultVo> multiThread = new MultiThread<ParamVo, ResultVo>(splitList) {

        @Override
        public ResultVo outExecute(int currentThread, ParamVo data) {

          System.out.println("当前线程名称:" + Thread.currentThread().getName() + "当前线程号=" + currentThread
              + " data=" + data);
          i--;
          return new ResultVo(data.getI());
        }
      };

      MultiThread<ParamVo, ResultVo> multiThread1 = new MultiThread<ParamVo, ResultVo>(splitList1) {

        @Override
        public ResultVo outExecute(int currentThread, ParamVo data) {

          System.out.println("当前线程名称:" + Thread.currentThread().getName() + "当前线程号=" + currentThread
              + " data=" + data);
          i--;
          return new ResultVo(data.getI());
        }
      };
      List<ResultVo> list = multiThread.getResult();
      List<ResultVo> list1 = multiThread1.getResult();
      // 获取每一批次处理结果
      System.out.println("获取处理结果........................");
      for (ResultVo vo : list) {
        System.out.println(vo.i);
      }
      System.out.println("获取1处理结果........................");
      for (ResultVo vo : list1) {
        System.out.println(vo.i);
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

这个类也用在了生产当中,用来并发插入数据。但是事务不能被管控,需要自己保证最终事务一致。需要注意。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持我们。

时间: 2020-06-27

使用java的HttpClient实现多线程并发

说明:以下的代码基于httpclient4.5.2实现. 我们要使用java的HttpClient实现get请求抓取网页是一件比较容易实现的工作: public static String get(String url) { CloseableHttpResponseresponse = null; BufferedReader in = null; String result = ""; try { CloseableHttpClienthttpclient = HttpClient

Java多线程编程中的两种常用并发容器讲解

ConcurrentHashMap并发容器 ConcurrentHashMap可以做到读取数据不加锁,并且其内部的结构可以让其在进行写操作的时候能够将锁的粒度保持地尽量地小,不用对整个ConcurrentHashMap加锁. ConcurrentHashMap的内部结构 ConcurrentHashMap为了提高本身的并发能力,在内部采用了一个叫做Segment的结构,一个Segment其实就是一个类Hash Table的结构,Segment内部维护了一个链表数组,我们用下面这一幅图来看下Con

java编程多线程并发处理实例解析

本文主要是通过一个银行用户取钱的实例,演示java编程多线程并发处理场景,具体如下. 从一个例子入手:实现一个银行账户取钱场景的实例代码. 第一个类:Account.java 账户类: package cn.edu.byr.test; public class Account { private String accountNo; private double balance; public Account(){ } public Account(String accountNo,double

Java常见面试题之多线程和高并发详解

volatile 对 volatile的理解 volatile 是一种轻量级的同步机制. 保证数据可见性 不保证原子性 禁止指令重排序 JMM JMM(Java 内存模型)是一种抽象的概念,描述了一组规则或规范,定义了程序中各个变量的访问方式. JVM运行程序的实体是线程,每个线程创建时 JVM 都会为其创建一个工作内存,是线程的私有数据区域.JMM中规定所有变量都存储在主内存,主内存是共享内存.线程对变量的操作在工作内存中进行,首先将变量从主内存拷贝到工作内存,操作完成后写会主内存.不同线程间

Java多线程并发编程 并发三大要素

一.原子性 原子,一个不可再被分割的颗粒.原子性,指的是一个或多个不能再被分割的操作. int i = 1; // 原子操作 i++; // 非原子操作,从主内存读取 i 到线程工作内存,进行 +1,再把 i 写到朱内存. 虽然读取和写入都是原子操作,但合起来就不属于原子操作,我们又叫这种为"复合操作". 我们可以用synchronized 或 Lock 来把这个复合操作"变成"原子操作. 例子: private synchronized void increase

简单谈谈RxJava和多线程并发

前言 相信对于RxJava,大家应该都很熟悉,他最核心的两个字就是异步,诚然,它对异步的处理非常的出色,但是异步绝对不等于并发,更不等于线程安全,如果把这几个概念搞混了,错误的使用RxJava,是会来带非常多的问题的. RxJava与并发 首先让我们来看一段RxJava协议的原文: Observables must issue notifications to observers serially (not in parallel). They may issue these notificat

JAVA多线程并发下的单例模式应用

单例模式应该是设计模式中比较简单的一个,也是非常常见的,但是在多线程并发的环境下使用却是不那么简单了,今天给大家分享一个我在开发过程中遇到的单例模式的应用. 首先我们先来看一下单例模式的定义: 一个类有且仅有一个实例,并且自行实例化向整个系统提供. 单例模式的要素: 1.私有的静态的实例对象 2.私有的构造函数(保证在该类外部,无法通过new的方式来创建对象实例) 3.公有的.静态的.访问该实例对象的方法 单例模式分为懒汉形和饿汉式 懒汉式: 应用刚启动的时候,并不创建实例,当外部调用该类的实例

Java 高并发二:多线程基础详细介绍

本系列基于炼数成金课程,为了更好的学习,做了系列的记录. 本文主要介绍 1.什么是线程 2.线程的基本操作 3.守护线程 4.线程优先级 5.基本的线程同步操作 1. 什么是线程 线程是进程内的执行单元 某个进程当中都有若干个线程. 线程是进程内的执行单元. 使用线程的原因是,进程的切换是非常重量级的操作,非常消耗资源.如果使用多进程,那么并发数相对来说不会很高.而线程是更细小的调度单元,更加轻量级,所以线程会较为广泛的用于并发设计. 在Java当中线程的概念和操作系统级别线程的概念是类似的.事

Java 高并发五:JDK并发包1详细介绍

在[高并发Java 二] 多线程基础中,我们已经初步提到了基本的线程同步操作.这次要提到的是在并发包中的同步控制工具. 1. 各种同步控制工具的使用 1.1 ReentrantLock ReentrantLock感觉上是synchronized的增强版,synchronized的特点是使用简单,一切交给JVM去处理,但是功能上是比较薄弱的.在JDK1.5之前,ReentrantLock的性能要好于synchronized,由于对JVM进行了优化,现在的JDK版本中,两者性能是不相上下的.如果是简

java 注解的基础详细介绍

java 注解的基础详细介绍 前言 注解是Java引入的一项非常受欢迎的补充,它提供了一种结构化的,并且具有类型检查能力的新途径,从而使得程序员能够为代码加入元数据,而不会导致代码杂乱且难以阅读.使用注解能够帮助我们避免编写累赘的部署描述文件,以及其他生成的文件. 注解的语法比较简单,除了@符号的使用之外,它基本与java固有的语法一致.但由于java源码中提供的内置注解很少,所以大部分同学对注解都不是很了解,虽然我们都接触过,比如java内置的几种注解: @Override,表示当前的方法定义

Java中的同步与异步详细介绍

进程同步用来实现程序并发执行时候的可再现性. 一.进程同步及异步的概念 1.进程同步:就是在发出一个功能调用时,在没有得到结果之前,该调用就不返回.也就是必须一件一件事做,等前一件做完了才能做下一件事.就像早上起床后,先洗涮,然后才能吃饭,不能在洗涮没有完成时,就开始吃饭.按照这个定义,其实绝大多数函数都是同步调用(例如sin,isdigit等).但是一般而言,我们在说同步.异步的时候,特指那些需要其他部件协作或者需要一定时间完成的任务.最常见的例子就是 sendmessage.该函数发送一个消

java 高并发中volatile的实现原理

java 高并发中volatile的实现原理 摘要: 在多线程并发编程中synchronized和Volatile都扮演着重要的角色,Volatile是轻量级的synchronized,它在多处理器开发中保证了共享变量的"可见性".可见性的意思是当一个线程修改一个共享变量时,另外一个线程能读到这个修改的值.它在某些情况下比synchronized的开销更小 1. 定义: java编程语言允许线程访问共享变量,为了确保共享变量能被准确和一致的更新,线程应该确保通过排他锁单独获得这个变量.

Java中的main函数的详细介绍

Java中的main函数的详细介绍 JAVA中的主函数是我们再熟悉不过的了,相信每个学习过JAVA语言的人都能够熟练地写出这个程序的入口函数,但对于主函数为什么这么写,其中的每个关键字分别是什么意思,可能就不是所有人都能轻松地答出来的了.我也是在学习中碰到了这个问题,通过在网上搜索资料,并加上自己的实践终于有了一点心得,不敢保留,写出来与大家分享. 主函数的一般写法如下: public static void main(String[] args){-} 下面分别解释这些关键字的作用: (1)p

Java中批处理框架spring batch详细介绍

spring batch简介 spring batch是spring提供的一个数据处理框架.企业域中的许多应用程序需要批量处理才能在关键任务环境中执行业务操作. 这些业务运营包括: 无需用户交互即可最有效地处理大量信息的自动化,复杂处理. 这些操作通常包括基于时间的事件(例如月末计算,通知或通信). 在非常大的数据集中重复处理复杂业务规则的定期应用(例如,保险利益确定或费率调整). 集成从内部和外部系统接收的信息,这些信息通常需要以事务方式格式化,验证和处理到记录系统中. 批处理用于每天为企业处

Java 高并发四:无锁详细介绍

在[高并发Java 一] 前言中已经提到了无锁的概念,由于在jdk源码中有大量的无锁应用,所以在这里介绍下无锁. 1 无锁类的原理详解 1.1 CAS CAS算法的过程是这样:它包含3个参数CAS(V,E,N).V表示要更新的变量,E表示预期值,N表示新值.仅当V 值等于E值时,才会将V的值设为N,如果V值和E值不同,则说明已经有其他线程做了更新,则当前线程什么 都不做.最后,CAS返回当前V的真实值.CAS操作是抱着乐观的态度进行的,它总是认为自己可以成功完成 操作.当多个线程同时使用CAS操

Java多线程ThreadAPI详细介绍

1.Thread的构造方法 package threadAPI; public class CreateThread { public static void main(String[] args) { Thread t1 = new Thread(); Thread t2 = new Thread(); t1.start(); t2.start(); System.out.println(t1.getName()); System.out.println(t2.getName()); } }

Java 高并发十: JDK8对并发的新支持详解

1. LongAdder 和AtomicLong类似的使用方式,但是性能比AtomicLong更好. LongAdder与AtomicLong都是使用了原子操作来提高性能.但是LongAdder在AtomicLong的基础上进行了热点分离,热点分离类似于有锁操作中的减小锁粒度,将一个锁分离成若干个锁来提高性能.在无锁中,也可以用类似的方式来增加CAS的成功率,从而提高性能. LongAdder原理图: AtomicLong的实现方式是内部有个value 变量,当多线程并发自增,自减时,均通过CA