Docker启动RabbitMQ,实现生产者与消费者

news2025/2/24 8:05:40

目录

一、Docker拉取镜像并启动RabbitMQ

二、Hello World

(一)依赖导入

(二)消息生产者

(三)消息消费者

三、实现轮训分发消息

(一)抽取工具类

(二)启动两个工作线程

(三)启动发送线程

四、实现手动应答

(一)消息应答概念

(二)消息应答的方法

(三)消息自动重新入队 

(四)消息手动应答代码 

1、生产者

2、睡眠工具类模拟业务执行

3、消费者


一、Docker拉取镜像并启动RabbitMQ

拉取镜像

docker pull rabbitmq:3.8.8-management

查看镜像

docker images rabbitmq

 启动镜像

docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.8.8-management

Linux虚拟机记得开放5672端口或者关闭防火墙,在window通过 主机ip:15672 访问rabbitmq控制台

 用户名密码默认为guest

 

 

二、Hello World

(一)依赖导入

<!--指定 jdk 编译版本-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
    <dependencies>
        <!--rabbitmq 依赖客户端-->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.8.0</version>
        </dependency>
        <!--操作文件流的一个依赖-->
        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>2.6</version>
        </dependency>
    </dependencies>

(二)消息生产者

工作原理

  • Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message Broker
  • Connectionpublisherconsumer broker 之间的 TCP 连接
  • Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销
  • Exchangemessage 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast)
  • Queue消息最终被送到这里等待 consumer 取走

我们需要先获取连接(Connection),然后通过连接获取信道(Channel),这里我们演示简单例子,可以直接跳过交换机(Exchange)发送队列(Queue)

public class Producer {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置主机ip
        factory.setHost("182.92.234.71");
        // 设置用户名
        factory.setUsername("guest");
        // 设置密码
        factory.setPassword("guest");
        //channel 实现了自动 close 接口 自动关闭 不需要显示关闭
        Connection connection = factory.newConnection();
        // 获取信道
        Channel channel = connection.createChannel();
        /*
         * 生成一个队列
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments)
         * 1.队列名称
         * 2.队列里面的消息是否持久化 默认消息存储在内存中
         * 3.该队列是否只供一个消费者进行消费 是否进行共享 true 可以多个消费者消费
         * 4.是否自动删除 最后一个消费者端开连接以后 该队列是否自动删除 true 自动删除
         * 5.其他参数
         **/
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "hello rabbitmq";
        /*
         * 发送一个消息
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * 1.发送到哪个交换机
         * 2.路由的key是哪个
         * 3.其他的参数信息
         * 4.发送消息的消息体
         *
         **/
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println("发送成功");
    }
}

(三)消息消费者

public class Consumer {

    private static final String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        //创建一个连接工厂
        ConnectionFactory factory = new ConnectionFactory();
        // 设置主机ip
        factory.setHost("182.92.234.71");
        // 设置用户名
        factory.setUsername("guest");
        // 设置密码
        factory.setPassword("guest");
        //channel 实现了自动 close 接口 自动关闭 不需要显示关闭
        Connection connection = factory.newConnection();
        // 获取信道
        Channel channel = connection.createChannel();

        // 推送的消息如何进行消费的回调接口
        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println(new String(message.getBody()));
        };
        // 取消消费的一个回调接口,如在消费的时候队列被删除了
        CancelCallback cancelCallback = (consumerTag) -> {
            System.out.println("消息消费被中断");
        };
        /*
         * 消费者消费消息
         * basicConsume(String queue, boolean autoAck, 
         * DeliverCallback deliverCallback, CancelCallback cancelCallback)
         * 1.消费哪个队列
         * 2.消费成功之后是否要自动应答 true 代表自动应答 false 手动应答
         * 3.消费者未成功消费的回调
         **/
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
    }
}

 

三、实现轮训分发消息

(一)抽取工具类

可以发现,上面获取连接工厂,然后获取连接,再获取信道的步骤是一致的,我们可以抽取成一个工具类来调用,并使用单例模式-饿汉式完成信道的初始化

public class RabbitMqUtils {

    private static Channel channel;

    static {
        ConnectionFactory factory = new ConnectionFactory();
        // 设置ip地址
        factory.setHost("192.168.23.100");
        // 设置用户名
        factory.setUsername("guest");
        // 设置密码
        factory.setPassword("guest");
        try {
            // 创建连接
            Connection connection = factory.newConnection();
            // 获取信道
            channel = connection.createChannel();
        } catch (Exception e) {
            System.out.println("创建信道失败,错误信息:" + e.getMessage());
        }
    }

    public static Channel getChannel() {
        return channel;
    }
}

(二)启动两个工作线程

相当于前面的消费者,我们只需要写一个类,通过ideal实现多线程启动即可模拟两个线程

public class Worker01 {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = ( consumerTag,  message) -> {
            System.out.println("接受到消息:" + new String(message.getBody()));
        };
        CancelCallback cancelCallback = (cunsumerTag) -> {
            System.out.println("消费者取消消费接口回调逻辑");
        };
        // 启动两次,第一次为C1, 第二次为C2
        System.out.println("C2消费者等待消费消息");
        channel.basicConsume(QUEUE_NAME, true, deliverCallback,cancelCallback);
    }
}

(三)启动发送线程

public class Test01 {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        // 通过控制台输入充当消息,使轮训演示更明显
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );
            System.out.println("消息发送完成:" + message);
        }
    }
}

结果 

四、实现手动应答

(一)消息应答概念

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成
了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消
息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续
发送给该消费这的消息,因为它无法接收到。
为了保证消息在发送过程中不丢失,rabbitmq 引入消息应答机制,消息应答就是: 消费者在接
收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。

自动应答:消费者发送后立即被认为已经传送成功。这种模式需要在高吞吐量和数据传输安全性方面做权,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了。

当然另一方面这种模式消费者那边可以传递过载的消息, 没有对传递的消息数量进行限制
当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终 使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并 以某种速率能够处理这些消息的情况下使用

手动应答:消费者接受到消息并顺利完成业务后再调用方法进行确认,rabbitmq 才可以把该消息删除

(二)消息应答的方法

 

  • Channel.basicAck(用于肯定确认)
    • RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了
  • Channel.basicNack(用于否定确认)
  • Channel.basicReject(用于否定确认)
    • 与 Channel.basicNack 相比少一个参数Multiple
      • ​​​​​​​multiple 的 true 和 false 代表不同意思
                true 代表批量应答 channel 上未应答的消息
                比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时
                5-8 的这些还未应答的消息都会被确认收到消息应答
                false 同上面相比
                只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答
    • 不处理该消息了直接拒绝,可以将其丢弃了

 

(三)消息自动重新入队 

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确
保不会丢失任何消息。

(四)消息手动应答代码 

1、生产者

public class Test01 {

    private final static String QUEUE_NAME = "ack";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        Scanner scanner = new Scanner(System.in);
        while(scanner.hasNext()) {
            String message = scanner.next();
            channel.basicPublish("", QUEUE_NAME,null, message.getBytes() );
            System.out.println("消息发送完成:" + message);
        }
    }
}

2、睡眠工具类模拟业务执行

public class SleepUtils {

    public static void sleep(int second) {
        try {
            Thread.sleep(1000 * second);
        } catch (InterruptedException _ignored) {
            Thread.currentThread().interrupt();
        }
    }
}

3、消费者

public class Worker01 {

    private final static String QUEUE_NAME = "ack";

    public static void main(String[] args) throws Exception {
        System.out.println("C1,业务时间短");
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = ( consumerTag,  message) -> {
            SleepUtils.sleep(1);  // 模拟业务执行1秒
            System.out.println("接受到消息:" + new String(message.getBody()));
            /*
             * 1、消息标识
             * 2、是否启动批量确认,false:否。
             *    启用批量有可能造成消息丢失,比如未消费的消息提前被确然删除,后面业务消费该消息
             *    时出现异常会导致该消息的丢失
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = (cunsumerTag) -> {
            System.out.println("消费者取消消费接口回调逻辑");
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);
    }
}

==============================================================================
public class Worker02 {

    private final static String QUEUE_NAME = "ack";

    public static void main(String[] args) throws Exception {
        System.out.println("C2,业务时间长");
        Channel channel = RabbitMqUtils.getChannel();
        DeliverCallback deliverCallback = ( consumerTag,  message) -> {
            SleepUtils.sleep(15);  // 模拟业务执行15秒
            System.out.println("接受到消息:" + new String(message.getBody()));
            /*
             * 1、消息标识
             * 2、是否启动批量确认,false:否。
             *    启用批量有可能造成消息丢失,比如未消费的消息提前被确然删除,后面业务消费该消息
             *    时出现异常会导致该消息的丢失
             */
            channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
        };
        CancelCallback cancelCallback = (cunsumerTag) -> {
            System.out.println("消费者取消消费接口回调逻辑");
        };
        boolean autoAck = false;
        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback,cancelCallback);
    }
}

worker01业务时间短,worker02业务时间长,我们提前终止worker02模拟出异常,可以看到消息dd会被放回队列由worker01接收处理。

注意:这里需要先启动生产者声明队列ack,不然启动消费者会报错

 

最后一个案例我们可以看到消息轮训+消息自动重新入队+手动应答。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/363893.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

网络安全——数链路层据安全协议

作者简介&#xff1a;一名云计算网络运维人员、每天分享网络与运维的技术与干货。 座右铭&#xff1a;低头赶路&#xff0c;敬事如仪 个人主页&#xff1a;网络豆的主页​​​​​​ 目录 前言 一.数据链路层安全协议简介 1.数据链路安全性 二.局域网数据链路层协议 1.…

git应用笔记(三)

在新增虚拟机linux的基础上&#xff0c;做git的下载与提交 1、初始化自己的用户名和邮箱。 git config --global user.name “输入你的用户名” git config --global user.email “输入你的邮箱” 2、将本地公钥及配置如图1复制粘贴到虚拟机当前用户.ssh\目录下 4929a0205f43…

面渣逆袭:分布式十二问,万字图文详解

大家好&#xff0c;我是老三&#xff0c;不管今年金三银四如何&#xff0c;面渣逆袭系列继续&#xff0c;这期我们来看看分布式。 分布式理论 1. 说说CAP原则&#xff1f; CAP原则又称CAP定理&#xff0c;指的是在一个分布式系统中&#xff0c;Consistency&#xff08;一致性…

硬件学习 软件Cadence day07 PCB 底板电路图布线

1.根据原理图的元器件&#xff0c; 选择在 PCB 芯片制作的元器件 &#xff08;allegro中原理图和pcb中元件的交互&#xff09; 1.首先完成下列操作 可以尝试先关闭再打开&#xff0c; 等下操作的时候就好 发现新增的发光物体&#xff01;&#xff01; 2.完成操作 &#xff0c;…

Web3中文|香港拟允许比特币交易,瞄准“全球web3中心”

香港再次成为全球加密行业关注的焦点。在美国SEC对于加密货币交易所Kraken、BUSD发行商Paxos以及BA的重磅打击对比下&#xff0c;香港从去年开始持续拥抱Web3的姿态&#xff0c;让投资者开始押注香港。2023年2月20日&#xff0c;香港证监会宣布&#xff0c;就适用于虚拟资产交易…

Linux:基于libevent读写管道代码

基于libevent读写管道代码&#xff1a; 读端&#xff1a; #include <stdlib.h> #include <stdio.h> #include <unistd.h> #include <sys/types.h> #include <sys/stat.h> #include <string.h> #include <event2/event.h> #include…

gdb的简单练习

题目来自《ctf安全竞赛入门》1.用vim写代码vim gdb.c#include "stdio.h" #include "stdlib.h" void main() {int i 100;int j 101;if (i j){printf("bingooooooooo.");system("/bin/sh");}elseprintf("error............&quo…

Python 算法交易实验48 表字段设计

说明 虽然说的是表&#xff0c;实际上用的是Mongo集合 基于ADBS(APIFunc DataBase Service)可以构造一个供后续研究、生产长时间使用的数据基础&#xff0c;这个基础包括了&#xff1a; 1 队列服务。通过队列&#xff0c;数据可以通过API实现零担和批量两种模式的快速存储。2 …

ideal创建maven项目

前置工作本机安装mavenIdea 设置使用本机maven 工具Settings--->Maven开始创建maven项目创建maven项目&#xff0c;勾选通过模板创建&#xff0c;选择 maven-archetype-webapp 模板GroupId: 公司名倒序ArtifactId: 项目名设置本地maven仓库配置项目文件显示名&#xff0c;和…

外卖点餐小程序开发

前言 餐饮行业是一个传统的行业。根据当前发展现状,网络信息时代的全面普及,餐饮行业也在发生着变化,单就点餐这一方面,利用手机点单正在逐步进入人们的生活。传统的点餐方式,不仅会耗费大量的人力、时间,有时候还会出错。小程序系统伴随智能手机为我们提供了新的方向。 手机…

如何使用码匠连接 MySQL

目录 在码匠中集成 MySQL 在码匠中使用 MySQL 关于码匠 目前码匠已经实现了与 MySQL 数据源的连接&#xff0c;支持书写 SQL 语句&#xff0c;也支持通过图形化界面对数据进行增、删、改、查&#xff0c; 同时还支持将数据绑定至各种组件&#xff0c;并通过简单的代码实现数据…

Collecting package metadata (current_repodata.json): failed

一、问题描述 安装anaconda之后&#xff0c;想创建环境&#xff0c;用了下面这段代码&#xff1a; conda create -n pytorch python3.7 conda创建环境报错了&#xff0c;报了如下这一堆&#xff1a; Collecting package metadata (current_repodata.json): failedUnavailab…

echart中x轴数据过多时展示不全

项目中遇到需要展示一些柱状图&#xff0c;之前做相关功能时&#xff0c;横坐标x轴一直用的是时间&#xff0c;所以没有注意到这个问题。 如下图所示&#xff1a; 当x轴显示的是”人名“这种类型的值的时候&#xff0c;这种显示情况就有问题了&#xff0c;这样就不会知道&…

python学习笔记之例题篇NO.3

获得用户输入的一个整数N&#xff0c;输出N中所出现不同数字的和。‪‬‪‬‪‬‪‬‪‬‮‬‪‬‫‬‪‬‪‬‪‬‪‬‪‬‮‬‫‬‪‬‪‬‪‬‪‬‪‬‪‬‮‬‫‬‭‬‪‬‪‬‪‬‪‬‪‬‮‬‫‬‮‬‪‬‪‬‪‬‪‬‪‬‮‬‫‬‫‬ s list(set(list(input())))# ① r…

全局组件和局部组件

全局组件第一种定义方法&#xff1a;A、创建自己的组件&#xff1a;Loading.vueB、在main.js文件中引入组件并注册import Vue from vue import App from ./App.vue import * as filters from ./filterimport quanjuzujian from ./components/quanjuzujian.vueVue.component(qua…

carla0.9.13-UE4添加4轮车模型(Linux系统)

前期准备建模工具&#xff1a;blender:v3.4.1&#xff1b;可以在Ubuntu Software商店直接下载虚拟引擎&#xff1a;carla-UE4 (carla v0.9.13)&#xff0c;无需额外安装UE4&#xff0c;carla中自带插件编译carla参照官方文档&#xff1a;https://carla.readthedocs.io/en/0.9.1…

mysql存储引擎、事务、索引

目录MySQL进阶存储引擎什么是存储引擎常用存储引擎事务什么是事务怎么理解提交事务 和回滚事务事务特性事务的隔离级别索引什么是索引索引的实现原理什么条件下&#xff0c;我们会考虑给字段添加索引呢?什么条件下&#xff0c;索引会失效&#xff1f;索引分类MySQL进阶 存储引…

Scala的变量声明

文章目录变量声明&#xff08;一&#xff09;简单说明&#xff08;二&#xff09;利用val声明变量1&#xff0c;声明方式2&#xff0c;案例演示&#xff08;三&#xff09;利用var声明变量1&#xff0c;声明方式2&#xff0c;案例演示&#xff08;四&#xff09;换行输入语句&a…

横道图时间标尺在P6软件中的设置

卷首语 由于其直观简洁且易于管理的特性&#xff0c;使其成为展示项目活动顺序及时间安排的最常用的进度管理工具。 甘特图 甘特图&#xff08;Gantt Chart&#xff09;&#xff0c;又称为横道图或棒条图&#xff0c;是最早的项目进度管理工具之一。由于其直观简洁且易于管理…

从WEB到PWA 开发-发布-安装

见意如题&#xff01;本文主要来说说PWA开发&#xff01;作为一个前端程序员&#xff0c;在没有任何Android/IOS的开发情况下&#xff0c;想想我们有多少种方法来开发一个原生移动应用程序&#xff01;我们可以有非原生、混合开发&#xff0c;PWA等等手段。类似uniapp&#xff…