RabbitMQ:消息分发模型

news2024/11/22 6:30:27

Work queues,也被称为 Task queues,任务模型,当消息处理比较耗时时的时候,可能产生消息的速度会远远大于消息的消费速度,消息会堆积越来越多,无法及时处理,此时就可以使用work模型:让多个消费者绑定一个队列,共同消费队列中的消息,队列中的消息一旦消费,就会消失,因此任务不会被重复执行。

RabbitMQ 单生产单消费模型主要有以下三个角色构成:

  • 生产者(producer/ publisher):一个发送消息的用户应用程序。
  • 消费者1(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序,领取任务并完成
  • 消费者2(consumer):领取任务并完成…
  • 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

本篇内容包括:RabbitMQ 消息分发模型、RabbitMQ 消息分发模型实现、RabbitMQ 手动消息确认


文章目录

    • 一、RabbitMQ 消息分发模型
        • 1、消息分发模型(Work Queue 模型)
        • 2、消息分发模型组成
    • 二、RabbitMQ 消息分发模型实现
        • 1、添加 Maven 依赖
        • 2、封装工具类 ConnectionUtil
        • 3、生产者实现
        • 4、消费者-1 实现
        • 5、消费者-2 实现
        • 6、消息队列的循环机制
    • 三、RabbitMQ 手动消息确认
        • 1、消费者-1 实现
        • 2、消费者-2 实现
        • 3、实现能者多劳


一、RabbitMQ 消息分发模型

1、消息分发模型(Work Queue 模型)

Work queues,也被称为 Task queues,任务模型,当消息处理比较耗时时的时候,可能产生消息的速度会远远大于消息的消费速度,消息会堆积越来越多,无法及时处理,此时就可以使用work模型:让多个消费者绑定一个队列,共同消费队列中的消息,队列中的消息一旦消费,就会消失,因此任务不会被重复执行。

image-20221212152117164

2、消息分发模型组成

RabbitMQ 单生产单消费模型主要有以下四个角色构成:

  • 生产者(producer/ publisher):一个发送消息的用户应用程序。
  • 消费者1(consumer):消费和接收有类似的意思,消费者是一个主要用来等待接收消息的用户应用程序,领取任务并完成
  • 消费者2(consumer):领取任务并完成…
  • 队列:RabbitMQ 内部类似于邮箱的一个概念。虽然消息流经 RabbitMQ 和你的应用程序,但是它们只能存储在队列中。队列只受主机的内存和磁盘限制,实质上是一个大的消息缓冲区。许多生产者可以发送消息到一个队列,许多消费者可以尝试从一个队列接收数据。

二、RabbitMQ 消息分发模型实现

1、添加 Maven 依赖

# 在 pom.xml 文件中添加以下依赖

<dependencies>
    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.16.0</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.13.2</version>
        <scope>test</scope>
    </dependency>
  	...
</dependencies>

2、封装工具类 ConnectionUtil

# 连接工具类封装-ConnectionUtil

package com.lizhengi.work.demo1;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @author liziheng
 * @version 1.0.0
 * @description 连接工具类封装
 * @date 2022-12-23 11:29 上午
 **/
public class ConnectionUtil {

    /**
     * 创建 MQ 连接工厂对象
     */
    private static final ConnectionFactory CONNECTION_FACTORY;

    // 类加载时,重量级资源加载
    static {
        CONNECTION_FACTORY = new ConnectionFactory();
        // 设置连接MQ主机
        CONNECTION_FACTORY.setHost("127.0.0.1");
        // 设置连接MQ端口号
        CONNECTION_FACTORY.setPort(5672);
        // 设置连接MQ虚拟主机
        CONNECTION_FACTORY.setVirtualHost("/test");
        // 设置连接MQ虚拟主机的用户名密码
        CONNECTION_FACTORY.setUsername("admin");
        CONNECTION_FACTORY.setPassword("123456");
    }

    /**
     * 建立与RabbitMQ连接 工具方法
     *
     * @return Connection
     */
    public static Connection getConnection() {
        try {
            // 返回连接对象
            return CONNECTION_FACTORY.newConnection();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 关闭通道和连接 工具方法
     *
     * @param channel    Channel
     * @param connection Connection
     */
    public static void closeConnectionAndChanel(Channel channel, Connection connection) {
        try {
            if (channel != null) {
                channel.close();
            }
            if (connection != null) {
                connection.close();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

3、生产者实现

package com.lizhengi.work.demo1;

import com.lizhengi.hello.demo2.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import org.testng.annotations.Test;

import java.io.IOException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description Rabbit 消息分发模型 生产者
 * @date 2022-12-23 11:32 上午
 **/
public class Producer {

    /**
     * 消息生产
     *
     * @throws IOException IOException
     */
    @Test
    public void testSendMessage() throws IOException {
        // 获取连接对象
        Connection connection = ConnectionUtil.getConnection();
        // 获取通道
        assert connection != null;
        Channel channel = connection.createChannel();
        // 通过通道声明队列
        channel.queueDeclare("WorkQueue", true, false, false, null);
        // 生产消息
        for (int i = 0; i < 100; i++) {
            channel.basicPublish("", "WorkQueue", null, ("Hello RabbitMQ " + i).getBytes());
        }
        // 资源关闭
        ConnectionUtil.closeConnectionAndChanel(channel, connection);
    }
}

4、消费者-1 实现

package com.lizhengi.work.demo1;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description Rabbit 消息分发模型 消费者-1
 * @date 2022-12-23 11:32 上午
 **/
public class Customer1 {
    public static void main(String[] msg) throws IOException {

        Connection connection = ConnectionUtil.getConnection();

        assert connection != null;
        Channel channel = connection.createChannel();

        channel.queueDeclare("WorkQueue", true, false, false, null);

        channel.basicConsume("WorkQueue", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("消费者1:" + new String(body));
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });

    }
}

5、消费者-2 实现

package com.lizhengi.work.demo1;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description Rabbit 消息分发模型 消费者-2
 * @date 2022-12-23 11:32 上午
 **/
public class Customer2 {
    public static void main(String[] msg) throws IOException {

        Connection connection = ConnectionUtil.getConnection();

        assert connection != null;
        Channel channel = connection.createChannel();

        channel.queueDeclare("WorkQueue", true, false, false, null);

        channel.basicConsume("WorkQueue", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
                System.out.println("消费者2:" + new String(body));
            }
        });

    }
}

6、消息队列的循环机制

运行上面的消费者与生产者,观察消费者打印:

消费者1:Hello RabbitMQ 1
消费者1:Hello RabbitMQ 3
消费者1:Hello RabbitMQ 5
消费者1:Hello RabbitMQ 7
消费者1:Hello RabbitMQ 9

消费者2:Hello RabbitMQ 0
消费者2:Hello RabbitMQ 2
消费者2:Hello RabbitMQ 4
消费者2:Hello RabbitMQ 6
消费者2:Hello RabbitMQ 8

可以观察到,两个消费者依次打印了队列中的消息,哪怕是在其中一个加上 Thread.sleep(1000); 依然是这种循环机制!


三、RabbitMQ 手动消息确认

1、消费者-1 实现

package com.lizhengi.work.demo2;

import com.lizhengi.work.demo1.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description Rabbit 消息分发模型 消费者-1
 * @date 2022-12-23 11:32 上午
 **/
public class Customer1 {
    public static void main(String[] msg) throws IOException {

        Connection connection = ConnectionUtil.getConnection();

        assert connection != null;
        Channel channel = connection.createChannel();
        // 设置每次只能消费一个消息
        channel.basicQos(1);

        channel.queueDeclare("WorkQueue", true, false, false, null);
        // 参数2:自动消息确认关闭
        channel.basicConsume("WorkQueue", true, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                try {
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
                System.out.println("消费者1:" + new String(body));
                /*  手动消息确认
                 *  参数1:确认队列中的那个具体消息
                 *  参数2:是否开启多个消息同时确认
                 */
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

    }
}

2、消费者-2 实现

package com.lizhengi.work.demo2;

import com.lizhengi.work.demo1.ConnectionUtil;
import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @author liziheng
 * @version 1.0.0
 * @description Rabbit 消息分发模型 消费者-2
 * @date 2022-12-23 11:32 上午
 **/
public class Customer2 {
    public static void main(String[] msg) throws IOException {

        Connection connection = ConnectionUtil.getConnection();

        assert connection != null;
        Channel channel = connection.createChannel();

        //设置每次只能消费一个消息
        channel.basicQos(1);

        channel.queueDeclare("WorkQueue", true, false, false, null);
        channel.basicConsume("WorkQueue", false, new DefaultConsumer(channel) {
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("消费者2:" + new String(body));
                channel.basicAck(envelope.getDeliveryTag(), false);
            }
        });

    }
}

3、实现能者多劳

运行上面的消费者与生产者,观察消费者打印:

消费者1:Hello RabbitMQ 0
消费者1:Hello RabbitMQ 2
消费者1:Hello RabbitMQ 3
消费者1:Hello RabbitMQ 4
消费者1:Hello RabbitMQ 5
消费者1:Hello RabbitMQ 6
消费者1:Hello RabbitMQ 7

消费者2:Hello RabbitMQ 1
消费者2:Hello RabbitMQ 64
消费者2:Hello RabbitMQ 76
消费者2:Hello RabbitMQ 89

可以观察到,消费者1开启了手动消费机制后进行了更大比重消息数量的消费!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/115350.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

圣诞的荒诞小故事并记录互联网协议-五层模型

今天敲代码敲着敲着灵光乍现&#xff0c;突然一个荒诞的故事&#x1f4a1;映入脑海。 1.未来和过去&#xff1a; 人高度发达&#xff08;以下称之为渡&#xff09; 渡可以打开时空穿越过去&#xff08;以下称之为旧迹&#xff09;&#xff0c;并且可以进随心所欲的来去自如&a…

【微服务】Nacos的寻址机制

目录 一、 Nacos的寻址机制 1、前提 2、设计 3、内部实现 3.1、单机寻址 3.2、文件寻址 3.3、地址服务器寻址 4、未来可扩展点 4.1、集群节点自动扩缩容 &#x1f496; Spring家族及微服务系列文章 一、 Nacos的寻址机制 1、前提 Nacos 支持单机部署以及集群部署&am…

xxl-job基本原理以及底层流程讲解

任务执行策略 任务阻塞策略 整体架构部署 这里主要讲解下每个模块的作用 调度模块&#xff1a;负责管理调度信息&#xff0c;按照调度配置发出调度请求&#xff0c;自身不承担任何业务代码。调度系统于任务解耦&#xff0c;提高了系统可用性和稳定性&#xff0c;同时调度系统性…

MFC工程的文件说明

工程创建 使用VS创建一个MFC的工程&#xff0c;这里不做说明 文件说明 使用VS创建好的MFC工程有如下文件&#xff1a; MFC全称Microsoft Foundation Classes&#xff0c;也就是微软基础类库;是VC的核心&#xff0c;是C与Windows API的结合&#xff0c;很彻底的用C封装了Win…

这个医生说的防疫措施,我挺认可的

上面那个语音是一个朋友发给我的&#xff0c;语音时间比较长&#xff0c;但是里面讲的很多内容我觉得挺不错的&#xff0c;现在疫情反复&#xff0c;我们会听到很多关于疫情的信息&#xff0c;有的人说奥密克戎感染性很强&#xff0c;之前专家说的无症状是骗人的&#xff0c;根…

元宇宙产业委联席秘书长叶毓睿:去中心化和去中介化的定义、区别,以及和元宇宙的关系

原创 Peter Ye 转自&#xff1a;乐生活与爱IT Plus 近日有个有关元宇宙的线上分享&#xff0c;有位名叫谢晓雪的听众提了一个我之前没思考过的问题&#xff1a;去中心化和去中介化的区别&#xff1f; 当时我回答了一部分&#xff0c;但主要是讲的之间的联系&#xff0c;区…

【年终总结】求职面试一定要扬长避短

时光荏苒&#xff0c;这周日就是元旦了&#xff0c;我也把年终总结提上了日程。 前言 今年的年终总结我打算多写几篇&#xff0c;每篇瞄准一个方向&#xff0c;写一些对大家有帮助、有启发的内容。 初步的想法会整理三篇&#xff1a; 第一篇分享求职面试的经验第二篇分享接私…

Mybaits(环境搭建和基本使用)

目录   一、什么是 Mybaits   二、配置环境     2.1 导入 MyBatis Framework     2.2 连接 MyBatis   三、增删改查功能     3.1 创建实体类     3.2 select     3.3 delete 和 update     3.4 insert   四、SQL 注入     4.1 什么是 SQL…

2022年终总结、展望2023

2022年终总结、展望2023前言一、2022年工作成绩二、2022年工作不足即如何改进三、可以传承的工作方法或者经验四、2023年工作目标 &#xff08;目标细化、可落地&#xff09;<font colorred> 1、薪资待遇2、云端高效的实时智能视频处理平台架构图3、 云端高效的实时智能视…

Redis从部署群集到ASK路由

目录 数据库简介 一、数据库分类 二、Redis重要特性 三、redis应用场景 安装redis redis基本命令 redis持久化 redis主从复制 redis集群 群集实施 配置节点发现 Redis Cluster 通讯流程 Redis Cluster手动分配槽位 Redis Cluster ASK路由介绍 模拟故障转移 自动搭建部署Redis C…

前缀.因式分解.求和 .C

前缀和:在输入同时获得结果. s[i]s[i-1]input(a[i]). 区间和:前缀做差 Sum[A,B]s[B]-s[A-1]. for(i1,i<n,i){ input(a[i]); s[i]s[i-1]input(a[i]). input(a,b); counts[b]-s[a-1]; } 分解伪代码 Sa2*a1a3*(a2a1)a4*(a3a2a1)a5*(a4a3a2a1) 因式分解复杂度on; >&g…

一文告诉你如何选择低代码供应商?

低代码&#xff08;零代码&#xff09;软件平台、套件、工具和相关服务正在快速地广泛普及和扩展。现在许多人都知道&#xff0c;低代码软件解决方案提供的加速器和自动化&#xff0c;可以加速软件应用程序开发人员的工作……这就意味着&#xff08;在这个开发人员匮乏的星球上…

动态规划算法典型例题

这里写目录标题1、动态规划算法2、动态规划&分治3、动态规划算法典型例题3.1选数问题3.1.1递归解法3.1.2动态规划解法3.2最长公共子序列3.3钢条切割问题3.3.1递归解法3.3.2动态规划解法3.4斐波那契数列3.4.1递归解法3.4.2递归解法3.5背包问题&#xff08;0-1背包&#xff0…

SRS服务器搭建以及展现配置说明

对于企业而言&#xff0c;数字化建设是一项全面的、系统的工程&#xff0c;不仅仅只是部署几套软件、实现办公自动化而已&#xff0c;尤其是大型企业&#xff0c;数字化的建设往往涉及到了服务器、硬件、软件、网络等一系列内容。如门禁系统和人力、认证等系统集成&#xff0c;…

计算机视觉与图形学-神经渲染专题-非刚体NeRF II

《TiNeuVox:Fast Dynamic Radiance Fields with Time-Aware Neural Voxel》链接&#xff1a;https://jaminfong.cn/tineuvox/摘要作者通过表示具有时间感知体素特征的场景提出了一个辐射场框架&#xff0c;并将其命名为 TiNeuVox。作者引入了一个微小的坐标变形网络来模拟粗略的…

Flink-使用合流操作进行实时对账需求的实现

学Flink第八章多流转换的时候&#xff0c;进行合流操作.connect()使用到了第九章状态编程的知识&#xff0c;感觉总体不是很清晰&#xff0c;因此学完状态编程后现在进行重温并细化一些细节 业务背景 步骤一&#xff1a; 用户进行支付的时候&#xff0c;后台是需要调用第三方…

leetcode 834. Sum of Distances in Tree(树中的距离和)

无向连接的树&#xff08;不一定是二叉树&#xff09;&#xff0c;求每个节点到其他节点的距离和。 返回一个数组&#xff0c;数组的第i个元素就是第i个节点到其他所有节点的距离之和。 思路&#xff1a; 涉及无向图的构造和遍历&#xff0c;树的前序后序遍历&#xff0c;问题…

论文复现-1:Perturbation CheckLists for Evaluating NLG Evaluation Metrics

以data2text任务为例&#xff0c;探讨generation metric矩阵对于一些句子扰动是否敏感&#xff0c;在多个维度上的敏感性如何&#xff1f; 1数据集 data2text数据集是由3025条samples构成&#xff0c;关键词由“ID”和“reference”构成。 每个子任务由对应的criteria&#…

python基础语法19-calendar模块

一、简介 有了time及datetime模块&#xff0c;再结合日历&#xff08;Calendar&#xff09;模块就可以更好的覆盖到时间处理的各个方面的应用。日历模块主要是用于处理日历及星期相关操作。 calendar模块的内置函数如下: 序号 函数及描述 1 calendar.calendar(yea…

Keras深度学习实战(42)——强化学习基础

Keras深度学习实战&#xff08;42&#xff09;——强化学习基础0. 前言1. 强化学习基础1.1 基本概念1.2 马尔科夫决策过程1.3 目标函数2. 在具有非负奖励的模拟游戏中获取最佳动作2.1 问题设定2.2 模型分析2.3 模型构建与训练3. 在模拟游戏中获取最佳动作3.1 问题定义3.2 模型分…