RabbitMQ 第一天 基础 4 RabbitMQ 的工作模式 4.1 Work queues 工作队列模式

news2024/11/17 19:37:03

RabbitMQ

【黑马程序员RabbitMQ全套教程,rabbitmq消息中间件到实战】

文章目录

      • RabbitMQ
      • 第一天 基础
      • 4 RabbitMQ 的工作模式
        • 4.1 Work queues 工作队列模式
          • 4.1.1 模式说明
          • 4.1.2 代码编写
          • 4.1.3 小结

第一天 基础

4 RabbitMQ 的工作模式

4.1 Work queues 工作队列模式

4.1.1 模式说明

看看官网

在这里插入图片描述

之前我们 已经完成了 这个简单模式的编写,这种工作模式 是一个生产者 对一个 消费者

不同的工作模式其实就是 指的是 它的消息的路由策略以及 方式 不太一样

比如 Work queues 工作队列 模式

在这里插入图片描述

它就是一个生产者 对应两个【多个】消费者【它们是竞争关系,不是共享:意思就是一条消息只能被 一个消费者消费】

下面就来说说这种 模式

  • Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。

  • 应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度

4.1.2 代码编写

Work Queues 与入门程序的简单模式的代码几乎是一样的。

可以完全复制,并多复制一个消费者进行多个消费者同时对消费消息的测试。

【生产者】

package com.dingjiaxiong.producer;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * ClassName: Producer_WorkQueues
 * date: 2022/11/16 10:58
 * 发送消息
 * @author DingJiaxiong
 */

public class Producer_WorkQueues {

    public static void main(String[] args) throws IOException, TimeoutException {

        //1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2. 设置参数
        factory.setHost("xxxxxxxxxxxxxxxxx"); // 服务器IP【默认本机 localhost】
        factory.setPort(5672); //端口【默认也是 5672】

        factory.setVirtualHost("/ding"); //虚拟机【 默认是 /】
        factory.setUsername("dingjiaxiong"); // 用户名【默认 guest】
        factory.setPassword("12345"); //密码【默认 guest】

        //3. 创建连接 Connection
        Connection connection = factory.newConnection();

        //4. 创建Channel
        Channel channel = connection.createChannel();

        //5. 创建队列Queue
        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         * 参数解释:
         *        1. queue:队列名称
         *        2. durable:是否持久化【当mq 重启之后,还在】
         *        3. exclusive:
         *              - 是否独占。只能有一个消费者监听这队列
         *              - 当Connection 关闭时,是否删除队列
         *        4.autoDelete:是否自动删除【当没有Consumer 时,自动删除掉】
         *        5.arguments:一些参数信息
         * */
        //【如果没有一个名字叫hello_world 的队列,则会创建该队列,如果有则不会创建】
        channel.queueDeclare("work_queues",true,false,false,null);

        //6. 发送消息
        /*
         * basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
         * 参数解释:
         *        1. exchange:交换机名称。简单模式下交换机会使用默认的【""】
         *        2. routingKey:路由名称。
         *        3. props:配置信息
         *        4. body:发送消息数据
         * */

        for (int i = 1; i <= 10; i++) {
            String body = i + "hello,rabbitmq";
            channel.basicPublish("","work_queues",null,body.getBytes());
        }


        //7. 释放资源
        channel.close();
        connection.close();

    }

}

OK, 让它发10 条消息

【消费者】

package com.dingjiaxiong.consumer;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * ClassName: Consumer_WorkQueue1
 * date: 2022/11/16 12:37
 *
 * @author DingJiaxiong
 */


public class Consumer_WorkQueue1 {

    public static void main(String[] args) throws IOException, TimeoutException {

        //1. 创建连接工厂
        ConnectionFactory factory = new ConnectionFactory();

        //2. 设置参数
        factory.setHost("xxxxxxxxxxxxxxxx"); // 服务器IP【默认本机 localhost】
        factory.setPort(5672); //端口【默认也是 5672】

        factory.setVirtualHost("/ding"); //虚拟机【 默认是 /】
        factory.setUsername("dingjiaxiong"); // 用户名【默认 guest】
        factory.setPassword("12345"); //密码【默认 guest】

        //3. 创建连接 Connection
        Connection connection = factory.newConnection();

        //4. 创建Channel
        Channel channel = connection.createChannel();

        //5. 创建队列Queue
        /**
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         * 参数解释:
         *        1. queue:队列名称
         *        2. durable:是否持久化【当mq 重启之后,还在】
         *        3. exclusive:
         *              - 是否独占。只能有一个消费者监听这队列
         *              - 当Connection 关闭时,是否删除队列
         *        4.autoDelete:是否自动删除【当没有Consumer 时,自动删除掉】
         *        5.arguments:一些参数信息
         * */
        //【如果没有一个名字叫hello_world 的队列,则会创建该队列,如果有则不会创建】
        channel.queueDeclare("work_queues",true,false,false,null);

        // 接收消息【消费消息】
        /**
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 参数解释:
         *      1. queue:队列名称
         *      2. autoAck:是否自动确认
         *      3. callback:回调对象
         * */
        Consumer consumer = new DefaultConsumer(channel){
            //回调方法,当收到消息后,会自动执行这个 方法
            /*
            * 1. consumerTag:消息的标识
            * 2. envelope:获取一些 信息【交换机、路由key...】
            * 3. properties: 配置信息
            * 4. body: 数据
            * */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//                System.out.println("consumerTag:" + consumerTag);
//                System.out.println("Exchange:" + envelope.getExchange());
//                System.out.println("RoutingKey:" + envelope.getRoutingKey());
//                System.out.println("properties:" + properties);
                System.out.println("body:" + new String(body));
            }
        };
        channel.basicConsume("work_queues",true,consumer);

        // 不要关闭资源,让它一直监听

    }

}

这是消费者 1

2 是一样的

在这里插入图片描述

OK,这样消费者 也准备好了,先把消费者 跑起来

在这里插入图片描述

OK

现在 直接看看管控台

在这里插入图片描述

队列已经 创建出来了 ,而且有两个 消费者

运行生产者

在这里插入图片描述

OK,生产10 条消息完毕

查看消费者 的输出日志

在这里插入图片描述

没毛病,一人拿了 5 条 ,而且还有规律

这就是 工作队列 的工作模式【分担压力】

4.1.3 小结
  1. 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
  2. Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/113271.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ELK第四讲之【docker安装Logstash8.4.3、集成springboot】

docker安装elasticsearch8.4.3 docker安装kibana8.4.3 一、docker安装logstash8.4.3 官方地址 https://github.com/elastic/logstash/releases 1、拉取镜像 docker pull elastic/logstash:8.4.3 2、启动容器 docker run -it -d --name logstash -p 9600:9600 -p 5044:…

十六、Docker Compose容器编排第一篇

1、概述 Compose 是一个用于定义和运行多容器 Docker 应用程序的工具。使用 Compose&#xff0c;您可以使用 YAML 文件来配置应用程序的服务。然后&#xff0c;使用一个命令&#xff0c;您可以从您的配置中创建并启动所有服务。 Compose 适用于所有环境&#xff1a;生产、暂存、…

node.js+uni计算机毕设项目高校自习室座位网上预约小程序(程序+小程序+LW)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程。欢迎交流 项目运行 环境配置&#xff1a; Node.js Vscode Mysql5.7 HBuilderXNavicat11VueExpress。 项目技术&#xff1a; Express框架 Node.js Vue 等等组成&#xff0c;B/S模式 Vscode管理前后端分离等…

获取淘宝价格区间l-r的商品a的详细信息(商品名等)

看了一眼&#xff0c;上次更新距今2个月&#xff0c;看起来我好咕咕啊&#xff08;感叹&#xff09;&#xff0c;可是感觉这两个月也没闲着捏&#xff08;比赛&#xff0c;cf&#xff0c;期末等等&#xff0c;幸亏期末考延期了&#xff0c;我这被期末作业都整死了快&#xff09…

SpringBoot+Vue项目艺术摄影预约系统设计与实现

文末获取源码 开发语言&#xff1a;Java 使用框架&#xff1a;spring boot 前端技术&#xff1a;JavaScript、Vue.js 、css3 开发工具&#xff1a;IDEA/MyEclipse/Eclipse、Visual Studio Code 数据库&#xff1a;MySQL 5.7/8.0 数据库管理工具&#xff1a;phpstudy/Navicat JD…

Python pandas有几千个库函数,你用过几个?(2)

上一篇链接&#xff1a; Python pandas有几千个库函数&#xff0c;你用过几个&#xff1f;&#xff08;1&#xff09;_Hann Yang的博客-CSDN博客 I~Q&#xff1a; Function10~25 Types[Function][9:25] [infer_freq, interval_range, isna, isnull, json_normalize, lreshap…

微信HOOK 协议接口 实战开发篇 1.登录

使用HOOK也有不短的时间&#xff0c;写的各类接口杂乱无章 于是便有了将所有接口重构&#xff0c;整理一下的想法 顺手将整理的要点作为日志记录下来 预计每类接口写一篇日志&#xff0c;本次使用的是2022.12.24&#xff0c;当前微信最新版3.8.1.26版 言归正传&#xff0c;开始…

【秋招总结】双非本小菜鸡的坎坷秋招之路(附面经)

前言 因为大环境的影响&#xff0c;今年秋招hc骤缩&#xff0c;导致竞争的激烈程度比往年高了不少。 在秋招的时候&#xff0c;经历过简历石沉大海的无奈&#xff0c;也体验过人家收割offer而自己却依旧0offer的焦虑&#xff0c;不过好在最终也拿到了还算满意的结果。 如今我…

python爬虫把数据保存到csv、mysql中

啧&#xff0c;放假几天游戏玩腻了&#xff0c;啥都不想干&#xff0c;突然想起来python这玩意&#xff0c;无聊就来玩玩 目录 先是保存csv里面 然后保存到mysql里 目标&#xff1a;起点 主要是拿到这几个数据 分析下网页 一个li对应一本小说&#xff0c;打开li看里面的东西 …

Android ViewPager2 实现阅读器横向翻页效果(三)--- 实时动态分页及章节切换效果的原理及实现

文章目录Android ViewPager2 实现阅读器横向翻页效果&#xff08;三&#xff09;--- 实时动态分页及章节切换效果的原理及实现关键概念引入初始数据准备ViewPager Adapter 动态分页 及 第一次分页分页后更新窗口 及 首页尾页的特殊处理翻页状态监听 及 动态章节切换Android Vie…

BIT.4 Linux进程控制

目录进程创建fork函数初识写实拷贝fork常规用法fork调用失败的原因补充知识进程终止进程退出场景进程常见退出方法exit函数与_exit函数return 退出补充知识进程等待进程等待必要性进程等待的方法wait方法waitpid方法wait / waitpid 阻塞代码WIFEXITEDwait / waitpid 非阻塞代码…

LeetCode刷题复盘笔记—一文搞懂动态规划之718. 最长重复子数组问题(动态规划系列第三十一篇)

今日主要总结一下动态规划的一道题目&#xff0c;718. 最长重复子数组 题目&#xff1a;718. 最长重复子数组 Leetcode题目地址 题目描述&#xff1a; 给两个整数数组 nums1 和 nums2 &#xff0c;返回 两个数组中 公共的 、长度最长的子数组的长度 。 示例 1&#xff1a; …

华熙LIVE·五棵松商业北区明年国庆亮相 互动体验升级

京城着名的活力聚集地——华熙LIVE五棵松明年将增添两万多平米商业区&#xff0c;新增商业区位于现有商业区北侧并与之相连通&#xff0c;业态在承袭现有沉浸式互动体验业态基础上&#xff0c;将引进元宇宙等前沿科技和跳楼机等娱乐设施&#xff0c;使互动体验进一步升级。项目…

一文搞懂Linux内核中断机制原理与实现

为什么需要中断&#xff1f; 如果让内核定期对设备进行轮询&#xff0c;以便处理设备&#xff0c;那会做很多无用功&#xff0c;因为外设的处理速度一般慢于CPU&#xff0c;而CPU不能一直等待外部事件。所以能让设备在需要内核时主动通知内核&#xff0c;会是一个聪明的方式&a…

JWT渗透与攻防(一)

目录 前言 JWT漏洞介绍 案列演示之Leaky_JWT JWT漏洞具体的实现方式&#xff1a; 案列演示之JWT None Algorithm JWT漏洞工具的利用 JWT利用工具介绍 jwt_tool 漏洞利用 jwt-cracker c-jwt-cracker 前言 Json web token (JWT)相关漏洞对于渗透测试人员而言可能是一种…

node.js+uni计算机毕设项目店内点餐微信小程序LW(程序+小程序+LW)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程。欢迎交流 项目运行 环境配置&#xff1a; Node.js Vscode Mysql5.7 HBuilderXNavicat11VueExpress。 项目技术&#xff1a; Express框架 Node.js Vue 等等组成&#xff0c;B/S模式 Vscode管理前后端分离等…

【Pandas入门教程】如何从现有列派生新列

如何从现有列派生新列 来源&#xff1a;Pandas官网&#xff1a;https://pandas.pydata.org/docs/getting_started/intro_tutorials/index.html 笔记托管&#xff1a;https://gitee.com/DingJiaxiong/machine-learning-study 文章目录如何从现有列派生新列导包数据集准备【1】如…

C++——STL之stack和queue详解

C——STL之stack和queue详解&#x1f3d0;什么是stack和queue&#x1f3d0;stack和queue的实现&#x1f3c0;什么是deque&#x1f3c0;stack的模拟实现&#x1f3c0;queue的模拟实现&#x1f3d0;优先级队列&#xff08;priority_queue)&#x1f3c0;优先级队列的实现⚽push⚽p…

Spring Authorization Server1.0 介绍与使用

一、版本使用 1、Java&#xff1a;17或者更高的版本。 2、springboot 3.0 3、Spring Authorization Server 1.0版本。 <dependency><groupId>org.springframework.security</groupId><artifactId>spring-security-oauth2-authorization-server</ar…

使用proxy_pool来为爬虫程序自动更换代理IP

文章目录1. 前言2. 教程3. 官网4. 在线demo4.1. 本地部署4.2. 安装4.2.1. Python源码构建安装4.2.1.1. 安装redis数据库4.2.1.1.1. 下载redis源码4.2.1.1.2. 启动redis服务4.2.1.1.3. 安装redis服务4.2.1.1.4. 再次通过命令启动redis服务4.2.1.1.5. 测试redis服务是否可用4.2.1…