docker启动rabbitmq以及使用方式详解

目录
  • 搜索rabbitmq镜像
  • 下载镜像
  • 启动容器
  • 打印容器
  • 访问RabbitMQ Management
  • 编写生产者类
  • 消费者
  • 工作队列
    • RabbitMqUtils工具类
    • 启动2个工作线程
    • 启动发送线程
  • 消息应答机制
    • 生产者
    • 消费者
    • 模拟
    • 不公平分发
  • 总结

搜索rabbitmq镜像

docker search rabbitmq:management

下载镜像

docker pull rabbitmq:management

启动容器

docker run -d --hostname localhost --name rabbitmq -p 15672:15672 -p 5672:5672 rabbitmq:management

打印容器

docker logs rabbitmq

访问RabbitMQ Management

http://localhost:15672

账户密码默认:guest

编写生产者类

package com.xun.rabbitmqdemo.example;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");

        Connection connection = factory.newConnection();

        Channel channel = connection.createChannel();
        /**
         * 生成一个queue队列
         * 1、队列名称 QUEUE_NAME
         * 2、队列里面的消息是否持久化(默认消息存储在内存中)
         * 3、该队列是否只供一个Consumer消费 是否共享 设置为true可以多个消费者消费
         * 4、是否自动删除 最后一个消费者断开连接后 该队列是否自动删除
         * 5、其他参数
         */
        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        String message = "Hello world!";
        /**
         * 发送一个消息
         * 1、发送到哪个exchange交换机
         * 2、路由的key
         * 3、其他的参数信息
         * 4、消息体
         */
        channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
        System.out.println(" [x] Sent '"+message+"'");

        channel.close();
        connection.close();
    }
}

运行该方法,可以看到控制台的打印

name=hello的队列收到Message

消费者

package com.xun.rabbitmqdemo.example;

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Receiver {
    private final static String QUEUE_NAME = "hello";
    public static void main(String[] args) throws IOException, TimeoutException {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setUsername("guest");
        factory.setPassword("guest");
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setConnectionTimeout(600000);//milliseconds
        factory.setRequestedHeartbeat(60);//seconds
        factory.setHandshakeTimeout(6000);//milliseconds
        factory.setRequestedChannelMax(5);
        factory.setNetworkRecoveryInterval(500);

        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME,false,false,false,null);
        System.out.println("Waiting for messages. ");

        Consumer consumer = new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            }
        };
        channel.basicConsume(QUEUE_NAME,true,consumer);
    }
}

工作队列

RabbitMqUtils工具类

package com.xun.rabbitmqdemo.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitMqUtils {
    public static Channel getChannel() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        factory.setUsername("guest");
        factory.setPassword("guest");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}

启动2个工作线程

package com.xun.rabbitmqdemo.workQueue;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;

public class Work01 {
    private static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收消息:"+receivedMessage);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C1 消费者启动等待消费....");
        /**
         * 消费者消费消息
         * 1、消费哪个队列
         * 2、消费成功后是否自动应答
         * 3、消费的接口回调
         * 4、消费未成功的接口回调
         */
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}
package com.xun.rabbitmqdemo.workQueue;

import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;

public class Work02 {
    private static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String receivedMessage = new String(delivery.getBody());
            System.out.println("接收消息:"+receivedMessage);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        System.out.println("C2 消费者启动等待消费....");
        channel.basicConsume(QUEUE_NAME,true,deliverCallback,cancelCallback);
    }
}

启动工作线程

启动发送线程

package com.xun.rabbitmqdemo.workQueue;

import com.rabbitmq.client.Channel;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import java.util.Scanner;

public class Task01 {
    private static final String QUEUE_NAME = "hello";
    public static void main(String[] args) throws Exception{
        try(Channel channel= RabbitMqUtils.getChannel();){
            channel.queueDeclare(QUEUE_NAME,false,false,false,null);
            //从控制台接收消息
            Scanner scanner = new Scanner(System.in);
            while(scanner.hasNext()){
                String message = scanner.next();
                channel.basicPublish("",QUEUE_NAME,null,message.getBytes());
                System.out.println("发送消息完成:"+message);
            }
        }
    }
}

启动发送线程,此时发送线程等待键盘输入

发送4个消息

可以看到2个工作线程按照顺序分别接收message。

消息应答机制

rabbitmq将message发送给消费者后,就会将该消息标记为删除。

但消费者在处理message过程中宕机,会导致消息的丢失。

因此需要设置手动应答。

生产者

import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import java.util.Scanner;

public class Task02 {
    private static final String TASK_QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception{
        try(Channel channel = RabbitMqUtils.getChannel()){
            channel.queueDeclare(TASK_QUEUE_NAME,false,false,false,null);
            Scanner scanner = new Scanner(System.in);
            System.out.println("请输入信息");
            while(scanner.hasNext()){
                String message = scanner.nextLine();
                channel.basicPublish("",TASK_QUEUE_NAME,null,message.getBytes());
                System.out.println("生产者task02发出消息"+ message);
            }
        }
    }
}

消费者

package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import com.xun.rabbitmqdemo.utils.SleepUtils;

public class Work03 {
    private static final String ACK_QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("Work03 等待接收消息处理时间较短");
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody());
            SleepUtils.sleep(1);
            System.out.println("接收到消息:"+message);
            /**
             * 1、消息的标记tag
             * 2、是否批量应答
             */
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        //采用手动应答
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}
package com.xun.rabbitmqdemo.workQueue;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xun.rabbitmqdemo.utils.RabbitMqUtils;
import com.xun.rabbitmqdemo.utils.SleepUtils;

public class Work04 {
    private static final String ACK_QUEUE_NAME = "ack_queue";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("Work04 等待接收消息处理时间较长");
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
            String message = new String(delivery.getBody());
            SleepUtils.sleep(30);
            System.out.println("接收到消息:"+message);
            channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
        };
        CancelCallback cancelCallback = (consumerTag)->{
            System.out.println(consumerTag+"消费者取消消费接口回调逻辑");
        };
        //采用手动应答
        boolean autoAck = false;
        channel.basicConsume(ACK_QUEUE_NAME,autoAck,deliverCallback,cancelCallback);
    }
}

工具类SleepUtils

package com.xun.rabbitmqdemo.utils;
public class SleepUtils {
    public static void sleep(int second){
        try{
            Thread.sleep(1000*second);
        }catch (InterruptedException _ignored){
            Thread.currentThread().interrupt();
        }
    }
}

模拟

work04等待30s后发出ack

在work04处理message时手动停止线程,可以看到message:dd被rabbitmq交给了work03

不公平分发

上面的轮询分发,生产者依次向消费者按顺序发送消息,但当消费者A处理速度很快,而消费者B处理速度很慢时,这种分发策略显然是不合理的。
不公平分发:

int prefetchCount = 1;
channel.basicQos(prefetchCount);

通过此配置,当消费者未处理完当前消息,rabbitmq会优先将该message分发给空闲消费者。

总结

到此这篇关于docker启动rabbitmq以及使用的文章就介绍到这了,更多相关docker启动rabbitmq及使用内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • docker安装rabbitmq无法进入管理页面的问题

    1.环境准备 腾讯云服务器 CENTOS 7 版本 安装docker容器 2.开始安装 docker pull rabbitmq:management 说明:为什么不直接安装 docker pull rabbitmq 这个,因为这个安装后,开启对应端口后是不能直接访问它的管理后台,需要额外的命令开启,后面会讲这种情况 容器运行,对应的端口开启 docker run -di --name=mycloud_rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369

  • 用docker部署RabbitMQ环境的详细介绍

    前置条件: 已经安装好docker 1.查找镜像(有2种方式) ①登录rabbitmq官网找到docker镜像,选择想要的镜像的tag https://www.rabbitmq.com/download.html https://hub.docker.com/_/rabbitmq 如果需要访问web管理页面,就选择tag为management的 ps:带有alpine的是用最小linux镜像构建的,体积最小可以达5M初学者不建议这么折腾,而且 Alpine Linux使用了muslmusl实现的

  • Docker安装配置RabbitMQ的实现步骤

    目录 单机部署 在线拉取 查看镜像 创建并运行RabbitMQ 创建并运行MQ容器成功 添加防火墙规则 进入RabbitMQ管理平台的端口 Overview总览 Connections连接 Channels通道 Exchanges交换机 Queues队列 Users用户管理 单机部署 本文将在Centos7系统下演示使用Docker来安装RabbitMQ. 在线拉取 rabbitmq镜像 docker pull rabbitmq:3-management 注:rabbitmq:3-managem

  • docker安装RabbitMQ详细步骤

    目录 1.查找镜像 2.下载RabbitMQ镜像 3.创建并启动RabbitMQ容器 4.进入容器交互页面 5.下载插件 6.阿里云控制台 开放端口号 7.登录 1.查找镜像 docker search rabbitmq 2.下载RabbitMQ镜像 直接下载最新的镜像如果需要下载其他版本 自行Docker官网查看并添加版本号再下载 # 下载镜像 docker pull rabbitmq #查看镜像 docker images 3.创建并启动RabbitMQ容器 第一个-p :用于页面访问使用第

  • docker部署rabbitmq集群的实现方法

    拉取rabbitmq management镜像 docker pull rabbitmq:3.7-rc-management 若不使用Rabbitmq的management功能,可以拉取镜像:rabbitmq:3.7-rc 参考: https://hub.docker.com/_/rabbitmq/ 创建网络 创建rabbitmq私有网络 # docker network create rabbitmqnet # docker network ls NETWORK ID NAME DRIVER

  • docker安装并运行rabbitmq的实例代码

    拉取镜像: [mall@VM_0_7_centos ~]$ sudo docker pull rabbitmq:3.7.15 [sudo] password for mall: 3.7.15: Pulling from library/rabbitmq 5b7339215d1d: Pull complete 14ca88e9f672: Pull complete a31c3b1caad4: Pull complete b054a26005b7: Pull complete eef17c6cb6c

  • docker搭建rabbitmq集群环境的方法

    本文主要讲述如何用docker搭建rabbitmq的集群.分享给大家,希望此文章对各位有所帮助. 下载镜像 采用bijukunjummen该镜像. git clone https://github.com/bijukunjummen/docker-rabbitmq-cluster.git 运行 启动集群 cd docker-rabbitmq-cluster/cluster docker-compose up -d ...... Status: Downloaded newer image for

  • docker快速安装rabbitmq的方法步骤

    一.获取镜像 #指定版本,该版本包含了web控制页面 docker pull rabbitmq:management 二.运行镜像 #方式一:默认guest 用户,密码也是 guest docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq:management #方式二:设置用户名和密码 docker run -d --hostname my-rabbit --name rabb

  • docker启动rabbitmq以及使用方式详解

    目录 搜索rabbitmq镜像 下载镜像 启动容器 打印容器 访问RabbitMQ Management 编写生产者类 消费者 工作队列 RabbitMqUtils工具类 启动2个工作线程 启动发送线程 消息应答机制 生产者 消费者 模拟 不公平分发 总结 搜索rabbitmq镜像 docker search rabbitmq:management 下载镜像 docker pull rabbitmq:management 启动容器 docker run -d --hostname localho

  • RabbitMQ 实现延迟队列的两种方式详解

    目录 1. 用插件 1.1 安装插件 1.2 消息收发 2. DLX 实现延迟队列 2.1 延迟队列实现思路 2.2 案例 3. 小结 定时任务各种各样,常见的定时任务例如日志备份,我们可能在每天凌晨 3 点去备份,这种固定时间的定时任务我们一般采用 cron 表达式就能轻松的实现,还有一些比较特殊的定时任务,向大家看电影中的定时炸弹,3分钟后爆炸,这种定时任务就不太好用 cron 去描述,因为开始时间不确定,我们开发中有的时候也会遇到类似的需求,例如: 在电商项目中,当我们下单之后,一般需要

  • spring boot linux启动方式详解

    前台启动 java -jar XXX.jar 后台启动 java -jar xxx.jar & 区别:前台启动ctrl+c就会关闭程序,后台启动ctrl+c不会关闭程序 制定控制台的标准输出 java -jar xxx.jar > catalina.out 2>&1 & catalina.out将标准输出指向制定文件catalina.out 2>&1 输出所有的日志文件 & 后台启动  脚本启动 #!/bin/sh #功能简介:启动上层目录下的ja

  • go-micro集成RabbitMQ实战和原理详解

    目录 Broker的核心功能 发布 订阅 go-micro集成RabbitMQ实战 启动一个RabbitMQ 编写收发函数 编写主体代码 go-micro集成RabbitMQ的处理流程 填的几个坑 不能接收其它框架发布的消息 RabbitMQ重启后订阅者和发布者无限阻塞 在go-micro中异步消息的收发是通过Broker这个组件来完成的,底层实现有RabbitMQ.Kafka.Redis等等很多种方式,这篇文章主要介绍go-micro使用RabbitMQ收发数据的方法和原理. Broker的核

  • Docker run 命令的使用方法详解

    注意,本文基于最新的Docker 1.4文档翻译. Docker会在隔离的容器中运行进程.当运行 docker run命令时,Docker会启动一个进程,并为这个进程分配其独占的文件系统.网络资源和以此进程为根进程的进程组.在容器启动时,镜像可能已经定义了要运行的二进制文件.暴露的网络端口等,但是用户可以通过docker run命令重新定义(译者注:docker run可以控制一个容器运行时的行为,它可以覆盖docker build在构建镜像时的一些默认配置),这也是为什么run命令相比于其它命

  • Docker守护进程安全配置项目详解

    本文将为大家介绍docker守护进程的相关安全配置项目. 一.测试环境 1.1 安装 CentOS 7 CentOS Linux release 7.7.1908 (Core) 升级内核,重启 # yum update kernel [root@localhost docker]# uname -a Linux localhost 3.10.0-1062.12.1.el7.x86_64 #1 SMP Tue Feb 4 23:02:59 UTC 2020 x86_64 x86_64 x86_64

  • 对docker中的overlay式网络详解

    翻译自docker官方文档,原文:https://docs.docker.com/network/overlay/ overlay(覆盖)式网络会在多个docker守护进程所在的主机之间创建一个分布式的网络.这个网络会覆盖宿主机特有的网络,并允许容器连接它(包括集群服务中的容器)来安全通信.显然,docker会处理docker守护进程源容器和目标容器之间的数据报的路由. 当你初始化一个集群(swarm)或把一个docker宿主机加入一个已经存在的集群时,宿主机上会新建两个网络: 一个叫ingre

  • Docker Secret的管理和使用详解

    一.什么是Docker Secret (一)情景展现 我们知道有的service是需要设置密码的,比如mysql服务是需要设置密码的: version: '3' services: web: image: wordpress ports: - 8080:80 volumes: - ./www:/var/www/html environment: WORDPRESS_DB_NAME=wordpress WORDPRESS_DB_HOST: mysql WORDPRESS_DB_PASSWORD:

  • 基于docker 搭建Prometheus+Grafana的过程详解

    一.介绍Prometheus Prometheus(普罗米修斯)是一套开源的监控&报警&时间序列数据库的组合,起始是由SoundCloud公司开发的.随着发展,越来越多公司和组织接受采用Prometheus,社会也十分活跃,他们便将它独立成开源项目,并且有公司来运作.Google SRE的书内也曾提到跟他们BorgMon监控系统相似的实现是Prometheus.现在最常见的Kubernetes容器管理系统中,通常会搭配Prometheus进行监控. Prometheus基本原理是通过HTT

  • SpringBoot项目jar和war打包部署方式详解

    目录 jar与war jar包部署运行 war包部署运行 jar与war Spring Boot项目开发完成后,需要以jar或war的方式将项目打包部署到测试开发环境. jar即Java Archive,是Java归档文件,该文件格式与平台无关,它允许将许多文件组合成一个压缩文件.Java程序都可以打成jar包,目前Docker广泛使用,Java项目都会打成可执行的jar包,最终构建为镜像文件来运行. jar文件格式基于流行的ZIP文件格式.与ZIP文件不同的是,jar文件不仅用于压缩和发布,而

随机推荐