Java并发中的Fork/Join 框架机制详解

什么是 Fork/Join 框架

Fork/Join 框架是一种在 JDk 7 引入的线程池,用于并行执行把一个大任务拆成多个小任务并行执行,最终汇总每个小任务结果得到大任务结果的特殊任务。通过其命名也很容易看出框架主要分为 Fork 和 Join 两个阶段,第一阶段 Fork 是把一个大任务拆分为多个子任务并行的执行,第二阶段 Join 是合并这些子任务的所有执行结果,最后得到大任务的结果。

这里不难发现其执行主要流程:首先判断一个任务是否足够小,如果任务足够小,则直接计算,否则,就拆分成几个更小的小任务分别计算,这个过程可以反复的拆分成一系列小任务。Fork/Join 框架是一种基于 分治 的算法,通过拆分大任务成多个独立的小任务,然后并行执行这些小任务,最后合并小任务的结果得到大任务的最终结果,通过并行计算以提高效率。。

Fork/Join 框架使用示例

下面通过一个计算列表中所有元素的总和的示例来看看 Fork/Join 框架是如何使用的,总的思路是:将这个列表分成许多子列表,然后对每个子列表的元素进行求和,然后,我们再计算所有这些值的总和就得到原始列表的和了。Fork/Join 框架中定义了 ForkJoinTask 来表示一个 Fork/Join 任务,其提供了 fork()、join() 等操作,通常情况下,我们并不需要直接继承这个 ForkJoinTask 类,而是使用框架提供的两个 ForkJoinTask 的子类:

  • RecursiveAction 用于表示没有返回结果的 Fork/Join 任务。
  • RecursiveTask 用于表示有返回结果的 Fork/Join 任务。

很显然,在这个示例中是需要返回结果的,可以定义 SumAction 类继承自 RecursiveTask,代码入下:

/**
 * @author mghio
 * @since 2021-07-25
 */
public class SumTask extends RecursiveTask<Long> {

  private static final int SEQUENTIAL_THRESHOLD = 50;

  private final List<Long> data;

  public SumTask(List<Long> data) {
    this.data = data;
  }

  @Override
  protected Long compute() {
    if (data.size() <= SEQUENTIAL_THRESHOLD) {
      long sum = computeSumDirectly();
      System.out.format("Sum of %s: %d\n", data.toString(), sum);
      return sum;
    } else {
      int mid = data.size() / 2;
      SumTask firstSubtask = new SumTask(data.subList(0, mid));
      SumTask secondSubtask = new SumTask(data.subList(mid, data.size()));
      // 执行子任务
      firstSubtask.fork();
      secondSubtask.fork();
      // 等待子任务执行完成,并获取结果
      long firstSubTaskResult = firstSubtask.join();
      long secondSubTaskResult = secondSubtask.join();
      return firstSubTaskResult + secondSubTaskResult;
    }
  }

  private long computeSumDirectly() {
    long sum = 0;
    for (Long l : data) {
      sum += l;
    }
    return sum;
  }

  public static void main(String[] args) {
    Random random = new Random();

    List<Long> data = random
        .longs(1_000, 1, 100)
        .boxed()
        .collect(Collectors.toList());

    ForkJoinPool pool = new ForkJoinPool();
    SumTask task = new SumTask(data);
    pool.invoke(task);

    System.out.println("Sum: " + pool.invoke(task));
  }
}

这里当列表大小小于 SEQUENTIAL_THRESHOLD 变量的值(阈值)时视为小任务,直接计算求和列表元素结果,否则再次拆分为小任务,运行结果如下:

通过这个示例代码可以发现,Fork/Join 框架 中 ForkJoinTask 任务与平常的一般任务的主要不同点在于:ForkJoinTask 需要实现抽象方法 compute() 来定义计算逻辑,在这个方法里一般通用的实现模板是,首先先判断当前任务是否是小任务,如果是,就执行执行任务,如果不是小任务,则再次拆分为两个子任务,然后当每个子任务调用 fork() 方法时,会再次进入到 compute() 方法中,检查当前任务是否需要再拆分为子任务,如果已经是小任务,则执行当前任务并返回结果,否则继续分割,最后调用 join() 方法等待所有子任务执行完成并获得执行结果。伪代码如下:

if (problem is small) {
  directly solve problem.
} else {
  Step 1. split problem into independent parts.
  Step 2. fork new subtasks to solve each part.
  Step 3. join all subtasks.
  Step 4. compose result from subresults.
}

Fork/Join 框架设计

Fork/Join 框架核心思想是把一个大任务拆分成若干个小任务,然后汇总每个小任务的结果最终得到大任务的结果,如果让你设计一个这样的框架,你会如何实现呢?(建议思考一下),Fork/Join 框架的整个流程正如其名所示,分为两个步骤:

  • 大任务分割 需要有这么一个的类,用来将大任务拆分为子任务,可能一次拆分后的子任务还是比较大,需要多次拆分,直到拆分出来的子任务符合我们定义的小任务才结束。
  • 执行任务并合并任务结果 第一步拆分出来的子任务分别存放在一个个 双端队列 里面(P.S. 这里为什么要使用双端队列请看下文),然后每个队列启动一个线程从队列中获取任务执行。这些子任务的执行结果都会放到一个统一的队列中,然后再启动一个线程从这个队列中拿数据,最后合并这些数据返回。

Fork/Join 框架使用了如下两个类来完成以上两个步骤:

  • ForkJoinTask 类 在上文的实例中也有提到,表示 ForkJoin 任务,在使用框架时首先必须先定义任务,通常只需要继承自 ForkJoinTask 类的子类 RecursiveAction(无返回结果) 或者 RecursiveTask(有返回结果)即可。
  • ForkJoinPool 从名字也可以猜到一二了,就是用来执行 ForkJoinTask 的线程池。大任务拆分出的子任务会添加到当前线程的双端队列的头部。

喜欢思考的你,心中想必会想到这么一种场景,当我们需要完成一个大任务时,会先把这个大任务拆分为多个独立的子任务,这些子任务会放到独立的队列中,并为每个队列都创建一个单独的线程去执行队列里的任务,即这里线程和队列时一对一的关系,那么当有的线程可能会先把自己队列的任务执行完成了,而有的线程则没有执行完成,这就导致一些先执行完任务的线程干等了,这是个好问题。

既然是做并发的,肯定要最大程度压榨计算机的性能,对于这种场景并发大师 Doug Lea 使用了工作窃取算法处理,使用工作窃取算法后,先完成自己队列中任务的线程会去其它线程的队列中”窃取“一个任务来执行,哈哈,一方有难,八方支援。但是此时这个线程和队列的持有线程会同时访问同一个队列,所以为了减少窃取任务的线程和被窃取任务的线程之间的竞争,ForkJoin 选择了双端队列这种数据结构,这样就可以按照这种规则执行任务了:被窃取任务的线程始终从队列头部获取任务并执行,窃取任务的线程使用从队列尾部获取任务执行。这个算法在绝大部分情况下都可以充分利用多线程进行并行计算,但是在双端队列里只有一个任务等极端情况下还是会存在一定程度的竞争。

Fork/Join 框架实现原理

Fork/Join 框架的实现核心是 ForkJoinPool 类,该类的重要组成部分为 ForkJoinTask 数组和 ForkJoinWorkerThread 数组,其中 ForkJoinTask 数组用来存放框架使用者给提交给 ForkJoinPool 的任务,ForkJoinWorkerThread 数组则负责执行这些任务。任务有如下四种状态:

NORMAL 已完成

CANCELLED 被取消

SIGNAL 信号

EXCEPTIONAL 发生异常

下面来看看这两个类的核心方法实现原理,首先来看 ForkJoinTask 的 fork() 方法,源码如下:

方法对于 ForkJoinWorkerThread 类型的线程,首先会调用 ForkJoinWorkerThread 的 workQueue 的 push() 方法异步的去执行这个任务,然后马上返回结果。继续跟进 ForkJoinPool 的 push() 方法,源码如下:

方法将当前任务添加到 ForkJoinTask 任务队列数组中,然后再调用 ForkJoinPool 的 signalWork 方法创建或者唤醒一个工作线程来执行该任务。然后再来看看 ForkJoinTask 的 join() 方法,方法源码如下:

方法首先调用了 doJoin() 方法,该方法返回当前任务的状态,根据返回的任务状态做不同的处理:

  • 已完成状态则直接返回结果
  • 被取消状态则直接抛出异常(CancellationException)
  • 发生异常状态则直接抛出对应的异常

继续跟进 doJoin() 方法,方法源码如下:

方法首先判断当前任务状态是否已经执行完成,如果执行完成则直接返回任务状态。如果没有执行完成,则从任务数组中(workQueue)取出任务并执行,任务执行完成则设置任务状态为 NORMAL,如果出现异常则记录异常并设置任务状态为 EXCEPTIONAL(在 doExec() 方法中)。

总结

本文主要介绍了 Java 并发框架中的 Fork/Join 框架的基本原理和其使用的工作窃取算法(work-stealing)、设计方式和部分实现源码。Fork/Join 框架在 JDK 的官方标准库中也有应用。比如 JDK 1.8+ 标准库提供的 Arrays.parallelSort(array) 可以进行并行排序,它的原理就是内部通过 Fork/Join 框架对大数组分拆进行并行排序,可以提高排序的速度,还有集合中的 Collection.parallelStream() 方法底层也是基于 Fork/Join 框架实现的,最后就是定义小任务的阈值往往是需要通过测试验证才能合理给出,并且保证程序可以达到最好的性能。

到此这篇关于Java 并发中的Fork/Join 框架机制详解的文章就介绍到这了,更多相关Java Fork/Join 框架内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • java中的forkjoin框架的使用

    fork join框架是java 7中引入框架,这个框架的引入主要是为了提升并行计算的能力. fork join主要有两个步骤,第一就是fork,将一个大任务分成很多个小任务,第二就是join,将第一个任务的结果join起来,生成最后的结果.如果第一步中并没有任何返回值,join将会等到所有的小任务都结束. 还记得之前的文章我们讲到了thread pool的基本结构吗? ExecutorService - ForkJoinPool 用来调用任务执行. workerThread - ForkJoi

  • Java Fork/Join框架

    Fork/Join框架是ExecutorService接口的一个实现,通过它我们可以实现多进程.Fork/Join可以用来将一个大任务递归的拆分为多个小任务,目标是充分利用所有的资源尽可能增强应用的性能. 和任何ExecutorService接口的实现一样,Fork/Join也会使用线程池来分布式的管理工作线程.Fork/Join框架的独特之处在于它使用了work-stealing(工作窃取)算法.通过这个算法,工作线程在无事可做时可以窃取其它正在繁忙的线程的任务来执行. Fork/Join框架

  • Java ForkJoin框架的原理及用法

    这篇文章主要介绍了Java ForkJoin框架的原理及用法,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 ForkJoin分析 一.ForkJoin ForkJoin是由JDK1.7后提供多线并发处理框架.ForkJoin的框架的基本思想是分而治之.什么是分而治之?分而治之就是将一个复杂的计算,按照设定的阈值进行分解成多个计算,然后将各个计算结果进行汇总.相应的ForkJoin将复杂的计算当做一个任务.而分解的多个计算则是当做一个子任务. 二

  • Java并发编程之Fork/Join框架的理解

    一.Fork/Join框架的理解 ForkJoinTask类属于java.util.concurrent 包下: ForkJoinTask类下有2个子类,分别为RecursiveTask和RecursiveAction类:(lz示例中使用RecursiveTask类进行重写compute()方法进行实现数值的累加计算) ForkJoinTask类 将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出. 二.Fork/Join框架使用示例 示例场景:对数值进

  • java8中forkjoin和optional框架使用

    并行流与串行流 并行流就是把一个内容分成多个数据块,并用不同的线程分别处理每个数据块的流. java 8 中将并行进行了优化,我们可以很容易的对数据进行并行操作.Stream API 可以声明性地通过 parallel()与 sequential()在并行流与顺序流之间进行切换. 了解 Fork/Join 框架 Fork/Join 框架:就是在必要的情况下,将一个大任务,进形拆分(fork)成若干个小任务(拆到不可再拆时),再将一个个的小任务运行的结果进行join汇总. Fork/Join 框架

  • 轻轻松松吃透Java并发fork/join框架

    目录 一.概述 二.说一说 RecursiveTask 三. Fork/Join框架基本使用 四.工作顺序图 1.ForkJoinPool构造函数 2.fork方法和join方法 五.使用Fork/Join解决实际问题 1.使用归并算法解决排序问题 2.使用Fork/Join运行归并算法 Fork / Join 是一个工具框架 , 其核心思想在于将一个大运算切成多个小份 , 最大效率的利用资源 , 其主要涉及到三个类 : ForkJoinPool / ForkJoinTask / Recursi

  • 浅谈Java Fork/Join并行框架

    初步了解Fork/Join框架 Fork/Join 框架是java7中加入的一个并行任务框架,可以将任务分割成足够小的小任务,然后让不同的线程来做这些分割出来的小事情,然后完成之后再进行join,将小任务的结果组装成大任务的结果.下面的图片展示了这种框架的工作模型: 使用Fork/Join并行框架的前提是我们的任务可以拆分成足够小的任务,而且可以根据小任务的结果来组装出大任务的结果,一个最简单的例子是使用Fork/Join框架来求一个数组中的最大/最小值,这个任务就可以拆成很多小任务,大任务就是

  • Java并发中的Fork/Join 框架机制详解

    什么是 Fork/Join 框架 Fork/Join 框架是一种在 JDk 7 引入的线程池,用于并行执行把一个大任务拆成多个小任务并行执行,最终汇总每个小任务结果得到大任务结果的特殊任务.通过其命名也很容易看出框架主要分为 Fork 和 Join 两个阶段,第一阶段 Fork 是把一个大任务拆分为多个子任务并行的执行,第二阶段 Join 是合并这些子任务的所有执行结果,最后得到大任务的结果. 这里不难发现其执行主要流程:首先判断一个任务是否足够小,如果任务足够小,则直接计算,否则,就拆分成几个

  • Java多线程高并发中的Fork/Join框架机制详解

    1.Fork/Join框架简介 Fork/Join 它可以将一个大的任务拆分成多个子任务进行并行处理,最后将子任务结果合并成最后的计算结果,并进行输出.Fork/Join 框架要完成两件事情: Fork:把一个复杂任务进行分拆,大事化小 :把一个复杂任务进行分拆,大事化小 Join:把分拆任务的结果进行合并 在 Java 的 Fork/Join 框架中,使用两个类完成上述操作: ForkJoinTask: 我们要使用 Fork/Join 框架,首先需要创建一个 ForkJoin 任务.该类提供了

  • java 并发中的原子性与可视性实例详解

    java 并发中的原子性与可视性实例详解 并发其实是一种解耦合的策略,它帮助我们把做什么(目标)和什么时候做(时机)分开.这样做可以明显改进应用程序的吞吐量(获得更多的CPU调度时间)和结构(程序有多个部分在协同工作).做过java Web开发的人都知道,Java Web中的Servlet程序在Servlet容器的支持下采用单实例多线程的工作模式,Servlet容器为你处理了并发问题. 原子性 原子是世界上的最小单位,具有不可分割性.比如 a=0:(a非long和double类型) 这个操作是不

  • Java集合中的fail-fast(快速失败)机制详解

    简介 我们知道Java中Collection接口下的很多集合都是线程不安全的, 比如 java.util.ArrayList不是线程安全的, 因此如果在使用迭代器的过程中有其他线程修改了list,那么将抛出ConcurrentModificationException,这就是所谓fail-fast策略. 这一策略在源码中的实现是通过 modCount 域,modCount 顾名思义就是修改次数,对ArrayList 内容的修改都将增加这个值,那么在迭代器初始化过程中会将这个值赋给迭代器的 exp

  • Java并发编程之显式锁机制详解

    我们之前介绍过synchronized关键字实现程序的原子性操作,它的内部也是一种加锁和解锁机制,是一种声明式的编程方式,我们只需要对方法或者代码块进行声明,Java内部帮我们在调用方法之前和结束时加锁和解锁.而我们本篇将要介绍的显式锁是一种手动式的实现方式,程序员控制锁的具体实现,虽然现在越来越趋向于使用synchronized直接实现原子操作,但是了解了Lock接口的具体实现机制将有助于我们对synchronized的使用.本文主要涉及以下一些内容: 接口Lock的基本组成成员 可重入锁Re

  • java中类加载与双亲委派机制详解

    目录 类加载是什么 类加载器 双亲委派机制 BootStrapClassLoader ExtClassLoader AppClassLoader 为什么使用双亲委派机制 全盘负责委托机制 自定义类加载器 打破双亲委派机制 类加载是什么 把磁盘中的java文件加载到内存中的过程叫做类加载 当我们用java命令运行某个类的main函数启动程序时,首先需要通过类加载器把主类加载到JVM. 有如下 User 类 package dc.dccmmtop; public Class User { publi

  • Java开发中synchronized的定义及用法详解

    概念 是利用锁的机制来实现同步的. 互斥性:即在同一时间只允许一个线程持有某个对象锁,通过这种特性来实现多线程中的协调机制,这样在同一时间只有一个线程对需同步的代码块(复合操作)进行访问.互斥性我们也往往称为操作的原子性. 可见性:必须确保在锁被释放之前,对共享变量所做的修改,对于随后获得该锁的另一个线程是可见的(即在获得锁时应获得最新共享变量的值),否则另一个线程可能是在本地缓存的某个副本上继续操作从而引起不一致. 用法 修饰静态方法: //同步静态方法 public synchronized

  • Java并发编程之Semaphore(信号量)详解及实例

    Java并发编程之Semaphore(信号量)详解及实例 概述 通常情况下,可能有多个线程同时访问数目很少的资源,如客户端建立了若干个线程同时访问同一数据库,这势必会造成服务端资源被耗尽的地步,那么怎样能够有效的来控制不可预知的接入量呢?及在同一时刻只能获得指定数目的数据库连接,在JDK1.5 java.util.concurrent 包中引入了Semaphore(信号量),信号量是在简单上锁的基础上实现的,相当于能令线程安全执行,并初始化为可用资源个数的计数器,通常用于限制可以访问某些资源(物

  • java并发学习之BlockingQueue实现生产者消费者详解

    1.介绍 阻塞队列 (BlockingQueue)是Java util.concurrent包下重要的数据结构,BlockingQueue提供了线程安全的队列访问方式:当阻塞队列进行插入数据时,如果队列已满,线程将会阻塞等待直到队列非满:从阻塞队列取数据时,如果队列已空,线程将会阻塞等待直到队列非空.并发包下很多高级同步类的实现都是基于BlockingQueue实现的. JDK7提供了以下7个阻塞队列: ArrayBlockingQueue :由数组结构组成的有界阻塞队列. LinkedBloc

随机推荐