Java实现FIFO任务调度队列策略

目录
  • 前言
  • FIFO任务调度器架构
  • 示例代码

前言

在工作中,很多高并发的场景中,我们会用到队列来实现大量的任务请求。当任务需要某些特殊资源的时候,我们还需要合理的分配资源,让队列中的任务高效且有序完成任务。熟悉分布式的话,应该了解yarn的任务调度算法。本文主要用java实现一个FIFO(先进先出调度器),这也是常见的一种调度方式。

FIFO任务调度器架构

主要实现的逻辑可以归纳为:

1、任务队列主要是单队列,所有任务按照顺序进入队列后,也会按照顺序执行。

2、如果任务无法获得资源,则将任务塞回队列原位置。

示例代码

Maven依赖如下:

      	<dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
                <dependency>
            <groupId>cn.hutool</groupId>
            <artifactId>hutool-all</artifactId>
            <version>5.5.2</version>
        </dependency>

具体的原理就不细说了,通过代码我们看看FIFO任务调度策略是什么玩的吧。下面的代码也可以作为参考。我们会使用到一个双向阻塞队列LinkedBlockingDeque。后面的代码说明会提到。

package ai.guiji.csdn.dispatch;

import cn.hutool.core.thread.ThreadUtil;
import lombok.Builder;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;

import java.util.Random;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

/**
 * @Program: csdn @ClassName: FIFODemo @Author: 剑客阿良_ALiang @Date: 2021-12-24 21:21 @Description:
 * fifo队列 @Version: V1.0
 */
@Slf4j
public class FIFODemo {
  private static final LinkedBlockingDeque<Task> TASK_QUEUE = new LinkedBlockingDeque<>();
  private static final ConcurrentHashMap<Integer, LinkedBlockingQueue<Resource>> RESOURCE_MAP =
      new ConcurrentHashMap<>();
  private static final ExecutorService TASK_POOL =
      new ThreadPoolExecutor(
          8,
          16,
          0L,
          TimeUnit.MILLISECONDS,
          new LinkedBlockingQueue<>(),
          new CustomizableThreadFactory("TASK-THREAD-"),
          new ThreadPoolExecutor.AbortPolicy());
  private static final ScheduledExecutorService ENGINE_POOL =
      Executors.newSingleThreadScheduledExecutor(new CustomizableThreadFactory("ENGINE-"));
  private static final AtomicInteger CODE_BUILDER = new AtomicInteger(0);

  @Data
  @Builder
  private static class Resource {
    private Integer rId;
    private Type type;
  }

  @Data
  @Builder
  private static class Task implements Runnable {
    private Integer tId;
    private Runnable work;
    private Type type;
    private Resource resource;

    @Override
    public void run() {
      log.info("[{}]任务,使用资源编号:[{}]", tId, resource.getRId());
      try {
        work.run();
      } catch (Exception exception) {
        exception.printStackTrace();
      } finally {
        log.info("[{}]任务结束,回归资源", tId);
        returnResource(resource);
      }
    }
  }

  private enum Type {
    /** 资源类型 */
    A("A资源", 1),
    B("B资源", 2),
    C("C资源", 3);

    private final String desc;
    private final Integer code;

    Type(String desc, Integer code) {
      this.desc = desc;
      this.code = code;
    }

    public String getDesc() {
      return desc;
    }

    public Integer getCode() {
      return code;
    }
  }

  public static void initResource() {
    Random random = new Random();
    int aCount = random.nextInt(10) + 1;
    int bCount = random.nextInt(10) + 1;
    int cCount = random.nextInt(10) + 1;
    RESOURCE_MAP.put(Type.A.getCode(), new LinkedBlockingQueue<>());
    RESOURCE_MAP.put(Type.B.getCode(), new LinkedBlockingQueue<>());
    RESOURCE_MAP.put(Type.C.getCode(), new LinkedBlockingQueue<>());
    IntStream.rangeClosed(1, aCount)
        .forEach(
            a ->
                RESOURCE_MAP
                    .get(Type.A.getCode())
                    .add(Resource.builder().rId(a).type(Type.A).build()));
    IntStream.rangeClosed(1, bCount)
        .forEach(
            a ->
                RESOURCE_MAP
                    .get(Type.B.getCode())
                    .add(Resource.builder().rId(a).type(Type.B).build()));
    IntStream.rangeClosed(1, cCount)
        .forEach(
            a ->
                RESOURCE_MAP
                    .get(Type.C.getCode())
                    .add(Resource.builder().rId(a).type(Type.C).build()));
    log.info("初始化资源A数量:{},资源B数量:{},资源C数量:{}", aCount, bCount, cCount);
  }

  public static Resource extractResource(Type type) {
    return RESOURCE_MAP.get(type.getCode()).poll();
  }

  public static void returnResource(Resource resource) {
    log.info("开始归还资源,rId:{},资源类型:{}", resource.getRId(), resource.getType().getDesc());
    RESOURCE_MAP.get(resource.getType().code).add(resource);
    log.info("归还资源完成,rId:{},资源类型:{}", resource.getRId(), resource.getType().getDesc());
  }

  public static void enginDo() {
    ENGINE_POOL.scheduleAtFixedRate(
        () -> {
          Task task = TASK_QUEUE.poll();
          if (task == null) {
            log.info("任务队列为空,无需要执行的任务");
          } else {
            Resource resource = extractResource(task.getType());
            if (resource == null) {
              log.info("[{}]任务无法获取[{}],返回队列", task.getTId(), task.getType().getDesc());
              TASK_QUEUE.addFirst(task);
            } else {
              task.setResource(resource);
              TASK_POOL.submit(task);
            }
          }
        },
        0,
        1,
        TimeUnit.SECONDS);
  }

  public static void addTask(Runnable runnable, Type type) {
    Integer tId = CODE_BUILDER.incrementAndGet();
    Task task = Task.builder().tId(tId).type(type).work(runnable).build();
    log.info("提交任务[{}]到任务队列", tId);
    TASK_QUEUE.add(task);
  }

  public static void main(String[] args) {
    initResource();
    enginDo();
    Random random = new Random();
    ThreadUtil.sleep(5000);
    IntStream.range(0, 10)
        .forEach(
            a -> addTask(() -> ThreadUtil.sleep(random.nextInt(10) + 1, TimeUnit.SECONDS), Type.A));
    IntStream.range(0, 10)
        .forEach(
            a -> addTask(() -> ThreadUtil.sleep(random.nextInt(10) + 1, TimeUnit.SECONDS), Type.B));
    IntStream.range(0, 10)
        .forEach(
            a -> addTask(() -> ThreadUtil.sleep(random.nextInt(10) + 1, TimeUnit.SECONDS), Type.C));
  }
}

代码说明:

1、首先我们构造了任务队列,使用的是LinkedBlockingDeque,使用双向队列的原因是如果任务无法获取资源,还需要塞到队首,保证任务的有序性。

2、使用ConcurrentHashMap作为资源映射表,为了保证资源队列使用的均衡性,一旦使用完成的资源会塞到对应资源的队尾处。

3、其中实现了添加任务、提取资源、回归资源几个方法。

4、initResource方法可以初始化资源队列,这里面只是简单的随机了几个资源到A、B、C三种资源,塞入各类别队列。

5、任务私有类有自己的任务标识以及执行完后调用回归资源方法。

6、main方法中会分别提交需要3中资源的10个任务,看看调度情况。

执行结果

我们可以通过结果发现任务有序调度,使用完任务后回归队列。 

以上就是Java实现FIFO任务调度队列策略的详细内容,更多关于Java FIFO任务调度的资料请关注我们其它相关文章!

(0)

相关推荐

  • Java实现常用缓存淘汰算法:FIFO、LRU、LFU

    目录 缓存淘汰算法 FIFO LRU LFU 总结 缓存淘汰算法 在高并发.高性能的质量要求不断提高时,我们首先会想到的就是利用缓存予以应对. 第一次请求时把计算好的结果存放在缓存中,下次遇到同样的请求时,把之前保存在缓存中的数据直接拿来使用. 但是,缓存的空间一般都是有限,不可能把所有的结果全部保存下来.那么,当缓存空间全部被占满再有新的数据需要被保存,就要决定删除原来的哪些数据.如何做这样决定需要使用缓存淘汰算法. 常用的缓存淘汰算法有:FIFO.LRU.LFU,下面我们就逐一介绍一下. F

  • C语言实现页面置换算法(FIFO、LRU)

    目录 1.实现效果 2.实现源代码  1.实现效果 2.实现源代码  #include<iostream> #include<process.h> #include<stdlib.h> #include<ctime> #include<conio.h> #include<stdio.h> #include<string.h> using namespace std; #define Myprintf printf(&quo

  • JS 实现缓存算法的示例(FIFO/LRU)

    FIFO 最简单的一种缓存算法,设置缓存上限,当达到了缓存上限的时候,按照先进先出的策略进行淘汰,再增加进新的 k-v . 使用了一个对象作为缓存,一个数组配合着记录添加进对象时的顺序,判断是否到达上限,若到达上限取数组中的第一个元素key,对应删除对象中的键值. /** * FIFO队列算法实现缓存 * 需要一个对象和一个数组作为辅助 * 数组记录进入顺序 */ class FifoCache{ constructor(limit){ this.limit = limit || 10 this

  • 详解Java实现缓存(LRU,FIFO)

    现在软件或者网页的并发量越来越大了,大量请求直接操作数据库会对数据库造成很大的压力,处理大量连接和请求就会需要很长时间,但是实际中百分之80的数据是很少更改的,这样就可以引入缓存来进行读取,减少数据库的压力. 常用的缓存有Redis和memcached,但是有时候一些小场景就可以直接使用Java实现缓存,就可以满足这部分服务的需求. 缓存主要有LRU和FIFO,LRU是Least Recently Used的缩写,即最近最久未使用,FIFO就是先进先出,下面就使用Java来实现这两种缓存. LR

  • C语言实现页面置换 先进先出算法(FIFO)

    本文实例为大家分享了C语言实现页面置换算法的具体代码,供大家参考,具体内容如下 一.设计目的 加深对请求页式存储管理实现原理的理解,掌握页面置换算法中的先进先出算法. 二.设计内容 设计一个程序,有一个虚拟存储区和内存工作区,实现下述三种算法中的任意两种,计算访问命中率(命中率=1-页面失效次数/页地址流长度).附加要求:能够显示页面置换过程. 该系统页地址流长度为320,页面失效次数为每次访问相应指令时,该指令对应的页不在内存的次数.    程序首先用srand()和rand()函数分别进行初

  • Linux 进程通信之FIFO的实现

    FIFO通信(first in first out) FIFO 有名管道,实现无血缘关系进程通信. 创建一个管道的伪文件 a.mkfifo testfifo 命令创建 b.也可以使用函数int mkfifo(const char *pathname, mode_t mode); 内核会针对fifo文件开辟一个缓冲区,操作fifo文件,可以操作缓冲区,实现进程间通信–实际上就是文件读写 man 3 mkfifo #include <sys/types.h> #include <sys/st

  • Java实现FIFO任务调度队列策略

    目录 前言 FIFO任务调度器架构 示例代码 前言 在工作中,很多高并发的场景中,我们会用到队列来实现大量的任务请求.当任务需要某些特殊资源的时候,我们还需要合理的分配资源,让队列中的任务高效且有序完成任务.熟悉分布式的话,应该了解yarn的任务调度算法.本文主要用java实现一个FIFO(先进先出调度器),这也是常见的一种调度方式. FIFO任务调度器架构 主要实现的逻辑可以归纳为: 1.任务队列主要是单队列,所有任务按照顺序进入队列后,也会按照顺序执行. 2.如果任务无法获得资源,则将任务塞

  • 深入了解java内存分配和回收策略

    一.导论 java技术体系中所提到的内存自动化管理归根结底就是内存的分配与回收两个问题,之前已经和大家谈过java回收的相关知识,今天来和大家聊聊java对象的在内存中的分配.通俗的讲,对象的内存分配就是在堆上的分配,对象主要分配在新生代的Eden上(关于对象在内存上的分代在垃圾回收中会补上,想了解的也可以参考<深入理解java虚拟机>),如果启动了本地线程分配缓冲,讲按线程优先在TLAB上分配.少数情况下也是直接在老年代中分配. 二.经典的分配策略 1.对象优先在Eden上分配 一般情况下对

  • java线程池工作队列饱和策略代码示例

    线程池(Thread Pool) 是并行执行任务收集的实用工具.随着 CPU 引入适合于应用程序并行化的多核体系结构,线程池的作用正日益显现.通过 ThreadPoolExecutor类及其他辅助类,Java 5 引入了这一框架,作为新的并发支持部分. ThreadPoolExecutor框架灵活且功能强大,它支持特定于用户的配置并提供了相关的挂钩(hook)和饱和策略来处理满队列 Java线程池会将提交的任务先置于工作队列中,在从工作队列中获取(SynchronousQueue直接由生产者提交

  • Java通俗易懂系列设计模式之策略模式

    介绍 策略设计模式是行为设计模式之一.当我们为特定任务使用多个算法时,使用策略模式,客户端决定在运行时使用的实际实现. 策略模式的最佳示例之一是Collections.sort()采用Comparator参数的方法.基于Comparator接口的不同实现,对象将以不同的方式进行排序. 实例 对于我们的示例,我们将尝试实施一个简单的购物车,我们有两种付款策略 - 使用信用卡或使用PayPal. 首先,我们将为我们的策略模式示例创建接口,在我们的例子中,支付金额作为参数传递. 支付方式:Paymen

  • 解析Java内存分配和回收策略以及MinorGC、MajorGC、FullGC

    目录 对象内存分配与回收策略 对象何时进入新生代.老年代 三种GC介绍 MinorGC Major GC/Full GC: 图示GC过程 对象内存分配与回收策略 对象的内存分配,往大方向讲,就是在堆上分配[但也可能经过JIT编译后被拆散为标量类型并间接地栈上分配),对象主要分配在新生代的Eden区上,如果启动了本地线程分配缓冲,将按线程优先在TLAB上分配.少数情况下也可能会直接分配在老年代中. 对象优先分配在Eden区,当Eden区可用空间不够时会进行MinorGC 大对象直接进入老年代:大对

  • java 线程池如何执行策略又拒绝哪些策略

    目录 线程池执行流程 线程池拒绝策略 DiscardPolicy拒绝策略 AbortPolicy拒绝策略 自定义拒绝策略 总结 前言: 聊到线程池就一定会聊到线程池的执行流程,也就是当有一个任务进入线程池之后,线程池是如何执行的?我们今天就来聊聊这个话题.线程池是如何执行的?线程池的拒绝策略有哪些? 线程池执行流程 想要真正的了解线程池的执行流程,就得先从线程池的执行方法 execute() 说起,execute() 实现源码如下: public void execute(Runnable co

  • 解析Java多线程之常见锁策略与CAS中的ABA问题

    目录 1.常见的锁策略 1.1乐观锁与悲观锁 1.2读写锁与普通互斥锁 1.3重量级锁与轻量级锁 1.4挂起等待锁与自旋锁 1.5公平锁与非公平锁 1.6可重入锁与不可重入锁 1.7死锁问题 1.7.1常见死锁的情况 1.7.2哲学家就餐问题 2.CAS指令与ABA问题 2.1CAS指令 2.2ABA问题 本篇文章将介绍常见的锁策略以及CAS中的ABA问题,前面介绍使用synchronized关键字来保证线程的安全性,本质上就是对对象进行加锁操作,synchronized所加的锁到底是什么类型的

  • Java线程池使用AbortPolicy策略

    目录 线程池ThreadPoolExecutor的拒绝策略 AbortPolicy策略 线程池ThreadPoolExecutor的拒绝策略 线程池中的线程资源全部被占用时,对新添加的Task任务有不同的处理策略,在默认的情况下, ThreadPoolExecutor类中有4种不同的处理方式: AbortPolicy:当任务添加到线程池中被拒绝时,它将抛出RejectExecutionException异常. CallerRunsPolicy:当任务添加到线程池中被拒绝时,会使用调用线程池的Th

  • java ThreadPoolExecutor线程池拒绝策略避坑

    目录 1.场景 2. 原因分析 3.总结 4.思考 1.场景 线程池使用DiscardOldestPolicy拒绝策略,阻塞队列使用ArrayBlockingQueue,发现在某些情形下对于得到的Future,调用get()方法当前线程会一直阻塞. 为了便于理解,将实际情景抽象为下面的代码: ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor( 1, 1, 1, TimeUnit.SECONDS, new ArrayBlo

  • 深入了解Java行为型设计模式之策略模式

    目录 策略模式 应用场景 优缺点 主要角色 策略模式的基本使用 创建抽象策略角色 创建具体策略角色 创建上下文角色 客户端执行 策略模式实现支付方式的选择 创建抽象策略角色 创建具体策略角色 创建上下文角色 客户端执行 策略模式 策略模式(Strategy Pattern)也叫政策模式(Policy Pattern),属于行为型模式. 它是将定义的一系列算法.分别封装起来,让它们之间可以互相替换,从而让算法的变化不会影响到使用算法的用户. 策略模式能让你定义一系列算法, 并将每种算法分别放入独立

随机推荐