并行Stream与Spring事务相遇会发生什么?

目录
  • 事务不生效的代码
  • JDK 8 的Stream
  • @Transactional事务处理
  • Bug综合分析
  • 问题拓展
  • 小结

前言:

事情是这样的:运营人员反馈,通过Excel导入数据时,有一部分成功了,有一部分未导入。初步猜测,是事务未生效导致的。查看代码,发现导入部分已经通过@Transcational注解进行事务控制了,为什么还会出现事务不生效的问题呢?下面我们就进行具体的案例分析,Let's go!

事务不生效的代码

这里写一段简单的伪代码来演示展示一下事务不生效的代码:

  @Transactional(rollbackFor = Exception.class)
  public void batchInsert(List<Order> list) {
    list.parallelStream().forEach(order -> orderMapper.save(order));
  }

逻辑很简单,遍历list,然后批量插入Order数据到数据库。在该方法上使用@Transactional来声明出现异常时进行回滚。

但事实情况是,其中某一条数据执行异常时,事务并没有进行回滚。这到底是为什么呢?

下面一探究竟。

JDK 8 的Stream

上面代码中涉及到了两个知识点:parallelStream和@Transactional,我们先来铺垫一下parallelStream相关知识。

在JDK8 中引入了Stream API的概念和实现,这里的Stream有别于 InputStream 和OutputStream,Stream API 是处理对象流而不是字节流。

比如,我们可以通过如下方式来基于Stream进行实现:

List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
numbers.stream().forEach(num->System.out.println(num));

输出:1 2 3 4 5 6 7 8 9

代码看起来方便清爽多了。

关于Stream的基本处理流程如下:

在这些Stream API中,还提供了一个并行处理的API,也就是parallelStream。它可以将任务拆分子任务,分发给多个处理器同时处理,之后合并。这样做的目的很明显是为了提升处理效率。

parallelStream的基本使用方式如下:

// 并行执行流
list.stream().parallel().filter(e -> e > 10).count()

针对上述代码,对应的流程如下:

而parallelStream会将流划分成多个子流,分散到不同的CPU并行处理,然后合并处理结果。其中,parallelStream默认是基于ForkJoinPool.commonPool()线程池来实现并行处理的。

通常情况下,我们可以认为并行会比串行快,但还是有前提条件的:

  • 处理器核心数量:并行处理核心数越多,处理效率越高;
  • 处理数据量:处理数据量越大优势越明显;

但并行处理也面临着一系列的问题,比如:资源竞争、死锁、线程切换、事务、可见性、线程安全等问题。

@Transactional事务处理

上面了解了parallelStream的基本原理及特性之后,再来看看@Transactional的事务处理特性。

@Transactional是Spring提供的基于注解的一种声明式事务方式,该注解只能运用到public的方法上。

基本原理:当一个方法被@Transactional注解之后,Spring会基于AOP在方法执行之前开启一个事务。当方法执行完毕之后,根据方法是否报错,来决定回滚或提交事务。

在默认代理模式下,只有目标方法由外部方法调用时,才能被Spring的事务拦截器拦截。所以,在同一个类中的两个方法直接调用,不会被Spring的事务拦截器拦截。这是事务不生效的一个场景,但在上述案例中,并不存在这种情况。

Spring在处理事务时,会从连接池中获得一个jdbc connection,将连接绑定到线程上(基于ThreadLocal),那么同一个线程中用到的就是同一个connection了。具体实现在DataSourceTransactionManager#doBegin方法中。

Bug综合分析

在了解了parallelStream和@Transactional的相关知识之后,我们会发现:parallelStream处理时开启了多线程,而@Transactional在处理事务时会(基于ThreadLocal)将连接绑定到当前线程,由于@Transactional绑定管理的是主线程的事务,而parallelStream开启的新的线程与主线程无关。因此,事务也就无效了。

此时,将parallelStream改为普通的stream,事务可正常回滚。这就提示我们,在使用基于@Transactional方式管理事务时,慎重使用多线程处理。

问题拓展

虽然parallelStream带来了更高的性能,但也要区分场景进行使用。即便是在不需要事务管理的情况下,如果parallelStream使用不当,也会造成同一时间对数据库发起大量请求等问题。

因此,在stream与parallelStream之间进行选择时,还要考虑几个问题:

  • 是否需要并行?数据量比较大,处理器核心数比较多的情况下才会有性能提升。
  • 任务之间是否是独立的,是否会引起任何竞态条件?比如:是否共享变量。
  • 执行结果是否取决于任务的调用顺序?并行执行的顺序是不确定的。

小结

文章讲述的Bug虽然简单,但如果不了解parallelStream与@Transactional注解的特性,还是很难排查的。而且也让我们意识到,虽然Spring通过@Transactional将事务管理进行了简化处理,但作为开发者,还是需要深入了解一下它的基本运作原理。不然,在排查bug时,很容易踩坑。

到此这篇关于并行Stream与Spring事务相遇会发生什么?的文章就介绍到这了,更多相关 Stream与Spring 内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • Spring Cloud Stream简单用法

    目录 简单使用Spring Cloud Stream 构建基于RocketMQ的生产者和消费者 生产者 消费者 Stream其他特性 消息发送失败的处理 消费者错误处理 Spring Cloud Stream对Spring Cloud体系中的Mq进⾏了很好的上层抽象,可以让我们与具体消息中间件解耦合,屏蔽掉了底层具体MQ消息中间件的细节差异,就像Hibernate屏蔽掉了具体数据库(Mysql/Oracle⼀样).如此⼀来,我们学习.开发.维护MQ都会变得轻松.⽬前Spring Cloud St

  • springboot 中 inputStream 神秘消失之谜(终破)

    序言 最近小明接手了前同事的代码,意料之外.情理之中的遇到了坑. 为了避免掉入同一个坑两次,小明决定把这个坑记下来,并在坑前立一个大牌子,避免其他小伙伴掉进去. HTTPClient 模拟调用 为了把这个问题说明,我们首先从最简单的 http 调用说起. 设置 body 服务端 服务端的代码如下: @Controller @RequestMapping("/") public class ReqController { @PostMapping(value = "/body&

  • Springcloud整合stream,rabbitmq实现消息驱动功能

    springcloud整合stream,rabbitmq实现消息驱动功能 1.代码实现: 创建项目stream 添加依赖 <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.6.2</version> <relativePath/>

  • 解决spring 处理request.getInputStream()输入流只能读取一次问题

    一般我们会在InterceptorAdapter拦截器中对请求进行验证 正常普通接口请求,request.getParameter()可以获取,能多次读取 如果我们的接口是用@RequestBody来接受数据,那么我们在拦截器中 需要读取request的输入流 ,因为 ServletRequest中getReader()和getInputStream()只能调用一次 这样就会导致controller 无法拿到数据. 解决方法 : 1.自定义一个类 BodyReaderHttpServletReq

  • spring-cloud-stream结合kafka使用详解

    1.pom文件导入依赖 <!-- kafka --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-kafka</artifactId> </dependency> 2.application.yml文件配置 spring: cloud: stream: kafka: bind

  • SpringCloud Stream 整合RabbitMQ的基本步骤

    目录 一.项目介绍 二.生产者 三.消费者 四.验证 在postman 访问生产者接口: 本篇简单介绍SpringCloud Stream 整合RabbitMQ基本步骤: 引入SpringCloud 引入SpringCloud Stream相关依赖 定义绑定接口: 消息生产者(Output…Binding) .消息消费者(Input…Binding) @EnableBinding 在对应类上进行定义 @StreamListener 在对应方法上创建监听用来消费消息 调用output的send()

  • springMarchal集成xStream的完整示例代码

    Xstream是一个库, 用于将对象序列化为xml, 反之亦然, 而无需任何映射文件.注意, castor需要一个映射文件. XStreamMarshaller类提供了将对象封送为xml的工具, 反之亦然. 1.maven GAV(1.4.11.1版本才兼容): <dependency> <groupId>com.thoughtworks.xstream</groupId> <artifactId>xstream</artifactId> <

  • springboot整合mongodb changestream的示例代码

    目录 前言 ChangeStream介绍 环境准备 Java客户端操作changestream 1.引入maven依赖 2.测试类核心代码 下面来看看具体的整合步骤 1.引入核心依赖 2.核心配置文件 3.编写实体类,映射comment集合中的字段 4.编写一个服务类 5.编写一个接口 6.接下来,只需要依次添加下面3个配置类即可 典型应用场景 数据迁移 应用监控 对接大数据应用 前言 changestream是monggodb的3.6版本之后出现的一种基于collection(数据库集合)的变

  • 并行Stream与Spring事务相遇会发生什么?

    目录 事务不生效的代码 JDK 8 的Stream @Transactional事务处理 Bug综合分析 问题拓展 小结 前言: 事情是这样的:运营人员反馈,通过Excel导入数据时,有一部分成功了,有一部分未导入.初步猜测,是事务未生效导致的.查看代码,发现导入部分已经通过@Transcational注解进行事务控制了,为什么还会出现事务不生效的问题呢?下面我们就进行具体的案例分析,Let's go! 事务不生效的代码 这里写一段简单的伪代码来演示展示一下事务不生效的代码: @Transact

  • Spring事务管理只对出现运行期异常进行回滚

    一.结论 Spring的事务管理默认只对出现运行期异常(java.lang.RuntimeException及其子类)进行回滚. 如果一个方法抛出Exception或者Checked异常,Spring事务管理默认不进行回滚. 关于异常的分类一下详细介绍: 1.基本概念 看java的异常结构图  Throwable是所有异常的根,java.lang.Throwable Error是错误,java.lang.Error Exception是异常,java.lang.Exception 2.Excep

  • Spring事务传播属性和隔离级别详细介绍

    1 事务的传播属性(Propagation) 1) REQUIRED ,这个是默认的属性 Support a current transaction, create a new one if none exists. 如果存在一个事务,则支持当前事务.如果没有事务则开启一个新的事务. 被设置成这个级别时,会为每一个被调用的方法创建一个逻辑事务域.如果前面的方法已经创建了事务,那么后面的方法支持当前的事务,如果当前没有事务会重新建立事务. 2) MANDATORY Support a curren

  • Spring事务失效场景原理及解决方案

    1.事务失效-自身调用(通过REQUIRES.REQUIRES_NEW传播属性):自身调用即调该类自己的方法. 同类OrderServiceImpl 中 doSomeThing()方法 不存在事务,该方法去调用本类中的存在事务注解的 insertAndUpdateOrderInfo() 方法.但是insertAndUpdateOrderInfo() 其实是无法保证预想的事务性. 示列验证: OrderServiceImpl.insertAndUpdateOrderInfo方法中upateData

  • 深入理解spring事务

    事务介绍 一个事务要么同时成功,要么同时失败 特性 Atomic原子性 事务是由一个或多个活动组成的一个工作单元.原子性确保事务中的所有操作全部发生或全部不发生 Consistent一致性 一旦事务完成,系统必须确保它所建模的业务处于一致的状态 Isolated隔离性 事务允许多个用户对数据进行操作,每个用户的操作不会与其他用户纠缠在一起 Durable持久性 一旦事务完成,事务的结果应该持久化 事务隔离级别 DEFAULT 使用底层数据库预设的隔离层级 READ_UNCOMMITTED (读未

  • 详细谈谈Spring事务是如何管理的

    目录 前言 Spring事务抽象 PlatformTransactionManager是事务管理器接口 常见的事务管理器有以下几种 定义事务的一些参数: 7种事务传播特性: 四种事务隔离级别: Spring之编程式事务 声明式事务 为什么加了@Transactional注解事物就生效了? 通过注解怎么实现指定的传播特性和隔离级别的? 事务失效的8种情况及解决办法 总结 前言 我们都知道Spring给我们提供了很多抽象,比如我们在操作数据库的过程中,它为我们提供了事务方面的抽象,让我们可以非常方便

  • Spring事务传播中嵌套调用实现方法详细介绍

    目录 前言 7种传播方式 注解式事务 事务的方法之间的调用 注意事项 前言 最近在使用Spring框架时遇到了一些问题,主要是Spring的事务传播问题,一个不带事务的方法调用带事务的方法,有时候会出现不回滚的情况,所以写了这篇文章来记录一下. 7种传播方式 我们先来看Spring事务的7中传播方式以及对应的描述 属性名称 值 描述 PROPAGATION__REQUIRED REQUIRED 表示的是当前这个方法必须运行在一个事务环境中,如果当前方法已经处于事务环境中,就可以直接使用该方法,否

  • 深入理解Spring事务原理

    一.事务的基本原理 Spring事务的本质其实就是数据库对事务的支持,没有数据库的事务支持,spring是无法提供事务功能的.对于纯JDBC操作数据库,想要用到事务,可以按照以下步骤进行:  1.获取连接 Connection con = DriverManager.getConnection()  2.开启事务con.setAutoCommit(true/false);  3.执行CRUD  4.提交事务/回滚事务 con.commit() / con.rollback();  5.关闭连接

  • Spring 事务隔离与事务传播的详解与对比

    Spring 事务隔离与事务传播的详解与对比 Spring是SSH中的管理员,负责管理其它框架,协调各个部分的工作.今天一起学习一下Spring的事务管理.Spring的事务管理分为声明式跟编程式.声明式就是在Spring的配置文件中进行相关配置:编程式就是用注解的方式写到代码里. Spring配置文件中关于事务配置总是由三个组成部分,分别是DataSource.TransactionManager和代理机制这三部分,无论哪种配置方式,一般变化的只是代理机制这部分. DataSource. Tr

  • JSP 中spring事务配置详解

    JSP 中spring事务配置详解 前几天被问到,如何防止服务器宕机,造成的数据操作的不完全. 问了一下同事,是事务.哎,恍然大悟,迷糊一时了. 声明式的事务配置,这个是最推荐的,配置到service层. <?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:context=&

随机推荐