Reactor中的onErrorContinue 和 onErrorResume

目录
  • 前言
  • 1 基础功能
  • 2 只有 onErrorResume ()
  • 3 只有 onErrorContinue()
  • 4 onErrorResume() 然后 onErrorContinue()
  • 5 使用 onErrorResume() 模拟 onErrorContinue()
  • 6 使用 onErrorResume() 和下游的 onErrorContinue() 模拟 onErrorContinue()

前言

这似乎是 Reactor 的热门搜索之一,至少当我在谷歌中输入 onErrorContinue 时,onErrorResume 会在它旁边弹出。让我把我的测试代码和我的一些解释粘贴在下面。

1 基础功能

这是一个简单的函数,将 5 个连续的数字分别乘以 2,然后相加,当 i==2 时抛出一个异常:

public static void main(String... args) {
      Flux.range(1,5)
              .doOnNext(i -> System.out.println("input=" + i))
              .map(i -> i == 2 ? i / 0 : i)
              .map(i -> i * 2)
              .reduce((i,j) -> i+j)
              .doOnNext(i -> System.out.println("sum=" + i))
              .block();
  }

显然,输出如下:

input=1
input=2
Exception in thread "main" java.lang.ArithmeticException: / by zero

2 只有 onErrorResume ()

public static void main(String... args) {
      Flux.range(1,5)
              .doOnNext(i -> System.out.println("input=" + i))
              .map(i -> i == 2 ? i / 0 : i)
              .map(i -> i * 2)
              .onErrorResume(err -> {
                  log.info("onErrorResume");
                  return Flux.empty();
              })
              .reduce((i,j) -> i+j)
              .doOnNext(i -> System.out.println("sum=" + i))
              .block();
  }

输出如下:

input=1
input=2
17:40:47.828 [main] INFO com.example.demo.config.TestRunner - onErrorResume
sum=2

正如官方文档描述(onErrorResume)的那样,onErrorResume 用返回内容替换 Flux,因此在 2 之后的数据不会被处理。唯一值得一提的是,onErrorResume() 不必马上在错误之后捕获异常。在 onErrorContinue() 的官网文档中(onErrorContinue),只有 onErrorContinue() 强调了了 upstream 关键词,但显然,它还有其他含义。

3 只有 onErrorContinue()

public static void main(String... args) {
  Flux.range(1,5)
          .doOnNext(i -> System.out.println("input=" + i))
          .map(i -> i == 2 ? i / 0 : i)
          .map(i -> i * 2)
          .onErrorContinue((err, i) -> {log.info("onErrorContinue={}", i);})
          .reduce((i,j) -> i+j)
          .doOnNext(i -> System.out.println("sum=" + i))
          .block();
}

输出:

input=1
input=2
17:43:10.656 [main] INFO com.example.demo.config.TestRunner - onErrorContinue=2
input=3
input=4
input=5
sum=26

显然,onErrorContinue 会丢弃错误数据 2, 然后继续数字 3 直到 5。

4 onErrorResume() 然后 onErrorContinue()

public static void main(String... args) {
    Flux.range(1,5)
            .doOnNext(i -> System.out.println("input=" + i))
            .map(i -> i == 2 ? i / 0 : i)
            .map(i -> i * 2)
            .onErrorResume(err -> {
                log.info("onErrorResume");
                return Flux.empty();
            })
            .onErrorContinue((err, i) -> {log.info("onErrorContinue={}", i);})
            .reduce((i,j) -> i+j)
            .doOnNext(i -> System.out.println("sum=" + i))
            .block();
}

输出和上面一样:

input=1
input=2
17:47:05.789 [main] INFO com.example.demo.config.TestRunner - onErrorContinue=2
input=3
input=4
input=5
sum=26

这样的结果,你想到了吗?onErrorContinue() 会在 onErrorResume() 得到错误之前处理这个错误。当两个错误处理函数在同一个函数中的时候很明显,但是当你的函数中只有 onErrorResume(),而一些调用者实际上有 onErrorContinue() 时,你的 onErrorResume() 没有被调用的原因可能就不那么明显了。

5 使用 onErrorResume() 模拟 onErrorContinue()

有些博客建议我们完全不用 onErrorContinue(),且在所有场景中仅用 onErrorResume()。但是上述示例已经展示了它们会产生不同的结果。那我们怎么实现呢?

public static void main(String... args) {
    Flux.range(1,5)
            .doOnNext(i -> System.out.println("input=" + i))
            .flatMap(i -> Mono.just(i)
                    .map(j -> j == 2 ? j / 0 : j)
                    .map(j -> j * 2)
                    .onErrorResume(err -> {
                        System.out.println("onErrorResume");
                        return Mono.empty();
                    })
            )
            .reduce((i,j) -> i+j)
            .doOnNext(i -> System.out.println("sum=" + i))
            .block();
}

因此,本质上是将可能在 flatMap 或 concatMap 中抛出错误的操作包装起来,并在其上使用 onErrorResume()。这样,它会产生相同的结果:

input=1
input=2
onErrorResume
input=3
input=4
input=5
sum=26

6 使用 onErrorResume() 和下游的 onErrorContinue() 模拟 onErrorContinue()

有时候,onErrorContinue() 放在调用程序中,您无法控制它。但你仍然需要 onErrorResume()。你该怎么办?

public static void main(String... args) {
    Flux.range(1,5)
            .doOnNext(i -> System.out.println("input=" + i))
            .flatMap(i -> Mono.just(i)
                    .map(j -> j == 2 ? j / 0 : j)
                    .map(j -> j * 2)
                    .onErrorResume(err -> {
                        System.out.println("onErrorResume");
                        return Mono.empty();
                    })
                    .onErrorStop()
            )
            .onErrorContinue((err, i) -> {log.info("onErrorContinue={}", i);})
            .reduce((i,j) -> i+j)
            .doOnNext(i -> System.out.println("sum=" + i))
            .block();
}

秘诀是在 onErrorResume() 代码块的末尾添加 onErrorStop() ——这会阻塞 onErrorContinue(),这样它就不会在 onErrorResume() 之前占用错误。尝试删除 onErrorStop(),你会看到 onErrorContinue() 在 onErrorResume 之前弹出。

到此这篇关于Reactor中的onErrorContinue 和 onErrorResume的文章就介绍到这了,更多相关Reactor onErrorContinue 内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

时间: 2022-09-20

Reactor反应器的实现方法详解

大多数应用都会使用ACE_Reactor::instance()提供的默认反应器实例.但是你也可以选择自己的反应器,这是因为ACE使用了Bridge模式(使用两个不同的类:一个是编程接口,另一个是实现,第一个类会把各个操作传给第二个类).例如使用线程池反应器实现:ACE_TP_Reactor* tp_reactor = new ACE_TP_Reactor;ACE_Reactor* my_reactor = new ACE_Reactor(tp_reactor, 1);//1表示my_react

Project Reactor源码解析publishOn使用示例

目录 功能分析 代码示例 prefetch delayError 源码分析 Flux#publishOn() Flux#subscribe() FluxPublishOn#subscribeOrReturn() FluxPublishOn#onSubscribe() 非融合 FluxPublishOn#onNext() FluxPublishOn#trySchedule() FluxPublishOn#run() FluxPublishOn#runAsync() FluxPublishOn#ch

如何使用Reactor完成类似Flink的操作

一.背景 Flink在处理流式任务的时候有很大的优势,其中windows等操作符可以很方便的完成聚合任务,但是Flink是一套独立的服务,业务流程中如果想使用需要将数据发到kafka,用Flink处理完再发到kafka,然后再做业务处理,流程很繁琐. 比如在业务代码中想要实现类似Flink的window按时间批量聚合功能,如果纯手动写代码比较繁琐,使用Flink又太重,这种场景下使用响应式编程RxJava.Reactor等的window.buffer操作符可以很方便的实现. 响应式编程框架也早已

Reactor 多任务并发执行且结果按顺序返回第一个

目录 1 场景 2 创建 service 2.1 创建基本接口和实体类 2.2 创建 service 实现 3 主体方法 4 实现异步 4.1 subcribeOn 实现异步 4.2 CompletableFuture 实现异步 1 场景 调用多个平级服务,按照服务优先级返回第一个有效数据. 具体case:一个页面可能有很多的弹窗,弹窗之间又有优先级.每次只需要返回第一个有数据的弹窗.但是又希望所有弹窗之间的数据获取是异步的.这种场景使用 Reactor 怎么实现呢? 2 创建 service

Java中多线程Reactor模式的实现

目录 1. 主服务器 2.IO请求handler+线程池 3.客户端 多线程Reactor模式旨在分配多个reactor每一个reactor独立拥有一个selector,在网络通信中大体设计为负责连接的主Reactor,其中在主Reactor的run函数中若selector检测到了连接事件的发生则dispatch该事件. 让负责管理连接的Handler处理连接,其中在这个负责连接的Handler处理器中创建子Handler用以处理IO请求.这样一来连接请求与IO请求分开执行提高通道的并发量.同时

老生常谈java中的Future模式

jdk1.7.0_79 本文实际上是对上文<简单谈谈ThreadPoolExecutor线程池之submit方法>的一个延续或者一个补充.在上文中提到的submit方法里出现了FutureTask,这不得不停止脚步将方向转向Java的Future模式. Future是并发编程中的一种设计模式,对于多线程来说,线程A需要等待线程B的结果,它没必要一直等待B,可以先拿到一个未来的Future,等B有了结果后再取真实的结果. ExecutorService executor = Executors.

详解Java中多线程异常捕获Runnable的实现

详解Java中多线程异常捕获Runnable的实现 1.背景: Java 多线程异常不向主线程抛,自己处理,外部捕获不了异常.所以要实现主线程对子线程异常的捕获. 2.工具: 实现Runnable接口的LayerInitTask类,ThreadException类,线程安全的Vector 3.思路: 向LayerInitTask中传入Vector,记录异常情况,外部遍历,判断,抛出异常. 4.代码: package step5.exception; import java.util.Vector

java 中多线程生产者消费者问题详细介绍

java 中多线程生产者消费者问题 前言: 一般面试喜欢问些线程的问题,较基础的问题无非就是死锁,生产者消费者问题,线程同步等等,在前面的文章有写过死锁,这里就说下多生产多消费的问题了 import java.util.concurrent.locks.*; class BoundedBuffer { final Lock lock = new ReentrantLock();//对象锁 final Condition notFull = lock.newCondition(); //生产者监视

Java 中责任链模式实现的三种方式

责任链模式 责任链模式的定义:使多个对象都有机会处理请求,从而避免请求的发送者和接受者之间的耦合关系, 将这个对象连成一条链,并沿着这条链传递该请求,直到有一个对象处理他为止.这里就不再过多的介绍什么是责任链模式,主要来说说java中如何编写.主要从下面3个框架中的代码中介绍. servlet中的filter dubbo中的filter mybatis中的plugin 这3个框架在实现责任链方式不尽相同. servlet中的Filter servlet中分别定义了一个 Filter和Filter

java中多线程的超详细介绍

1.线程概述 几乎所有的操作系统都支持同时运行多个任务,一个任务通常就是一个程序,每个运行中的程序就是一个进程.当一个程序运行时,内部可能包含了多个顺序执行流,每个顺序执行流就是一个线程. 2.线程与进程 进程概述: 几乎所有的操作系统都支持进程的概念,所有运行中的任务通常对应一个进程( Process).当一个程序进入内存运行时,即变成一个进程.进程是处于运行过程中的程序,并且具有一定的独立功能,进程是系统进行资源分配和调度的一个独立单位. 进程特征: 1.独立性:进程是系统中独立存在的实体,

Java中的代理模式详解及实例代码

java 代理模式详解 前言: 在某些情况下,一个客户不想或者不能直接引用一个对象,此时可以通过一个称之为"代理"的第三者来实现间接引用.代理对象可以在客户端和目标对象之间起到 中介的作用,并且可以通过代理对象去掉客户不能看到 的内容和服务或者添加客户需要的额外服务. 简单来说代理模式就是通过一个代理对象去访问一个实际对象,并且可以像装饰模式一样给对象添加一些功能. 静态代理 所谓静态代理即在程序运行前代理类就已经存在,也就是说我们编写代码的时候就已经把代理类的代码写好了,而动态代理则

Java中多线程同步类 CountDownLatch

在多线程开发中,常常遇到希望一组线程完成之后在执行之后的操作,java提供了一个多线程同步辅助类,可以完成此类需求: 类中常见的方法: 其中构造方法: CountDownLatch(int count) 参数count是计数器,一般用要执行线程的数量来赋值. long getCount():获得当前计数器的值. void countDown():当计数器的值大于零时,调用方法,计数器的数值减少1,当计数器等数零时,释放所有的线程. void await():调所该方法阻塞当前主线程,直到计数器减

对比Java设计模式编程中的状态模式和策略模式

为了能在Java应用程序中正确的使用状态模式和策略模式,开发人员需要清楚地知道这两种模式之间的区别.尽管状态模式和策略模式的结构非常相似,它们又同样遵循开闭原则,都代表着SOLID设计原则的'O',但它们的意图是完全不同的.Java中的策略模式是对一组相关的算法进行封装,给调用方提供了运行时的灵活性.调用方可以在运行时选择不同的算法,而不用修改使用策略的那个Context类.使用策略模式的经典例子包括实现加密算法,压缩算法,以及排序算法.另一方面,状态模式使用一个对象可以在不同的状态下表现出不同

Java中的双重检查(Double-Check)详解

在 Effecitve Java 一书的第 48 条中提到了双重检查模式,并指出这种模式在 Java 中通常并不适用.该模式的结构如下所示: public Resource getResource() { if (resource == null) { synchronized(this){ if (resource==null) { resource = new Resource(); } } } return resource; } 该模式是对下面的代码改进: public synchron

Java Thread多线程全面解析

多线程是java中很重要的知识点,在此小编给大家总结Java Thread多线程,非常有用,希望大家可以掌握哦. 一.线程的生命周期及五种基本状态 关于Java中线程的生命周期,首先看一下下面这张较为经典的图: 上图中基本上囊括了Java中多线程各重要知识点.掌握了上图中的各知识点,Java中的多线程也就基本上掌握了.主要包括: Java线程具有五种基本状态 新建状态(New):当线程对象对创建后,即进入了新建状态,如:Thread t = new MyThread(); 就绪状态(Runnab