.net平台的rabbitmq使用封装demo详解

目录
  • 前言
    • 什么是rabbitMQ
    • Rabbitmq的关键术语
  • Rabbitmq的运作
  • Publish(发布)的封装
  • Subscribe(订阅)的封装
  • Pull(拉)的封装
  • Rpc(远程调用)的封装
  • 结尾

前言

  RabbitMq大家再熟悉不过,这篇文章主要针对rabbitmq学习后封装RabbitMQ.Client的一个分享。文章最后,我会把封装组件和demo奉上。

什么是rabbitMQ

RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue 高级消息队列协议 )的开源实现,
        能够实现异步消息处理

RabbitMQ是一个消息代理:它接受和转发消息。
        你可以把它想象成一个邮局:当你把你想要发布的邮件放在邮箱中时,你可以确定邮差先生最终将邮件发送给你的收件人。在这个比喻中,RabbitMQ是邮政信箱,邮局和邮递员。
        RabbitMQ和邮局的主要区别在于它不处理纸张,而是接受,存储和转发二进制数据块

优点:异步消息处理
              业务解耦(下订单操作:扣减库存、生成订单、发红包、发短信),
                        将下单操作主流程:扣减库存、生成订单
                        然后通过MQ消息队列完成通知,发红包、发短信
              错峰流控 (通知量 消息量 订单量大的情况实现MQ消息队列机制,淡季情况下访问量会少)

灵活的路由(Flexible Routing)
                在消息进入队列之前,通过 Exchange 来路由消息的。对于典型的路由功能,RabbitMQ 已经提供了一些内置的 Exchange 来实现。针对更复杂的路由功能,可以将多个 Exchange 绑定在一起,也通过插件机制实现自己的 Exchange 。

RabbitMQ网站端口号:15672
        程序里面实现的端口为:5672

Rabbitmq的关键术语

  1、绑定器(Binding):根据路由规则绑定Queue和Exchange。

  2、路由键(Routing Key):Exchange根据关键字进行消息投递。

  3、交换机(Exchange):指定消息按照路由规则进入指定队列

  4、消息队列(Queue):消息的存储载体

  5、生产者(Producer):消息发布者。

  6、消费者(Consumer):消息接收者。

Rabbitmq的运作

  从下图可以看出,发布者(Publisher)是把消息先发送到交换器(Exchange),再从交换器发送到指定队列(Queue),而先前已经声明交换器与队列绑定关系,最后消费者(Customer)通过订阅或者主动取指定队列消息进行消费。

  那么刚刚提到的订阅和主动取可以理解成,推(被动),拉(主动)。

  推,只要队列增加一条消息,就会通知空闲的消费者进行消费。(我不找你,就等你找我,观察者模式)

  拉,不会通知消费者,而是由消费者主动轮循或者定时去取队列消息。(我需要才去找你)

  使用场景我举个例子,假如有两套系统 订单系统和发货系统,从订单系统发起发货消息指令,为了及时发货,发货系统需要订阅队列,只要有指令就处理。

  可是程序偶尔会出异常,例如网络或者DB超时了,把消息丢到失败队列,这个时候需要重发机制。但是我又不想while(IsPostSuccess == True),因为只要出异常了,会在某个时间段内都会有异常,这样的重试是没意义的。

  这个时候不需要及时的去处理消息,有个JOB定时或者每隔几分钟(失败次数*间隔分钟)去取失败队列消息,进行重发。

Publish(发布)的封装

  步骤:初始化链接->声明交换器->声明队列->换机器与队列绑定->发布消息。注意的是,我将Model存到了ConcurrentDictionary里面,因为声明与绑定是非常耗时的,其次,往重复的队列发送消息是不需要重新初始化的。

/// <summary>
          /// 交换器声明
          /// </summary>
          /// <param name="iModel"></param>
          /// <param name="exchange">交换器</param>
          /// <param name="type">交换器类型:
          /// 1、Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全
          /// 匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的
          /// 消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog
         /// 2、Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都
         /// 会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout
         /// 交换机转发消息是最快的。
         /// 3、Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多
         /// 个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*”
         /// 只会匹配到“audit.irs”。</param>
         /// <param name="durable">持久化</param>
         /// <param name="autoDelete">自动删除</param>
         /// <param name="arguments">参数</param>
         private static void ExchangeDeclare(IModel iModel, string exchange, string type = ExchangeType.Direct,
             bool durable = true,
             bool autoDelete = false, IDictionary<string, object> arguments = null)
         {
             exchange = exchange.IsNullOrWhiteSpace() ? "" : exchange.Trim();
             iModel.ExchangeDeclare(exchange, type, durable, autoDelete, arguments);
         }

         /// <summary>
         /// 队列声明
         /// </summary>
         /// <param name="channel"></param>
         /// <param name="queue">队列</param>
         /// <param name="durable">持久化</param>
         /// <param name="exclusive">排他队列,如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,
         /// 并在连接断开时自动删除。这里需要注意三点:其一,排他队列是基于连接可见的,同一连接的不同信道是可
         /// 以同时访问同一个连接创建的排他队列的。其二,“首次”,如果一个连接已经声明了一个排他队列,其他连
         /// 接是不允许建立同名的排他队列的,这个与普通队列不同。其三,即使该队列是持久化的,一旦连接关闭或者
         /// 客户端退出,该排他队列都会被自动删除的。这种队列适用于只限于一个客户端发送读取消息的应用场景。</param>
         /// <param name="autoDelete">自动删除</param>
         /// <param name="arguments">参数</param>
         private static void QueueDeclare(IModel channel, string queue, bool durable = true, bool exclusive = false,
            bool autoDelete = false, IDictionary<string, object> arguments = null)
         {
             queue = queue.IsNullOrWhiteSpace() ? "UndefinedQueueName" : queue.Trim();
             channel.QueueDeclare(queue, durable, exclusive, autoDelete, arguments);
        }

        /// <summary>
        /// 获取Model
        /// </summary>
       /// <param name="exchange">交换机名称</param>
        /// <param name="queue">队列名称</param>
         /// <param name="routingKey"></param>
         /// <param name="isProperties">是否持久化</param>
        /// <returns></returns>
         private static IModel GetModel(string exchange, string queue, string routingKey, bool isProperties = false)
        {
          return ModelDic.GetOrAdd(queue, key =>
            {
                 var model = _conn.CreateModel();
                ExchangeDeclare(model, exchange, ExchangeType.Fanout, isProperties);
                 QueueDeclare(model, queue, isProperties);
                model.QueueBind(queue, exchange, routingKey);
                ModelDic[queue] = model;
                 return model;
            });
        }

       /// <summary>
       /// 发布消息
         /// </summary>
       /// <param name="routingKey">路由键</param>
         /// <param name="body">队列信息</param>
        /// <param name="exchange">交换机名称</param>
        /// <param name="queue">队列名</param>
       /// <param name="isProperties">是否持久化</param>
        /// <returns></returns>
         public void Publish(string exchange, string queue, string routingKey, string body, bool isProperties = false)
         {
           var channel = GetModel(exchange, queue, routingKey, isProperties);

             try
             {
               channel.BasicPublish(exchange, routingKey, null, body.SerializeUtf8());
             }
            catch (Exception ex)
            {
               throw ex.GetInnestException();
           }
       }        

  下次是本机测试的发布速度截图:

  4.2W/S属于稳定速度,把反序列化(ToJson)会稍微快一些。

Subscribe(订阅)的封装

  发布的时候是申明了交换器和队列并绑定,然而订阅的时候只需要声明队列就可。从下面代码能看到,捕获到异常的时候,会把消息送到自定义的“死信队列”里,由另外的JOB进行定时重发,因此,finally是应答成功的。

/// <summary>
        /// 获取Model
        /// </summary>
        /// <param name="queue">队列名称</param>
        /// <param name="isProperties"></param>
        /// <returns></returns>
        private static IModel GetModel(string queue, bool isProperties = false)
        {
            return ModelDic.GetOrAdd(queue, value =>
             {
                 var model = _conn.CreateModel();
                 QueueDeclare(model, queue, isProperties);

                 //每次消费的消息数
                 model.BasicQos(0, 1, false);

                 ModelDic[queue] = model;

                 return model;
             });
        }    

        /// <summary>
        /// 接收消息
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="queue">队列名称</param>
        /// <param name="isProperties"></param>
        /// <param name="handler">消费处理</param>
        /// <param name="isDeadLetter"></param>
        public void Subscribe<T>(string queue, bool isProperties, Action<T> handler, bool isDeadLetter) where T : class
        {
            //队列声明
            var channel = GetModel(queue, isProperties);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var msgStr = body.DeserializeUtf8();
                var msg = msgStr.FromJson<T>();
                try
                {
                    handler(msg);
                }
                catch (Exception ex)
                {
                    ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");
                    if (!isDeadLetter)
                        PublishToDead<DeadLetterQueue>(queue, msgStr, ex);
                }
                finally
                {
                    channel.BasicAck(ea.DeliveryTag, false);
                }
            };
            channel.BasicConsume(queue, false, consumer);
        }        

  下次是本机测试的发布速度截图:

  快的时候有1.9K/S,慢的时候也有1.7K/S

Pull(拉)的封装

  直接上代码:

 /// <summary>
        /// 获取消息
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="routingKey"></param>
        /// <param name="handler">消费处理</param>
        private void Poll<T>(string exchange, string queue, string routingKey, Action<T> handler) where T : class
        {
            var channel = GetModel(exchange, queue, routingKey);

            var result = channel.BasicGet(queue, false);
            if (result.IsNull())
                return;

            var msg = result.Body.DeserializeUtf8().FromJson<T>();
            try
            {
                handler(msg);
            }
            catch (Exception ex)
            {
                ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");
            }
            finally
            {
                channel.BasicAck(result.DeliveryTag, false);
            }
        }    

  快的时候有1.8K/s,稳定是1.5K/S

Rpc(远程调用)的封装

  首先说明下,RabbitMq只是提供了这个RPC的功能,但是并不是真正的RPC,为什么这么说:

  1、传统Rpc隐藏了调用细节,像调用本地方法一样传参、抛出异常

  2、RabbitMq的Rpc是基于消息的,消费者消费后,通过新队列返回响应结果。

 /// <summary>
        /// RPC客户端
        /// </summary>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="routingKey"></param>
        /// <param name="body"></param>
        /// <param name="isProperties"></param>
        /// <returns></returns>
        public string RpcClient(string exchange, string queue, string routingKey, string body, bool isProperties = false)
        {
            var channel = GetModel(exchange, queue, routingKey, isProperties);

            var consumer = new QueueingBasicConsumer(channel);
            channel.BasicConsume(queue, true, consumer);

            try
            {
                var correlationId = Guid.NewGuid().ToString();
                var basicProperties = channel.CreateBasicProperties();
                basicProperties.ReplyTo = queue;
                basicProperties.CorrelationId = correlationId;

                channel.BasicPublish(exchange, routingKey, basicProperties, body.SerializeUtf8());

                var sw = Stopwatch.StartNew();
                while (true)
                {
                    var ea = consumer.Queue.Dequeue();
                    if (ea.BasicProperties.CorrelationId == correlationId)
                    {
                        return ea.Body.DeserializeUtf8();
                    }

                    if (sw.ElapsedMilliseconds > 30000)
                        throw new Exception("等待响应超时");
                }
            }
            catch (Exception ex)
            {
                throw ex.GetInnestException();
            }
        }    

        /// <summary>
        /// RPC服务端
        /// </summary>
        /// <typeparam name="T"></typeparam>
        /// <param name="exchange"></param>
        /// <param name="queue"></param>
        /// <param name="isProperties"></param>
        /// <param name="handler"></param>
        /// <param name="isDeadLetter"></param>
        public void RpcService<T>(string exchange, string queue, bool isProperties, Func<T, T> handler, bool isDeadLetter)
        {
            //队列声明
            var channel = GetModel(queue, isProperties);

            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var msgStr = body.DeserializeUtf8();
                var msg = msgStr.FromJson<T>();

                var props = ea.BasicProperties;
                var replyProps = channel.CreateBasicProperties();
                replyProps.CorrelationId = props.CorrelationId;

                try
                {
                    msg = handler(msg);
                }
                catch (Exception ex)
                {
                    ex.GetInnestException().WriteToFile("队列接收消息", "RabbitMq");
                }
                finally
                {
                    channel.BasicPublish(exchange, props.ReplyTo, replyProps, msg.ToJson().SerializeUtf8());
                    channel.BasicAck(ea.DeliveryTag, false);
                }
            };
            channel.BasicConsume(queue, false, consumer);
        }

  可以用,但不建议去用。可以考虑其他的RPC框架。grpc、thrift等。

结尾

  本篇文章,没有过多的写RabbitMq的知识点,因为园子的学习笔记实在太多了。下面把我的代码奉上 https://github.com/SkyChenSky/Sikiro.Mq.Rabbit。如果有发现写得不对的地方麻烦在评论指出,我会及时修改以免误导别人。

到此这篇关于.net平台的rabbitmq使用封装的文章就介绍到这了,更多相关.net使用rabbitmq内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • 运用.net core中实例讲解RabbitMQ高可用集群构建

    目录 一.集群架构简介 二.普通集群搭建 2.1 各个节点分别安装RabbitMQ 2.2 把节点加入集群 2.3 代码演示普通集群的问题 三.镜像集群 四.HAProxy环境搭建. 五.KeepAlived 环境搭建 一.集群架构简介 当单台 RabbitMQ 服务器的处理消息的能力达到瓶颈时,此时可以通过 RabbitMQ 集群来进行扩展,从而达到提升吞吐量的目的.RabbitMQ 集群是一个或多个节点的逻辑分组,集群中的每个节点都是对等的,每个节点共享所有的用户,虚拟主机,队列,交换器,绑

  • .Net使用RabbitMQ即时发消息Demo

    前言 最近项目要使用RabbitMQ,网上已经有很多优秀的文章了,百度百科对RabbitMQ阐述也非常明确,建议去看下,还有amqp协议.必须一提的是rabbitmq是由LShift提供的一个消息队列协议(AMQP)的开源实现,由以高性能.健壮以及可伸缩性出名的Erlang写成(因此也是继承了这些优点). 最近参考大神们的博客,自己做了一个RabbitMQ即时发消息的Demo.下面话不多说了,来一起看看详细的介绍吧. 步骤如下: 1.使用VS的NuGet安装包管理工具安装RabbitMQ.Cli

  • 运用.NetCore实例讲解RabbitMQ死信队列,延时队列

    目录 一.死信队列 二.延时队列 三.延时消息设置不同过期时间 四.延时消息用延时插件的方式实现 一.死信队列 描述:Q1队列绑定了x-dead-letter-exchange(死信交换机)为X2,x-dead-letter-routing-key(死信路由key)指向Q2(队列2) P(生产者)发送消息经X1(交换机1)路由到Q1(队列1),Q1的消息触发特定情况,自动把消息经X2(交换机2)路由到Q2(队列2),C(消费者)直接消息Q2的消息. 特定情况有哪些呢: 1.消息被拒(basic.

  • RabbitMQ .NET消息队列使用详解

    本文实例为大家分享了RabbitMQ .NET消息队列使用方法,供大家参考,具体内容如下 首先下载安装包,我都环境是win7 64位: 去官网下载 otp_win64_19.0.exe  和rabbitmq-server-3.6.3.exe安装好 然后开始编程了: (1)创建生产者类: class Program { private static void Main() { //建立RabbitMQ连接和通道 var connectionFactory = new ConnectionFacto

  • 如何用.NETCore操作RabbitMQ

    什么是RabbitMQ? RabbitMQ是由erlang语言开发的一个基于AMQP(Advanced Message Queuing Protocol)协议的企业级消息队列中间件.可实现队列,订阅/发布,路由,通配符等工作模式. 为什么要使用RabbitMQ? 异步处理:比如发送邮件,发送短信等不需要等待处理结果的操作 应用解耦:比如下单成功后,通知仓库发货,不需要等待仓库回应,通过消息队列去通知仓库,降低应用间耦合程序,可并行开发两个功能模块 流量削锋:在抢购或者其他的活动页,服务处于爆发式

  • .Net RabbitMQ实现HTTP API接口调用

    RabbitMQ Management插件还提供了基于RESTful风格的HTTP API接口来方便调用.一共涉及4种HTTP方法:GET.PUT.DELETE和POST.GET方法一般用来获取如集群.节点.队列.交换器等信息.PUT方法用来创建资源,如交换器.队列之类的.DELETE方法用来删除资源.POST方法也是用来创建资源的,与PUT不同的是,POST创建的是无法用具体名称的资源.比如绑定关系(bindings)和发布消息(publish)无法指定一个具体的名称. 点击Web管理界面左下

  • 运用.net core中实例讲解RabbitMQ

    目录 一.RabbitMQ简介 (1) AMQP协议 (2)AMQP专业术语 (3)RabbitMQ整体架构 二.安装RabbitMQ 三.RabbitMQ六种队列模式在.NetCore中使用 (1)简单队列 (2)工作队列模式 (3)发布订阅模式 (4)路由模式(推荐使用) (5)主题模式 (6)RPC模式 总结 一.RabbitMQ简介 是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用之间共享数据,RabbitMQ是使用Erlang(高并发语言)语言来编写的,并且Rabbi

  • .net平台的rabbitmq使用封装demo详解

    目录 前言 什么是rabbitMQ Rabbitmq的关键术语 Rabbitmq的运作 Publish(发布)的封装 Subscribe(订阅)的封装 Pull(拉)的封装 Rpc(远程调用)的封装 结尾 前言 RabbitMq大家再熟悉不过,这篇文章主要针对rabbitmq学习后封装RabbitMQ.Client的一个分享.文章最后,我会把封装组件和demo奉上. 什么是rabbitMQ RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue 高级消息

  • BootStrap实现带有增删改查功能的表格(DEMO详解)

    前言 bootstrap的表格样式,有类似EasyUI的表格,也有卡片式表格,放到移动端显示,各有千秋.但是BootStrap自带的表格是没有操作列的,网上的资源不少,但是都是比较单一.零碎,JS.CSS也经常给的不全,自己经过大概一个月左右的时间,把表格封装了一下,希望能分享给大家. 表格封装了3个版本,接下来给大家展示一下样式和代码. 版本一 1. 样式 表格布局: 添加:添加一行新的空白代码 修改:选中可修改的列,点击需要修改的单元格,即可变成可编辑的状态. 2.代码 @using Dat

  • Android异步下载图片并且缓存图片到本地DEMO详解

    在Android开发中我们经常有这样的需求,从服务器上下载xml或者JSON类型的数据,其中包括一些图片资源,本demo模拟了这个需求,从网络上加载XML资源,其中包括图片,我们要做的解析XML里面的数据,并且把图片缓存到本地一个cache目录里面,并且用一个自定义的Adapter去填充到LIstView,demo运行效果见下图: 通过这个demo,要学会有一下几点 1.怎么解析一个XML 2.demo中用到的缓存图片到本地一个临时目录的思想是怎样的? 3.AsyncTask类的使用,因为要去异

  • Kotlin + Retrofit + RxJava简单封装使用详解

    本文介绍了Kotlin + Retrofit + RxJava简单封装使用详解,分享给大家,具体如下: 实例化Retrofit object RetrofitUtil { val CONNECT_TIME_OUT = 30//连接超时时长x秒 val READ_TIME_OUT = 30//读数据超时时长x秒 val WRITE_TIME_OUT = 30//写数据接超时时长x秒 val retrofit: Retrofit by lazy { Log.d("RetrofitUtil"

  • rabbitmq五种模式详解(含实现代码)

    一.五种模式详解 1.简单模式(Queue模式) 当生产端发送消息到交换机,交换机根据消息属性发送到队列,消费者监听绑定队列实现消息的接收和消费逻辑编写.简单模式下,强调的一个队列queue只被一个消费者监听消费. 1.1 结构 生产者:生成消息,发送到交换机交换机:根据消息属性,将消息发送给队列消费者:监听这个队列,发现消息后,获取消息执行消费逻辑 1.2应用场景 常见的应用场景就是一发,一接的结构 例如: 手机短信邮件单发 2.争抢模式(Work模式) 强调的也是后端队列与消费者绑定的结构

  • Vue项目的网络请求代理到封装步骤详解

    目录 1.创建vue项目 2.安装axios 3.进行config.js配置 4.main.js里引入 5.src目录下新建Utils文件夹,在内封装request.js 6.以login路由为示例 src文件下新建api文件,在api内新建login.js 7.在页面内引入方法,并使用 1.创建vue项目 vue create demo demo是项目名称 2.安装axios 进入demo里面打开终端(黑窗口),执行 npm install axios 3.进行config.js配置 devS

  • ruby ftp封装实例详解

     ruby ftp封装实例详解 最近自己用ruby 封装了一个Net::FTP的工具类. class FtpTool def initialize() @current_ftp = create_ftp end # 获取指定格式的文件名称列表 # 例如: source = "test/*.txt" # 返回: [source/file_name.txt] def fetch_remote_filenames(source) return [] if source.blank? log_

  • Java基本数据类型与封装类型详解(int和Integer区别)

    int是java提供的8种原始数据类型之一. Java为每个原始类型提供了封装类,Integer是java为int提供的封装类(即Integer是一个java对象,而int只是一个基本数据类型).int的默认值为0,而Integer的默认值为null,即Integer可以区分出未赋值和值为0的区别,int则无法表达出未赋值的情况,例如,要想表达出没有参加考试和考试成绩为0的区别,则只能使用Integer.在JSP开发中,Integer的默认为null,所以用el表达式在文本框中显示时,值为空白字

  • Socket+JDBC+IO实现Java文件上传下载器DEMO详解

    该demo实现的功能有: 1.用户注册: 注册时输入两次密码,若两次输入不一致,则注册失败,需要重新输入.若用户名被注册过,则提示用户重新输入用户名: 2.用户登录: 需要验证数据库中是否有对应的用户名和密码,若密码输错三次,则终止用户的登录操作: 3.文件上传: 从本地上传文件到文件数据库中 4.文件下载: 从数据库中下载文件到本地 5.文件更新: 根据id可更新数据库中的文件名 6.文件删除: 根据id删除数据库中某一个文件 7.看数据库所有文件; 8.查看文件(根据用户名); 9.查看文件

  • iOS中关于信鸽推送的使用demo详解

    最近在看推送方面的知识,用的是信鸽推送主要是因为后台用的是信鸽 推送用第三方推送,也就是在客户端建一个广播接收器,当服务器发送消息时发送到信鸽,信鸽再发送一次,广播接受器接受下: 我实现的功能比较简单,当app在运行状态时,会在主页展示一个弹窗展示推送的消息:如果app不在运行状态且service没被销毁就展示默认的通知 那么如何在主页展示弹窗:当广播接受器收到我要的消息时,用观察者模式,收到消息在发送个消息个主界面 官方的Demo连接:http://xg.qq.com/xg/help/ctr_

随机推荐