【RabbitMq源码阅读】分析RabbitMq发送消息源码

news2025/1/11 16:49:44

一:基本介绍

        本文通过demo构建测试代码,debug分析的方法查看RabbitMq源码。

rabbit的中文文档: 官方中文文档

二:测试Demo

2.1 引入Springboot整合的RabbitMq依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.2 手写获取RabbitMq的连接,通道等信息

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * RabbitMq Source Test
 * @author c
 * date: 2024-9-26 19:12:27
 */
public class RabbitMqSourceTest {

    public static void main(String[] args) throws IOException, TimeoutException {

        ConnectionFactory connectionFactory = new ConnectionFactory();
        // 设置rabbitmq的服务器地址
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setPort(AMQP.PROTOCOL.PORT);
        // 建立连接
        Connection conn = connectionFactory.newConnection();

        String exchange = "test-Exchange";
        String queueName = "test-Queue";
        String key = "test-Exchange-key";
        String msg = "测试消息";

        // 创建一个channel
        Channel channel = conn.createChannel();

        // 创建一个直连交换机
        channel.exchangeDeclare(exchange, BuiltinExchangeType.DIRECT);

        // 创建一个队列
        channel.queueDeclare(queueName, true, false, false, null);

        // 绑定队列
        channel.queueBind(queueName, exchange, key);

        // 发送消息
        channel.basicPublish(exchange, key, null, msg.getBytes());

        channel.close();
        conn.close();   
    }

}

上面的基本流程

简单理解为:通过连接,获取通道,数据传输

基本步骤:

  1. 获取连接(Connection)
  2. 获取通道(channel)
  3. 创建交换机(Exchange)
  4. 创建队列(Queue)
  5. 队列通过key绑定交换机(Bind)
  6. 往交换机中的key发送消息
  7. 其他方法

(上面创建消息的也是可以直接创建队列,进行消息的发送,源码中会将没有创建exchange设置成默认的,具体可以自己查看一下)

三:详细分析步骤

3.0 获取连接(Connection)

 public Connection newConnection(ExecutorService executor, AddressResolver addressResolver, String clientProvidedName)
        throws IOException, TimeoutException {
        if(this.metricsCollector == null) {
            this.metricsCollector = new NoOpMetricsCollector();
        }
        // make sure we respect the provided thread factory
        FrameHandlerFactory fhFactory = createFrameHandlerFactory();
        ConnectionParams params = params(executor);
        // set client-provided via a client property
        if (clientProvidedName != null) {
            Map<String, Object> properties = new HashMap<String, Object>(params.getClientProperties());
            properties.put("connection_name", clientProvidedName);
            params.setClientProperties(properties);
        }
        // 如果设置 自动发送为 true
        if (isAutomaticRecoveryEnabled()) {
            // see 
         
  com.rabbitmq.client.impl.recovery.RecoveryAwareAMQConnectionFactory#newConnection
            // 创建连接            
AutorecoveringConnection conn = new AutorecoveringConnection(params, fhFactory, addressResolver, metricsCollector);
            // 初始化
            conn.init();
            return conn;
        } else {
            // 通过 addrs  获取
            List<Address> addrs = addressResolver.getAddresses();
            Exception lastException = null;
            for (Address addr : addrs) {
                try {
                    // 创建对应的 FrameHandler 
                    FrameHandler handler = fhFactory.create(addr, clientProvidedName);
                    // 创建连接                    
                    AMQConnection conn = createConnection(params, handler, metricsCollector);    
                    conn.start();
                    this.metricsCollector.newConnection(conn);
                    return conn;
                } catch (IOException e) {
                    lastException = e;
                } catch (TimeoutException te) {
                    lastException = te;
                }
            }
            if (lastException != null) {
                if (lastException instanceof IOException) {
                    throw (IOException) lastException;
                } else if (lastException instanceof TimeoutException) {
                    throw (TimeoutException) lastException;
                }
            }
            throw new IOException("failed to connect");
        }
    }

3.1 创建渠道(Channel)

    @Override
    public Channel createChannel() throws IOException {
        // 确定开启状态
        ensureIsOpen();
        ChannelManager cm = _channelManager;
        if (cm == null) return null;
        // 重点可以看下这里: 创建channel
        Channel channel = cm.createChannel(this);
        // 通过 metricsCollector 创建新的channel
        metricsCollector.newChannel(channel);
        return channel;
    }

 createChannel 方法:

        这里主要是通过channelNumberAllocator分配到一个channelNumber,可以理解为一个唯一标识,具体可以自行看一下它的实现

    public ChannelN createChannel(AMQConnection connection) throws IOException {
        ChannelN ch;
        synchronized (this.monitor) {
            // 通过 channelNumberAllocator 获取到一个 channelNumber 
            int channelNumber = channelNumberAllocator.allocate();
            if (channelNumber == -1) {
                return null;
            } else {
                ch = addNewChannel(connection, channelNumber);
            }
        }
        ch.open(); // now that it's been safely added
        return ch;
    }

3.2 创建交换机(Exchange)

    public AMQP.Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments) throws IOException {
        final AMQP.Exchange.DeclareOk ok = delegate.exchangeDeclare(exchange, type, durable, autoDelete, internal, arguments);
        RecordedExchange x = new RecordedExchange(this, exchange).
          type(type).
          durable(durable).
          autoDelete(autoDelete).
          arguments(arguments);
        // 记录当前的交换机
        recordExchange(exchange, x);
        return ok;
    }

进入delegate.exchangeDeclare方法,可以看到控制台会创建成功exchange:

3.3 创建队列(Queue)

    @Override
    public AMQP.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException {
        // 这里执行完成会创建成功队列
        final AMQP.Queue.DeclareOk ok = delegate.queueDeclare(queue, durable, exclusive, autoDelete, arguments);
        RecordedQueue q = new RecordedQueue(this, ok.getQueue()).
            durable(durable).
            exclusive(exclusive).
            autoDelete(autoDelete).
            arguments(arguments);
        if (queue.equals(RecordedQueue.EMPTY_STRING)) {
            q.serverNamed(true);
        }
        recordQueue(ok, q);
        return ok;
    }
    @Override
    public Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive,
                                        boolean autoDelete, Map<String, Object> arguments)
        throws IOException
    {
        validateQueueNameLength(queue);
        return (Queue.DeclareOk)
               // 通过rpc申明 信息
               exnWrappingRpc(new Queue.Declare.Builder()
                               .queue(queue)
                               .durable(durable)
                               .exclusive(exclusive)
                               .autoDelete(autoDelete)
                               .arguments(arguments)
                              .build())
               .getMethod();
    }

可以看到此时虽然创建了queue,但是并未绑定到exchang上面,需要进行下面的绑定

3.4 绑定队列

    @Override
    public AMQP.Queue.BindOk queueBind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException {
        // 绑定队列
        AMQP.Queue.BindOk ok = delegate.queueBind(queue, exchange, routingKey, arguments);
        recordQueueBinding(queue, exchange, routingKey, arguments);
        return ok;
    }
    @Override
    public Queue.BindOk queueBind(String queue, String exchange,
                                  String routingKey, Map<String, Object> arguments)
        throws IOException
    {
        validateQueueNameLength(queue);
        return (Queue.BindOk)    
               // 通过 rpc 申明绑定信息
               exnWrappingRpc(new Queue.Bind.Builder()
                               .queue(queue)
                               .exchange(exchange)
                               .routingKey(routingKey)
                               .arguments(arguments)
                              .build())
               .getMethod();
    }

3.5 发送消息

    @Override
    public void basicPublish(String exchange, String routingKey,
                             boolean mandatory, boolean immediate,
                             BasicProperties props, byte[] body)
        throws IOException
    {
        if (nextPublishSeqNo > 0) {
            unconfirmedSet.add(getNextPublishSeqNo());
            nextPublishSeqNo++;
        }
        if (props == null) {
            props = MessageProperties.MINIMAL_BASIC;
        }
        // 组装消息
        AMQCommand command = new AMQCommand(
            new Basic.Publish.Builder()
                .exchange(exchange)
                .routingKey(routingKey)
                .mandatory(mandatory)
                .immediate(immediate)
                .build(), props, body);
        try {
            // 发送消息
            transmit(command);
        } catch (IOException e) {
            metricsCollector.basicPublishFailure(this, e);
            throw e;
        }
        // 推送当前的channel 进行发布
        metricsCollector.basicPublish(this);
    }

四:总结

发送消息可以理解为以下步骤:

  1. 通过Channel往Rabbit服务端发送消息
  2. 通过PRC申明交换机,队列,绑定等信息
  3. 通过AMQP协议发送消息

   👍如果对你有帮助,给博主一个免费的点赞以示鼓励
欢迎各位🔎点赞👍评论收藏⭐️

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2177405.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用npm link 把一个本地项目变成依赖,引入到另一个项目中

突然有天,发现线上的项目有块功能缺失,我以为是我优化的时候不小心改坏了什么代码,导致的,先上图 第一反应,就以为天塌了,完全无从入手,然后我就找了之前的离职的同事,他又给我两个包,让我打成依赖扔进去,这两个包分别是scratch-blocks,scratch-vm, 然后我就使用了npm link np…

淘宝api上货软件)一刻工具箱,一天上几万不出现爬虫违规,更新开放类目错放功能,淘宝电商必备软件!

天猫淘宝抖音上货神器&#xff0c;助力电商快速铺货 在当今这个信息爆炸、电商飞速发展的时代&#xff0c;如何快速有效地将产品铺货到各大电商平台&#xff0c;成为每一位电商从业者都需要面对的问题。 通过电商API接口能为电商从业者打造的综合辅助工具&#xff0c;支持天猫、…

uniapp学习(002 常用的内置组件)

零基础入门uniapp Vue3组合式API版本到咸虾米壁纸项目实战&#xff0c;开发打包微信小程序、抖音小程序、H5、安卓APP客户端等 总时长 23:40:00 共116P 此文章包含第5p-第p10的内容 文章目录 view组件相当于div标签按下松开例子冒泡例子 text组件 相当于span标签scroll-view纵…

第二百五十八节 JPA教程 - JPA查询选择两个实体示例

JPA教程 - JPA查询选择两个实体示例 以下JPQL从两个实体中选择。 List l em.createQuery("SELECT d, m FROM Department d, Professor m WHERE d m.department").getResultList();例子 以下代码来自Professor.java。 package cn.w3cschool.common;import java.…

想跳槽,我懂你!

在职场的长河中&#xff0c;每个人都是自己航行船的舵手&#xff0c;时而顺流而下享受平静&#xff0c;时而逆流而上追求挑战。跳槽&#xff0c;作为职业生涯中常见且重要的决策之一&#xff0c;往往承载着对现状的不满、对未来的憧憬以及对自我价值的重新定位。本文将从跳槽的…

紫光 FPGA固化RAM位置的操作流程

1. 前提条件&#xff1a;需要已经编译出一个功能完整的没有时序违例的版本出来&#xff1b; 2. 将RAM导出至txt文件&#xff1a; 这个过程需要几分钟&#xff0c;耐心等待一下。 等待提示成功就可以进行下一步操作了。 3. 将【2】中的txt文件中的内容全选复制粘贴到pcf文件的…

离职赔偿一览表-这年头每人都应该备一份

离职赔偿一览表 离职时一定要知道N、N1&#xff0c;2N的计算方法 N&#xff08;经济补偿金&#xff09;、N1&#xff08;经济补偿金代通知金&#xff09;&#xff0c;2N&#xff08;赔偿金&#xff09;其实都是简称。 01 经济补偿金&#xff08;N&#xff09; 经济补偿金工…

led灯什么牌子的质量好?五款市面上非常适合孩子使用的护眼台灯

在当今这个数字化时代&#xff0c;孩子们从小就开始频繁接触各种数码设备&#xff0c;每日长时间面对着电子屏幕。由于疫情的影响&#xff0c;居家上网课更是让孩子们不得不持续面对电子屏幕。而儿童和青少年时期正是眼睛发育的关键阶段&#xff0c;许多孩子因为在这个时期过度…

Python从入门到精通-基础篇

1.Python的起源 1989年&#xff0c;为了打发圣诞节假期&#xff0c;Gudio van Rossum&#xff08;吉多范罗苏姆&#xff08;龟叔&#xff09;&#xff09;决心开发一个新的解释程序&#xff08;Python雏形&#xff09; 1991年&#xff0c;第一个Python解释器诞生 Python这个…

根据ip地址查网页怎么查询?

一、通过命令提示符查询查网页&#xff08;Windows系统&#xff09; ①按“WinR”键&#xff0c;打开运营窗口。 ②输入“cmd”“回车”&#xff0c;打开命令提示符窗口。 ③输入“nslookup ip地址”将ip地址换成查询的实际ip地址“回车” ⑤系统返回输入ip地址对应的域名信息…

Latex 首字母下沉,lettrine冲突报错,手动解决办法

在文章最开始&#xff0c;\usepackage{*}下面&#xff0c;设置两个命令&#xff0c; \newcommand{\calcfirstletterheight}[3]{ % #1 是高度变量&#xff0c;#2 是首字母&#xff0c;#3 是比例变量% 测量首字母的高度\settoheight{#1}{#2} % 测量首字母的高度% 计算比例&…

FristiLeaks靶场打靶记录

一、靶机介绍 靶机下载地址&#xff1a;https://download.vulnhub.com/fristileaks/FristiLeaks_1.3.ova 二、信息收集 扫描靶机ip arp-scan -l 确认靶机ip为&#xff1a;192.168.5.132 扫描端口 nmap -p- -A 192.168.5.132 扫描目录 dirb http://192.168.5.132/ 进入网…

【STM32】 TCP/IP通信协议--LwIP介绍

一、前言 TCP/IP是干啥的&#xff1f;它跟SPI、IIC、CAN有什么区别&#xff1f;它如何实现stm32的通讯&#xff1f;如何去配置&#xff1f;为了搞懂这些问题&#xff0c;查询资料可解决如下疑问&#xff1a; 1.为什么要用以太网通信? 以太网(Ethernet) 是指遵守 IEEE 802.3 …

一体式远程IO:纺织行业数字化转型的新引擎

在纺织行业这片古老而又充满活力的土地上&#xff0c;技术的每一次飞跃都深刻地改变着生产模式与效率。随着纺织技术的飞速发展和人工成本的日益提高&#xff0c;纺织企业正积极寻求通过自动化和智能化手段来降低生产成本、提升市场竞争力。一体式远程IO&#xff08;输入输出&a…

mysql怎么修改一个字段中的所有部分数据

UPDATE videos SET VideoCode replace(VideoCode,flv,mp4); update 表名 set 字段名 replace&#xff08;字段名&#xff0c;‘修改前’&#xff0c;‘修改后’&#xff09;&#xff1b;

Python精选200Tips:183-185

针对序列&#xff08;时间、文本&#xff09;数据的网络结构 P183--循环神经网络&#xff08;RNN, Recurrent Neural Network 1980s&#xff09;&#xff08;1&#xff09;模型结构说明&#xff08;2&#xff09;创新性说明&#xff08;3&#xff09;示例代码&#xff1a;类似古…

无人机在农业方面的应用!

一、提高农业生产效率 通过搭载农业智能传感器和喷洒设备&#xff0c;可以实现对农田的精准施肥和喷药。这种方式不仅减少了农药和化肥的浪费&#xff0c;还降低了对环境的污染&#xff0c;提高了农业生产效率。 无人机利用热、多光谱和高光谱技术&#xff0c;可以高效、准确…

3种方法解决Docker容器中配置运行环境问题

1. dockerfile用于通过脚本生成镜像 2.进入docker容器后&#xff0c;配置环境完&#xff0c;导出容器快照为镜像&#xff0c;拷贝到另一个主机&#xff0c;再进行加载&#xff1b; 3.在本地将依赖库等需要的文件按照目录整理好&#xff0c;映射到docker中。 1. dockerfile用于…

这几个高含金量证书,网工真的该拿

在这个技术日新月异的行业中&#xff0c;不断学习新技能和知识是保持竞争力的关键。 而证书&#xff0c;作为一种专业能力的认证&#xff0c;不仅能够证明你的技术实力&#xff0c;还能为你打开更多的职业发展大门。 在众多的IT认证中&#xff0c;有些证书因其高含金量而备受推…

四川财谷通信息技术有限公司抖音小店领域的强势力量

在数字化浪潮汹涌的今天&#xff0c;电商行业以其独特的魅力和无限潜力&#xff0c;成为了推动经济发展的重要力量。而在这片充满机遇与挑战的电商蓝海中&#xff0c;四川财谷通信息技术有限公司凭借其敏锐的市场洞察、创新的技术实力以及优质的服务品质&#xff0c;迅速崛起为…