Reactor 多任务并发执行且结果按顺序返回第一个

目录
  • 1 场景
  • 2 创建 service
    • 2.1 创建基本接口和实体类
    • 2.2 创建 service 实现
  • 3 主体方法
  • 4 实现异步
    • 4.1 subcribeOn 实现异步
    • 4.2 CompletableFuture 实现异步

1 场景

调用多个平级服务,按照服务优先级返回第一个有效数据。

具体case:一个页面可能有很多的弹窗,弹窗之间又有优先级。每次只需要返回第一个有数据的弹窗。但是又希望所有弹窗之间的数据获取是异步的。这种场景使用 Reactor 怎么实现呢?

2 创建 service

2.1 创建基本接口和实体类

public interface TestServiceI {
    Mono request();
}

提供一个 request 方法,返回一个 Mono 对象。

@Data
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class TestUser {
    private String name;
}

2.2 创建 service 实现

@Slf4j
public class TestServiceImpl1 implements TestServiceI {
    @Override
    public Mono request() {
        log.info("execute.test.service1");
        return Mono.fromSupplier(() -> {
                    try {
                        System.out.println("service1.threadName=" + Thread.currentThread().getName());
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    return "";
                })
                .map(name -> {
                    return new TestUser(name);
                });
    }
}

第一个 service 执行耗时 500ms。返回空对象;

创建第二个 service 执行耗时 1000ms。返回空对象;代码如上,改一下sleep时间即可。

继续创建第三个 service 执行耗时 1000ms。返回 name3。代码如上,改一下 sleep 时间,以及返回为 name3。

3 主体方法

public static void main(String[] args) {
        long startTime = System.currentTimeMillis();
        TestServiceI testServiceImpl4 = new TestServiceImpl4();
        TestServiceI testServiceImpl5 = new TestServiceImpl5();
        TestServiceI testServiceImpl6 = new TestServiceImpl6();
        List<TestServiceI> serviceIList = new ArrayList<>();
        serviceIList.add(testServiceImpl4);
        serviceIList.add(testServiceImpl5);
        serviceIList.add(testServiceImpl6);

    // 执行 service 列表,这样有多少个 service 都可以
        Flux<Mono<TestUser>> monoFlux = Flux.fromIterable(serviceIList)
                .map(service -> {
                    return service.request();
                });
    // flatMap(或者flatMapSequential) + map 实现异常继续下一个执行
        Flux flux = monoFlux.flatMapSequential(mono -> {
            return mono.map(user -> {
                        TestUser testUser = JsonUtil.parseJson(JsonUtil.toJson(user), TestUser.class);
                        if (Objects.nonNull(testUser) && StringUtils.isNotBlank(testUser.getName())) {
                            return testUser;
                        }
            // null 在 reactor 中是异常数据。
                        return null;
                    })
                    .onErrorContinue((err, i) -> {
                        log.info("onErrorContinue={}", i);
                    });
        });
        Mono mono = flux.elementAt(0, Mono.just(""));
        Object block = mono.block();
        System.out.println(block + "blockFirst 执行耗时ms:" + (System.currentTimeMillis() - startTime));
    }
  • 1、Flux.fromIterable 执行 service 列表,可以随意增删 service 服务。
  • 2、flatMap(或者flatMapSequential) + map + onErrorContinue 实现异常继续下一个执行。具体参考:Reactor中的onErrorContinue 和 onErrorResume
  • 3、Mono mono = flux.elementAt(0, Mono.just("")); 返回第一个正常数据。

执行输出:

20:54:26.512 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
20:54:26.553 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
service1.threadName=main
20:54:27.237 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
20:54:27.237 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
service5.threadName=main
20:54:28.246 [main] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
20:54:28.246 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
service6.threadName=main
TestUser(name=name3)blockFirst 执行耗时ms:2895

  • 1、service1 和 service2 因为返回空,所以继续下一个,最终返回 name3。
  • 2、查看总耗时:2895ms。service1 耗时 500,service2 耗时1000,service3 耗时 1000。发现耗时基本上等于 service1 + service2 + service3 。这是怎么回事呢?查看返回执行的线程,都是 main。

总结:这样实现按照顺序返回第一个正常数据。但是执行并没有异步。下一步:如何实现异步呢?

4 实现异步

4.1 subcribeOn 实现异步

修改 service 实现。增加 .subscribeOn(Schedulers.boundedElastic())

如下:

@Slf4j
public class TestServiceImpl1 implements TestServiceI {
    @Override
    public Mono request() {
        log.info("execute.test.service1");
        return Mono.fromSupplier(() -> {
                    try {
                        System.out.println("service1.threadName=" + Thread.currentThread().getName());
                        Thread.sleep(500);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    return "";
                })
                //增加subscribeOn
                .subscribeOn(Schedulers.boundedElastic())
                .map(name -> {
                    return new TestUser(name);
                });
    }
}

再次执行输出如下:

21:02:04.213 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
21:02:04.265 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1
service4.threadName=boundedElastic-1
21:02:04.300 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
21:02:04.302 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
service2.threadName=boundedElastic-2
service3.threadName=boundedElastic-3
21:02:04.987 [boundedElastic-1] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
21:02:05.307 [boundedElastic-2] INFO com.geniu.reactor.TestReactorOrderV2 - onErrorContinue=TestUser(name=)
TestUser(name=name6)blockFirst 执行耗时ms:1242

  • 1、发现具体实现 sleep 的线程都不是 main 线程,而是 boundedElastic
  • 2、最终执行耗时 1242ms,只比执行时间最长的 service2 和 service3 耗时 1000ms,多一些。证明是异步了。

4.2 CompletableFuture 实现异步

修改 service 实现,使用 CompletableFuture 执行耗时操作(这里是sleep,具体到项目中可能是外部接口调用,DB 操作等);然后使用 Mono.fromFuture 返回 Mono 对象。

@Slf4j
public class TestServiceImpl1 implements TestServiceI{
    @Override
    public Mono request() {
        log.info("execute.test.service1");
        CompletableFuture<String> uCompletableFuture = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("service1.threadName=" + Thread.currentThread().getName());
                Thread.sleep(500);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            return "testname1";
        });

        return Mono.fromFuture(uCompletableFuture).map(name -> {
            return new TestUser(name);
        });
    }
}

执行返回如下:

21:09:59.465 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
21:09:59.510 [main] INFO com.geniu.reactor.TestServiceImpl2 - execute.test.service2
service2.threadName=ForkJoinPool.commonPool-worker-1
21:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl3 - execute.test.service3
service3.threadName=ForkJoinPool.commonPool-worker-2
21:09:59.526 [main] INFO com.geniu.reactor.TestServiceImpl1 - execute.test.service1 
service1.threadName=ForkJoinPool.commonPool-worker-3
21:10:00.526 [ForkJoinPool.commonPool-worker-1] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=)
21:10:00.538 [ForkJoinPool.commonPool-worker-2] INFO com.geniu.reactor.TestReactorOrder - onErrorContinue=TestUser(name=)
TestUser(name=testname1)blockFirst 执行耗时ms:1238

  • 1、耗时操作都是使用 ForkJoinPool 线程池中的线程执行。
  • 2、最终耗时和方法1基本差不多。

到此这篇关于Reactor 多任务并发执行且结果按顺序返回第一个的文章就介绍到这了,更多相关Reactor 多任务执行内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

时间: 2022-09-22

Reactor反应器的实现方法详解

大多数应用都会使用ACE_Reactor::instance()提供的默认反应器实例.但是你也可以选择自己的反应器,这是因为ACE使用了Bridge模式(使用两个不同的类:一个是编程接口,另一个是实现,第一个类会把各个操作传给第二个类).例如使用线程池反应器实现:ACE_TP_Reactor* tp_reactor = new ACE_TP_Reactor;ACE_Reactor* my_reactor = new ACE_Reactor(tp_reactor, 1);//1表示my_react

Project Reactor源码解析publishOn使用示例

目录 功能分析 代码示例 prefetch delayError 源码分析 Flux#publishOn() Flux#subscribe() FluxPublishOn#subscribeOrReturn() FluxPublishOn#onSubscribe() 非融合 FluxPublishOn#onNext() FluxPublishOn#trySchedule() FluxPublishOn#run() FluxPublishOn#runAsync() FluxPublishOn#ch

Reactor中的onErrorContinue 和 onErrorResume

目录 前言 1 基础功能 2 只有 onErrorResume () 3 只有 onErrorContinue() 4 onErrorResume() 然后 onErrorContinue() 5 使用 onErrorResume() 模拟 onErrorContinue() 6 使用 onErrorResume() 和下游的 onErrorContinue() 模拟 onErrorContinue() 前言 这似乎是 Reactor 的热门搜索之一,至少当我在谷歌中输入 onErrorCont

Java中多线程Reactor模式的实现

目录 1. 主服务器 2.IO请求handler+线程池 3.客户端 多线程Reactor模式旨在分配多个reactor每一个reactor独立拥有一个selector,在网络通信中大体设计为负责连接的主Reactor,其中在主Reactor的run函数中若selector检测到了连接事件的发生则dispatch该事件. 让负责管理连接的Handler处理连接,其中在这个负责连接的Handler处理器中创建子Handler用以处理IO请求.这样一来连接请求与IO请求分开执行提高通道的并发量.同时

如何使用Reactor完成类似Flink的操作

一.背景 Flink在处理流式任务的时候有很大的优势,其中windows等操作符可以很方便的完成聚合任务,但是Flink是一套独立的服务,业务流程中如果想使用需要将数据发到kafka,用Flink处理完再发到kafka,然后再做业务处理,流程很繁琐. 比如在业务代码中想要实现类似Flink的window按时间批量聚合功能,如果纯手动写代码比较繁琐,使用Flink又太重,这种场景下使用响应式编程RxJava.Reactor等的window.buffer操作符可以很方便的实现. 响应式编程框架也早已

PHP封装类似thinkphp连贯操作数据库Db类与简单应用示例

本文实例讲述了PHP封装类似thinkphp连贯操作数据库Db类与简单应用.分享给大家供大家参考,具体如下: <?php header("Content-Type:text/html;charset=utf-8"); /** *php操作mysql的工具类 */ class Db{ private $_db = null;//数据库连接句柄 private $_table = null;//表名 private $_where = null;//where条件 private $

PostgreSQL实现批量插入、更新与合并操作的方法

前言 就在 2019 年 1 月份微软收购了 PostgreSQL 数据库的初创公司 CitusData, 在云数据库方面可以增强与 AWS 的竟争.AWS 的 RDS 两大开源数据库就是 MySQL(Aurora 和 MariaDB 是它的变种) 和 PostgreSQL. 而 PostgreSQL 跳出了普通关系型数据库的类型约束,它灵活的支持 JSON, JSONB, XML, 数组等类型.比如说字段类型可以是各种形式的数组,一维或多维. create table t1( address

Java操作Jenkins操作凭证(Credential)信息方式

jenkins-client包的api中大部分的操作是支持的,但有一些特殊操作,比如需要操作jenkins的Credential凭据信息,或是希望使用java修改Jenkins的系统配置,类似这样的操作在api中是没有的. 但依然可以通过rest的方式完成这些操作 当新增一条凭据信息时可以通过浏览器的网络监控看到他的请求内容 可以看到操作凭证的请求地址.请求参数列表,这里的参数列表的参数非常的多,但实际上在源码中读取的只有红框中的json的这一段.用postman来模拟调用一下 调用后会发现是失

C++ set的使用方法详解

C++ set的使用方法详解 set也是STL中比较常见的容器.set集合容器实现了红黑树的平衡二叉检索树的数据结构,它会自动调整二叉树的排列,把元素放到适当的位置.set容器所包含的元素的值是唯一的,集合中的元素按一定的顺序排列. 我们构造set集合的目的是为了快速的检索,不可直接去修改键值. set的一些常见操作: begin() 返回指向第一个元素的迭代器 clear() 清除所有元素 count() 返回某个值元素的个数 empty() 如果集合为空,返回true(真) end() 返回

探索PowerShell (四) PowerShell的对象、格式与参数

今天贴博文晚了,感谢各位能继续关注! 本节将要给大家介绍一下PowerShell下的对象,基本格式以及参数.依然属于PowerShell的基础. PowerShell中的对象 在本教程开篇我们说过,PowerShell是基于面向对象化的,不像传统的shell那样基于文本.这其中最主要的原因就是因为Win平台在管理操作上主要以面向对象为主,因此为了符合系统特点和我们的操作习惯,PowerShell也继承了这一特色.因此,不像传统的shell,在PowerShell中,我们可以随意地与对象进行互动,

js 面向对象的技术创建高级 Web 应用程序

JavaScript 对象是词典 在 C++ 或 C# 中,在谈论对象时,是指类或结构的实例.对象有不同的属性和方法,具体取决于将它们实例化的模板(即类).而 JavaScript 对象却不是这样.在 JavaScript 中,对象只是一组名称/值对,就是说,将 JavaScript 对象视为包含字符串关键字的词典.我们可以使用熟悉的"."(点)运算符或"[]"运算符,来获得和设置对象的属性,这是在处理词典时通常采用的方法.以 下代码段 复制代码 代码如下: var

详解Angular 中 ngOnInit 和 constructor 使用场景

1. constructor constructor应该是ES6中明确使用constructor来表示构造函数的,构造函数使用在class中,用来做初始化操作.当包含constructor的类被实例化时,构造函数将被调用. 来看例子: class AppComponent { public name: string; constructor(name) { console.log('Constructor initialization'); this.name = name; } } let a

java中treemap和treeset实现红黑树

TreeMap 的实现就是红黑树数据结构,也就说是一棵自平衡的排序二叉树,这样就可以保证当需要快速检索指定节点. TreeSet 和 TreeMap 的关系 为了让大家了解 TreeMap 和 TreeSet 之间的关系,下面先看 TreeSet 类的部分源代码: public class TreeSet<E> extends AbstractSet<E> implements NavigableSet<E>, Cloneable, java.io.Serializab

Python中str is not callable问题详解及解决办法

Python中str is not callable问题详解及解决办法 问题提出: 在Python的代码,在运行过程中,碰到了一个错误信息: python代码: def check_province_code(province, country): num = len(province) while num <3: province = ''.join([str(0),province]) num = num +1 return country + province 运行的错误信息: check

PHP之十六个魔术方法详细介绍

PHP中把以两个下划线__开头的方法称为魔术方法(Magic methods),这些方法在PHP中充当了举足轻重的作用. 魔术方法包括: __construct(),类的构造函数 __destruct(),类的析构函数 __call(),在对象中调用一个不可访问方法时调用 __callStatic(),用静态方式中调用一个不可访问方法时调用 __get(),获得一个类的成员变量时调用 __set(),设置一个类的成员变量时调用 __isset(),当对不可访问属性调用isset()或empty(