java开发RocketMQ生产者高可用示例详解

目录
  • 引言
  • 1 消息
    • 1.1 topic
    • 1.2 Body
    • 1.3 tag
    • 1.4 key
    • 1.5 延迟级别
  • 2 生产者高可用
    • 2.1 客户端保证生产者高可用
      • 2.1.1 重试机制
      • 2.1.2 客户端容错
    • 2.2 Broker端保证生产者高可用

引言

前边两章说了点基础的,从这章开始,我们挖挖源码。看看RocketMQ是怎么工作的。

首先呢,这个生产者就是送孩子去码头的家长,孩子们呢,就是消息了。

我们看看消息孩子们都长啥样。

1 消息

public class Message implements Serializable {
    private static final long serialVersionUID = 8445773977080406428L;
    //主题名字
    private String topic;
    //消息扩展信息,Tag,keys,延迟级别都存在这里
    private Map<String, String> properties;
    //消息体,字节数组
    private byte[] body;
    //设置消息的key,
    public void setKeys(String keys) {}
    //设置topic
    public void setTopic(String topic) {}
    //延迟级别
    public int setDelayTimeLevel(int level) {}
    //消息过滤的标记
    public void setTags(String tags) {}
    //扩展信息存放在此
    public void putUserProperty(final String name, final String value) {}
}

消息就是孩子们,这些孩子们呢,有各自的特点,也有共性。同一个家长送来的两个孩子可以是去同一个地方的,也可以是去不同的地方的。

1.1 topic

首先呢,每个孩子消息都有一个属性topic,这个我们上文说到了,是一个候船大厅。孩子们进来之后,走到自己指定的候船大厅的指定区域(平时出门坐火车高铁不也是指定的站台乘车么),坐到message queue座位上等,等着出行。

Broker有一个或者多个topic,消息会存放到topic内的message queue内,等待被消费。

1.2 Body

孩子消息,也有一个Body属性,这就是他的能力,他会画画,他会唱歌,他会干啥干啥,就记录在这个Body属性里。等走出去了,体现价值的地方也是这个Body属性。

Body就是消息体,消费者会根据消息体执行对应的操作。

1.3 tag

这个tag我们上节说了,就是一个标记,有的孩子背着画板,相机,有的游船就特意找到这些孩子拉走,完成他们的任务。

可以给消息设置tag属性,消费者可以选择含有特定tag属性的消息进行消费。

1.4 key

key就是每个孩子消息的名字了。要找哪个孩子,喊他名就行。

对发送的消息设置好 Key,以后可以根据这个Key 来查找消息。比如消息异常,消息丢失,进行查找会很方便。

1.5 延迟级别

当然,还有的孩子来就不急着走,来之前就想好了,要恰个饭,得30分钟,所以自己来了会等30分钟后被接走。

设置延迟级别可以规定多久后消息可以被消费。

2 生产者高可用

每个送孩子来的家长都希望能送到候船大厅里,更不希望孩子被搞丢了,这个时候这个候船大厅就需要一些保证机制了。

2.1 客户端保证生产者高可用

2.1.1 重试机制

就是说家长送来了,孩子进到候船大厅之后,没能成功坐到message queue座位上,这个时候工作人员会安排重试,再去看是否有座位坐。重试次数默认是2次,也就是说,消息孩子共有3次找座位坐的机会。

看源码,我特意加了注解,大致可以看懂一些了。

//这里取到了重试的次数
int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
int times = 0;
String[] brokersSent = new String[timesTotal];
for (; times < timesTotal; times++) {
    String lastBrokerName = null == mq ? null : mq.getBrokerName();
    //获取消息队列
    MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
    if (mqSelected != null) {
        mq = mqSelected;
        brokersSent[times] = mq.getBrokerName();
        try {
            beginTimestampPrev = System.currentTimeMillis();
            if (times > 0) {
                //Reset topic with namespace during resend.
                msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
            }
            long costTime = beginTimestampPrev - beginTimestampFirst;
            if (timeout < costTime) {
                callTimeout = true;
                break;
            }
            //发送消息
            sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
            ...
        } catch (RemotingException e) {
            ...
            continue;
        } catch (MQClientException e) {
            ...
            continue;
        } catch (MQBrokerException e) {
            ...
            continue;
        } catch (InterruptedException e) {
            //可以看到只有InterruptedException抛出了异常,其他的exception都会继续重试
            throw e;
        }
    } else {
        break;
    }
}

重试代码如上,这个sendDefaultImpl方法中,会尝试发送三次消息,若是都失败,才会抛出对应的错误。

2.1.2 客户端容错

若是有多个Broker候车大厅的时候,服务人员会安排消息孩子选择一个相对不拥挤比较容易进入的来进入。当然那些已经关闭的停电的没有服务能力的,我们是不会进的。

MQ Client会维护一个Broker的发送延迟信息,根据这个信息会选择一个相对延迟较低的Broker来发送消息。会主动剔除哪些已经宕机,不可用或发送延迟级别较高的Broker.

选择Broker就是在选择message queue,对应的代码如下:

这里会先判断延迟容错开关是否开启,这个开关默认是关闭的,若是开启的话,会优先选择延迟较低的Broker。

public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
    //判断发送延迟容错开关是否开启
    if (this.sendLatencyFaultEnable) {
        try {
            //选择一个延迟上可以接受,并且和上次发送相同的Broker
            int index = tpInfo.getSendWhichQueue().incrementAndGet();
            for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                if (pos < 0)
                    pos = 0;
                MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                //若是Broker的延迟时间可以接受,则返回这个Broker
                if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                    return mq;
            }
            //若是第一步没能选中一个Broker,就选择一个延迟较低的Broker
            final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
            int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
            if (writeQueueNums > 0) {
                final MessageQueue mq = tpInfo.selectOneMessageQueue();
                if (notBestBroker != null) {
                    mq.setBrokerName(notBestBroker);
                    mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                }
                return mq;
            } else {
                latencyFaultTolerance.remove(notBestBroker);
            }
        } catch (Exception e) {
            log.error("Error occurred when selecting message queue", e);
        }
        //若是前边都没选中一个Broker,就随机选一个Broker
        return tpInfo.selectOneMessageQueue();
    }
    return tpInfo.selectOneMessageQueue(lastBrokerName);
}

但是当延迟容错开关关闭状态的时候,执行的代码如下:

为了均匀分散Broker的压力,会选择与之前不同的Broker

public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
    //若是没有上次的Brokername做参考,就随机选一个
    if (lastBrokerName == null) {
        return selectOneMessageQueue();
    } else {
        //如果有,那么就选一个其他的Broker
        for (int i = 0; i < this.messageQueueList.size(); i++) {
            int index = this.sendWhichQueue.incrementAndGet();
            int pos = Math.abs(index) % this.messageQueueList.size();
            if (pos < 0)
                pos = 0;
            MessageQueue mq = this.messageQueueList.get(pos);
            //这里判断遇上一个使用的Broker不是同一个
            if (!mq.getBrokerName().equals(lastBrokerName)) {
                return mq;
            }
        }
        //若是上边的都没选中,那么就随机选一个
        return selectOneMessageQueue();
    }
}

2.2 Broker端保证生产者高可用

Broker候船大厅为了能确切的接收到消息孩子,至少会有两个厅,一个主厅一个副厅,一般来说孩子都会进入到主厅,然后一顿操作,卡该忙信那机资(影分身之术),然后让分身进入到副厅,这样当主厅停电了,不工作了,副厅的分身只要去完成了任务就ok的。一般来说都是主厅的消息孩子去坐船完成任务。

之后我们会聊到Broker的主从复制,分为同步复制和异步复制,同步复制时指当master 收到消息之后,同步到slaver才算消息发送成功。异步复制是只要master收到消息就算成功。生产中建议至少部署两台master和两台slaver。

下一篇,我们聊聊,消息的发送流程,就是说,一个消息孩子,从进码头的门到坐到message queue座位上,都经历了啥。

以上就是java开发RocketMQ生产者高可用示例详解的详细内容,更多关于java RocketMQ生产者高可用的资料请关注我们其它相关文章!

(0)

相关推荐

  • RocketMQ消息发送流程源码剖析

    目录 正文 读源码 1 调用defaultMQProducerImpl.send() 2 设置过期时间 3 执行defaultMQProducerImpl.sendDefaultImpl()方法 sendDefaultImpl是发送消息的核心方法. 1 两个校验 2 获取topic路由信息 3 计算重试次数 4 执行队列选择方法 5 发送消息 正文 就是说,我们打了个比方,把RocketMQ比作码头上的一个小房子,来送孩子登船的家长比作生产者,拉走孩子们的船夫比作消费者,所以,RocketMQ的

  • Springboot详解RocketMQ实现广播消息流程

    RocketMQ消息模式主要有两种:广播模式.集群模式(负载均衡模式) 广播模式是每个消费者,都会消费消息: 负载均衡模式是每一个消费只会被某一个消费者消费一次: 我们业务上一般用的是负载均衡模式,当然一些特殊场景需要用到广播模式,比如发送一个信息到邮箱,手机,站内提示: 我们可以通过@RocketMQMessageListener的messageModel属性值来设置,MessageModel.BROADCASTING是广播模式,MessageModel.CLUSTERING是默认集群负载均衡

  • Springboot详解RocketMQ实现消息发送与接收流程

    springboot+rockermq 实现简单的消息发送与接收 普通消息的发送方式有3种:单向发送.同步发送和异步发送. 下面来介绍下 springboot+rockermq 整合实现 普通消息的发送与接收 创建Springboot项目,添加rockermq 依赖 <!--rocketMq依赖--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-

  • Springboot详细讲解RocketMQ实现顺序消息的发送与消费流程

    目录 一.创建Springboot项目添加rockermq依赖 二.配置rocketmq 三.新建一个controller来做消息发送 四.创建消费端监听消息消费消息 五.启动服务测试顺序消息发送与消费 如何实现顺序消息? 需要程序保证发送和消费的是同一个 Queue rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是順序消费消息的:有時候,我们需要顺序消费一批消息,比如电商系统 订单创建.支付.完成操作,需要順序执行: RocketMQTemplat

  • SpringBoot集成RocketMQ发送事务消息的原理解析

    目录 简介 原理 具体实现 消费者 消费者 生产者消息监听器 消息事务测试 正常测试 异常测试 代码调整 执行结果 总结 简介 RocketMQ 事务消息(Transactional Message)是指应用本地事务和发送消息操作可以被定义到全局事务中,要么同时成功,要么同时失败.RocketMQ 的事务消息提供类似 X/Open XA 的分布事务功能,通过事务消息能达到分布式事务的最终一致. 原理 RocketMQ事务消息通过异步确保方式,保证事务的最终一致性.设计的思想可以借鉴两个阶段提交事

  • java开发线上事故理解RocketMQ异步精髓

    目录 引言 1 业务场景 2 线程池模式 3 本地内存 + 定时任务 4 MQ 模式 5 Agent 服务 + MQ 模式 6 总结 第一层:什么场景下需要异步 第二层:异步的外功心法 第三层:异步的本质 引言 在高并发的场景下,异步是一个极其重要的优化方向. 前段时间,生产环境发生一次事故,笔者认为事故的场景非常具备典型性 . 写这篇文章,笔者想和大家深入探讨该场景的架构优化方案.希望大家读完之后,可以对异步有更深刻的理解. 1 业务场景 老师登录教研平台,会看到课程列表,点击课程后,课程会以

  • Java使用FileInputStream流读取文件示例详解

    一.File流概念 JAVA中针对文件的读写操作设置了一系列的流,其中主要有FileInputStream,FileOutputStream,FileReader,FileWriter四种最为常用的流 二.FileInputStream 1)FileInputStream概念  FileInputStream流被称为文件字节输入流,意思指对文件数据以字节的形式进行读取操作如读取图片视频等 2)构造方法 2.1)通过打开与File类对象代表的实际文件的链接来创建FileInputStream流对象

  • JAVA多线程实现生产者消费者的实例详解

    JAVA多线程实现生产者消费者的实例详解 下面的代码实现了生产者消费者的问题 Product.Java package consumerProducer; public class Product { private String id; public String getId() { return id; } public void setId(String id) { this.id = id; } public Product(String id) { this.id=id; } publ

  • Java使用Collections.sort()排序的示例详解

    Java中Collections.sort()排序详解,通过实例代码给大家讲解,具体代码如下所示: public static void main(String[] args) { List<String> list = new ArrayList<String>(); list.add("beijing"); list.add("shanghai"); list.add("hangzhou"); Collections.

  • java时区转换的理解及示例详解

    一.时区的基本概念 GMT(Greenwich Mean Time),即格林威治标准时,是东西经零度的地方.人们将地球人为的分为24等份,每一等份为一个时区,每时区横跨经度15度,时间正好为1小时.往西一个时区,则减去一小时:往东一个时区,则加上一小时.中国在东经120度上,(东经120°-东经0°)所得度数再除以15,即得8. UTC(Coordinated Universal Time),即世界协调时间,是经过平均太阳时(以格林威治时间GMT为准).地轴运动修正后的新时标以及以「秒」为单位的

  • Java数组的声明与创建示例详解

    今天在刷Java题的时候,写惯了C++发现忘记了Java数组的操作,遂把以前写的文章发出来温习一下. 首先,数组有几种创建方式? Java程序中的数组必须先进行初始化才可以使用,所谓初始化,就是为数组对象的元素分配内存空间,并为每个数组元素指定初始值,而在Java中,数组是静态的,数组一旦初始化,长度便已经确定,不能再随意更改. 声明数组变量 首先必须声明数组变量,才能在程序中使用数组.下面是声明数组变量的语法: dataType[] arrayRefVar; // 首选的方法 或 dataTy

  • java开发中遇到的异常汇总详解

    异常 算术异常类:ArithmeticExecption 空指针异常类:NullPointerException 类型强制转换异常:ClassCastException 数组负下标异常:NegativeArrayException 数组下标越界异常:ArrayIndexOutOfBoundsException 违背安全原则异常:SecturityException 文件已结束异常:EOFException 文件未找到异常:FileNotFoundException 字符串转换为数字异常:Numb

  • Java开发中的23种设计模式详解(推荐)

    设计模式(Design Patterns)                                   --可复用面向对象软件的基础 设计模式(Design pattern)是一套被反复使用.多数人知晓的.经过分类编目的.代码设计经验的总结.使用设计模式是为了可重用代码.让代码更容易被他人理解.保证代码可靠性. 毫无疑问,设计模式于己于他人于系统都是多赢的,设计模式使代码编制真正工程化,设计模式是软件工程的基石,如同大厦的一块块砖石一样.项目中合理的运用设计模式可以完美的解决很多问题,每

  • java开发 线上问题排查命令详解

    前言 作为一个合格的开发人员,不仅要能写得一手还代码,还有一项很重要的技能就是排查问题.这里提到的排查问题不仅仅是在coding的过程中debug等,还包括的就是线上问题的排查.由于在生产环境中,一般没办法debug(其实有些问题,debug也白扯...),所以我们需要借助一些常用命令来查看运行时的具体情况,这些运行时信息包括但不限于运行日志.异常堆栈.堆使用情况.GC情况.JVM参数情况.线程情况等. 给一个系统定位问题的时候,知识.经验是关键,数据是依据,工具是运用知识处理数据的手段.为了便

  • 在java poi导入Excel通用工具类示例详解

    前言 本文主要给大家介绍了关于java poi导入Excel通用工具类的相关内容,分享出来供大家参考学习,下面话不多说了,来一起看看详细的介绍吧. 问题引入和分析 提示:如果不想看罗嗦的文章,可以直接到最后点击源码下载运行即可 最近在做一个导入Excel的功能,在做之前在百度上面查找"java通用导入Excel工具类",没有查到,大多数都是java通用导出Excel.后来仔细想想,导出可以利用java的反射,做成通用的,放进相应的实体成员变量中,导入为什么不可以呢?也是可以的,不过在做

  • ios开发中的容错处理示例详解

    前言 后台服务器返回给客户端的值有时会是null,有时会是"<null>",直接赋值并进行后续操作有时会导致崩溃. 之前的处理方式都是尽量让后台服务器返回数据时不返回null或者是"<null>",但是他们还是时不时返回这些数据,所以app时不时就会出现闪退现象.一出现这种问题,调试后发现是他们返回null或者是"null"的数据类型,因为是线上的问题,所以让他们直接在后台将出现问题的字段进行处理就好了.久而久之,发现这种

随机推荐

其他