RabbitMQ 客户端 连接、发送、接收处理消息

news2024/12/23 23:07:04

RabbitMQ 客户端 连接、发送、接收处理消息

一. RabbitMQ 的机制跟 Tcp、Udp、Http 这种还不太一样

RabbitMQ 服务,不是像其他服务器一样,负责逻辑处理,然后转发给客户端
而是所有客户端想要向 RabbitMQ服务发送消息,

第一步:创建一个链接 RabbitMQ 服务的连接

需要传入 RabbitMQ服务地址、用户名、密码,然后在连接代码中传入一个 queue 的字符串作为 标志
连接成功后,RabbitMQ服务上就可以看到这个链接了
如下图,可以看到有一个 Name = queueL1 的连接,后边有链接状态、消息数
Ready 和 Total 都是 0
在这里插入图片描述

向 RabbitMQ 发送消息的:

(1) 如果没有建立连接,执行第一步,建立一个链接
(2) 通过 发送消息接口向 RabbitMQ 服务 发消息
(3) RabbitMQ 服务接收到消息,只是按照连接的 queue 分别把消息放在自己名字的 queue 下, RabbitMQ 服务只是存着客户端发送的消息,服务什么都不处理

向 RabbitMQ 服务发送几条消息
下图可以看到 queueL1 的队列已经接收了 5 条消息,这五条消息如果没有客户端接收处理,就一直在这存着
在这里插入图片描述

接收 RabbitMQ 服务消息:

(1) 如果没有建立连接,执行第一步,建立一个链接
(2) 注册接收消息接口,在 RabbitMQ 中叫 消费消息,可以标记消费消息后是否将 RabbitMQ 的数据删除
(3) 如果 RabbitMQ 服务收到消息,就转发给 注册接收消息接口的 连接,如果接收的连接标记了 AutoDelete,那么发送给客户端后,RabbitMQ 就会将消息从消息队列中删除

注册接收消息,我的客户端就会收到 RabbitMQ 发送过来的消息,消息中包含发送上来的消息内容,还有发送消息的 queue 名字

此时再看,就会发现 Ready 和 Total 又变成 0 了
在这里插入图片描述

为什么上面讲解中将 接收 RabbitMQ 服务消息、向 RabbitMQ 发送消息的 分开说
是因为 RabbitMQ 发送消息就仅仅是发消息,发送完就不管了
而 RabbitMQ 的消费消息(接收消息) 也仅仅是接收消息,它不管是谁发的消息,只要是发送的 RabbitMQ 服务的消息,它都能接收,

(3.1) 比如我创建了 一个 连接,queue名为 xxxA,
它发送了消息 “Hello World”,
xxxA 连接自己又注册了 消费消息(接收消息),那么xxxA 自己就会接收到 xxxA 队列发送的 Hello World 信息

(3.2) 我又创建了 新的连接,queue 名还是 xxxA
那么新的连接也可以收到 (3.1) 发的 消息 HelloWorld

二. 客户端连接服务器

  1. 实例化一个 连接 RabbitMQ 服务的客户端连接
    实例化需要传入 服务地址、端口、用户名、密码
using RabbitMQ.Client;
using System;
using System.Threading.Tasks;
using RabbitMQ.Client.Exceptions;
using UnityEngine;
using System.Text;
using RabbitMQ.Client.Events;

namespace Network
{
    /// <summary>
    /// RabbitMQ 创建一个链接
    /// 供 RabbitMQReceive、RabbitMQSend 使用
    /// </summary>
    public class RabbitMQConnect
    {
        private RabbitMQConnectData connectData;

        private ConnectionFactory factory;
        private IChannel channel;
        private IConnection connection;

        private NetWorkState state;

        private Action<string, byte[]> receivedCallBack;

        private const int TimeOut = 10; //连接超时 10 秒
        private bool dispose = false;

        public RabbitMQConnect(RabbitMQConnectData connectData)
        {
            this.connectData = connectData;
            State = NetWorkState.Disconnected;
            dispose = false;
        }

        public string Queue
        {
            get { return connectData.queue; }
        }

        public NetWorkState State
        {
            get { return state; }
            private set { state = value; }
        }

        public IChannel Channel
        {
            get { return channel; }
        }

        /// <summary>
        /// 网络是否连接中
        /// </summary>
        public bool IsConnect
        {
            get
            {
                if (null == channel || null == connection)
                {
                    return false;
                }
                return channel.IsOpen && connection.IsOpen;
            }
        }

        public async Task StartConnect()
        {
            if (State == NetWorkState.Connecting)
            {
                await Task.Delay(TimeOut * 1000);
            }

            if (State == NetWorkState.Connected)
            {
                return;
            }
            // 创建连接工厂
            // 如果初始化失败,不会启动恢复连接
            //factory = new ConnectionFactory()
            //{
            //    HostName = hostName, // 替换为你的 RabbitMQ 服务器地址
            //    UserName = userName, // 替换为用户名
            //    Password = password  // 替换为密码
            //};

            string url = $"amqp://{connectData.userName}:{connectData.password}@{connectData.hostName}:{connectData.port}"; //string.Format("amqp://unity:unity@139.9.137.14:5672");
            factory = new ConnectionFactory()
            {
                Uri = new Uri(url)
            };

            // 自动恢复连接
            factory.AutomaticRecoveryEnabled = true;
            // 如果由于异常导致恢复失败(例如RabbitMQ节点仍然不可达),它将在固定的时间间隔(默认为5秒)后重试。间隔时间可配置如下
            // Connection.CloseAsync 关闭的连接不会启动自动恢复连接
            factory.NetworkRecoveryInterval = TimeSpan.FromSeconds(10);
            factory.TopologyRecoveryEnabled = true;

            while (State != NetWorkState.Connected)
            {
                if (!dispose)
                {
                    await Connect();
                }
            }

            await Task.Delay(1);

            if (!string.IsNullOrEmpty(connectData.receiveQueeu))
            {
                await BasicConsumer();
            }
        }

        private async Task Connect()
        {
            try
            {
                State = NetWorkState.Connecting;
                // 异步创建连接
                connection = await factory.CreateConnectionAsync();
                channel = await connection.CreateChannelAsync();
                // 声明队列
                QueueDeclareOk queueDeclareOk = await channel.QueueDeclareAsync(
                    queue: connectData.queue,
                    durable: false,
                    exclusive: false,
                    autoDelete: false,
                    arguments: null);

                /*
                    autoDelete = true:没有消费者时队列自动删除,通常用于临时或一次性的队列。
                    autoDelete = false:队列不会自动删除,通常用于需要长期存在的队列。
                    选择是否设置 autoDelete = true 取决于你是否希望队列在没有消费者时自动删除。如果你的队列是临时的、一次性的,那么使用 autoDelete = true 会更适合;如果队列是长期需要使用的,则设置为 autoDelete = false 会更为合适 
                */

                State = NetWorkState.Connected;

                // 设置消费者的预取计数为10,允许同时处理10条消息
                await channel.BasicQosAsync(
                    prefetchSize: 0, 
                    prefetchCount: 10, 
                    global: false);
                Debug.Log("RabbitMQ Connect Success");
                GameNotifycation.GetInstance().Notify<NetWorkState>(ENUM_MSG_TYPE.MSG_NETWORK_STATE_CHANGE, State);
            }
            catch (BrokerUnreachableException e)
            {
                await Task.Delay(5000);
                State = NetWorkState.ConnectFailed;
                Debug.LogError("ConnectError:" + e.ToString());
                // apply retry logic
            }

            await Task.Delay(1);
        }

        /// <summary>
        /// 发送消息
        /// exchange:   要发布消息的交换机名称。
        /// routingKey: 路由键,决定消息应该路由到哪个队列。
        /// mandatory:  如果设置为 true,RabbitMQ 会确保消息至少被投递到一个队列。如果没有队列接收该消息,RabbitMQ 会触发 basic.return。
        /// immediate:  如果设置为 true,RabbitMQ 会在消息无法立即被消费时丢弃消息。
        /// basicProperties: 消息的属性,类型为 IBasicProperties。这些属性可以设置消息的优先级、持久性等。
        /// body: 消息体的字节数组。
        /// 
        /// BasicPublishAsync 方法 没有返回消息投递的结果。它仅仅表示“请求已经被成功发送到 RabbitMQ 的交换机”。如果发布操作成功,Task 会正常完成,不会抛出异常。你可以通过异常处理来捕获潜在的错误。
        /// </summary>
        /// <param name="msg"></param>
        public async Task SendAsync(string message)
        {
            if (!IsConnect)
            {
                UnityEngine.Debug.Log("Send not IsConnect");
                await StartConnect();
            }

            try
            {
                IChannel channel = Channel;
                var body = Encoding.UTF8.GetBytes(message);
                var props = new BasicProperties();
                props.ContentType = "text/plain";
                props.DeliveryMode = DeliveryModes.Transient;
                await channel.BasicPublishAsync(
                    exchange: "",
                    routingKey: Queue,
                    mandatory: false,
                    basicProperties: props,
                    body: body).ConfigureAwait(false);

                //Debug.Log($"[x] Sent: Complete");
            }
            catch (Exception ex)
            {
                UnityEngine.Debug.LogError($"Error publishing message: {ex.Message}");
            }
        }

        /// <summary>
        /// 设置接收消息回调
        /// </summary>
        /// <param name="receivedCallBack"></param>
        public void SetReceive(Action<string, byte[]> receivedCallBack)
        {
            this.receivedCallBack = receivedCallBack;
        }

        /// <summary>
        /// 创建异步消费者
        /// </summary>
        /// <returns></returns>
        public async Task<string> BasicConsumer()
        {
            if (!IsConnect)
            {
                await StartConnect();
            }

            var consumer = new AsyncEventingBasicConsumer(Channel);
            // 处理消息的异步回调逻辑
            consumer.ReceivedAsync += ReceivedAsync;

            // 开始消费
            string result = await Channel.BasicConsumeAsync(
                queue: connectData.receiveQueeu,  // 指定消费者要监听的队列名称
                autoAck: false,        // 决定是否自动确认消息。如果 true,消息在交付时会自动确认。如果 false,则需要手动调用 BasicAck 确认消息
                consumer: consumer);  // 指定消息的处理方式,通过实现 IBasicConsumer 接口来定义如何处理从队列中接收到的消息

            /*
                autoAck = true:消息一旦传递给消费者,RabbitMQ 就认为该消息被成功处理,无需再确认。
                autoAck = false:消费者需要显式地调用 channel.BasicAck 来确认消息的处理,通常用于消息处理失败时能够重试消息。
            */

            return result;
        }

        /// <summary>
        /// 异步接收消息
        /// 如果 Channel.BasicConsumeAsync 方法中 autoAck 设置为 true,那么 channel.BasicAckAsync 调用是不允许的
        /// 想在  Channel.BasicConsumeAsync 消费消息收到消息时 调用 channel.BasicAckAsync,必须将 Channel.BasicConsumeAsync 方法中 autoAck 设置为 false
        /// </summary>
        /// <param name="sender"></param>
        /// <param name="eventArgs"></param>
        /// <returns></returns>
        private async Task ReceivedAsync(object sender, BasicDeliverEventArgs eventArgs)
        {
            try
            {
                //Debug.Log("ReceivedAsync");
                AsyncEventingBasicConsumer consumer = sender as AsyncEventingBasicConsumer;
                string queue = consumer.Channel.CurrentQueue;
                var body = eventArgs.Body.ToArray();
                receivedCallBack?.Invoke(queue, body);
                // 模拟异步任务处理(比如访问数据库或调用其他服务)
                await channel.BasicAckAsync(eventArgs.DeliveryTag, false);
            }
            catch (Exception ex)
            {
                Debug.LogError($"Error processing message: {ex.Message}");
                // 如果处理失败,可以拒绝并重新入队(可选)
                //await Channel.BasicNackAsync(eventArgs.DeliveryTag, multiple: false, requeue: true);
            }
            await Task.Delay(1);
        }

        /// <summary>
        /// 关闭连接
        /// </summary>
        public async void Dispose()
        {
            dispose = true;
            // 先关闭通道、再关闭连接
            if (channel != null)  // 通道关闭
            {
                await channel.CloseAsync();
                channel = null;
            }

            if (connection != null)  // 连接关闭
            {
                UnityEngine.Debug.Log("ConnectDispose");
                await connection.CloseAsync();
                connection = null;
            }
            await Task.Delay(1);
        }
    }
}

RibbitMQ 服务通过 queue 来区分每一个连接的客户端,代码部分如下

QueueDeclareOk queueDeclareOk = await channel.QueueDeclareAsync(
                    queue: queue,
                    durable: false,
                    exclusive: false,
                    autoDelete: false,
                    arguments: null);
  1. 客户端实例

  2. 测试用例

using UnityEngine;
using Network;
using LitJson;
using System.Text;
using System.Collections;
using System.Collections.Generic;

public class RabbitMQDemo : MonoBehaviour
{
    // 客户端
    private RabbitMQConnect rabbitMQConnect;
    private Queue<string> receiveQueue = new Queue<string>();

    void Start()
    {
        RabbitMQConnectData connectData = new RabbitMQConnectData();
        connectData.queue = "TestA";
        connectData.receiveQueeu = "TestA";
        connectData.hostName = "XXX.XXX.XXX.XXX";
        connectData.port = "5672";
        connectData.userName = "unity";
        connectData.password = "unity";

        // 实例化
        rabbitMQConnect = new RabbitMQConnect(connectData);
        rabbitMQConnect.SetReceive(Receive);
        StartConnect();
    }

    private async void StartConnect()
    {
        await rabbitMQConnect.StartConnect();
    }

    private async void Send(string meg)
    {
        await rabbitMQConnect.SendAsync(meg);
    }

    private void Receive(string queue, byte[] byteData)
    {
        var json = Encoding.UTF8.GetString(byteData);
        UnityEngine.Debug.Log($"[x] ReceivedAsync: {json}");
        receiveQueue.Enqueue(netWorkData);
    }

    private int number = 1000;
    // Update is called once per frame
    void Update()
    {
        if (Input.GetKeyDown(KeyCode.A))
        {
            ++number;
            Send("Hello RabbitMQ:" + number);
        }

        DispatchMessage();
    }

    private void DispatchMessage()
    {
        if (receiveQueue.Count <= 0)
        {
            return;
        }

        string json = receiveQueue.Dequeue();
    }

    private void OnDestroy()
    {
        Debug.LogError("OnDestroy");
        rabbitMQConnect.Dispose();
    }
}

	/// <summary>
	/// 网络连接状态
	/// </summary>
	public enum NetWorkState
	{
		// init
		/// <summary>
		/// 关闭/断开连接
		/// </summary>
		Closed,

		// client
		/// <summary>
		/// 已经建立连接
		/// </summary>
		Connected,

		/// <summary>
		/// 正在请求连接
		/// </summary>
		Connecting,

		/// <summary>
		/// 连接失败
		/// </summary>
		ConnectFailed,

		// both
		/// <summary>
		/// 连接超时
		/// </summary>
		Timeout,

		/// <summary>
		/// 断开连接
		/// </summary>
		Disconnected,
	}

扩展
可以在 网页上 Overview 页面,找到 Ports and contexts 部分
可以看到每种协议对应的端口是不一样的
每种协议都有一种独立的连接方式
需要根据自己选择的协议拼接路径

比如 我上面代码使用的 http 方式

    string localHost = "localhost"; // ip如 xxx.xxx.xxx.xxx
    string userName = "用户名";
    string password = "密码";
	// 创建连接工厂
	// 如果初始化失败,不会启动恢复连接
	factory = new ConnectionFactory()
	{
	    HostName = hostName, // 替换为你的 RabbitMQ 服务器地址
	    UserName = userName, // 替换为用户名
	    Password = password  // 替换为密码
	};
    

amqp 协议连接方式如下

	string url = $"amqp://{userName}:{password}@{hostName}:{port}"; 
	factory = new ConnectionFactory()
	{
	    Uri = new Uri(url)
	};

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2253695.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PyQt 中的无限循环后台任务

在 PyQt 中实现一个后台无限循环任务&#xff0c;需要确保不会阻塞主线程&#xff0c;否则会导致 GUI 无响应。常用的方法是利用 线程&#xff08;QThread&#xff09; 或 任务&#xff08;QRunnable 和 QThreadPool&#xff09; 来运行后台任务。以下是一些实现方式和关键点&a…

云计算vsphere 服务器上添加主机配置

这里是esxi 主机 先把主机打开 然后 先开启dns 再开启 vcenter 把每台设备桌面再vmware workstation 上显示 同上也是一样 &#xff0c;因为在esxi 主机的界面可能有些东西不好操作 我们选择主机和集群 左边显示172.16.100.200

Python酷库之旅-第三方库Pandas(255)

目录 一、用法精讲 1206、pandas.tseries.offsets.SemiMonthEnd.is_on_offset方法 1206-1、语法 1206-2、参数 1206-3、功能 1206-4、返回值 1206-5、说明 1206-6、用法 1206-6-1、数据准备 1206-6-2、代码示例 1206-6-3、结果输出 1207、pandas.tseries.offsets.S…

Envoy-istio

最近研究envoy-istio&#xff0c;发现这个博客&#xff0c;觉得很不错&#xff0c;这里记录一下 envoy-istio介绍 envoy-istio - 随笔分类 - yaowx - 博客园 envoy部分七&#xff1a;envoy的http流量管理基础 envoy部分六&#xff1a;envoy的集群管理 envoy部分五&#xff…

甘特图的绘制步骤:教你如何绘制甘特图

甘特图是项目管理中一种极为重要的可视化工具&#xff0c;它以直观的方式展示项目进度&#xff0c;包括任务的开始时间、结束时间、持续时长以及任务之间的先后顺序。在当今的项目管理领域&#xff0c;Excel 和专业的项目管理软件是制作甘特图的两大常用途径&#xff0c;它们各…

C++模拟堆

模板题目 图片来源Acwing 堆的基础知识 代码实现 #include<iostream> #include<algorithm>using namespace std;const int N 1e5 10; int a[N]; int n, m;void down(int u) {int t u;if (2 * u < n && a[2 * u] < a[u]){t 2 * u;}if (2 * u …

牛客linux

1、 统计文件的行数 # 方法 1 wc -l ./nowcoder.txt | awk {print $1} # 方法 2 &#xff0c;awk 可以打印所有行的行号, 或者只打印最后一行 awk {print NR} ./nowcoder.txt |tail -n 1 awk END{print NR} ./nowcoder.txt # 方法 3 grep -c 、-n等等 grep -c "" ./…

【unity小技巧】在 Unity 中,Application获取各种文件路径或访问不同类型的存储路径

文章目录 前言1. **Application.persistentDataPath**2. **Application.dataPath**3. **Application.streamingAssetsPath**4. **Application.temporaryCachePath**5. **Application.consoleLogPath**6. **Application.userDataPath**7. **Application.streamingAssetsPath 与 …

汇编语言学习-二

好吧&#xff0c;已经隔了两天&#xff0c;下完班看了两天&#xff0c;在电脑上装了虚拟机版的MS_DOS,主要是怕折腾坏我的电脑系统&#xff1b; 这个第二天应该是称为第二章更为合适&#xff0c;目前第二章已经看完&#xff0c;基本的命令也是敲了敲&#xff1b; 下面就进行一…

游戏引擎学习第33天

仓库: https://gitee.com/mrxiao_com/2d_game 位置表示的回顾 在之前的工作中&#xff0c;已经实现了将单位从像素空间转移到真实的空间&#xff0c;这样可以确保所有的动作和物体都按米为单位来进行。这个转变让游戏中的物体不再是基于像素的&#xff0c;而是按照真实世界的…

泷羽sec-burp(3)decodor comparer logger模块使用 学习笔记

声明&#xff01; 学习视频来自B站up主 **泷羽sec** 有兴趣的师傅可以关注一下&#xff0c;如涉及侵权马上删除文章&#xff0c;笔记只是方便各位师傅的学习和探讨&#xff0c;文章所提到的网站以及内容&#xff0c;只做学习交流&#xff0c;其他均与本人以及泷羽sec团队无关&a…

vue-cli创建项目报错:command failed: npm install --loglevel error

网上解决方法有很多&#xff0c;对于我都没用。 最后用这个方法起了作用&#xff1a; 尝试将npm源设置为HTTP&#xff0c;慎用&#xff0c;可能不安全 npm config set registry http://registry.npm.taobao.org/ 改为http就顺利创建项目了。

《船舶物资与市场》是什么级别的期刊?是正规期刊吗?能评职称吗?

问题解答 问&#xff1a;《船舶物资与市场》是不是核心期刊&#xff1f; 答&#xff1a;不是&#xff0c;是知网收录的正规学术期刊。 问&#xff1a;《船舶物资与市场》级别&#xff1f; 答&#xff1a;国家级。主管单位&#xff1a;中国船舶集团有限公司 主办单…

超详细MacBook Pro(M1)配置GO语言环境(图文超详细版)

前提 当我第一次使用MacBook配置Go语言环境时&#xff0c;网上的资料错综复杂&#xff0c;部分资料对于第一次使用MacBook的小白们非常不友好&#xff0c;打开终端时&#xff0c;终端的位置对应的访达中的位置不是很清楚&#xff0c;因此才有了这篇文章&#xff0c;该文章通过…

单端和差分信号的接线法

内容来源&#xff1a;【单端信号 差分信号与数据采集卡的【RSE】【 NRES】【 DIFF】 模式的连接】 此篇文章仅作笔记分享。 单端输入 单端信号指的是输入信号由一个参考端和一个信号端构成&#xff0c;参考端一般是地端&#xff0c;信号就是通过计算信号端口和地端的差值所得…

前端开发 之 15个页面加载特效中【附完整源码】

前端开发 之 15个页面加载特效中【附完整源码】 文章目录 前端开发 之 15个页面加载特效中【附完整源码】八&#xff1a;圆环百分比加载特效1.效果展示2.HTML完整代码 九&#xff1a;毒药罐加载特效1.效果展示2.HTML完整代码 十&#xff1a;无限圆环加载特效1.效果展示2.HTML完…

【H2O2|全栈】Node.js与MySQL连接

目录 前言 开篇语 准备工作 初始配置 创建连接池 操作数据库 封装方法 结束语 前言 开篇语 本节讲解如何使用Node.js实现与MySQL数据库的连接&#xff0c;并将该过程进行函数封装。 与基础部分的语法相比&#xff0c;ES6的语法进行了一些更加严谨的约束和优化&#…

spark-sql配置教程

1.前期准备 &#xff08;1&#xff09;首先要把hadoop集群&#xff0c;hive和spark等配置好 hadoop集群&#xff0c;hive的配置可以看看这个博主写的博客 大数据_蓝净云的博客-CSDN博客 或者看看黑马程序员的视频 黑马程序员大数据入门到实战教程&#xff0c;大数据开发必…

【网络安全】网站常见安全漏洞 - 网站基本组成及漏洞定义

文章目录 引言1. 一个网站的基本构成2. 一些我们经常听到的安全事件3. 网站攻击者及其意图3.1 网站攻击者的类型3.2 攻击者的意图 4. 漏洞的分类4.1 按来源分类4.2 按危害分类4.3 常见漏洞与OWASP Top 10 引言 在当今的数字化时代&#xff0c;安全问题已成为技术领域不可忽视的…

【最新免费PPT制作并下载】Kimi PPT助手:智能化演示文稿生成,职场效率的革命性提升

最新免费PPT制作方法在这里&#xff01;下面我想向大家介绍一款能够极大提升我们工作效率的工具——Kimi PPT助手。 Kimi PPT助手&#xff1a;智能化演示文稿生成 Kimi PPT助手是由Moonshot AI推出的一款革命性产品&#xff0c;它通过人工智能技术&#xff0c;实现了PPT的一键…