Java并行处理的实现

目录
  • 1. 背景
  • 2.知识
  • 3. Java 中的并行处理
  • 4. 扩展
    • 线程池方式实现并行处理
    • 使用 fork/join框架
  • 5.参考:

1. 背景

本文是一个短文章,介绍Java 中的并行处理。
说明:10多分钟读完的文章我称之为短文章,适合快速阅读。

2.知识

并行计算(parallel computing)一般是指许多指令得以同时进行的计算模式。在同时进行的前提下,可以将计算的过程分解成小部分,之后以并发方式来加以解决。

也就是分解为几个过程:

1、将一个大任务拆分成多个子任务,子任务还可以继续拆分。
2、各个子任务同时进行运算执行。
3、在执行完毕后,可能会有个 " 归纳 " 的任务,比如 求和,求平均等。

再简化一点的理解就是: 先拆分  -->  在同时进行计算  --> 最后“归纳”
为什么要“并行”,优点呢?

1、为了获得 “节省时间”,“快”。适合用于大规模运算的场景。从理论上讲,在 n 个并行处理的执行速度可能会是在单一处理机上执行的速度的 n 倍。
2、以前的计算机是单核的,现代的计算机Cpu都是多核的,服务器甚至都是多Cpu的,并行计算可以充分利用硬件的性能。

3. Java 中的并行处理

JDK 8 新增的Stream API(java.util.stream)将生成环境的函数式编程引入了Java库中,可以方便开发者能够写出更加有效、更加简洁的代码。

steam 的另一个价值是创造性地支持并行处理(parallel processing)。示例:

final Collection< Task > tasks = Arrays.asList(
    new Task( Status.OPEN, 5 ),
    new Task( Status.OPEN, 13 ),
    new Task( Status.CLOSED, 8 )
);

// 并行执行多个任务,并 求和
final double totalPoints = tasks
   .stream()
   .parallel()
   .map( task -> task.getPoints() ) // or map( Task::getPoints )
   .reduce( 0, Integer::sum );

System.out.println( "Total points (all tasks): " + totalPoints );

对于上面的tasks集合,上面的代码计算所有任务的点数之和。
它使用 parallel 方法并行处理所有的task,并使用 reduce 方法计算最终的结果。

4. 扩展

线程池方式实现并行处理

jdk1.5引入了并发包,其中包括了ThreadPoolExecutor,相关代码如下:

public class ExecutorServiceTest {

    public static final int THRESHOLD = 10_000;
    public static long[] numbers;

    public static void main(String[] args) throws Exception {
        numbers = LongStream.rangeClosed(1, 10_000_000).toArray();
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() + 1);
        CompletionService<Long> completionService = new ExecutorCompletionService<Long>(executor);
        int taskSize = (int) (numbers.length / THRESHOLD);
        for (int i = 1; i <= taskSize; i++) {
            final int key = i;
            completionService.submit(new Callable<Long>() {

                @Override
                public Long call() throws Exception {
                    return sum((key - 1) * THRESHOLD, key * THRESHOLD);
                }
            });
        }
        long sumValue = 0;
        for (int i = 0; i < taskSize; i++) {
            sumValue += completionService.take().get();
        }
        // 所有任务已经完成,关闭线程池
        System.out.println("sumValue = " + sumValue);
        executor.shutdown();
    }

    private static long sum(int start, int end) {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
}

使用 fork/join框架

分支/合并框架的目的是以递归的方式将可以并行的认为拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果;相关代码如下:

public class ForkJoinTest extends java.util.concurrent.RecursiveTask<Long> {

    private static final long serialVersionUID = 1L;
    private final long[] numbers;
    private final int start;
    private final int end;
    public static final long THRESHOLD = 10_000;

    public ForkJoinTest(long[] numbers) {
        this(numbers, 0, numbers.length);
    }

    private ForkJoinTest(long[] numbers, int start, int end) {
        this.numbers = numbers;
        this.start = start;
        this.end = end;
    }

    @Override
    protected Long compute() {
        int length = end - start;
        if (length <= THRESHOLD) {
            return computeSequentially();
        }
        ForkJoinTest leftTask = new ForkJoinTest(numbers, start, start + length / 2);
        leftTask.fork();
        ForkJoinTest rightTask = new ForkJoinTest(numbers, start + length / 2, end);
        Long rightResult = rightTask.compute();
        // 注:join方法会阻塞,因此有必要在两个子任务的计算都开始之后才执行join方法
        Long leftResult = leftTask.join();
        return leftResult + rightResult;
    }

    private long computeSequentially() {
        long sum = 0;
        for (int i = start; i < end; i++) {
            sum += numbers[i];
        }
        return sum;
    }

    public static void main(String[] args) {
        System.out.println(forkJoinSum(10_000_000));
    }

    public static long forkJoinSum(long n) {
        long[] numbers = LongStream.rangeClosed(1, n).toArray();
        ForkJoinTask<Long> task = new ForkJoinTest(numbers);
        return new ForkJoinPool().invoke(task);
    }
}

上面的代码实现了 递归方式拆分子任务,并放入到线程池中执行。

5.参考:

https://zh.wikipedia.org/wiki/%E5%B9%B6%E8%A1%8C%E8%AE%A1%E7%AE%97

到此这篇关于Java并行处理的实现的文章就介绍到这了,更多相关Java并行处理内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Java8并行流中自定义线程池操作示例

    本文实例讲述了Java8并行流中自定义线程池操作.分享给大家供大家参考,具体如下: 1.概览 java8引入了流的概念,流是作为一种对数据执行大量操作的有效方式.并行流可以被包含于支持并发的环境中.这些流可以提高执行性能-以牺牲多线程的开销为代价 在这篇短文中,我们将看一下 Stream API的最大限制,同时看一下如何让并行流和线程池实例(ThreadPool instance)一起工作. 2.并行流Parallel Stream 我们先以一个简单的例子来开始-在任一个Collection类型

  • Java大文本并行计算实现过程解析

    简单提高文本读取效率,使用BufferedReader是个不错的选择.速度最快的方法是MappedByteBuffer,但是,相比BufferedReader而言,效果不是非常明显.也就是说,后者虽然快,但也快的有限(不要抱有性能提升几倍的幻想). 对于大文本的读取,性能瓶颈主要在IO,read占时间多是正常的,硬盘本身就不快,读入内存后还要转成对象,都比较耗时间. 想要提速应当用并行的办法,用多线程同时读取和处理数据,但Java写多线程程序很麻烦,并行分段读同一个文件时还要考虑调整边界,也比较

  • Java 并行数据处理和性能分析

    并行流 并行流是一个把元素分成多个块的流,每个块用不同的线程处理.可以自动分区,让所有的处理器都忙起来. 假设要写一个方法,接受一个数量n做参数,计算1-n的和.可以这样实现: public long sequentialSum(long n) { return Stream.iterate(1L, i -> i + 1) .limit(n) .reduce(0L, Long::sum); } 也许可以使用parallel方法,简单地使用并行计算,提高程序性能: public long sequ

  • java利用CountDownLatch实现并行计算

    本文实例为大家分享了利用CountDownLatch实现并行计算的具体代码,供大家参考,具体内容如下 import java.util.concurrent.CountDownLatch; /** * @Author pipi * @Date 2018/10/15 13:56 **/ public class ParallelComputing { private int[] nums; private String[] info; private CountDownLatch countDow

  • Java并行处理的实现

    目录 1. 背景 2.知识 3. Java 中的并行处理 4. 扩展 线程池方式实现并行处理 使用 fork/join框架 5.参考: 1. 背景 本文是一个短文章,介绍Java 中的并行处理. 说明:10多分钟读完的文章我称之为短文章,适合快速阅读. 2.知识 并行计算(parallel computing)一般是指许多指令得以同时进行的计算模式.在同时进行的前提下,可以将计算的过程分解成小部分,之后以并发方式来加以解决. 也就是分解为几个过程: 1.将一个大任务拆分成多个子任务,子任务还可以

  • 最好的8个Java RESTful框架

    过去的每一year,涌现出越来越多的Java框架.就像JavaScript,每个人都认为他们知道一个好的框架的功能应该是怎么样的.连我的老祖母现在也使用 一个我从来没有听说过而且可能永远不会使用的框架.玩笑归玩笑,可以做几乎任何事的臃肿的框架市场已经饱和,,但是如何评判呢?这篇文章旨在提供目前最好的 Java RESTfulful 框架.我只介绍轻量级的产品, 略过了那些臃肿的过度设计的框架.同时,我只想要他们稳定和成熟,提供简单,轻量级的特点.我只在介绍Play frameworks时打破这条

  • Hadoop多Job并行处理的实例详解

    Hadoop多Job并行处理的实例详解 有关Hadoop多Job任务并行处理,经过测试,配置如下: 首先做如下配置: 1.修改mapred-site.xml添加调度器配置: <property> <name>mapred.jobtracker.taskScheduler</name> <value>org.apache.hadoop.mapred.FairScheduler</value> </property> 2.添加jar文件地

  • 论Java Web应用中调优线程池的重要性

    不论你是否关注,Java Web应用都或多或少的使用了线程池来处理请求.线程池的实现细节可能会被忽视,但是有关于线程池的使用和调优迟早是需要了解的.本文主要介绍Java线程池的使用和如何正确的配置线程池. 单线程 我们先从基础开始.无论使用哪种应用服务器或者框架(如Tomcat.Jetty等),他们都有类似的基础实现.Web服务的基础是套接字(socket),套接字负责监听端口,等待TCP连接,并接受TCP连接.一旦TCP连接被接受,即可从新创建的TCP连接中读取和发送数据. 为了能够理解上述流

  • Java 8 新特性终极版指南详解

    前言: Java 8已经公布有一段时间了,种种迹象表明Java 8是一个有重大改变的发行版.在Java Code Geeks上已经有很多介绍Java 8新特性的文章,例如Playing with Java 8 – Lambdas and Concurrency.Java 8 Date Time API Tutorial : LocalDateTime和Abstract Class Versus Interface in the JDK 8 Era.本文还参考了一些其他资料,例如:15 Must

  • Java 多线程并发编程_动力节点Java学院整理

    一.多线程 1.操作系统有两个容易混淆的概念,进程和线程. 进程:一个计算机程序的运行实例,包含了需要执行的指令:有自己的独立地址空间,包含程序内容和数据:不同进程的地址空间是互相隔离的:进程拥有各种资源和状态信息,包括打开的文件.子进程和信号处理. 线程:表示程序的执行流程,是CPU调度执行的基本单位:线程有自己的程序计数器.寄存器.堆栈和帧.同一进程中的线程共用相同的地址空间,同时共享进进程锁拥有的内存和其他资源. 2.Java标准库提供了进程和线程相关的API,进程主要包括表示进程的jav

  • 图文详解java内存回收机制

    在Java中,它的内存管理包括两方面:内存分配(创建Java对象的时候)和内存回收,这两方面工作都是由JVM自动完成的,降低了Java程序员的学习难度,避免了像C/C++直接操作内存的危险.但是,也正因为内存管理完全由JVM负责,所以也使Java很多程序员不再关心内存分配,导致很多程序低效,耗内存.因此就有了Java程序员到最后应该去了解JVM,才能写出更高效,充分利用有限的内存的程序. 1.Java在内存中的状态  首先我们先写一个代码为例子: Person.java package test

  • Java8新特性之lambda的作用_动力节点Java学院整理

    我们期待了很久lambda为java带来闭包的概念,但是如果我们不在集合中使用它的话,就损失了很大价值.现有接口迁移成为lambda风格的问题已经通过default methods解决了,在这篇文章将深入解析Java集合里面的批量数据操作(bulk operation),解开lambda最强作用的神秘面纱. 1.关于JSR335 JSR是Java Specification Requests的缩写,意思是Java 规范请求,Java 8 版本的主要改进是 Lambda 项目(JSR 335),其

  • Java中Lambda表达式并行与组合行为

    从串行到并行 串行指一个步骤一个步骤地处理,也就是通常情况下,代码一行一行地执行. 如果将我们常用的迭代器式的循环展开的话,就是串行执行了循环体内所定义的操作: sum += arr.get(0); sum += arr.get(1); sum += arr.get(2); //... 在书的一开始,就提到Java需要支持集合的并行计算(而Lambda为这个需求提供了可能). 这些功能将全部被实现于库代码中,对于我们使用者,实现并行的复杂性被大大降低(最低程度上只需要调用相关方法). 另外,关于

  • Java 线程池详解及实例代码

    线程池的技术背景 在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或者其它更多资源.在Java中更是如此,虚拟机将试图跟踪每一个对象,以便能够在对象销毁后进行垃圾回收. 所以提高服务程序效率的一个手段就是尽可能减少创建和销毁对象的次数,特别是一些很耗资源的对象创建和销毁.如何利用已有对象来服务就是一个需要解决的关键问题,其实这就是一些"池化资源"技术产生的原因. 例如Android中常见到的很多通用组件一般都离不开"池"的概念,如各种图片

随机推荐