【MQ工作队列模式】

news2024/9/21 10:50:44
1 、模式介绍
Work Queues :与入门程序的简单模式相比,多了一个或一些消费端,
多个消费端共同消费同一个队列中的消息。
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处
理的速度。
小结 :
1 、在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关
系是 竞争 的关系
2 Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任
务处理的速度。例如:短信服务部署多个,
只需要有一个节点成功发送即可。

 

2 、代码实现
Work Queues 与入门程序的简单模式的代码几乎是一样的。可以完全复
制,并多复制一个消费者进行多
个消费者同时对消费消息的测试。
1 、生产者
生产者代码 Producer_WorkQueues:
package com.dxw.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者:发送消息
*/
public class Producer_WorkQueues {
public static void main(String[] args) throws
IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new
ConnectionFactory();
//2、设置参数
factory.setHost("localhost");//ip 默认localhost
factory.setPort(5672);//端口 默认5672
factory.setVirtualHost("/dxw");//虚拟机 默认/
factory.setUsername("dxw");//用户名 默认guest
factory.setPassword("1234");//密码 默认guest
//3、创建连接
Connection connection = factory.newConnection();
//4、创建Channel
Channel channel = connection.createChannel();
//5、创建队列
/*
* 参数解释:
* queueDeclare(String queue,
* boolean durable,
* boolean exclusive,
* boolean autoDelete,
* Map<String, Object> arguments)
* 1. queue:队列名称
* 如果没有一个名字叫hello_world的队列,则会创建该队
列,如果有则不会创建
* 2. durable:是否持久化,当mq重启之后,队列中消息还在
* 3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
* 4. autoDelete:是否自动删除。当没有Consumer时,自动
删除掉
* 5. arguments:参数。
*/
channel.queueDeclare("work_queues",true,false,false,null)
;
//6、发送消息
/*
* 参数解释:
* basicPublish(String exchange,
* String routingKey,
* BasicProperties props,
* byte[] body)
* 1. exchange:交换机名称。简单模式下交换机会使用默认的
""
* 2. routingKey:路由名称
* 3. props:配置信息
* 4. body:发送消息数据
启动生产者观察控制台
2、消费者
第一个消费者代码Consumer_WorkQueues1:
*/
for(int i=1;i<=10;i++){
String body = i+"hello rabbitmq~~~";
channel.basicPublish("","work_queues",null,body.getBytes(
));
}
//7、释放资源
//channel.close();
//connection.close();
}
}
启动生产者观察控制台

 

2 、消费者
第一个消费者代码 Consumer_WorkQueues1:
package com.dxw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:接收消息
*/
public class Consumer_WorkQueues1 {
public static void main(String[] args) throws
IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new
ConnectionFactory();
//2. 设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/dxw");//虚拟机 默认/
factory.setUsername("dxw");//用户名 默认guest
factory.setPassword("1234");//密码 默认guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5、创建队列
/*
* 参数解释:
* queueDeclare(String queue,
* boolean durable,
* boolean exclusive,
* boolean autoDelete,
* Map<String, Object> arguments)
* 1. queue:队列名称
* 如果没有一个名字叫hello_world的队列,则会创建该
队列,如果有则不会创建
* 2. durable:是否持久化,当mq重启之后,队列中消息还在
* 3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
* 4. autoDelete:是否自动删除。当没有Consumer时,自
动删除掉
* 5. arguments:参数。
*/
channel.queueDeclare("work_queues",true,false,false,null)
;
//6、接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag,
Envelope envelope, AMQP.BasicProperties properties, byte[]
body) throws IOException {
/*System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey(
));
System.out.println("properties:"+properties);*/
System.out.println("body:"+new
String(body));
}
};
/*
* 参数解释:
* basicConsume(String queue, boolean autoAck,
Consumer callback)
* 1. queue:队列名称
* 2. autoAck:是否自动确认
* 3. callback:回调对象
*/
channel.basicConsume("work_queues",true,consumer);
//关闭资源?不要
}
}
第二个消费者代码 Consumer_WorkQueues2:
package com.dxw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:接收消息
*/
public class Consumer_WorkQueues2 {
public static void main(String[] args) throws
IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new
ConnectionFactory();
//2. 设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/dxw");//虚拟机 默认/
factory.setUsername("dxw");//用户名 默认guest
factory.setPassword("1234");//密码 默认guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5、创建队列
/*
* 参数解释:
* queueDeclare(String queue,
* boolean durable,
* boolean exclusive,
* boolean autoDelete,
* Map<String, Object> arguments)
* 1. queue:队列名称
* 如果没有一个名字叫hello_world的队列,则会创建该
队列,如果有则不会创建
* 2. durable:是否持久化,当mq重启之后,队列中消息还在
* 3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
* 4. autoDelete:是否自动删除。当没有Consumer时,自
动删除掉
* 5. arguments:参数。
*/
channel.queueDeclare("work_queues",true,false,false,null)
;
//6、接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag,
Envelope envelope, AMQP.BasicProperties properties, byte[]
body) throws IOException {
/*System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey(
));
System.out.println("properties:"+properties);*/
System.out.println("body:"+new
String(body));
}
};
/*
* 参数解释:
* basicConsume(String queue, boolean autoAck,
Consumer callback)
* 1. queue:队列名称
* 2. autoAck:是否自动确认
* 3. callback:回调对象
注意:先启动两个消费者,然后再启动生产者,然后观察两个生产者控制台输出
3、Pub/Sub订阅模式
1、模式介绍
在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
⚫ P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是
发给X(交换机)
⚫ C:消费者,消息的接收者,会一直等待消息到来
⚫ Queue:消息队列,接收消息、缓存消息
⚫ Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方
面,知道如何处理消息,例如递交给某个特别队 列、递交给所有队列、
*/
channel.basicConsume("work_queues",true,consumer);
//关闭资源?不要
}
}
注意 : 先启动两个消费者 , 然后再启动生产者 , 然后观察两个生产者控制台输出

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/13107.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

初学Nodejs(3):http模块

初学Nodejs http模块 1、概念 什么是客户端与服务端 在网络节点中&#xff0c;负责消费资源的电脑&#xff0c;叫做客户端&#xff1b;负责对外提供网络资源的电脑叫做服务器 http模块是Nodejs官方提供的、用来创建web服务器的模块。通过http模块提供的http.createServe()方…

[附源码]java毕业设计流浪动物救助系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

活动sql语句索引基本优化

前言 最近接到了一个需求开发&#xff0c;然后开发完成以后打算对sql进行一些优化&#xff0c;于是等所有功能开发完成以后对mapper文件里面的sql,和service层的查询语句都摘出来&#xff0c;然后设计了一些索引&#xff0c;下面就来说说一些大概的优化思路&#xff0c;至于mys…

WPF上位机通信组件与Modbus协议

1、Modbus通信方式与分类 - 串口 RS485&#xff08;一主多从&#xff09;&#xff1a;不同的报文格式&#xff1a;ModbusAscii&#xff08;ASCII字符方式进行发送&#xff09;、ModbusRTU&#xff08;Remote Terminal Unit&#xff09; - 以太网&#xff08;TCP点对点&#…

[博士后申请]套磁信的五大误区

博士后申请有一些技巧需要注意&#xff0c;下面就随知识人网一起来看看博士后申请套磁信的五大误区。 误区一&#xff1a;字数越多越好 Email字数控制在200字左右。教授每天处理上百封邮件&#xff0c;简单明了的邮件内容是为别人节约时间的一种礼貌;简短易回复的信件也会加大…

supervisor常见报错问题处理及使用教程

Supervisor 是用Python开发的一套通用的进程管理程序&#xff0c;能将一个普通的命令行进程变为后台daemon&#xff0c;并监控进程状态&#xff0c;异常退出时能自动重启。 官网介绍 Supervisor已经过测试&#xff0c;可以在Linux&#xff08;Ubuntu 9.10&#xff09;&#xf…

MySql常见复合查询(重点)

复合查询&#xff08;重点&#xff09; 多表查询 实际开发中往往数据来自不同的表&#xff0c;所以需要多表查询。本节我们用一个简单的公司管理系统&#xff0c;有三张表 EMP,DEPT,SALGRADE来演示如何进行多表查询。 显示雇员名、雇员工资以及所在部门的名字因为上面的数据来…

如何解决Web前端安全问题?

我国网络技术水平的提升&#xff0c;带动着WEB前端业务量的显著增长&#xff0c;人们对于网络服务的需求也日益复杂&#xff0c;与此同时&#xff0c;越来越多的黑客出现&#xff0c;其攻击水平也有了明显提升&#xff0c;WEB前端也成为了众多黑客进行网络攻击的主要目标。 因…

什么是零代码?零代码与低代码有什么联系与区别?

传统的软件研发方式目前并不能很好地满足企业的需求&#xff1a;人员成本高、研发时间长、运维复杂。 这时零代码或低代码工具出现在市面上并被关注就是必然趋势了。对于不太了解两者的人来说&#xff0c;零代码和低代码是什么&#xff1f;又有什么联系与区别&#xff1f; 01 …

uni小程序——评论、文本域、发送、键盘调起、有值后按钮变色等

一、简介 文本域默认显示一行&#xff0c;最多显示4行&#xff0c;到了4行之后不再增高。 输入值后按钮变色 二、案例演示 三、代码 <template><view><view class"plBox"><textarea auto-height"true" maxlength"-1" :s…

[Linux安装软件详解系列]04 安装Redis

目录1、查看服务器是否已安装Redis2、安装Redis1&#xff09;下载2&#xff09;解压3&#xff09;安装4&#xff09;移动配置文件到安装目录下5&#xff09;配置redis为后台启动6&#xff09;将redis-cli&#xff0c;redis-server拷贝到bin下7&#xff09;启动redis8&#xff0…

RabbitMQ简介及在Linux中安装部署(yum)

一、RabbitMQ简介及其作用 RabbitMQ简介 RabbitMQ是在2007 年发布&#xff0c;是一个在 AMQP(高级消息队列协议)基础上完成的&#xff0c;可复用的企业消息系统&#xff0c;是当前最主流的消息中间件之一。RabbitMQ是一个由erlang开发的AMQP&#xff08;Advanced Message Queu…

Arcpy入门教程01:从零开始制作一个arcpy脚本

从零开始制作一个arcpy脚本 文章目录 需求分析代码实现构造临时工作目录数据处理过程及API解析脚本打包代码封装在红盒子中创建脚本报错提醒 EOL while scanning string literal完整代码需求分析 我们现在有一个GDB存储这西安市各个区的绿地面的GDB,以及碑林区和新城区的行政…

将时间序列转成图像——相对位置矩阵方法 Matlab实现

目录 1 方法 2 Matlab代码实现 3.结果 【若觉文章质量良好且有用&#xff0c;请别忘了点赞收藏加关注&#xff0c;这将是我继续分享的动力&#xff0c;万分感谢&#xff01;】 其他&#xff1a; 1.时间序列转二维图像方法及其应用研究综述_vm-1215的博客-CSDN博客 2.将时…

Nginx实现负载均衡

目录 一、环境准备 1、准备3台centos服务器 2、软件安装 二、负载均衡配置 三、其他分配策略 1、fair&#xff08;第三方&#xff09; 一、环境准备 1、准备3台centos服务器​​​​​​​ 服务器名称主机名IP安装服务备注Nginx反向代理服务器proxy192.168.1.10nginx关…

ES倒排序索引

前言 在学习Elasticsearch的使用前&#xff0c;我们先来了解下es是如何实现全文搜索的。 倒排索引是 Elasticsearch 中非常 重要的索引结构&#xff0c;从 文档单词到文档 ID 的过程 为什么要使用倒排索引 先看下面的商品数据goods id 标题 描述 1 小米手机 小米手机性…

【保姆级】新机器部署Redis

1、登录服务器&#xff0c;如果非root用户则切root用户 sudo su - 2、安装gcc yum install gcc-c 3、在/usr/tmp目录上传redis安装包 4、将安装包移到/opt/byd目录 mv redis-4.0.11.tar.gz /opt/byd 5、解压 & 重命名 tar -xzvf redis-4.0.11.tar.gz mv redis-4.0.11 …

安全狗受邀出席CIS 2022网络安全创新大会

11月16日&#xff0c;由网络安全行业门户Freebuf主办的CIS 2022网络安全创新大会&#xff08;简称CIS&#xff09;在上海主会场顺利开幕。 作为国内云原生安全领导厂商&#xff0c;安全狗也收到邀请出席此次活动。 据悉&#xff0c;此次大会分为上海、北京、深圳等多个会场&am…

Pytorch中的DDP

一. 概览 DDP的原理&#xff1f; 在分类上&#xff0c;DDP属于Data Parallel。简单来讲&#xff0c;就是通过提高batch size来增加并行度。为什么快&#xff1f; DDP通过Ring-Reduce的数据交换方法提高了通讯效率&#xff0c;并通过启动多个进程的方式减轻Python GIL的限制&am…

2022-11-17 mysql列存储引擎-聚合运算中间结果缓存磁盘文件以避免OOM-需求分析

摘要: mysql列存储引擎-聚合运算中间结果缓存磁盘文件以避免OOM-需求分析 关联ISSUE: https://github.com/stoneatom/stonedb/issues/21 需求分析ISSUE: https://github.com/stoneatom/stonedb/issues/949 上下文说明: 当前聚合运算的结果都缓存在了内存的HASH中, 一旦数据量…