RocketMQ Push 消费模型示例详解

目录
  • 使用 DefaultMQPushConsumer 消费消息
  • 基于长轮询机制的伪 push 实现
    • 客户端侧发起的长轮询请求
    • 服务端阻塞请求
    • 客户端回调处理
    • 客户端发起请求的底层逻辑
    • PullCallback 回调
  • 总结

Push 模式是指由 Server 端来控制消息的推送,即当有消息到 Server 之后,会将消息主动投递给 client(Consumer 端)。

使用 DefaultMQPushConsumer 消费消息

下面是使用 DefaultMQPushConsumer 消费消息的官方示例代码:

// 初始化consumer,并设置consumer group name
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("MyGroup");
// 设置NameServer地址
consumer.setNamesrvAddr("localhost:9876");
//订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
consumer.subscribe("TopicTest", "*");
//注册回调接口来处理从Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        // 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS 为消费成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
// 启动Consumer
consumer.start();

这里看到主要是通过 consumer 注册回调接口来处理从 Broker 中收到的消息。这种监听回调的机制很容易想到是一种观察者模式或者事件机制;对于这种 C-S 模型的架构来说,如果要做到 Server 在有新消息时立即推送给 Client,那么 Client 和 Server 之间应该是有连接存在的,Client 端开放端口来 watch Server 的推送。这里好论证,即可以查看当前 Client 端所在进程开启了什么端口即可,通过如下指令查看:

  • 1、先通过 jps 查看 Consumer Client 的进程号
➜  rocketmq-4.9.4 git:(06f2208a3) jps
10722 Jps
4676 rocketmq-dashboard-1.0.1-SNAPSHOT.jar
1766
4121 BrokerStartup
4009 NamesrvStartup
9419 PushConsumer
9692 RemoteMavenServer36

可以看到 PushConsumer 的进程号是 9419

  • 2、通过 lsof 命令查看进程端口占用
➜  rocketmq-4.9.4 git:(06f2208a3) lsof -nP -p 9419| grep LISTEN
➜

这里没有看到 PushConsumer 有开启端口。同样,这里可以看看 Broker 的进程端口占用

➜  rocketmq-4.9.4 git:(06f2208a3) lsof -nP -p 4121| grep LISTEN
java    4121 glmapper  137u    IPv6 0xca1142b0f200067d        0t0                 TCP *:10912 (LISTEN)
java    4121 glmapper  141u    IPv6 0xca1142b0f1fc8cfd        0t0                 TCP *:10911 (LISTEN)
java    4121 glmapper  142u    IPv6 0xca1142b0f1fc935d        0t0                 TCP *:10909 (LISTEN)

所以得到一个初步的结论是,在 Push 模式下,Consumer Client 并没有启动端口来接收 Server 的消息推送。 那么 RocketMQ 是怎么实现的?

基于长轮询机制的伪 push 实现

真正的 Push 方式,是 Server 端接收到消息后,主动把消息推送给 Client 端,这种情况一般需要 Client 和 Server 之间建立长连接。通过前面的分析,Client 既然没有开启端口用于接收 Server 的信息推送,那么只有一种可能就是 Client 自己去拉了消息,但是这种主动拉消息的方式是对于用户无感的,从使用上体验上来看,做到了和 push 一样的效果;这种机制就是“长轮询”。

为啥不用长连接方式,让 Server 主动 Push 呢?其实很好理解,对于一个提供队列服务的 Server 来说,用 Push方式主动推送有两个问题:

  • 1、会增加 Server 端的工作量,进而影响 Server 的性能
  • 2、Client 的处理能力存在差异,Client 的状态不受 Server 控制,如果 Client 不能及时处理 Server 推送过来的消息,会造成各种潜在问题

客户端侧发起的长轮询请求

下图是初始化相关资源的过程,DefaultMQPushConsumer 是面向用户使用的 API client 类,内部处理实际上是委托给 DefaultMQPushConsumerImpl 来处理的。DefaultMQPushConsumerImpl#start 时,会初始化 MQClientInstance ,MQClientInstance 初始化过程中又会初始化一堆资源,比如请求-响应的通道,开启各种各样的调度任务(定期拉去 NameServerAddress、定期更新 Topic 路由信息、定期清理 Offline状态的 Broker、定期发送心跳给 Broker、定期持久化所有 Consumer Offset等等),开启 pullMessageService,开启 rebalance Service 等等。大致的调用链如下图

下面这个代码片段是 pullMessageService 的 run 方法(pullMessageService 是 Runnable 子类)

@Override
public void run() {
    log.info(this.getServiceName() + " service started");
    while (!this.isStopped()) {
        try {
            // 从 pullRequestQueue 中取 pullRequest
            PullRequest pullRequest = this.pullRequestQueue.take();
            this.pullMessage(pullRequest);
        } catch (InterruptedException ignored) {
        } catch (Exception e) {
            log.error("Pull Message Service Run Method exception", e);
        }
    }
    log.info(this.getServiceName() + " service end");
}

通过代码,可以直观的看起,pullMessageService 会一直从 pullRequestQueue 中取 pullRequest,然后执行 pullMessage 请求。实际上 MessageQueue 是和 pullRequest 一一对应的 ,pullRequest 全部存储到该 Consumer 的 pullRequestQueue 队列里面;消费者会不停的从 PullRequest 的队列里取 request 然后向broker 请求消息。

这里还有一个问题是队列取出之后什么时候放回去的?在 pullMessage 的回调方法中,如果正常得到了 broker 的响应,那么会把 PullRequest放回队列,相关代码可以从 org.apache.rocketmq.client.consumer.PullCallbackonSuccess 方法中得到答案。

服务端阻塞请求

服务端处理 pullRequest 请求的是 PullMessageProcessor,当没有消息时,则通过 PullRequestHoldService 将当前请求先 hold 住。

case ResponseCode.PULL_NOT_FOUND:
    if (brokerAllowSuspend && hasSuspendFlag) {
        long pollingTimeMills = suspendTimeoutMillisLong;
        // 如果是 LongPolling,则 hold 住
        if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {
            pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();
        }
        String topic = requestHeader.getTopic();
        long offset = requestHeader.getQueueOffset();
        int queueId = requestHeader.getQueueId();
        PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,
            this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);
        this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);
        response = null;
        break;
    }

PullRequestHoldService 中会将所有的 PullRequest 缓存到 pullRequestTable。PullRequestHoldService 也是一个 task,默认每次 hold 5s 然后再去检查是否有新的消息过来,如果有新的消息到来,则唤醒对应的线程来将消息返回给客户端。

// 已省略无关代码
public void run() {
    // loop
    while (!this.isStopped()) {
        // default hold 5s
        if (this.brokerController.getBrokerConfig().isLongPollingEnable()) {
            this.waitForRunning(5 * 1000);
        } else {
            this.waitForRunning(this.brokerController.getBrokerConfig().getShortPollingTimeMills());
        }
        long beginLockTimestamp = this.systemClock.now();
        // 检查是否有新的消息到达
        this.checkHoldRequest();
        long costTime = this.systemClock.now() - beginLockTimestamp;
        if (costTime > 5 * 1000) {
            log.info("[NOTIFYME] check hold request cost {} ms.", costTime);
        }
	}
}

客户端回调处理

我们在编写 consumer 代码时,基于 push 模式是通过如下方式来监听消息的

//注册回调接口来处理从Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
        // 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS 为消费成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});

通过前面的分析,对于如何通过“长轮询”实现伪“push” 有了大概得了解;客户端通过一个定时任务不断向 Broker 发请求,Broker 在没有消息时先 hold 住一小段时间,当有新的消息时会立即将消息返回给 consumer;本节就主要探讨 consumer 在收到消息之后的处理逻辑,以及是怎么触发 MessageListener 回调执行的。

客户端发起请求的底层逻辑

以异步调用为例,代码在

org.apache.rocketmq.client.impl.MQClientAPIImpl#pullMessageAsync中,截取部分代码如下:

this.remotingClient.invokeAsync(addr, request, timeoutMillis, new InvokeCallback() {
    @Override
    public void operationComplete(ResponseFuture responseFuture) {
        RemotingCommand response = responseFuture.getResponseCommand();
        if (response != null) {
            try {
                PullResult pullResult = MQClientAPIImpl.this.processPullResponse(response, addr);
                assert pullResult != null;
                // 成功回调
                pullCallback.onSuccess(pullResult);
            } catch (Exception e) {
                // 异常回调
                pullCallback.onException(e);
            }
        } else {
            if (!responseFuture.isSendRequestOK()) {
                 // 异常回调
                pullCallback.onException(new MQClientException("send request failed to " + addr + ". Request: " + request, responseFuture.getCause()));
            } else if (responseFuture.isTimeout()) {
                 // 异常回调
                pullCallback.onException(new MQClientException("wait response from " + addr + " timeout :" + responseFuture.getTimeoutMillis() + "ms" + ". Request: " + request,
                    responseFuture.getCause()));
            } else {
                 // 异常回调
                pullCallback.onException(new MQClientException("unknown reason. addr: " + addr + ", timeoutMillis: " + timeoutMillis + ". Request: " + request, responseFuture.getCause()));
            }
        }
    }
});

PullCallback 回调

PullCallback 回调逻辑在 org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#pullMessage方法中,以正常返回消息为例:

// 已省略无关代码
public void onSuccess(PullResult pullResult) {
    // 将接收到的消息 交给 consumeMessageService 处理
    DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
        pullResult.getMsgFoundList(),
        processQueue,
        pullRequest.getMessageQueue(),
        dispatchToConsume);
    // 将 pullRequest 放回 pullRequestQueue
 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
}

ConsumeRequest 是一个 Runnable,submitConsumeRequest 就是将返回结果丢在一个单独的线程池中去处理返回结果的。ConsumeRequest 的 run 方法中,会拿到 messageListener,然后执行 consumeMessage 方法。

总结

到此,关于 RocketMQ push 消费模型基本就探讨完了。从实现机制上来看,push 本质上并不是在建立双向通道的前提下,由 Server 主动推送给 Client 的,而是由 Client 端触发 pullRequest 请求,以长轮询的方式“伪装”的结果。从代码上来,RocketMQ 代码中使用了非常多的异步机制,如 pullRequestQueue 来解耦发送请求和等待结果,各种定时任务等等。

整体看,PushConsumer 采用了 长轮询+超时时间+Pull的模式, 这种方式带来的好处总结如下

  • 1、减少 Broker 的压力,避免由于不同 Consumer 消费能力导致 Broker 出现问题
  • 2、确保了 Consumer 不会负载过高,Consumer 在校验自己的缓存消息没有超过阈值才会去从 Broker 拉取消息,Broker 不会主动推过来
  • 3、兼顾了消息的即时性,Broker 在没有消息的时候会先 hold 一小段时间,有消息会立即唤起线程将消息返回给 Consumer
  • 4、Broker 端无效请求的次数大大降低,Broker 在没有消息时会挂起 PullRequest,而 Consumer 在未接收到Response 且未超时时,也不会重新发起 PullRequest

以上就是RocketMQ Push 消费模型示例详解的详细内容,更多关于RocketMQ Push 消费模型的资料请关注我们其它相关文章!

时间: 2022-09-19

RocketMQ源码解析topic创建机制详解

目录 1. RocketMQ Topic创建机制 2. 自动Topic 3. 手动创建--预先创建 通过界面控制台创建 1. RocketMQ Topic创建机制 以下源码基于Rocket MQ 4.7.0 RocketMQ Topic创建机制分为两种:一种自动创建,一种手动创建.可以通过设置broker的配置文件来禁用或者允许自动创建.默认是开启的允许自动创建 autoCreateTopicEnable=true/false 下面会结合源码来深度分析一下自动创建和手动创建的过程. 2. 自动T

java开发RocketMQ生产者高可用示例详解

目录 引言 1 消息 1.1 topic 1.2 Body 1.3 tag 1.4 key 1.5 延迟级别 2 生产者高可用 2.1 客户端保证生产者高可用 2.1.1 重试机制 2.1.2 客户端容错 2.2 Broker端保证生产者高可用 引言 前边两章说了点基础的,从这章开始,我们挖挖源码.看看RocketMQ是怎么工作的. 首先呢,这个生产者就是送孩子去码头的家长,孩子们呢,就是消息了. 我们看看消息孩子们都长啥样. 1 消息 public class Message implemen

RocketMQ消息发送流程源码剖析

目录 正文 读源码 1 调用defaultMQProducerImpl.send() 2 设置过期时间 3 执行defaultMQProducerImpl.sendDefaultImpl()方法 sendDefaultImpl是发送消息的核心方法. 1 两个校验 2 获取topic路由信息 3 计算重试次数 4 执行队列选择方法 5 发送消息 正文 就是说,我们打了个比方,把RocketMQ比作码头上的一个小房子,来送孩子登船的家长比作生产者,拉走孩子们的船夫比作消费者,所以,RocketMQ的

rocketmq消费负载均衡--push消费详解

前言 本文介绍了DefaultMQPushConsumerImpl消费者,客户端负载均衡相关知识点.本文从DefaultMQPushConsumerImpl启动过程到实现负载均衡,从源代码一步一步分析,共分为6个部分进行介绍,其中第6个部分 rebalanceByTopic 为负载均衡的核心逻辑模块,具体过程运用了图文进行阐述. 介绍之前首先抛出几个问题: 1. 要做负载均衡,首先要解决的一个问题是什么? 2. 负载均衡是Client端处理还是Broker端处理? 个人理解: 1. 要做负载均衡

RocketMQ的push消费方式实现示例

目录 引言 MQ消费方式 1.push(推方式) 2.pull(拉方式) RocketMQ对于消费方式的实现 RocketMQ聪明地实现push的原因 轮询与长轮询 轮询 长轮询 push消费方式源码探究 1.消费者拉取消息控制压力源码 2.MQ将请求hold住源码 3.MQ收到消息响应给消费者的源码 最后 引言 最近仍然畅游在RocketMQ的源码中,这几天刚好翻到了消费者的源码,发现RocketMQ的对于push消费方式的实现简直太聪明了,所以趁着我脑子里还有点印象的时候,赶紧来写一篇文章,

RocketMQ普通消息实战演练详解

目录 引言 普通消息同步发送 普通消息异步发送 普通消息单向发送 集群消费模式 广播消费模式 引言 之前研究了RocketMQ的源码,在这里将各种消息发送与消费的demo进行举例,方便以后使用的时候CV. 相关的配置,安装和启动在这篇文章有相关讲解  https://www.jb51.net/article/260237.htm 普通消息同步发送 同步消息是指发送出消息后,同步等待,直到接收到Broker发送成功的响应才会继续发送下一个消息.这个方式可以确保消息发送到Broker成功,一些重要的

JMS 之 Active MQ 的消息传输(详解)

本文使用Active MQ5.6 一.消息协商器(Message Broker) broke:消息的交换器,就是对消息进行管理的容器.ActiveMQ 可以创建多个 Broker,客户端与ActiveMQ交互,实际上都是与ActiveMQ中的Broker交互,Broker配置在${MQ_HOME}\conf\activemq.xml. 二.连接器(Connectors)(一).传输连接器 (transportConnectors) transportConnectors 连接器:就是建立brok

hmac模块生成加入了密钥的消息摘要详解

hmac模块 hmac模块用于生成HMAC码.这个HMAC码可以用于验证消息的完整性,其原理也很简单,就是一种加入了密钥的消息摘要,相比起MAC更加安全.JWT(JSON Web Token)中第三部分的消息摘要就是使用了HMAC. HMAC(Hash-based Message Authentication Code) 先大致介绍一下HMAC吧.HMAC是一种消息摘要算法,是一种特殊的MAC(消息认证码),内部使用别的摘要算法进行摘要的计算(比如MD5).相比MAC,HMAC在生成摘要的时候加

wxpython多线程防假死与线程间传递消息实例详解

wxpython中启用线程的方法,将GUI和功能的执行分开. 网上关于python多线程防假死与线程传递消息是几年前的,这里由于wxpython和threading模块已经更新最新,因此给出最新修改代码,能在2017年最新版的python和模块中运行. 原来的publisher()和callafter都无法使用. 修改后的代码. import time import wx from threading import Thread from wx.lib.pubsub import pub cla

iOS开发系列--通知与消息机制详解

概述 在多数移动应用中任何时候都只能有一个应用程序处于活跃状态,如果其他应用此刻发生了一些用户感兴趣的那么通过通知机制就可以告诉用户此时发生的事情.iOS中通知机制又叫消息机制,其包括两类:一类是本地通知:另一类是推送通知,也叫远程通知.两种通知在iOS中的表现一致,可以通过横幅或者弹出提醒两种形式告诉用户,并且点击通知可以会打开应用程序,但是实现原理却完全不同.今天就和大家一块去看一下如何在iOS中实现这两种机制,并且在文章后面会补充通知中心的内容避免初学者对两种概念的混淆. 本地通知 本地通

Android Intent发送广播消息实例详解

Android Intent发送广播消息 Intent的另一种用途是发送广播消息,应用程序和Android系统都可以使用Intent发送广播消息,广播消息的内容是可以与应用程序密切相关的数据信息,也可以是Android的系统信息,例如网络连接变化.电池电量变化.接收的短信或系统设置变化等.如果应用程序注册了BroadcastReceiver,则可以接受到指定的广播信息. 使用Intent发送广播消息非常简单,只须创建一个Intent,并调用sendBroadcast()函数就可把Intent携带

Windows窗口消息实例详解

本文实例总结了Windows窗口消息.分享给大家供大家参考.具体如下: 复制代码 代码如下: //////////////////////////////////////////////////////////////////////////    #include "AFXPRIV.H"//消息值的定义来源    #include "Dde.h"//DDE消息值的定义来源    #include "CPL.H"//控制面板消息值的定义来源   

Android 消息机制详解及实例代码

Android 消息机制 1.概述 Android应用启动时,会默认有一个主线程(UI线程),在这个线程中会关联一个消息队列(MessageQueue),所有的操作都会被封装成消息队列然后交给主线程处理.为了保证主线程不会退出,会将消息队列的操作放在一个死循环中,程序就相当于一直执行死循环,每循环一次,从其内部的消息队列中取出一个消息,然后回调相应的消息处理函数(handlerMessage),执行完成一个消息后则继续循环,若消息队列为空,线程则会阻塞等待.因此不会退出.如下图所示: Handl

python实现的自动发送消息功能详解

本文实例讲述了python实现的自动发送消息功能.分享给大家供大家参考,具体如下: 一个简单的脚本 #-*- coding:utf-8 -*- from __future__ import unicode_literals from threading import Timer import itchat import requests # 抓取金山毒霸每日一句,英文和翻译 def get_news(): url = "http://open.iciba.com/dsapi/" r =

Android异步消息机制详解

Android中的异步消息机制分为四个部分:Message.Handler.MessageQueue和Looper. 其中,Message是线程之间传递的消息,其what.arg1.arg2字段可以携带整型数据,obj字段可以携带一个Object对象. Handler是处理者,主要用于发送消息和处理消息.发送消息的方法是sendMessage:处理消息的方法是handleMessage(),Message字段携带的信息在该方法中用作判别. MessageQueue是消息队列,存放所有Handle

Python用sndhdr模块识别音频格式详解

本文主要介绍了Python编程中,用sndhdr模块识别音频格式的相关内容,具体如下. sndhdr模块 功能描述:sndhdr模块提供检测音频类型的接口. 唯一一个API sndhdr模块提供了sndhdr.what(filename)和sndhdr.whathdr(filename)两个函数.但实际上它们的功能是一样的.(不知道多写一个的意义何在,what函数在内部调用了whathdr函数并把数据完完整整地返回) 在之前的版本,whathdr函数返回元组类型的数据,在Python3.5版本之