详解RabbitMQ延迟队列的基本使用和优化

目录
  • 1.延迟队列基本介绍
  • 2.延迟队列使用场景
  • 3.Spring Boot集成RabbitMQ
    • 3.1创建项目,引入依赖
    • 3.2application.properties配置文件
    • 3.3 队列TTL-代码结构图
    • 3.4MQ配置类
    • 3.5生产者代码
    • 3.6消费者代码
    • 3.7测试
  • 4.延迟队列优化
    • 4.1代码结构图
    • 4.2配置类
    • 4.3生产者
    • 4.4消费者
    • 4.5测试

1.延迟队列基本介绍

一般队列中的元素总是希望能够早点被取出来进行处理,但是延迟队列中的元素则是希望可以在指定时间内被取出和处理,延迟队列中的元素都是带有时间属性的。延迟队列就是用来存放需要在指定时间被处理的元素的队列

延迟队列就是想要消息延迟一段时间后被处理,TTL可以让消息在延迟一段时间后变成死信。变成死信的消息都会被投递到死信队列中,这样的话,只要消费者一直消费死信队列里面的消息就可以了,因为里面的消息都是希望被马上处理的消息 生产者生产一条延时消息,根据需要延时时间的不同,通过不同的routing key把消息路由到不同的延迟队列,每一个队列都设置了不同的TTL属性,并且绑定在同一个死信交换机中,消息过期了以后,根据routing key的不同,又会被路由到不同的死信队列中,消费者只需要监听对应的死信队列进行处理就可以了。注意:不要造成重复消费

2.延迟队列使用场景

下面的场景需要使用延迟队列

  1. 订单在十分钟内没有支付就自动取消
  2. 新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒
  3. 账单在一周内没有支付,就会自动结算
  4. 用户注册成功以后,如果三天内没有登录就进行短信题提醒
  5. 用户发起退款,如果三天内没有得到处理则通知相关运营人员。
  6. 预定会议以后,需要提前十分钟通知各个参会人员参加会议。

3.Spring Boot集成RabbitMQ

3.1创建项目,引入依赖

相关依赖

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.amqp</groupId>
            <artifactId>spring-rabbit-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

3.2application.properties配置文件

# RabbitMQ/配置
#服务器地址
spring.rabbitmq.host=服务器地址
#服务端口号
spring.rabbitmq.port=5672
#虚拟主机名称
spring.rabbitmq.virtual-host=/myhost
#用户名
spring.rabbitmq.username=admin
#密码
spring.rabbitmq.password=123456

3.3 队列TTL-代码结构图

3.4MQ配置类

package com.zyh.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
 * @author zengyihong
 * @create 2022--10--04 16:44
 */
@Configuration
public class TtlQueueConfiguration {
    //普通交换机
    public static final String X_EXCHANGE = "X";
    //普通队列
    public static final String QUEUE_A = "QA";
    public static final String QUEUE_B = "QB";
    //死信交换机
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    //死信队列QD
    public static final String QUEUE_D = "QD";
    /**
     * 声明普通交换机X
     *
     * @return
     */
    @Bean
    public DirectExchange xExchange() {
        return new DirectExchange(X_EXCHANGE);
    }
    /**
     * 声明队列QA
     *
     * @return
     */
    @Bean
    public Queue queueA() {
        //创建集合保存队列属性
        Map<String, Object> map = new HashMap<>();
        //设置该队列绑定的死信交换机名称
        map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //设置routing key
        map.put("x-dead-letter-routing-key", "YD");
        //设置队列延迟时间 10秒
        map.put("x-message-ttl", 10000);
        //创建队列
        return QueueBuilder.durable(QUEUE_A).withArguments(map).build();
    }
    /**
     * 把QA队列和交换机X进行绑定
     *
     * @return
     */
    @Bean
    public Binding queueA_BindingX(@Qualifier("queueA") Queue queue, @Qualifier("xExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("XA");
    }
    /**
     * 声明队列QB
     *
     * @return
     */
    @Bean
    public Queue queueB() {
        //创建集合保存队列属性
        Map<String, Object> map = new HashMap<>();
        //设置该队列绑定的死信交换机名称
        map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //设置routing key
        map.put("x-dead-letter-routing-key", "YD");
        //设置队列延迟时间 10秒
        map.put("x-message-ttl", 40000);
        //创建队列
        return QueueBuilder.durable(QUEUE_A).withArguments(map).build();
    }
    /**
     * 把QB队列和交换机X进行绑定
     *
     * @return
     */
    @Bean
    public Binding queueB_BindingX(@Qualifier("queueB") Queue queue, @Qualifier("xExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("XB");
    }
    /**
     * 声明死信交换机Y
     *
     * @return
     */
    @Bean
    public DirectExchange yExchange() {
        return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
    }
    /**
     * 声明死信队列QD
     *
     * @return
     */
    @Bean
    public Queue queueD() {
        return new Queue(QUEUE_D);
    }
    /**
     * 把死信交换机和死信队列进行绑定
     * @param queue
     * @param exchange
     * @return
     */
    @Bean
    public Binding deadLetterBindingQD(@Qualifier("queueD") Queue queue, @Qualifier("yExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("YD");
    }
}

3.5生产者代码

@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMessageController {
    @Resource
    private RabbitTemplate rabbitTemplate;
    /**
     * 生产者发送消息
     * @param message
     */
    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message){
        //记录日志
        log.info("当前时间:{},发送一条信息给两个TTL队列:{}",new Date(),message);
        //给QA队列发送消息
        rabbitTemplate.convertSendAndReceive("X","XA", "消息来自TTL为10秒的队列:"+message);
        rabbitTemplate.convertSendAndReceive("X","XB", "消息来自TTL为40秒的队列:"+message);
    }
}

3.6消费者代码

@Slf4j
@Component
public class DeadLetterQueueConsumer {
    @RabbitListener(queues = TtlQueueConfiguration.QUEUE_D)
    public void receiveQD(Message message, Channel channel){
        //获取消息
        String msg=new String(message.getBody());
        log.info("当前时间:{},收到死信队列消息:{}",new Date(),msg);
    }
}

3.7测试

启动boot项目,在浏览器输入localhost:8080/ttl/sendMessage/Hello

但是这种方式有一种缺点,现在我们只有TTL为10s和40s的延迟队列,如果我们需要其他延时时间的队列的话,那么我们又得新增其他队列,这样其实并不方便,我们想要的是能够动态设置TTL,这样就不需要为每个TTL设置新的延迟队列了。

4.延迟队列优化

4.1代码结构图

4.2配置类

在之前写的代码基础上新增一个配置类

package com.zyh.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
 * @author zengyihong
 * @create 2022--10--05 10:44
 */
@Configuration
public class MessageTtlQueueConfiguration {
    //死信交换机
    public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
    //普通队列
    public static final String QUEUE_C = "QC";
    /**
     * 声明QC队列
     * @return
     */
    @Bean
    public Queue queueC(){
        //创建集合保存队列属性
        Map<String, Object> map = new HashMap<>();
        //设置该队列绑定的死信交换机名称
        map.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE);
        //设置routing key
        map.put("x-dead-letter-routing-key", "YD");
        //设置队列延迟时间 10秒
        map.put("x-message-ttl", 10000);
        return QueueBuilder.durable(QUEUE_C).withArguments(map).build();
    }
    /**
     * 把QC队列和正常交换机X进行绑定
     *
     * @return
     */
    @Bean
    public Binding queueC_BindingX(@Qualifier("queueC") Queue queue, @Qualifier("xExchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("XC");
    }
}

4.3生产者

package com.zyh.controller;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import javax.annotation.Resources;
import java.util.Date;
/**
 * @author zengyihong
 * @create 2022--10--04 19:36
 */
@Slf4j
@RestController
@RequestMapping("/ttl")
public class SendMessageController {
    @Resource
    private RabbitTemplate rabbitTemplate;
    /**
     * 生产者发送消息
     *
     * @param message
     */
    @GetMapping("/sendMessage/{message}")
    public void sendMessage(@PathVariable String message) {
        //记录日志
        log.info("当前时间:{},发送一条信息给两个TTL队列:{}", new Date(), message);
        //给QA队列发送消息
        rabbitTemplate.convertSendAndReceive("X", "XA", "消息来自TTL为10秒的队列:" + message);
        rabbitTemplate.convertSendAndReceive("X", "XB", "消息来自TTL为40秒的队列:" + message);
    }
    /**
     * 生产者发送消息(动态设置有效期)
     *
     * @param message
     */
    @GetMapping("/sendMessage/{message}/{ttlTime}")
    public void sendMessage(@PathVariable String message, @PathVariable String ttlTime) {
        MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //设置消息有效期
                message.getMessageProperties().setExpiration(ttlTime);
                return message;
            }
        };
        //记录日志
        log.info("当前时间:{},发送一条时长{}毫秒信息给队列QC:{}", new Date(),ttlTime, message);
        //给QC队列发送消息
        rabbitTemplate.convertAndSend("X", "XC", message, messagePostProcessor);
    }
}

4.4消费者

@Slf4j
@Component
public class DeadLetterQueueConsumer {
    @RabbitListener(queues = TtlQueueConfiguration.QUEUE_D)
    public void receiveQD(Message message, Channel channel){
        //获取消息
        String msg=new String(message.getBody());
        log.info("当前时间:{},收到死信队列消息:{}",new Date(),msg);
    }
}

4.5测试

启动boot项目

在浏览器输入

http://localhost:8080/ttl/sendMessage/Hello/20000
http://localhost:8080/ttl/sendMessage/你好/2000

如果在消息属性上设置TTL的方式,那么消息可能不会按时死亡,因为RabbitMQ只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行

到此这篇关于详解RabbitMQ延迟队列的基本使用和优化的文章就介绍到这了,更多相关RabbitMQ延迟队列内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • SpringBoot整合RabbitMQ实现延迟队列的示例详解

    目录 如何保证消息不丢失 什么是消息投递可靠性 ttl死信队列 什么是死信队列 消息有哪几种情况成为死信 延迟队列 springboot整合rabbitmq实现订单超时自动关闭 如何保证消息不丢失 rabbitmq消息投递路径 生产者->交换机->队列->消费者 总的来说分为三个阶段. 1.生产者保证消息投递可靠性. 2.mq内部消息不丢失. 3.消费者消费成功. 什么是消息投递可靠性 简单点说就是消息百分百发送到消息队列中. 我们可以开启confirmCallback 生产者投递消息后

  • RabbitMQ 延迟队列实现订单支付结果异步阶梯性通知(实例代码)

    在第三方支付中,例如支付宝.或者微信,对于订单请求,第三方支付系统采用的是消息同步返回.异步通知+主动补偿查询的补偿机制. 由于互联网通信的不可靠性,例如双方网络.服务器.应用等因素的影响,不管是同步返回.异步通知.主动查询报文都可能出现超时无响应.报文丢失等情况,所以像支付业务,对结果的通知一般采用几种方案结合的补偿机制,不能完全依赖某一种机制.例如一个支付结果的通知,一方面会在支付页面跳转时候返回支付结果(一般只用作前端展示使用,非最终状态),同时会采用后台异步通知机制(有前台.后台通知的,

  • RabbitMQ 实现延迟队列的两种方式详解

    目录 1. 用插件 1.1 安装插件 1.2 消息收发 2. DLX 实现延迟队列 2.1 延迟队列实现思路 2.2 案例 3. 小结 定时任务各种各样,常见的定时任务例如日志备份,我们可能在每天凌晨 3 点去备份,这种固定时间的定时任务我们一般采用 cron 表达式就能轻松的实现,还有一些比较特殊的定时任务,向大家看电影中的定时炸弹,3分钟后爆炸,这种定时任务就不太好用 cron 去描述,因为开始时间不确定,我们开发中有的时候也会遇到类似的需求,例如: 在电商项目中,当我们下单之后,一般需要

  • RabbitMQ死信机制实现延迟队列的实战

    目录 延迟队列 应用场景 Time To Live(TTL) Dead Letter Exchanges(DLX) 延迟队列 延迟队列存储的对象肯定是对应的延时消息,所谓"延时消息"是指当消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个消息进行消费. 应用场景 三方支付,扫码支付调用上游的扫码接口,当扫码有效期过后去调用查询接口查询结果.实现方式:每当一笔扫码支付请求后,立即将此订单号放入延迟队列中(RabbitMQ),队列过期时间为二维码有效期,此队列

  • 手把手带你掌握SpringBoot RabbitMQ延迟队列

    目录 1. 简介 2. 安装插件 3. 实现延迟队列 3.1 引入所需依赖 3.2 application.yaml 3.3 RabbitConfig 3.4 Producer 3.5 Consumer 3.6 测试代码 3.7 启动测试 1. 简介 我们在上一篇博文中遗留了一个小问题,就是虽然TTL + DLX能实现延迟队列的功能,但是有两个问题. 首先业务场景为:比如海底捞预约,每个人预约的时间段不一致,有个可能一个小时后,有的可能三个小时等,当快到预约时间点需要给用户进行短信通知. 通过给

  • SpringBoot整合RabbitMQ处理死信队列和延迟队列

    目录 简介 实例代码 路由配置 控制器 发送器 接收器 application.yml 实例测试 简介 说明 本文用示例介绍SpringBoot整合RabbitMQ时如何处理死信队列/延迟队列. RabbitMQ消息简介 RabbitMQ的消息默认不会超时. 什么是死信队列?什么是延迟队列? 死信队列: DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,

  • 详解RabbitMQ中死信队列和延迟队列的使用详解

    目录 简介 死信队列 简介 示例 延迟队列 简介 使用场景 简介 本文介绍RabbitMQ的死信队列和延迟队列. 本内容也是Java后端面试中常见的问题. 死信队列 简介 DLX,全称为Dead-Letter-Exchange,可以称之为死信交换器,也有人称之为死信邮箱.当消息在一个队列中变成死信(dead message)之后,它能被重新被发送到另一个交换器中,这个交换器就是DLX,绑定DLX的队列就称之为死信队列. 以下几种情况会导致消息变成死信: 消息被拒绝(Basic.Reject/Ba

  • RabbitMQ延迟队列及消息延迟推送实现详解

    这篇文章主要介绍了RabbitMQ延迟队列及消息延迟推送实现详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 应用场景 目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如: 淘宝七天自动确认收货.在我们签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能. 12306 购票支付确认页面.我们在选好票点击确定跳转的页面中往往都会有倒计时,代表着 3

  • 从实战角度详解Disruptor高性能队列

    目录 一.背景 二.Java内置队列 三.ArrayBlockingQueue的问题 1.加锁 a.关于锁和CAS b.锁 c.原子变量 2.伪共享 a.什么是共享 b.缓存行 c.什么是伪共享 四.Disruptor的设计方案 1.一个生产者 2.多个生产者 a.读数据 b.写数据 五.总结 六.性能 七.等待策略 生产者的等待策略 消费者的等待策略 八.Log4j 2应用场景 1.性能差异 一.背景 Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的

  • 详解RabbitMq如何做到消息的可靠性投递

    目录 前言 RabbitMq的投递及消费流程 提供者如何确保消息的成功投递 单条消息的同步确认 多条消息的同步确认 异步消息确认 消息的返回机制 前言 现在的一些互联网项目或者是高并发的项目中很少有没有引入消息队列的. 引入消息队列可以给这个项目带来很多的好处:比如 削峰 这个就很好的理解,在系统中的请求量是固定的,但是有的时候会多出很多的突发流量,比如在有秒杀活动的时候,这种瞬时的高流量可能会打垮系统,这个时候就可以很好的引入MQ,将这些请求积压到MQ中,然后消费端在按照自已的能力去处理这里请

  • Rabbitmq延迟队列实现定时任务的方法

    场景 开发中经常需要用到定时任务,对于商城来说,定时任务尤其多,比如优惠券定时过期.订单定时关闭.微信支付2小时未支付关闭订单等等,都需要用到定时任务,但是定时任务本身有一个问题,一般来说我们都是通过定时轮询查询数据库来判断是否有任务需要执行,也就是说不管怎么样,我们需要先查询数据库,而且有些任务对时间准确要求比较高的,需要每秒查询一次,对于系统小倒是无所谓,如果系统本身就大而且数据也多的情况下,这就不大现实了,所以需要其他方式的,当然实现的方式有多种多样的,比如Redis实现定时队列.基于优先

  • 详解MySQL中的数据类型和schema优化

    最近在学习MySQL优化方面的知识.本文就数据类型和schema方面的优化进行介绍. 1. 选择优化的数据类型 MySQL支持的数据类型有很多,而如何选择出正确的数据类型,对于性能是至关重要的.以下几个原则能够帮助确定数据类型: 更小的通常更好 应尽可能使用可以正确存储数据的最小数据类型,够用就好.这样将占用更少的磁盘.内存和缓存,而在处理时也会耗时更少. 简单就好 当两种数据类型都能胜任一个字段的存储工作时,选择简单的那一方,往往是最好的选择.例如整型和字符串,由于整型的操作代价要小于字符,所

  • 详解Python中4种超参自动优化算法的实现

    目录 一.网格搜索(Grid Search) 二.随机搜索(Randomized Search) 三.贝叶斯优化(Bayesian Optimization) 四.Hyperband 总结 大家好,要想模型效果好,每个算法工程师都应该了解的流行超参数调优技术. 今天我给大家总结超参自动优化方法:网格搜索.随机搜索.贝叶斯优化 和 Hyperband,并附有相关的样例代码供大家学习. 一.网格搜索(Grid Search) 网格搜索是暴力搜索,在给定超参搜索空间内,尝试所有超参组合,最后搜索出最优

  • C#实现rabbitmq 延迟队列功能实例代码

    最近在研究rabbitmq,项目中有这样一个场景:在用户要支付订单的时候,如果超过30分钟未支付,会把订单关掉.当然我们可以做一个定时任务,每个一段时间来扫描未支付的订单,如果该订单超过支付时间就关闭,但是在数据量小的时候并没有什么大的问题,但是数据量一大轮训数据库的方式就会变得特别耗资源.当面对千万级.上亿级数据量时,本身写入的IO就比较高,导致长时间查询或者根本就查不出来,更别说分库分表以后了.除此之外,还有优先级队列,基于优先级队列的JDK延迟队列,时间轮等方式.但如果系统的架构中本身就

随机推荐