SpringBoot整合RabbitMQ实现延迟队列的示例详解

目录
  • 如何保证消息不丢失
    • 什么是消息投递可靠性
  • ttl死信队列
    • 什么是死信队列
    • 消息有哪几种情况成为死信
  • 延迟队列
  • springboot整合rabbitmq实现订单超时自动关闭

如何保证消息不丢失

rabbitmq消息投递路径

生产者->交换机->队列->消费者

总的来说分为三个阶段。

  • 1.生产者保证消息投递可靠性。
  • 2.mq内部消息不丢失。
  • 3.消费者消费成功。

什么是消息投递可靠性

简单点说就是消息百分百发送到消息队列中。

我们可以开启confirmCallback

生产者投递消息后,mq会给生产者一个ack.根据ack,生产者就可以确认这条消息是否发送到mq.

开启confirmCallback

修改配置文件

#NONE:禁用发布确认模式,是默认值,CORRELATED:发布消息成功到交换器后会触发回调方法
spring:
  rabbitmq:
    publisher-confirm-type: correlated

测试代码

@Test
public void testConfirmCallback() throws InterruptedException {
    rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
    /**
    *
    * @param correlationData 配置
    * @param ack 交换机是否收到消息,true是成功,false是失败
    * @param cause 失败的原因
    */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        System.out.println("confirm=====>");
        System.out.println("confirm==== ack="+ack);
        System.out.println("confirm==== cause="+cause);
        //根据ACK状态做对应的消息更新操作 TODO
    }
    });
    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"ikun.mei", "鸡你太美");
    Thread.sleep(10000);
}

通过returnCallback保证消息从交换器发送到队列成功。 修改配置文件

spring:
  rabbitmq:
    #开启returnCallback
    publisher-returns: true
    #交换机处理消息到路由失败,则会返回给生产者
    template:
      mandatory: true

测试代码

@Test
void testReturnCallback() {
    //为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定
    rabbitTemplate.setMandatory(true);
    //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
    rabbitTemplate.setReturnsCallback(returned -> {
        int code = returned.getReplyCode();
        System.out.println("code="+code);
        System.out.println("returned="+ returned);
    });
    rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"123456","测试returnCallback");
}

消费者消费消息时需要通过ack手动确认消息已消费。

修改配置文件

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: manual

编写测试代码

@RabbitHandler
public void consumer(String body, Message message, Channel channel) throws IOException {
    long msgTag = message.getMessageProperties().getDeliveryTag();
    System.out.println("msgTag="+msgTag);
    System.out.println("message="+ message);
    System.out.println("body="+body);  

    //成功确认,使用此回执方法后,消息会被 rabbitmq broker 删除
    channel.basicAck(msgTag,false);
    // channel.basicNack(msgTag,false,true);  

}

deliveryTags是消息投递序号,每次消费消息或者消息重新投递后,deliveryTag都会增加

ttl死信队列

什么是死信队列

没有被及时消费的消息存放的队列

消息有哪几种情况成为死信

  • 消费者拒收消息 (basic.reject/ basic.nack) ,并且没有重新入队 requeue=false
  • 消息在队列中未被消费,且超过队列或者消息本身的过期时间TTL(time-to-live)
  • 队列的消息长度达到极限
  • 结果:消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

死信队列经常用来做延迟队列消费。

延迟队列

生产者投递到mq中并不希望这条消息立马被消费,而是等待一段时间后再去消费。

springboot整合rabbitmq实现订单超时自动关闭

package com.fandf.test.rabbit;  

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;  

import java.util.HashMap;
import java.util.Map;  

/**
* @author fandongfeng
* @date 2023/4/15 15:38
*/
@Configuration
public class RabbitMQConfig {  

    /**
    * 订单交换机
    */
    public static final String ORDER_EXCHANGE = "order_exchange";
    /**
    * 订单队列
    */
    public static final String ORDER_QUEUE = "order_queue";
    /**
    * 订单路由key
    */
    public static final String ORDER_QUEUE_ROUTING_KEY = "order.#";  

    /**
    * 死信交换机
    */
    public static final String ORDER_DEAD_LETTER_EXCHANGE = "order_dead_letter_exchange";
    /**
    * 死信队列 routingKey
    */
    public static final String ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY = "order_dead_letter_queue_routing_key";  

    /**
    * 死信队列
    */
    public static final String ORDER_DEAD_LETTER_QUEUE = "order_dead_letter_queue";  

    /**
    * 创建死信交换机
    */
    @Bean("orderDeadLetterExchange")
    public Exchange orderDeadLetterExchange() {
        return new TopicExchange(ORDER_DEAD_LETTER_EXCHANGE, true, false);
    }  

    /**
    * 创建死信队列
    */
    @Bean("orderDeadLetterQueue")
    public Queue orderDeadLetterQueue() {
        return QueueBuilder.durable(ORDER_DEAD_LETTER_QUEUE).build();
    }  

    /**
    * 绑定死信交换机和死信队列
    */
    @Bean("orderDeadLetterBinding")
    public Binding orderDeadLetterBinding(@Qualifier("orderDeadLetterQueue") Queue queue, @Qualifier("orderDeadLetterExchange")Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY).noargs();
    }  

    /**
    * 创建订单交换机
    */
    @Bean("orderExchange")
    public Exchange orderExchange() {
        return new TopicExchange(ORDER_EXCHANGE, true, false);
    }  

    /**
    * 创建订单队列
    */
    @Bean("orderQueue")
    public Queue orderQueue() {
        Map<String, Object> args = new HashMap<>(3);
        //消息过期后,进入到死信交换机
        args.put("x-dead-letter-exchange", ORDER_DEAD_LETTER_EXCHANGE);  

        //消息过期后,进入到死信交换机的路由key
        args.put("x-dead-letter-routing-key", ORDER_DEAD_LETTER_QUEUE_ROUTING_KEY);  

        //过期时间,单位毫秒
        args.put("x-message-ttl", 10000);  

        return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();
    }  

    /**
    * 绑定订单交换机和队列
    */
    @Bean("orderBinding")
    public Binding orderBinding(@Qualifier("orderQueue") Queue queue, @Qualifier("orderExchange")Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ORDER_QUEUE_ROUTING_KEY).noargs();
    }  

}

消费者

package com.fandf.test.rabbit;  

import cn.hutool.core.date.DateUtil;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;  

import java.io.IOException;  

/**
* @author fandongfeng
* @date 2023/4/15 15:42
*/
@Component
@RabbitListener(queues = RabbitMQConfig.ORDER_DEAD_LETTER_QUEUE)
public class OrderMQListener {  

    @RabbitHandler
    public void consumer(String body, Message message, Channel channel) throws IOException {
        System.out.println("收到消息:" + DateUtil.now());
        long msgTag = message.getMessageProperties().getDeliveryTag();
        System.out.println("msgTag=" + msgTag);
        System.out.println("message=" + message);
        System.out.println("body=" + body);
        channel.basicAck(msgTag, false);
    }  

}

测试类

@Test
void testOrder() throws InterruptedException {
//为true,则交换机处理消息到路由失败,则会返回给生产者 配置文件指定,则这里不需指定
    rabbitTemplate.setMandatory(true);
    //开启强制消息投递(mandatory为设置为true),但消息未被路由至任何一个queue,则回退一条消息
    rabbitTemplate.setReturnsCallback(returned -> {
    int code = returned.getReplyCode();
    System.out.println("code=" + code);
    System.out.println("returned=" + returned);
    });
    rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, "order", "测试订单延迟");
    System.out.println("发送消息:" + DateUtil.now());
    Thread.sleep(20000);
}

程序输出

发送消息:2023-04-16 15:14:34
收到消息:2023-04-16 15:14:44
msgTag=1
message=(Body:'测试订单延迟' MessageProperties [headers={spring_listener_return_correlation=03169cfc-5061-41fe-be47-c98e36d17eac, x-first-death-exchange=order_exchange, x-death=[{reason=expired, count=1, exchange=order_exchange, time=Mon Apr 16 15:14:44 CST 2023, routing-keys=[order], queue=order_queue}], x-first-death-reason=expired, x-first-death-queue=order_queue}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=order_dead_letter_exchange, receivedRoutingKey=order_dead_letter_queue_routing_key, deliveryTag=1, consumerTag=amq.ctag-Eh8GMgrsrAH1rvtGj7ykOQ, consumerQueue=order_dead_letter_queue])
body=测试订单延迟

到此这篇关于SpringBoot整合RabbitMQ实现延迟队列的示例详解的文章就介绍到这了,更多相关SpringBoot RabbitMQ延迟队列内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • RabbitMQ 实现延迟队列的两种方式详解

    目录 1. 用插件 1.1 安装插件 1.2 消息收发 2. DLX 实现延迟队列 2.1 延迟队列实现思路 2.2 案例 3. 小结 定时任务各种各样,常见的定时任务例如日志备份,我们可能在每天凌晨 3 点去备份,这种固定时间的定时任务我们一般采用 cron 表达式就能轻松的实现,还有一些比较特殊的定时任务,向大家看电影中的定时炸弹,3分钟后爆炸,这种定时任务就不太好用 cron 去描述,因为开始时间不确定,我们开发中有的时候也会遇到类似的需求,例如: 在电商项目中,当我们下单之后,一般需要

  • SpringBoot整合RabbitMQ处理死信队列和延迟队列

    目录 简介 实例代码 路由配置 控制器 发送器 接收器 application.yml 实例测试 简介 说明 本文用示例介绍SpringBoot整合RabbitMQ时如何处理死信队列/延迟队列. RabbitMQ消息简介 RabbitMQ的消息默认不会超时. 什么是死信队列?什么是延迟队列? 死信队列: DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,

  • RabbitMQ死信机制实现延迟队列的实战

    目录 延迟队列 应用场景 Time To Live(TTL) Dead Letter Exchanges(DLX) 延迟队列 延迟队列存储的对象肯定是对应的延时消息,所谓"延时消息"是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 应用场景 三方支付,扫码支付调用上游的扫码接口,当扫码有效期过后去调用查询接口查询结果.实现方式:每当一笔扫码支付请求后,立即将此订单号放入延迟队列中(RabbitMQ),队列过期时间为二维码有效期,此队列

  • 详解RabbitMQ中死信队列和延迟队列的使用详解

    目录 简介 死信队列 简介 示例 延迟队列 简介 使用场景 简介 本文介绍RabbitMQ的死信队列和延迟队列. 本内容也是Java后端面试中常见的问题. 死信队列 简介 DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列. 以下几种情况会导致消息变成死信: 消息被拒绝(Basic.Reject/Ba

  • SpringBoot2 整合FreeMarker实现页面静态化示例详解

    一.页面静态化 1.动静态页面 静态页面 即静态网页,指已经装载好内容HTML页面,无需经过请求服务器数据和编译过程,直接加载到客户浏览器上显示出来.通俗的说就是生成独立的HTML页面,且不与服务器进行数据交互. 优缺点描述: 静态网页的内容稳定,页面加载速度极快: 不与服务器交互,提升安全性: 静态网页的交互性差,数据实时性很低: 维度成本高,生成很多HTML页面: 动态页面 指跟静态网页相对的一种网页编程技术,页面的内容需要请求服务器获取,在不考虑缓存的情况下,服务接口的数据变化,页面加载的

  • SpringBoot整合Swagger和Actuator的使用教程详解

    前言 本篇文章主要介绍的是SpringBoot整合Swagger(API文档生成框架)和SpringBoot整合Actuator(项目监控)使用教程. SpringBoot整合Swagger 说明:如果想直接获取工程那么可以直接跳到底部,通过链接下载工程代码. Swagger 介绍 Swagger 是一套基于 OpenAPI 规范构建的开源工具,可以帮助我们设计.构建.记录以及使用 Rest API.Swagger 主要包含了以下三个部分: Swagger Editor:基于浏览器的编辑器,我们

  • springboot整合websocket最基础入门使用教程详解

    项目最终的文件结构 1 添加maven依赖 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-websocket</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <

  • springboot整合websocket实现群聊思路代码详解

    实现思路 发送者向服务器发送大家早上好.其它客户端可以收到对应消息. 项目展示 通过springboot引入websocket,实现群聊,通过在线websocket测试进行展示. 核心代码 pom引入jar <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2

  • SpringBoot整合阿里云开通短信服务详解

    准备工作 开通短信服务 如果开通不成功,就只能借下别人已经开通好的短信,如果不想重复,可在其下创建一个新的模板管理 这里只是介绍如何使用 导入依赖 com.aliyun aliyun-java-sdk-core 4.5.1 com.aliyun aliyun-java-sdk-dysmsapi 1.1.0 com.alibaba fastjson 1.2.62 发送验证码到手机上,验证码生成工具类(内容较为固定,也可根据需求改) package com.xsha.msmservice.utils

  • Springboot整合minio实现文件服务的教程详解

    首先pom文件引入相关依赖 <!--minio--> <dependency> <groupId>io.minio</groupId> <artifactId>minio</artifactId> <version>3.0.10</version> </dependency> springboot配置文件application.yml 里配置minio信息 #minio配置 minio: endpo

  • SpringBoot整合EasyExcel进行大数据处理的方法详解

    目录 EasyExcel 需要的Maven 基础读案例 操作的excel 实体类 读取监听器 测试 基础写案例 实体类 测试 Excel模板方式 准备模块 实体类 测试 EasyExcel EasyExcel文档 我用过Poi和EasyPoi这些工具总体来说: POI 优点我觉得自由,但是迎来的就是复杂度,和大数据量时候性能的缺点 EasyPoi基于POI 的二次封装,解决了大部分的常用场景,简化了代码,但是特别复杂表格处理还是不行,而且性能的话和poi差不多,简单来说就是简化了Poi的操作,少

  • C语言实现队列的示例详解

    目录 前言 一. 什么是队列 二. 使用什么来实现栈 三. 队列的实现 3.1头文件 3.2 函数的实现 四.完整代码 前言 前一段时间,我们试着用C语言实现了数据结构中的顺序表,单链表,双向循环链表,栈.今天我们再用C语言来实现另一种特殊的线性结构:队列 一. 什么是队列 队列是一种特殊的线性表,特殊之处在于它只允许在表的前端(head)进行删除操作,而在表的后端(tail)进行插入操作,和栈一样,队列是一种操作受限制的线性表.进行插入操作的端称为队尾,进行删除操作的端称为队头. 这个队列就可

随机推荐