RocketMQ Namesrv架构工作原理详解

目录
  • 1 概念
  • 2 核心数据结构和API
    • 2.1 Namesrv的核心数据结构
    • 2.2 Namesrv的API
  • 3 Namesrv架构
    • 3.1组件
    • 3.2 Namesrv四个功能模块

1 概念

Namesrv的作用是保存元数据提高Broker的可用性

Namesrv的主要功能是临时存储管理Topic路由信息,各个Namesrv节点之间是不通信无状态的,互相不知道对方的存在。

当Broker,生产者,消费者启动的时候,会轮询全部的Namesrv节点,获取路由信息。

2 核心数据结构和API

2.1 Namesrv的核心数据结构

Namesrv中保存的信息是Topic的路由信息,Topic的路由决定了Topic的信息发送给哪些Broker,或者从哪些Broker获取消息。

路由数据结构的实现代码都在org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager中

public class RouteInfoManager {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    //Broker存活的时间周期,默认120秒
    private final static long DEFAULT_BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    //保存Topic和队列的路由信息
    private final Map<String/* topic */, Map<String, QueueData>> topicQueueTable;
    //Broker名字和Broker信息的对应信息
    private final Map<String/* brokerName */, BrokerData> brokerAddrTable;
    //集群和Broker的对应关系
    private final Map<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    //在线的Broker地址和Broker信息的对应关系
    private final Map<BrokerAddrInfo/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    //过滤服务器消息
    private final Map<BrokerAddrInfo/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;
    private final Map<String/* topic */, Map<String/*brokerName*/, TopicQueueMappingInfo>> topicQueueMappingInfoTable;
    private final BatchUnRegisterService unRegisterService;
    private final NamesrvController namesrvController;
    private final NamesrvConfig namesrvConfig;

2.2 Namesrv的API

Namesrv的的API在org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor中,根据方法名很容易判断出来方法的作用。

switch (request.getCode()) {
    case RequestCode.PUT_KV_CONFIG:
        return this.putKVConfig(ctx, request);
    case RequestCode.GET_KV_CONFIG:
        return this.getKVConfig(ctx, request);
    case RequestCode.DELETE_KV_CONFIG:
        return this.deleteKVConfig(ctx, request);
    case RequestCode.QUERY_DATA_VERSION:
        return this.queryBrokerTopicConfig(ctx, request);
    case RequestCode.REGISTER_BROKER:
        //Broker注册自身信息到Namesrv
        return this.registerBroker(ctx, request);
    case RequestCode.UNREGISTER_BROKER:
        //Broker取消注册自身信息到Namesrv
        return this.unregisterBroker(ctx, request);
    case RequestCode.BROKER_HEARTBEAT:
        return this.brokerHeartbeat(ctx, request);
    case RequestCode.GET_BROKER_MEMBER_GROUP:
        return this.getBrokerMemberGroup(ctx, request);
    case RequestCode.GET_BROKER_CLUSTER_INFO:
        return this.getBrokerClusterInfo(ctx, request);
    case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
        return this.wipeWritePermOfBroker(ctx, request);
    case RequestCode.ADD_WRITE_PERM_OF_BROKER:
        return this.addWritePermOfBroker(ctx, request);
    case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
        return this.getAllTopicListFromNameserver(ctx, request);
    case RequestCode.DELETE_TOPIC_IN_NAMESRV:
        return this.deleteTopicInNamesrv(ctx, request);
    case RequestCode.REGISTER_TOPIC_IN_NAMESRV:
        return this.registerTopicToNamesrv(ctx, request);
    case RequestCode.GET_KVLIST_BY_NAMESPACE:
        return this.getKVListByNamespace(ctx, request);
    case RequestCode.GET_TOPICS_BY_CLUSTER:
        return this.getTopicsByCluster(ctx, request);
    case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
        return this.getSystemTopicListFromNs(ctx, request);
    case RequestCode.GET_UNIT_TOPIC_LIST:
        return this.getUnitTopicList(ctx, request);
    case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
        return this.getHasUnitSubTopicList(ctx, request);
    case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
        return this.getHasUnitSubUnUnitTopicList(ctx, request);
    case RequestCode.UPDATE_NAMESRV_CONFIG:
        return this.updateConfig(ctx, request);
    case RequestCode.GET_NAMESRV_CONFIG:
        return this.getConfig(ctx, request);
    case RequestCode.GET_CLIENT_CONFIG:
        return this.getClientConfigs(ctx, request);
    default:
        String error = " request type " + request.getCode() + " not supported";
        return RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
}

3 Namesrv架构

下图是一个消息的常规流转过程,生产者,消费者,Broker通过与Namesrv交换信息来实现自己的功能。

3.1组件

  • Broker

Broker在启动的时候,将自己的元数据信息,上报给Namesrv,这部分信息也就是Topic路由。

这里的元数据包含Broker本身的元数据和该Broker中Topic的信息。

  • 生产者

生产者只关注Topic路由,从namesrv获取到Topic路由后就可以知道这个Topic的消息存放到了哪些Broker中。

  • 消费者

消费者也只关注Topic路由,从namesrv获取到获取到Topic路由之后,才能知道自己订阅的Topic的Broker地址,从而获取消息。

3.2 Namesrv四个功能模块

  • Topic功能管理模块

这是Namesrv最核心的模块,Topic路由决定,Topic的数据会保存在哪些Broker上。Broker启动的时候,会将自身的信息注册到Namesrv中,以供消费者和生产者获取。生产者和消费者与Namesrv之间会有心跳通信,从而获取最新的Broker信息。

  • Remoting通信模块

这个模块是基于Netty的网络通信封装,担任各个组件之间的网络通信任务。

  • 定时任务模块

定时任务模块包括:定时扫描宕机的Broker,定时打印KV配置,定时扫描超时请求。

  • KV管理模块

Namesrv维护了一个全局的KV配置魔窟啊,方便全局配置。

以上就是RocketMQ Namesrv架构工作原理详解的详细内容,更多关于RocketMQ Namesrv架构的资料请关注我们其它相关文章!

时间: 2022-08-10

SpringBoot整合RocketMQ实现消息发送和接收的详细步骤

我们使用主流的SpringBoot框架整合RocketMQ来讲解,使用方便快捷: 最终项目结构如下: 具体步骤如下: 第一步:新建SpringBoot项目rocketmq-test,引入rocketmq依赖,以及项目配置 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <vers

RocketMQ4.5.2 修改mqnamesrv 和 mqbroker的日志路径操作

此解决方案是针对window的,因为日志默认保存路径在C盘,linux忽略. 学习RocketMQ过程中,总是出现 com.alibaba.rocketmq.client.exception.MQBrokerException: CODE: 14 DESC: service not available now, maybe disk full, CL: 0.87 CQ: 0.87 INDEX: 0.87, maybe your broker machine memory too small. 这

Springboot详细讲解RocketMQ实现顺序消息的发送与消费流程

目录 一.创建Springboot项目添加rockermq依赖 二.配置rocketmq 三.新建一个controller来做消息发送 四.创建消费端监听消息消费消息 五.启动服务测试顺序消息发送与消费 如何实现顺序消息? 需要程序保证发送和消费的是同一个 Queue rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是順序消费消息的:有時候,我们需要顺序消费一批消息,比如电商系统 订单创建.支付.完成操作,需要順序执行: RocketMQTemplat

SpringCloud+RocketMQ实现分布式事务的实践

目录 一.RocketMQ的分布式事务结构和说明 二.搭建RocketMQ 三.事务场景,然后准备工程,运行代码 随着互联网公司的微服务越来越多,分布式事务已经成为了我们的经常使用的.所以我们来一步一步的实现基于RocketMQ的分布式事务.接下来,我们将要做的主题写出来. RocketMQ的分布式事务结构和说明 搭建RocketMQ步骤 事务场景,然后准备工程,运行代码 一.RocketMQ的分布式事务结构和说明 我们通过下图来了解一下RocketMQ实现分布式事务的结构.采用半消息机制实现分

RocketMQ实现随缘分BUG小功能示例详解

目录 正文 实现过程 生产者: 正文 以前公司的产品已经上线20多年了,主要是维护,也就是改bug.每周我们Team会从Jira上拿我们可以改的bug,因为每个团队负责的业务范围不一样,我们团队只能改我们自己业务范围的.这样每周大概有20个左右的新bug,假如团队一共10个人,那么均分就是每人两个,改完下班. 但是这BUG肯定有难有简单,大家肯定都愿意改简单的,在家办公,任务量完了不就等于放假么.开始是自己给自己抢,就忒卷,是欧美项目,所以客服晚上报出来的bug多.有的哥们早上5点起来看有没有新

解决springboot集成rocketmq关于tag的坑

springboot集成rocketmq关于tag的坑 新项目使用springboot的若依框架集成rocketmq,选择集成RocketMQTemplate这种方式实现消息的发送和接收. 1.客户端发送代码 此处回调方法里有些业务不用关注,只关心发送方法 @Component public class RocketMqHelper { Logger logger = LoggerFactory.getLogger(RocketMqHelper.class); @Resource private

如何解决SpringBoot集成百度UEditor图片上传后直接访问404

SpringBoot项目上传图片一般是上传至远程服务器存储,开发过程中可能会上传至当前项目的某个静态目录中,此时就会遇到这个问题,文件在上传之后直接访问并不能被访问到,必须重新加载项目. 首先分析一下原因: 我们知道,如果使用类似 /upload/image/1.jpg 这种格式进行图片的访问的时候,SpringBoot读取的并不是本项目中直接的静态目录,而是在进行编译的时候生成target目录下的文件,如下图所示: 那么问题就来了,我们在运行的过程中上传一个图片的话,并不能重新加载当前这个项目

详解SpringBoot集成jsp(附源码)+遇到的坑

本文介绍了SpringBoot集成jsp(附源码)+遇到的坑 ,分享给大家 1.大体步骤 (1)创建Maven web project: (2)在pom.xml文件添加依赖: (3)配置application.properties支持jsp (4)编写测试Controller (5)编写JSP页面 (6)编写启动类App.java 2.新建SpringInitialzr 3.pom文件 <dependencies> <dependency> <groupId>org.s

SpringBoot集成Swagger2实现Restful(类型转换错误解决办法)

pom.xml增加依赖包 <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.2.2</version> </dependency> <dependency> <groupId>io.springfox</groupId> <

解决SpringBoot框架因post数据量过大没反应问题(踩坑)

此处网上最多的做法是需要修改tomcat的参数配置大致如下: <Connector port="8080" protocol="HTTP/1.1" connectionTimeout="2000" redirectPort="8443" URIEncoding="UTF-8" maxThreads="3000" compression="on" compress

SpringBoot集成gRPC微服务工程搭建实践的方法

前言 本文将使用Maven.gRPC.Protocol buffers.Docker.Envoy等工具构建一个简单微服务工程,笔者所使用的示例工程是以前写的一个Java后端工程,因为最近都在 学习微服务相关的知识,所以利用起来慢慢的把这个工程做成微服务化应用.在实践过程踩过很多坑,主要是经验不足对微服务还是停留在萌新阶段,通过本文 记录创建微服务工程碰到一些问题,此次实践主要是解决以下问题: 如何解决.统一服务工程依赖管理 SpringBoot集成gRPC 管理Protocol buffers文

SpringBoot集成Zipkin实现分布式全链路监控

Zipkin 简介 Zipkin is a distributed tracing system. It helps gather timing data needed to troubleshoot latency problems in service architectures. Features include both the collection and lookup of this data. If you have a trace ID in a log file, you ca

SpringBoot集成WebSocket长连接实际应用详解

前言: 一.WebSocket之初出茅驴 官方定义:WebSocket是一种在单个TCP连接上进行全双工通信的协议.WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据.在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输.是真正的双向平等对话,属于服务器推送技术的一种. 太官方啦,还是博主过来翻译一下吧 :WebSocket技术只需要service和client建立一次连接,就能实现服

SpringBoot集成Spring Data JPA及读写分离

相关代码: github OSCchina JPA是什么 JPA(Java Persistence API)是Sun官方提出的Java持久化规范,它为Java开发人员提供了一种对象/关联映射工具 来管理Java应用中的关系数据.它包括以下几方面的内容: 1.ORM映射 支持xml和注解方式建立实体与表之间的映射. 2.Java持久化API 定义了一些常用的CRUD接口,我们只需直接调用,而不需要考虑底层JDBC和SQL的细节. 3.JPQL查询语言 这是持久化操作中很重要的一个方面,通过面向对象

解决springboot 获取form-data里的file文件的问题

解决springboot 获取form-data里的file文件的问题 前言: 这两天用 springboot 和同事的 iOS 客户端上传文件对接.在客户端他使用的是 afnetworking 第三方库.我使用的是 springboot 集成的 StandardMultipartHttpServletRequest 的解析方式. 写好服务器端的接口以后,使用 postman 模拟 form-data 混合上传普通文本数据和 file 文件是没问题的.后来再 iOS 端混合上传文本和 file

SpringBoot集成swagger的实例代码

Swagger 是一款RESTFUL接口的文档在线自动生成+功能测试功能软件.本文简单介绍了在项目中集成swagger的方法和一些常见问题.如果想深入分析项目源码,了解更多内容,见参考资料. Swagger 是一个规范和完整的框架,用于生成.描述.调用和可视化 RESTful 风格的 Web 服务.总体目标是使客户端和文件系统作为服务器以同样的速度来更新.文件的方法,参数和模型紧密集成到服务器端的代码,允许API来始终保持同步.Swagger 让部署管理和使用功能强大的API从未如此简单. 对于