200、使用默认 Exchange 实现 P2P 消息 之 消息生产者(发送消息) 和 消息消费者(消费消息)

news2025/2/23 17:34:31

RabbitMQ 工作机制图:

Connection: 代表客户端(包括消息生产者和消费者)与RabbitMQ之间的连接。
Channel: 连接内部的Channel。channel:通道
Exchange: 充当消息交换机的组件。
Queue: 消息队列。
在这里插入图片描述

★ 消费消息

使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下:

(1)创建ConnectionFactory,设置连接信息,再通过ConnectionFactory获取Connection。

(2)通过Connection获取Channel。

(3)根据需要,调用Channel的queueDeclare()方法声明队列,如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。

(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,
该参数相当于JMS中的消息监听器。

这个 basicConsume()方法 相当于是异步消费。
而同步消费会出现阻塞情况,这就失去消息中间件存在的意义,所以先讲异步消费。

★ 发送消息

使用RabbitMQ Java Client依赖库开发消息生产者的大致步骤如下:

(1)创建ConnectionFactory,设置连接信息,再通过ConnectionFactory获取Connection。

(2)通过Connection获取Channel。

(3)根据需要调用exchangeDeclare()、queueDeclare()方法声明Exchange和队列、并完成队列与Exchange的绑定。
如果声明的Exchange还不存在,则创建该Exchange;否则直接使用已有的Exchange。
Declare:声明、宣布

(4)调用Channel的basicPublish()方法发送消息,调用该方法的第一个参数是exchange,
第二个参数为路由key,最后两个参数依次是消息属性和消息数据体。

【注意】:虽然消息生产者与队列是完全隔离的, 但如果消息生产者不声明消息队列,那系统中就可能暂时还没有任何消息队列。

在这种情况下,消息生产者向Exchange发送的消息将不会分发给任何队列,这些消息直接就被丢弃了。

【备注】:为了保证消息生产者能将消息发送到指定队列,消息生产者需要声明消息队列,保证消息队列的存在。

**问题:**消息生产者 和 消息队列 是完全隔离的,但是生产者为什么还要声明消息队列?
**原因:**因为程序如果先运行消息生产者,后运行消费者,而声明消息队列的方法又只存在消费者那边,那么在先运行消息生产者时,就会因为还没有生成消息队列,所以生产者发送到exchange的消息,会因为没有对应的消息队列而被丢弃。

代码演示:

先创建一个普通的 maven 项目。
在这里插入图片描述
添加一些属性 和 RabbitMQ的依赖
在这里插入图片描述

在这里插入图片描述

创建消息消费者

把创建连接的代码封装到一个方法里去。
在这里插入图片描述

消费者的代码
在这里插入图片描述
在这里插入图片描述

注意:channel.basicConsume 的第二个参数 autoAck:true,就是表示自动确认消息已经被消费完成了。就是当消费者接收到消息之后,就立马返回一个已经确认消费的消息回去给消息队列。
这样容易出现问题,就是消费者这边因为一收到消息就会自动确认消息被消费了并返回已经消费消息的结果回去给消息队列,但是可能消费者其实还没有把消息消费掉,而消息队列那边又以为消费者已经把消息消费了,所以就继续发消息给那个消费者。
而消费者一收到消息又自动确认消费并返回,就会导致这个消息队列的消息越来越多,然后消费者消费不完。
在这里插入图片描述
这个演示已消费未确认的演示放最后



执行消费者
在这里插入图片描述
控制台查看
原本没有这个消息队列,通过调用Channel的queueDeclare()方法声明队列,如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列

一开始消费者声明的这个消息队列,这个是否独占的exclusive 参数我是写true,
所以下图的 myQueue01的 Features 就是 Excl

在这里插入图片描述
在这里插入图片描述

这个就是创建的消费者。
用于消费这个 myQueue01 消息队列的消费者。
在这里插入图片描述

后面把 exclusive 改成了 false,是因为后面的生产者,需要也声明这个 myQueue01 消息队列,而如果这个消息队列是 独占的,就没法声明了,所以改成 false
在这里插入图片描述
在这里插入图片描述

创建消息生产者

生产者发送完消息就会关闭资源
消费者则是一直启动着
在这里插入图片描述

测试

先启动消费者或者启动生产者都一样,因为生产者和消费者都有调用queueDeclare() 方法声明消息队列,所以不存在发送消息后没找到对应的消息队列而导致消息被丢弃的情况。

启动消费者
在这里插入图片描述

然后启动生产者
生产者发送消息
在这里插入图片描述
再看消费者,已经消费了一条消息了。
因为先启动消费者,所以生产者发送的消息马上被消费了,在控制台的队列就看不到了。
在这里插入图片描述

再测试:

先启动生产者
关闭消费者,然后启动生产者发送消息
可以看出消息已经生产发送到消息队列了
在这里插入图片描述
在这里插入图片描述

这一步的流程图
在这里插入图片描述

启动消费者消费消息
在这里插入图片描述

流程图:
在这里插入图片描述

已消费未确认

注意:channel.basicConsume 的第二个参数 autoAck:true,就是表示自动确认消息已经被消费完成了。就是当消费者接收到消息之后,就立马返回一个已经确认消费的消息回去给消息队列。
这样容易出现问题,就是消费者这边因为一收到消息就会自动确认消息被消费了并返回已经消费消息的结果回去给消息队列,但是可能消费者其实还没有把消息消费掉,而消息队列那边又以为消费者已经把消息消费了,所以就继续发消息给那个消费者。
而消费者一收到消息又自动确认消费并返回,就会导致这个消息队列的消息越来越多,然后消费者消费不完。
在这里插入图片描述
在这里插入图片描述

如图:因为 autoAck 为false , 所以消费者消费消息后没有进行确认。这里的 unacked 条数就为1.

如果改成 autoAck 为false ,那么消费者消费消息的代码,要加上确认消息的方法。
在这里插入图片描述
这个就是手动确认消息。
在这里插入图片描述

完整代码:

ConnectionUtil 连接工具类

在这里插入图片描述

package cn.ljh.app.rabbitmq.util;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//连接工具
public class ConnectionUtil
{
    //获取连接的方法
    public static Connection getConnection() throws IOException, TimeoutException
    {
        //创建连接工厂----这个ConnectionFactory源码可以看出有构造器,所以直接new一个出来
        ConnectionFactory connectionFactory =  new ConnectionFactory();
        //设置连接信息
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("ljh");
        connectionFactory.setPassword("123456");
        connectionFactory.setVirtualHost("/"); //连接虚拟主机
        //从连接工厂获取连接
        Connection connection = connectionFactory.newConnection();
        //返回连接
        return connection;
    }
}

P2PProducer 生产者

package cn.ljh.app.rabbitmq.producer;

import cn.ljh.app.rabbitmq.consumer.P2PConsumer;
import cn.ljh.app.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

//消息生产者--使用默认的exchange
public class P2PProducer
{
    //(1)创建ConnectionFactory,设置连接信息,再通过ConnectionFactory获取Connection。
    //(2)通过Connection获取Channel。
    //(3)根据需要调用exchangeDeclare()、queueDeclare()方法声明Exchange和队列、并完成队列与Exchange的绑定。
    //    如果声明的Exchange还不存在,则创建该Exchange;否则直接使用已有的Exchange。
    //(4)调用Channel的basicPublish()方法发送消息,调用该方法的第一个参数是exchange,
    //    第二个参数为路由key,最后两个参数依次是消息属性和消息数据体。
    public static void main(String[] args) throws IOException, TimeoutException
    {
        //1、创建连接
        Connection conn = ConnectionUtil.getConnection();

        //2、通过Connection获取Channel。
        Channel channel = conn.createChannel();

        //3、调用exchangeDeclare()方法声明Exchange、调用queueDeclare()方法声明队列,并完成队列与Exchange的绑定
        //此处打算直接使用默认的Exchange来分发消息,因此无需声明 Exchange,只需声明队列
        channel.queueDeclare(P2PConsumer.QUEUE_NAME, true, false, false, null);

        String message = "生产者发送的消息的内容";

        //4、调用Channel的basicPublish()方法发送消息
        channel.basicPublish(""/*默认的 Exchange 没有名字,所以用空的字符串*/,
                P2PConsumer.QUEUE_NAME/*使用队列名作为路由key,表明该消息将会被路由到该队列*/,
                null /*指定额外的消息的属性*/,
                message.getBytes(StandardCharsets.UTF_8)/*消息体必须是字节数组类型-->byte[]*/
        );
        //5、关闭资源
        //关闭通道
        channel.close();
        //关闭连接
        conn.close();
    }
}

P2PConsumer 消费者

package cn.ljh.app.rabbitmq.consumer;

import cn.ljh.app.rabbitmq.util.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

//消息消费者
public class P2PConsumer
{
    // 使用 RabbitMQ Java Client 开发 消息消费者 的大致步骤如下:
    //(1)创建ConnectionFactory连接工厂,设置连接信息,再通过ConnectionFactory获取Connection连接。
    //(2)通过Connection获取Channel。
    //(3)根据需要、调用Channel的queueDeclare()方法声明队列,  Declare:声明、宣布
    //    如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列。
    //(4)调用Channel 的 basicConsume()方法开始处理消息,调用该方法时需要传入一个Consumer参数,该参数相当于JMS中的消息监听器。

    //常量
    public final static String QUEUE_NAME = "myQueue01";

    public static void main(String[] args) throws IOException, TimeoutException
    {
        //1、创建连接工厂,设置连接信息,然后再通过连接工厂获取连接
        Connection conn = ConnectionUtil.getConnection();

        //2、通过Connection获取Channel 消息通道
        Channel channel = conn.createChannel();

        //3、调用 Channel 的 queueDeclare() 方法声明队列
        //如果声明的队列已存在,该方法直接获取已有的队列;如果声明的队列还不存在,该方法将会创建新的队列
        //参数1:声明的队列名; 参数2:消息队列是否持久化
        //参数3:是否只允许该消息消费者消费该队列的消息,为true,则其他消费者在这个myQueue01队列消息积堆过多的情况下,也无法帮忙消费。
        //参数4:是否自动删除(如果为true,在该队列没消息的情况下,会自动删除该队列) 参数5:填写额外的参数
        channel.queueDeclare(QUEUE_NAME, true, false, false, null);

        //4、调用Channel 的 basicConsume()方法开始处理消费消息
        channel.basicConsume(QUEUE_NAME/*消费这个名字的消费队列里面的消息*/,
                true/*消息的确认模式:是否自动确认*/,
                new DefaultConsumer(channel)
                {
                    //处理消息:当这个消息队列收到消息的时候,这个方法就会被触发。重写这个方法:
                    @Override
                    public void handleDelivery(String consumerTag,
                                               Envelope envelope /*消息所在的信封,存放消息的exchange、路由key这些*/,
                                               AMQP.BasicProperties properties /*消息的那些属性*/,
                                               byte[] body /*body:消息的消息体*/) throws IOException
                    {
                        //把消息体中的消息拿出来
                        String message = new String(body, "UTF-8");
                        //printf:格式化输出函数   %s:输出字符串  %n:换行
                        System.err.printf("P2PConsumer收到来自Exchange为【%s】、路由key为【%s】的消息,消息内容为%s%n",
                                envelope.getExchange(),envelope.getRoutingKey(),message);
                    }
                }
        );
    }


}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>cn.ljh</groupId>
    <artifactId>rabbitmqtest</artifactId>
    <version>1.0.0</version>
    <name>rabbitmqtest</name>

    <!--  属性  -->
    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>11</java.version>
    </properties>

    <!--  依赖  -->
    <dependencies>
        <!-- RabbitMQ 的依赖库 -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>5.13.0</version>
        </dependency>

    </dependencies>

</project>

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1088258.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Leetcode---365周赛

题目列表 2873. 有序三元组中的最大值 I 2874. 有序三元组中的最大值 II 2875. 无限数组的最短子数组 2876. 有向图访问计数 一、有序三元组中的最大值I 看一眼该题的数据范围&#xff0c;直接三层for循环暴力枚举&#xff0c;时间复杂度O(n^3)&#xff0c;代码如下 class…

Avalonia常用小控件Svg

1.项目下载地址&#xff1a;https://gitee.com/confusedkitten/avalonia-demo 2.UI库Semi.Avalonia&#xff0c;项目地址 https://github.com/irihitech/Semi.Avalonia 3.SVG库&#xff0c;Avalonia.Svg.Skia&#xff0c;项目地址 https://github.com/wieslawsoltes/Svg.Ski…

淘宝店铺商品评论数据采集,淘宝商品评论数据接口,淘宝API接口

采集淘宝店铺商品评论数据的方法如下&#xff1a; 进入主界面&#xff0c;选择"自定义任务"。将商品信息页的网址复制粘贴到网站输入框中&#xff0c;点击"保存设置"。将页面下拉到底部&#xff0c;点击"下一页"按钮&#xff0c;在右侧的操作提…

IDEA 中SpringBoot对Run/Debug Configurations配置 SpringBoot的多环境参数指定

例如下面中有多种环境&#xff1a;dev、test、prod 等配置参数&#xff0c;运行服务时候指定其中一种 VM options:内部配置参数 -Dspring.config.namebootstrap -Dspring.spring.profilesdev # 也可以指定端口 -Dserver.port8080 -Dspring.profiles.activetest -Ddebug 参考链…

校招C#面试题整理—Unity客户端

前言 博客已经1年多没有更新了&#xff0c;这一年主要在实习并准备秋招和春招&#xff0c;目前已经上岸Unity客户端岗位&#xff0c;现将去年校招遇到的一些面试题的事后整理分享出来。答案是笔者自己整理的不一定保证准确&#xff0c;欢迎大家在评论区指出。 Unity客户端岗的…

【小巧玲珑】文件太大,怎么办?分卷压缩技术了解下,这才是压缩技术

【小巧玲珑】文件太大&#xff0c;怎么办&#xff1f;分卷压缩技术了解下&#xff0c;这才是压缩技术 1、痛点2、场景重现2.1 jar包2.1 ZIP压缩 3、压缩步骤3.1 新建压缩文件3.2 压缩结果 4、解压步骤5、效果6、jar压缩算法 1、痛点 通过浏览器客户端访问云服务&#xff0c;文…

【VR】【Unity】白马VR课堂系列-VR开发核心基础03-项目准备-VR项目设置

【内容】 详细说明 在设置Camera Rig前,我们需要针对VR游戏做一些特别的Project设置。 点击Edit菜单,Project Settings,选中最下方的XR Plugin Management,在右边面板点击Install。 安装完成后,我们需要选中相应安卓平台下的Pico VR套件,关于怎么安装PICO VR插件,请参…

硬盘格式化怎么选?NTFS/FAT32/exFAT

我们在初次使用硬盘时需要进行格式化&#xff0c;很多移动硬盘和U盘在使用时也有格式化的需求&#xff0c;不过在格式化的时候会面临3个选项&#xff0c;分别是FAT32、NTFS和exFAT&#xff0c;他们到底有什么区别&#xff0c;我们应该如何选呢&#xff1f; 首先简单介绍一下文件…

顶灯控制器OHC

OHC(Over Head Console)顶部控制终端系统&#xff0c;主要实现对车内饰灯以及天窗的控制功能。OHC产品采用平台化设计&#xff0c;并已通过多家整车厂的设计评审和试验验证&#xff0c;为特斯拉、福特、林肯、捷豹、路虎若干车型配套。 产品应用 车内照明灯控制天窗控制后排照…

网站的常见攻击与防护方法

在互联网时代&#xff0c;几乎每个网站都存在着潜在的安全威胁。这些威胁可能来自人为失误&#xff0c;也可能源自网络犯罪团伙所发起的复杂攻击。无论攻击的本质如何&#xff0c;网络攻击者的主要动机通常是谋求经济利益。这意味着无论您经营的是电子商务项目还是小型商业网站…

mongoDB 性能优化

文章目录 前言mongoDB 性能优化1. explain方法来查看查询的执行计划2. 查看mongoDB 集合的索引3. mongoDB 怎么添加索引4. 升序索引与降序索引是什么意思 前言 如果您觉得有用的话&#xff0c;记得给博主点个赞&#xff0c;评论&#xff0c;收藏一键三连啊&#xff0c;写作不易…

Linux - make命令 和 makefile

make命令和 makefile 如果之前用过 vim 的话&#xff0c;应该会对 vim 又爱又恨吧&#xff0c;刚开始使用感觉非常的别扭&#xff0c;因为这种编写代码的方式&#xff0c;和在 windows 当中用图形化界面的方式编写代码的方式差别是不是很大。当你把vim 用熟悉的之后&#xff0…

专业韩语论文翻译,论文中译韩哪里比较专业?

据了解&#xff0c;论文翻译是翻译工作中较常见的一种翻译题材&#xff0c;论文翻译的主要目的是在国外期刊上发表&#xff0c;加强国际学术交流。那么&#xff0c;如何做好论文翻译&#xff0c;论文中译韩哪里比较专业&#xff1f; 业内人士指出&#xff0c;翻译韩语论文&…

pycharm2020无法打开,点击无反应

pycharm 2020 无法打开&#xff0c;点击无反应&#xff0c;今天我碰到这现象&#xff0c;总结大体原因 C:\Users\ygw\AppData\Roaming\JetBrains &#xff08;删除该目录即可&#xff0c;一般由于升级安装 或 安装两个不同版本 会存在老旧文件影响导致&#xff09;

了解什么是JWT的原理及实际应用

目录 一、介绍&讲述 ( 1 ) 什么是JWT ( 2 ) 为什么要学 二、结构 三、Jwt的工具类的使用 1. 依赖 2. 工具类 3. 过滤器 4. 控制器 5. 配置 6. 测试类 用于生成JWT 解析Jwt 复制jwt&#xff0c;并延时30分钟 测试JWT的有效时间 测试过期JWT的解析 四、…

智能制造优化,RFID生产线管理系统解决方案

一、背景介绍 随着全球经济的发展&#xff0c;传统制造业面临着越来越高的成本和低利润的挑战&#xff0c;为了提升企业的整体利润率&#xff0c;优化管理流程成为必要的手段之一&#xff0c;在传统的制造企业中&#xff0c;生产线通常采用单件流生产模式&#xff0c;但这种模…

成立 15 年的美图分享,AI 视觉大模型的核心能力是什么?

出品 | CSDN 云计算 国民级美颜修图软件美图秀秀&#xff0c;从移动互联网时代火到现在&#xff0c;而它背后的美图公司也走过了十五年的发展&#xff0c;旗下拥有众多的专业影像与设计产品。最近&#xff0c;美图公司举办 15 周年生日会&#xff0c;生日会上美图还发布了自研 …

深度学习基础知识数据 数据预处理transforms流程讲解

深度学习基础知识数据 数据预处理transforms流程讲解 1、数据预处理2、使用节点2、transform.RandomResizedCrop 随机尺寸裁剪缩放3、水平翻转 与 垂直翻转4、ColorJitter变换5、ToTensor6、Normalization 归一化7、transforms.Compose8、重写transforms1、分类任务2、目标检测…

“Jwt认证在前后端分离架构中的应用与优化“

目录 引言1. JWT的简介1.1 什么是JWT1.2 JWT的优势 2. JWT工具类2.1 JWT生成与解析2.2 JWT与安全性 3. JWT案例演示后台改动前台改动 总结 引言 在当今互联网应用开发中&#xff0c;前后端分离架构已经成为一种主流的开发模式。而身份验证和授权是保证系统安全性的重要环节之一…

Unity 快捷键的一些记录

1.Unity Prefab Apply All 设置快捷键&#xff0c;修改预设体之后快捷键应用 打包会出问题&#xff1a;The type or namespace name ‘EditorWindow‘ could not be found EditorWindow类无法打包出EXE 添加unity关键字定义如下文所示&#xff1a; #if UNITY_EDITOR using Uni…