RocketMQ整合SpringBoot实现生产级二次封装

目录
  • 前言说明
  • 一、为什么要二次封装
    • 1.1 二次封装不同观点
    • 1.2 封装的抽离点
    • 1.3 设计模式的应用
  • 二、二次封装核心要点
    • 2.1 二次封装核心点
      • 2.1.1 封装主要讨论点
      • 2.1.2 发送/消费的几种消息实体
    • 2.2 RocketMQTemplate封装
      • 2.2.1 封装基础实体类
      • 2.2.2 RocketMQTemplate
      • 3.2.3 增强RocketMQTemplate
    • 2.3 RocketMQListener封装
    • 2.4 广播消息的应用场景
    • 2.3 代码封装完结测试

前言说明

前置掌握:SpringBoot基础使用、RocketMQ和SpringBoot的整合使用

  • 主要使用参考第二节
  • 核心使用参考第一篇文章

文章难度:四颗星

代码不难,重点是封装的思想需要体会

不同观点欢迎大家在评论区一起讨论学习,没有对错之分,每个系统业务特性不同,适合系统的才是最好的~

源码地址https://gitee.com/tianxincoder/practice-rocketmq-enterprise文章只会说明核心代码,其他的基础整合配置和多环境自动隔离参考源码即可

一、为什么要二次封装

为了不产生歧义,文章中提到的二次封装均是基于原始使用方式的封装,而非源码级别的二次封装

换句话说:如果都需要对源码进行封装了,那么说明公司业务规模都到一定程度了,二次封装这种东西已经不需要讨论了,封装已经是一个共识

  • 首先明确一点:不进行二次封装完全不影响RocketMQ的使用,可以选择二次封装和不选择二次封装

    • 二次封装可以提供更多的功能和更简洁的使用方式
    • 如果一个封装搞得比原始使用方式更复杂,那么就失去了二次封装的意义
    • Q1:二次封装可不可以不要?完全可以,完全不影响正常使用
    • Q2:二次封装有没有必要?仁者见仁智者见智的问题,如果觉得没有必要那么这篇文章可以跳过~
  • ORM框架中典型的一个二次封装框架就是MyBatisPlus(简称MP),后者是对MyBatis原生使用的增加,不使用MP直接使用MyBatis可不可以?完全可以,那为什么要用二次封装后的MP?
    • 场景:大部分的数据库操作,无外乎CRUD,那么最常用的比如(根据名称就可以知道这个方法做什么用,就没有必要再二次说明了):updateById、batchUpdate、deleteById、saveOrUpdate、batchInsert。对于上面这5个操作,变化的只是表字段和表名,剩下的语法都是一样
    • 不封装:直接使用MyBatis完全可以自己实现上面方法的功能,但是每个表都需要写一遍自己上面的方法,假设有100张表,那么就会多出495个(下面说明)重复功能代码,而且所有代码都是冗余的
    • 封装后:由封装者提供上面5个方法的公共实现,然后所有需要使用上面功能的Service只需要继承封装好的类就自然的拥有了上面的5大功能,那么代码的冗余量就从100张表*5个方法==500,去掉封装的5个,节省了495的代码冗余量
  • 所以二次封装是为了更方便用、更简洁、更加适用于系统,量身打造可以大大提升开发效率。就如上面的5个方法,完全重复性的东西为什么要浪费开发时间来做这些冗余的事情呢?

1.1 二次封装不同观点

让我们以一个生活中的蛋炒饭开个头

原始框架好比提供了原材料:厨具、鸡蛋,米饭等食材、菜谱

  • 对于框架的使用通常有以下两种方式
  • 第一种:根据菜谱来进行做饭(使用原生的方法调用),洗菜、做饭、刷碗、打扫不管啥入参自己管
  • 第二种:找一个人来学会这道菜(负责二次封装的人)的多种做法(封装大部分业务场景)并做成一种点餐式的服务,谁想吃哪种类型的蛋炒饭直接点餐(调用封装好的方法)就可以吃上香喷喷的蛋炒饭

问题:哪种方案更好? 答案:两种各有各的优势(在说废话,哈哈~)

  • 第一种:原生方式

    • 优点:可以按照各种方式灵活调用,比如每个人都使用RocketMQTemplate原生send发送消息,想要发送什么类型的消息就发送什么类型的消息,比较随意
    • 缺点:代码大量冗余,从构建参数对象、发送消息、消费消息、异常处理、日志记录、异常重试啥啥的都是自己搞,每个消费队列就会出现上面所有的步骤。比如现在有一个订单处理中心A接收来自各种类型订单,此时如B、C两个原始订单来源想让A处理订单,那么B和C都需要按照A的要求进行调用,代码会冗余
  • 第二种:点餐式服务
    • 优点:封装了大部分统一的方法调用,比如 发送消息、异常处理、日志记录等等都是重复的,封装后点餐的人不需要再关系这一部分要怎么处理,只需要告诉点餐服务要不要进行异常、要不异常重试等等,那么此时对于点餐人来说只需要 付钱(调用服务)吃饭(消费消息),除此之外啥也不用管,全部由点餐服务提供者完成所有上述两个步骤外的其它操作
    • 缺点:无法满足所有点餐人的要求,有的人喜欢味道重一点,有的喜欢味道淡一点。但是这个缺点完全可以处理,比如点餐服务提供了自定义厨房(返回原始发送对象),此时调用者可以按照第一种方式进行使用
  • 选哪种?
    • 个人而言:业务系统复杂的优先选择第二种简单业务的选择第一种(尽量采用封装,后续维护方便)。对于一个复杂的系统,本身业务级的代码就已经很多了,结果还要每个人处理全部一样的东西,消费者越多代码冗余越多。如果一个系统只是为了使用MQ来进行业务分离,消费者也不多,那么可以选择最快的方式,但是最终会选择第二种,如果业务随着时间增长越复杂,越晚改成第二种花费的代价越大!
    • 第一种就好比此时我们要直接操作内存,原生操作就好比C++或C,可以直接操作内存,但是同时用完后还要自己写各种异常处理和释放内存;代码封装就好比Java,我们只需要告诉Java我们要使用内存,然后用完就不用管
  • 企业中,业务功能产出是一级优先级,在此之上才能有更高级的东西。技术服务于业务,而不是业务服务于技术!比如现在30个人的系统,我们要使用缓存加速访问,那么我们是选择 内部缓存(直接用集合或者map存起来)还是用Redis?
    • 内部缓存和Redis能不能达到目的?能
    • 哪个更方便更快?内部缓存!内部对象就很快实现
    • 如果业务发展迟早会转为Redis这种专业的缓存中间件,就好比业务发展前第一种,业务发展后选择第二种,但是对于大部分业务系统来说功能增加是很快的,特别是产品同事上一分钟提需求下一分钟就要上线这种(开个玩笑~),所以我们在引用一个技术需不需要进行二次封装时需要技术负责人对业务增长有一个预判。建议是都进行封装一下

1.2 封装的抽离点

  • 对于二次封装,其中最主要的就是找出该框架在日常使用中所出现的大部分涉及到的操作,然后找出变化操作和不变化操作
  • RocketMQ日常使用主要场景为例:
    • 发送消息阶段:准备需要发送的消息、发送消息、记录原始消息日志、发送失败处理、可靠性处理
    • 消费消息阶段:记录接收消息日志、业务处理、业务日志记录、异常处理、异常重试、异常通知、死信处理
  • 提取变化点和不变化点(可以抽取为公共处理的场景)
    • 发送消息阶段:

      • 变化点:准备需要发送的消息
      • 不变化点:发送消息、记录原始消息日志、发送失败处理、可靠性处理
    • 消费消息阶段
      • 变化点:业务处理、业务日志记录
      • 不变化点:记录接收消息日志、异常处理、异常重试、异常通知、死信处理
  • 从上可以看到,对于RocketMQ的使用,大部分场景都是可以抽离成一个公共的方法处理,只有业务级的需要自己处理,所以如果我们把不变化场景抽取后,每个同事只需要写自己业务相关部分即可
  • 抽取后的复杂度:对于新加一个消费者,只需要处理业务相关三个场景(准备需要发送的消息、业务处理、业务日志记录),剩下的九个场景,只需要封装一次就可以。需要现在就几十个消费者,可以想想一些减少了多少代码冗余

1.3 设计模式的应用

  • 要封装出一个好的抽象层,【设计模式】建议好好体会和学习一下
  • 设计模式对于用不到的人来说比较虚幻,对于用的到的人来说,这个真牛X

二、二次封装核心要点

2.1 二次封装核心点

2.1.1 封装主要讨论点

  • 对于RocketMQ或者说对于整个MQ体系来说(不管是RabbitMQ、RocketMQ、Kafka)等封装的核心主要有两个:发送消息、消费消息者两个场景
  • 对于RocketMQ我们主要讨论三个地方:RocketMQTemplate封装、RocketMQListener封装和广播消息的封装
  • 广播消息是分布式系统中同时让所有节点都干一件事情的一个好的方式,如果用不到忽略广播消息即可

2.1.2 发送/消费的几种消息实体

  • RocketMQ发送消息对于不同的使用来说,大部分选择下面的几种发送消息类型

    • A、发送Json对象,比如Fastjson的JSONObject
    • B、直接发送转Json后的String对象
    • C、根据业务封装对应实体类
    • D、直接使用原生MessageExt接收
  • 怎么选择?怎么选择才是最优?
    • 上面哪一种都可以达到目的,如果要统一封装就必须要有一个标准
    • 怎么选择只需要回答这个问题:在不看消息发送者的情况下,消费者怎么知道发送者发送的消息含义?
    • 比如现在有一个订单消息,如果我们不看消息发送者,怎么知道发送者给消费者发送哪些字段
      • A、B、D可以吗?一定不可以!JSON对象和String对象,如果我们不看消息发送者不可能知道到底发送了啥,这点我相信没有可以讨论的地方,因为类型决定了这个操作不可能
      • C可以吗?可以!此时不需要看消息发送者,只需要看消费者的实体类点进去,有哪些业务字段一清二楚
      • 可能有杠要抬了,有看实体类的功夫,我看消息发送者都看完了
        • 灵魂拷问1:如果消息发送者和消费者不在一个系统怎么看?邪魅一笑,不同业务线可能没代码权限吧?分布式系统完全独立可能吧?
        • 灵魂拷问2:如果现在需要一个功能,如果某些必须要的字段消息发送者如果没有给的话需要校验,普通String和JSONObject怎么实现?换成实体类呢?
  • 基于上述讨论点,封装建议基于实体类来,实体类不管是排查问题、新人熟悉系统代码、信息校验等String和JSONObject无法像实体类一样轻松胜任

2.2 RocketMQTemplate封装

2.2.1 封装基础实体类

  • 基础消息实体类封装了除了业务消息外所有其他公共字段,主要看下面代码中的字段和注释
  • 基础抽象消息实体,包含基础的消息、根据自己的业务消息设置更多的字段
    • 其中也可以包含所有消费者可能用得到的方法等,比如做些数据的加解密
package com.codecoord.rocketmq.domain;

import lombok.Data;

import java.time.LocalDateTime;
import java.util.UUID;

/**
 * 基础消息实体,包含基础的消息
 * 根据自己的业务消息设置更多的字段
 *
 * @author tianxincoord@163.com
 * @since 2022/6/16
 */
@Data
public abstract class BaseMqMessage {
    /**
     * 业务键,用于RocketMQ控制台查看消费情况
     */
    protected String key;
    /**
     * 发送消息来源,用于排查问题
     */
    protected String source = "";
    /**
     * 发送时间
     */
    protected LocalDateTime sendTime = LocalDateTime.now();
    /**
     * 跟踪id,用于slf4j等日志记录跟踪id,方便查询业务链
     */
    protected String traceId = UUID.randomUUID().toString();
    /**
     * 重试次数,用于判断重试次数,超过重试次数发送异常警告
     */
    protected Integer retryTimes = 0;
}
  • 有了此基础抽象实体类,那么剩下的所有业务消息实体只需要继承此基类,然后在自己业务类中包含自己需要的字段即可,因为这些公共字段不管是向上转型还是向下转型,子类和父类都可以看得到

2.2.2 RocketMQTemplate

  • RocketMQTemplate发送消息的代码如果不封装,我们发送消息需要这样

    • String destination = topic + ":" + tag;
    • template.syncSend(destination, message);
  • 每个人发送消息都要自己处理这个冒号,直接传入topic和tag不香吗?按照抽离变化点中的变化点,只有消息是变化的,除此之外的其他规则交给封装类
  • RocketMQTemplate主要封装发送消息的日志、异常的处理、消息key设置、等等其他配置
  • 封装代码类如下,下面包含了主要发送方式,更多自己添加即可
    • 这里就是消息发送的点餐机器,同时也提供了封装方法也提供原始RocketMQTemplate供使用
    • 此处只是提供一种方式,生产中按照项目组商量决定
package com.codecoord.rocketmq.template;

import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqSysConstant;
import com.codecoord.rocketmq.domain.BaseMqMessage;
import com.codecoord.rocketmq.util.JsonUtil;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.apache.rocketmq.spring.support.RocketMQHeaders;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;

/**
 * RocketMQ模板类
 *
 * @author tianxincoord@163.com
 * @since 2022/4/15
 */
@Component
public class RocketMqTemplate {
    private static final Logger LOGGER = LoggerFactory.getLogger(RocketMqTemplate.class);
    @Resource(name = "rocketMQTemplate")
    private RocketMQTemplate template;

    /**
     * 获取模板,如果封装的方法不够提供原生的使用方式
     */
    public RocketMQTemplate getTemplate() {
        return template;
    }

    /**
     * 构建目的地
     */
    public String buildDestination(String topic, String tag) {
        return topic + RocketMqSysConstant.DELIMITER + tag;
    }

    /**
     * 发送同步消息
     */
    public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message) {
        // 注意分隔符
        return send(topic + RocketMqSysConstant.DELIMITER + tag, message);
    }

    public <T extends BaseMqMessage> SendResult send(String destination, T message) {
        // 设置业务键,此处根据公共的参数进行处理
        // 更多的其它基础业务处理...
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
        SendResult sendResult = template.syncSend(destination, sendMessage);
        // 此处为了方便查看给日志转了json,根据选择选择日志记录方式,例如ELK采集
        LOGGER.info("[{}]同步消息[{}]发送结果[{}]", destination, JsonUtil.toJson(message), JSONObject.toJSON(sendResult));
        return sendResult;
    }

    /**
     * 发送延迟消息
     */
    public <T extends BaseMqMessage> SendResult send(String topic, String tag, T message, int delayLevel) {
        return send(topic + RocketMqSysConstant.DELIMITER + tag, message, delayLevel);
    }

    public <T extends BaseMqMessage> SendResult send(String destination, T message, int delayLevel) {
        Message<T> sendMessage = MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.KEYS, message.getKey()).build();
        SendResult sendResult = template.syncSend(destination, sendMessage, 3000, delayLevel);
        LOGGER.info("[{}]延迟等级[{}]消息[{}]发送结果[{}]", destination, delayLevel, JsonUtil.toJson(message), JsonUtil.toJson(sendResult));
        return sendResult;
    }
}
  • 这个类是最基础的原始封装类,相当于餐馆提供的点餐服务。上面提供无业务特性的发送,比如想要发送日志消息或者动态发送消息目的场景

3.2.3 增强RocketMQTemplate

  • 以订单处理中心来说,变化点仅仅只是单号等业务数据不一样,发往订单处理中心的消息不管是topic还是tag等等其实完全都一样,那么此时可以根据业务来增加封装
  • 增强原始功能需要注意下面两个点
    • 所有父类能出现的地方,子类都能出现:也就是子类拥有功能 >= 父类 ,比如Java的List,只要入参是List的地方,传ArrayList和LinkedList完全可以
    • 增强功能不能改变原始功能的行为:比如父类有一个方法say是说话,结果子类覆写了say改成了行为是吃饭,然后当调用者调用say的时候得到了一个完全预期外的结果
  • 就以订单中心消息发送为例,封装OrderMessageTemplate继承自RocketMqTemplate,此时前者就拥有了封装父类的所有基础方法,拥有了所有父类的功能。然后可以在前者增加自身业务特性的发送方法,比如发送订单处理消息
package com.codecoord.rocketmq.template;

import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import javax.validation.constraints.NotNull;
import java.time.LocalDate;
import java.time.LocalDateTime;

/**
 * 订单类发送消息模板工具类
 *
 * @author tianxincode@163.com
 * @since 2022/6/16
 */
@Component
public class OrderMessageTemplate extends RocketMqTemplate {
    /// 如果不采用继承也可以直接注入使用
    /* @Resource
    private RocketMqTemplate rocketMqTemplate; */

    /**
     * 入参只需要传入是哪个订单号和业务体消息即可,其他操作根据需要处理
     * 这样对于调用者而言,可以更加简化调用
     */
    public SendResult sendOrderPaid(@NotNull String orderId, String body) {
        RocketMqEntityMessage message = new RocketMqEntityMessage();
        message.setKey(orderId);
        message.setSource("订单支付");
        message.setMessage(body);
        // 这两个字段只是为了测试
        message.setBirthday(LocalDate.now());
        message.setTradeTime(LocalDateTime.now());
        return send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.ORDER_PAID_TAG, message);
    }
}
  • 此时对于调用者只需要 orderMessageTemplate.sendOrderPaid("O001", "xxx");就可以把消息发送到订单处理中心
  • 封装后的好处,假如现在有10个订单来源,现在需要调整消息发送格式,如果不进行封装那么10个来源发送的地方都需要改;如果进行了二次封装,只需要改sendOrderPaid方法即可,而且还不会出错,此时优势就体现出来了

2.3 RocketMQListener封装

  • RocketMQListener是消费消息的核心,同时也涉及到更多的操作,比如:基础日志记录、异常处理、消息重试、警告通知等等等
  • 按照抽离变化点,RocketMQListener只应该处理与自身业务相关的,除此之外的其它应该交给父类,子类只需要告诉父类要不要异常处理、要不要重试等等,点餐式服务
  • 封装消息消费的抽象类
    • 注意泛型限定为标准基础消息类,这样能到消费者的一定有统一的标准类BaseMqMessage
    • 下面简单封装示例
package com.codecoord.rocketmq.listener;

import com.codecoord.rocketmq.constant.RocketMqSysConstant;
import com.codecoord.rocketmq.domain.BaseMqMessage;
import com.codecoord.rocketmq.template.RocketMqTemplate;
import com.codecoord.rocketmq.util.JsonUtil;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;

import javax.annotation.Resource;
import java.time.Instant;
import java.util.Objects;

/**
 * 抽象消息监听器,封装了所有公共处理业务,如
 * 1、基础日志记录
 * 2、异常处理
 * 3、消息重试
 * 4、警告通知
 * 5、....
 *
 * @author tianxincoord@163.com
 * @since 2022/4/17
 */
public abstract class BaseMqMessageListener<T extends BaseMqMessage> {
    /**
     * 这里的日志记录器是哪个子类的就会被哪个子类的类进行初始化
     */
    protected final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Resource
    private RocketMqTemplate rocketMqTemplate;

    /**
     * 消息者名称
     *
     * @return 消费者名称
     */
    protected abstract String consumerName();

    /**
     * 消息处理
     *
     * @param message 待处理消息
     * @throws Exception 消费异常
     */
    protected abstract void handleMessage(T message) throws Exception;

    /**
     * 超过重试次数消息,需要启用isRetry
     *
     * @param message 待处理消息
     */
    protected abstract void overMaxRetryTimesMessage(T message);
    /**
     * 是否过滤消息,例如某些
     *
     * @param message 待处理消息
     * @return true: 本次消息被过滤,false:不过滤
     */
    protected boolean isFilter(T message) {
        return false;
    }

    /**
     * 是否异常时重复发送
     *
     * @return true: 消息重试,false:不重试
     */
    protected abstract boolean isRetry();

    /**
     * 消费异常时是否抛出异常
     *
     * @return true: 抛出异常,false:消费异常(如果没有开启重试则消息会被自动ack)
     */
    protected abstract boolean isThrowException();

    /**
     * 最大重试次数
     *
     * @return 最大重试次数,默认10次
     */
    protected int maxRetryTimes() {
        return 10;
    }

    /**
     * isRetry开启时,重新入队延迟时间
     *
     * @return -1:立即入队重试
     */
    protected int retryDelayLevel() {
        return -1;
    }

    /**
     * 由父类来完成基础的日志和调配,下面的只是提供一个思路
     */
    public void dispatchMessage(T message) {
        MDC.put(RocketMqSysConstant.TRACE_ID, message.getTraceId());
        // 基础日志记录被父类处理了
        logger.info("[{}]消费者收到消息[{}]", consumerName(), JsonUtil.toJson(message));
        if (isFilter(message)) {
            logger.info("消息不满足消费条件,已过滤");
            return;
        }
        // 超过最大重试次数时调用子类方法处理
        if (message.getRetryTimes() > maxRetryTimes()) {
            overMaxRetryTimesMessage(message);
            return;
        }
        try {
            long start = Instant.now().toEpochMilli();
            handleMessage(message);
            long end = Instant.now().toEpochMilli();
            logger.info("消息消费成功,耗时[{}ms]", (end - start));
        } catch (Exception e) {
            logger.error("消息消费异常", e);
            // 是捕获异常还是抛出,由子类决定
            if (isThrowException()) {
                throw new RuntimeException(e);
            }
            if (isRetry()) {
                // 获取子类RocketMQMessageListener注解拿到topic和tag
                RocketMQMessageListener annotation = this.getClass().getAnnotation(RocketMQMessageListener.class);
                if (Objects.nonNull(annotation)) {
                    message.setSource(message.getSource() + "消息重试");
                    message.setRetryTimes(message.getRetryTimes() + 1);
                    SendResult sendResult;
                    try {
                        // 如果消息发送不成功,则再次重新发送,如果发送异常则抛出由MQ再次处理(异常时不走延迟消息)
                        // 此处捕获之后,相当于此条消息被消息完成然后重新发送新的消息
                        sendResult = rocketMqTemplate.send(annotation.topic(), annotation.selectorExpression(), message, retryDelayLevel());
                    } catch (Exception ex) {
                        throw new RuntimeException(ex);
                    }
                    // 发送失败的处理就是不进行ACK,由RocketMQ重试
                    if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                        throw new RuntimeException("重试消息发送失败");
                    }
                }
            }
        }
    }
}
  • 封装消费最终类

    • 注意:收到的消息是先委派给父类,父类进行调度管理
package com.codecoord.rocketmq.listener;

import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * 实体类消费监听器,在实现RocketMQListener中间还加了一层BaseMqMessageListener来处理基础业务消息
 *
 * @author tianxincoord@163.com
 * @since 2022/5/12
 */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = RocketMqBizConstant.SOURCE_TOPIC,
        consumerGroup = RocketMqBizConstant.SOURCE_GROUP,
        selectorExpression = RocketMqBizConstant.SOURCE_TAG,
        // 指定消费者线程数,默认64,生产中请注意配置,避免过大或者过小
        consumeThreadMax = 5
)
public class RocketEntityMessageListener extends BaseMqMessageListener<RocketMqEntityMessage>
                                         implements RocketMQListener<RocketMqEntityMessage> {
    /**
     * 此处只是说明封装的思想,更多还是要根据业务操作决定
     * 内功心法有了,无论什么招式都可以发挥最大威力
     */
    @Override
    protected String consumerName() {
        return "RocketMQ二次封装消息消费者";
    }

    @Override
    public void onMessage(RocketMqEntityMessage message) {
        // 注意,此时这里没有直接处理业务,而是先委派给父类做基础操作,然后父类做完基础操作后会调用子类的实际处理类型
        super.dispatchMessage(message);
    }

    @Override
    protected void handleMessage(RocketMqEntityMessage message) throws Exception {
        // 此时这里才是最终的业务处理,代码只需要处理资源类关闭异常,其他的可以交给父类重试
        System.out.println("业务消息处理");
    }

    @Override
    protected void overMaxRetryTimesMessage(RocketMqEntityMessage message) {
        // 当超过指定重试次数消息时此处方法会被调用
        // 生产中可以进行回退或其他业务操作
    }

    @Override
    protected boolean isRetry() {
        return false;
    }

    @Override
    protected int maxRetryTimes() {
        // 指定需要的重试次数,超过重试次数overMaxRetryTimesMessage会被调用
        return 5;
    }

    @Override
    protected boolean isThrowException() {
        // 是否抛出异常,到消费异常时是被父类拦截处理还是直接抛出异常
        return false;
    }
}
  • 封装后对于子类来说,只需要告诉父类要不要做就拥有了最开始说的所有功能,简化了使用,此时子类消费者只需要专注于自己的业务核心处理就可以了

2.4 广播消息的应用场景

  • 应用场景:多租户或者服务有内部缓存需要刷新情况下如果需要刷新租户信息或者缓存信息
  • 也就是需要所有服务节点都需要同事做某一件事情的时候,此时可以借助广播消息发送消息到所有节点刷新,无需一个节点一个节点的处理
  • 特别说明:广播消息默认会在家目录下创建消费进度文件,会以www.tianxincoord.com:9876@www.tianxincoord.com:9876这种地址形式生成文件路径,但是由于带有:符号,windows下是不允许此符号作为文件夹名称的,所以如果rocketMQ的链接地址不是连接串(不带有端口)可以取消下面的messageModel注释,否则启动的时候就会提示目标卷或者路径不存在,其实是因为这个问题
package com.codecoord.rocketmq.listener;

import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

/**
 * 广播消息
 * 应用场景:多租户或者服务有内部缓存需要刷新情况下如果需要刷新租户信息或者缓存信息
 *      也就是需要所有服务节点都需要同事做某一件事情的时候
 * 此时可以借助广播消息发送消息到所有节点刷新,无需一个节点一个节点的处理
 *
 * 特别说明:广播消息默认会在家目录下创建消费进度文件,会以www.tianxincoord.com:9876@www.tianxincoord.com:9876
 *      这种地址形式生成文件路径,但是由于带有:符号,windows下是不允许此符号作为文件夹名称的
 *      所以如果rocketMQ的链接地址不是连接串(不带有端口)可以取消下面的messageModel注释
 *      否则启动的时候就会提示目标卷或者路径不存在,其实是因为这个问题
 *
 * @author tianxincoord@163.com
 * @since 2022/5/12
 */
@Slf4j
@Component
@RocketMQMessageListener(
        topic = RocketMqBizConstant.SOURCE_TOPIC,
        consumerGroup = RocketMqBizConstant.SOURCE_BROADCASTING_GROUP,
        selectorExpression = RocketMqBizConstant.SOURCE_BROADCASTING_TAG
        // messageModel = MessageModel.BROADCASTING
)
public class RocketBroadcastingListener implements RocketMQListener<MessageExt> {

    /**
     * MessageExt:内置的消息实体,生产中根据需要自己封装实体
     */
    @Override
    public void onMessage(MessageExt message) {
        log.info("收到广播消息【{}】", new String(message.getBody()));
    }
}

2.3 代码封装完结测试

封装测试大家可以直接参考RocketMqController即可

package com.codecoord.rocketmq.controller;

import com.alibaba.fastjson.JSONObject;
import com.codecoord.rocketmq.constant.RocketMqBizConstant;
import com.codecoord.rocketmq.domain.RocketMqEntityMessage;
import com.codecoord.rocketmq.template.OrderMessageTemplate;
import com.codecoord.rocketmq.template.RocketMqTemplate;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.util.UUID;

/**
 * 消息发送
 *
 * @author tianxin01@huice.com
 * @since 2022/6/16
 */
@RestController
@RequestMapping("/rocketmq")
@Slf4j
public class RocketMqController {
    /**
     * 注意此处注入的是封装的RocketMqTemplate
     */
    @Resource
    private RocketMqTemplate rocketMqTemplate;
    /**
     * 注入对应业务的模板类
     */
    @Resource
    private OrderMessageTemplate orderMessageTemplate;

    /**
     * 通过实体类发送消息,发送注意事项请参考实体类
     * 说明:也可以在RocketMqTemplate按照业务封装发送方法,这样只需要调用方法指定基础业务消息接口
     */
    @RequestMapping("/entity/message")
    public Object sendMessage() {
        RocketMqEntityMessage message = new RocketMqEntityMessage();
        // 设置业务key
        message.setKey(UUID.randomUUID().toString());
        // 设置消息来源,便于查询we年
        message.setSource("封装测试");
        // 业务消息内容
        message.setMessage("当前消息发送时间为:" + LocalDateTime.now());
        // Java时间字段需要单独处理,否则会序列化失败
        message.setBirthday(LocalDate.now());
        message.setTradeTime(LocalDateTime.now());
        return rocketMqTemplate.send(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.SOURCE_TAG, message);
    }

    /**
     * 此时对于调用者而且,无需创建任何类
     * 如果某天需要调整消息发送来源,如果不封装,所有原来产生message的地方全部改
     * 如果封装了,只需要改sendOrderPaid就可以切换
     */
    @RequestMapping("/order/paid")
    public Object sendOrderPaidMessage() {
        return orderMessageTemplate.sendOrderPaid(UUID.randomUUID().toString(), "客户下单了...,快快备货");
    }

    /**
     * 直接将对象进行传输,也可以自己进行json转化后传输
     */
    @RequestMapping("/messageExt/message")
    public SendResult convertAndSend() {
        // 生产中不推荐使用jsonObject传递,不看发送者无法知道传递的消息包含什么信息
        JSONObject jsonObject = new JSONObject();
        jsonObject.put("type", "messageExt");
        String destination = rocketMqTemplate.buildDestination(RocketMqBizConstant.SOURCE_TOPIC, RocketMqBizConstant.SOURCE_BROADCASTING_TAG);
        // 如果要走内部方法发送则必须要按照标准来,否则就使用原生的消息发送
        return rocketMqTemplate.getTemplate().syncSend(destination, jsonObject);
    }
}

到此这篇关于RocketMQ整合SpringBoot实现生产级二次封装的文章就介绍到这了,更多相关RocketMQ SpringBoot封装内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

时间: 2022-06-18

springBoot整合RocketMQ及坑的示例代码

版本: JDK:1.8 springBoot:1.5.10 rocketMQ:4.2.0 pom 配置: <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.10.RELEASE</version> </parent> <d

Springboot RocketMq实现过程详解

首先,在虚拟机上安装rocketmq和rocketMq可视化控制,安装不做描述. 1.pom.xml文件添加依赖 mq的版本与连接的rocketmq版本保持一致 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-remoting</artifactId> <version>4.4.0</version> </depende

springboot整合rocketmq实现分布式事务

1 执行流程 (1) 发送方向 MQ 服务端发送消息. (2) MQ Server 将消息持久化成功之后,向发送方 ACK 确认消息已经发送成功,此时消息为半消息. (3) 发送方开始执行本地事务逻辑. (4) 发送方根据本地事务执行结果向 MQ Server 提交二次确认(Commit 或是 Rollback),MQ Server 收到Commit 状态则将半消息标记为可投递,订阅方最终将收到该消息:MQ Server 收到 Rollback 状态则删除半消息,订阅方将不会接受该消息. (5)

解决springboot集成rocketmq关于tag的坑

springboot集成rocketmq关于tag的坑 新项目使用springboot的若依框架集成rocketmq,选择集成RocketMQTemplate这种方式实现消息的发送和接收. 1.客户端发送代码 此处回调方法里有些业务不用关注,只关心发送方法 @Component public class RocketMqHelper { Logger logger = LoggerFactory.getLogger(RocketMqHelper.class); @Resource private

SpringBoot中使用RocketMQ的示例代码

目录 1 订单微服务发送消息 1.1 订单微服务添加rocketmq的依赖 1.2 添加配置 1.3 编写测试代码 1.4 测试 2 用户微服务订阅消息 2.1 用户微服务增加rocketmq依赖 2.2 修改主类,启动nacos客户端 2.3 修改配置文件 2.4 编写消息接收服务 2.5 测试 接下来我们模拟一种场景:商品下单成功之后,向下单用户发送短信.以此来示例SpringBoot中RocketMQ的使用方式. 1 订单微服务发送消息 1.1 订单微服务添加rocketmq的依赖 <!-

解决SpringBoot整合RocketMQ遇到的坑

应用场景 在实现RocketMQ消费时,一般会用到@RocketMQMessageListener注解定义Group.Topic以及selectorExpression(数据过滤.选择的规则)为了能支持动态筛选数据,一般都会使用表达式,然后通过apollo或者cloud config进行动态切换. 引入依赖 <!-- RocketMq Spring Boot Starter--> <dependency> <groupId>org.apache.rocketmq<

Springboot&nbsp;整合&nbsp;RocketMQ&nbsp;收发消息

Springboot 整合 RocketMQ 收发消息 创建springboot项目 pom.xml添加rocketmq-spring-boot-starter依赖. <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.0</version>

SpringBoot整合RocketMQ实现消息发送和接收的详细步骤

我们使用主流的SpringBoot框架整合RocketMQ来讲解,使用方便快捷: 最终项目结构如下: 具体步骤如下: 第一步:新建SpringBoot项目rocketmq-test,引入rocketmq依赖,以及项目配置 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <vers

浅谈Springboot整合RocketMQ使用心得

一.阿里云官网---帮助文档 https://help.aliyun.com/document_detail/29536.html?spm=5176.doc29535.6.555.WWTIUh 按照官网步骤,创建Topic.申请发布(生产者).申请订阅(消费者) 二.代码 1.配置: public class MqConfig { /** * 启动测试之前请替换如下 XXX 为您的配置 */ public static final String PUBLIC_TOPIC = "test"

浅谈springboot项目中定时任务如何优雅退出

在一个springboot项目中需要跑定时任务处理批数据时,突然有个Kill命令或者一个Ctrl+C的命令,此时我们需要当批数据处理完毕后才允许定时任务关闭,也就是当定时任务结束时才允许Kill命令生效. 启动类 启动类上我们获取到相应的上下文,捕捉相应命令.在这里插入代码片 @SpringBootApplication /**指定mapper对应包的路径*/ @MapperScan("com.youlanw.kz.dao") /**开启计划任务*/ @EnableScheduling

浅谈springboot 属性定义

本文介绍了浅谈springboot 属性定义,分享给大家.具体如下: 简单属性自定义 一般属性可以定义在通用的配置文件application.properties里面 # 自定义属性 boot.userName = yuxi 如何获取呢? 按照spring的获取方式就可以了,很简单 @Value(value = "${boot.userName}") private String userName; 复杂属性自定义 在配置里配置属性 # 复杂属性 test.id=1 test.name

浅谈SpringBoot处理url中的参数的注解

1.介绍几种如何处理url中的参数的注解 @PathVaribale 获取url中的数据 @RequestParam 获取请求参数的值 @GetMapping 组合注解,是 @RequestMapping(method = RequestMethod.GET) 的缩写 (1)PathVaribale 获取url中的数据 看一个例子,如果我们需要获取Url=localhost:8080/hello/id中的id值,实现代码如下: @RestController public class Hello

浅谈spring-boot的单元测试中,@Before不被执行的原因

我们先来看下笔者的单元测试的依赖版本: <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.6.RELEASE</version> <relativePath/> <!-- lookup parent from reposi

浅谈SpringBoot主流读取配置文件三种方式

读取配置SpringBoot配置文件三种方式 一.利用Bean注解中的Value(${})注解 @Data @Component public class ApplicationProperty { @Value("${application.name}") private String name; } 该方式可以自动读取当前配置文件appliation.yml  或者application.properties中的配置值 区别在于读取yml文件时候支持中文编码,peoperties需

浅谈springboot中tk.mapper代码生成器的用法说明

问:什么是tk.mapper? 答:这是一个通用的mapper框架,相当于把mybatis的常用数据库操作方法封装了一下,它实现了jpa的规范,简单的查询更新和插入操作都可以直接使用其自带的方法,无需写额外的代码. 而且它还有根据实体的不为空的字段插入和更新的方法,这个是非常好用的哈. 而且它的集成非常简单和方便,下面我来演示下使用它怎么自动生成代码. pom中引入依赖,这里引入tk.mybatis.mapper的版本依赖是因为在mapper-spring-boot-starter的新版本中没有

浅谈springboot内置tomcat和外部独立部署tomcat的区别

前两天,我去面了个试,面试官问了我个问题,独立部署的tomcat跟springboot内置的tomcat有什么区别,为什么存在要禁掉springboot的tomcat然后将项目部署到独立的tomcat当中? 我就想,不都一个样?独立部署的tomcat可以配置优化?禁AJP,开多线程,开nio?而且springboot内置的tomcat多方便,部署上服务器写个java脚本运行即可.现在考虑下有什么条件能优于内置tomcat的. 1.tomcat的优化配置多线程?内置的也可以配置多线程 server

浅谈Springboot之于Spring的优势

Spring在Java EE开发中是实际意义上的标准,但我们在开发Spring的时候可能会遇到以下令人头疼的问题: (1)大量配置文件的定义; (2)与第三方软件整合的技术问题,Spring每个新版本的推出都以减少配置作为自己的主要目标,例如: (a)推出@Component, @Service, @Repository, @Controller等注解在类上声明Bean; (b)推出@Configuration, @Bean的Java配置来替代Xml配置. 在脚本语言和敏捷开发大行其道的时代,J

浅谈SpringBoot之事务处理机制

一.Spring的事务机制 所有的数据访问技术都有事务处理机制,这些技术提供了API用来开启事务.提交事务来完成数据操作,或者在发生错误的时候回滚数据. 而Spring的事务机制是用统一的机制来处理不同数据访问技术的事务处理.Spring的事务机制提供了一个PlatformTransactionManager接口,不同的数据访问技术的事务使用不同的接口实现: 在程序中定义事务管理器的代码如下: @Bean public PlatformTransactionManager transaction