RabbitMQ
【黑马程序员RabbitMQ全套教程,rabbitmq消息中间件到实战】
文章目录
- RabbitMQ
- 第一天 基础
- 4 RabbitMQ 的工作模式
- 4.1 Work queues 工作队列模式
- 4.1.1 模式说明
- 4.1.2 代码编写
- 4.1.3 小结
第一天 基础
4 RabbitMQ 的工作模式
4.1 Work queues 工作队列模式
4.1.1 模式说明
看看官网
之前我们 已经完成了 这个简单模式的编写,这种工作模式 是一个生产者 对一个 消费者
不同的工作模式其实就是 指的是 它的消息的路由策略以及 方式 不太一样
比如 Work queues 工作队列 模式
它就是一个生产者 对应两个【多个】消费者【它们是竞争关系,不是共享:意思就是一条消息只能被 一个消费者消费】
下面就来说说这种 模式
-
Work Queues:与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。
-
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度
4.1.2 代码编写
Work Queues 与入门程序的简单模式的代码几乎是一样的。
可以完全复制,并多复制一个消费者进行多个消费者同时对消费消息的测试。
【生产者】
package com.dingjiaxiong.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* ClassName: Producer_WorkQueues
* date: 2022/11/16 10:58
* 发送消息
* @author DingJiaxiong
*/
public class Producer_WorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("xxxxxxxxxxxxxxxxx"); // 服务器IP【默认本机 localhost】
factory.setPort(5672); //端口【默认也是 5672】
factory.setVirtualHost("/ding"); //虚拟机【 默认是 /】
factory.setUsername("dingjiaxiong"); // 用户名【默认 guest】
factory.setPassword("12345"); //密码【默认 guest】
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
/**
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* 参数解释:
* 1. queue:队列名称
* 2. durable:是否持久化【当mq 重启之后,还在】
* 3. exclusive:
* - 是否独占。只能有一个消费者监听这队列
* - 当Connection 关闭时,是否删除队列
* 4.autoDelete:是否自动删除【当没有Consumer 时,自动删除掉】
* 5.arguments:一些参数信息
* */
//【如果没有一个名字叫hello_world 的队列,则会创建该队列,如果有则不会创建】
channel.queueDeclare("work_queues",true,false,false,null);
//6. 发送消息
/*
* basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
* 参数解释:
* 1. exchange:交换机名称。简单模式下交换机会使用默认的【""】
* 2. routingKey:路由名称。
* 3. props:配置信息
* 4. body:发送消息数据
* */
for (int i = 1; i <= 10; i++) {
String body = i + "hello,rabbitmq";
channel.basicPublish("","work_queues",null,body.getBytes());
}
//7. 释放资源
channel.close();
connection.close();
}
}
OK, 让它发10 条消息
【消费者】
package com.dingjiaxiong.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* ClassName: Consumer_WorkQueue1
* date: 2022/11/16 12:37
*
* @author DingJiaxiong
*/
public class Consumer_WorkQueue1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2. 设置参数
factory.setHost("xxxxxxxxxxxxxxxx"); // 服务器IP【默认本机 localhost】
factory.setPort(5672); //端口【默认也是 5672】
factory.setVirtualHost("/ding"); //虚拟机【 默认是 /】
factory.setUsername("dingjiaxiong"); // 用户名【默认 guest】
factory.setPassword("12345"); //密码【默认 guest】
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5. 创建队列Queue
/**
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* 参数解释:
* 1. queue:队列名称
* 2. durable:是否持久化【当mq 重启之后,还在】
* 3. exclusive:
* - 是否独占。只能有一个消费者监听这队列
* - 当Connection 关闭时,是否删除队列
* 4.autoDelete:是否自动删除【当没有Consumer 时,自动删除掉】
* 5.arguments:一些参数信息
* */
//【如果没有一个名字叫hello_world 的队列,则会创建该队列,如果有则不会创建】
channel.queueDeclare("work_queues",true,false,false,null);
// 接收消息【消费消息】
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* 参数解释:
* 1. queue:队列名称
* 2. autoAck:是否自动确认
* 3. callback:回调对象
* */
Consumer consumer = new DefaultConsumer(channel){
//回调方法,当收到消息后,会自动执行这个 方法
/*
* 1. consumerTag:消息的标识
* 2. envelope:获取一些 信息【交换机、路由key...】
* 3. properties: 配置信息
* 4. body: 数据
* */
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag:" + consumerTag);
// System.out.println("Exchange:" + envelope.getExchange());
// System.out.println("RoutingKey:" + envelope.getRoutingKey());
// System.out.println("properties:" + properties);
System.out.println("body:" + new String(body));
}
};
channel.basicConsume("work_queues",true,consumer);
// 不要关闭资源,让它一直监听
}
}
这是消费者 1
2 是一样的
OK,这样消费者 也准备好了,先把消费者 跑起来
OK
现在 直接看看管控台
队列已经 创建出来了 ,而且有两个 消费者
运行生产者
OK,生产10 条消息完毕
查看消费者 的输出日志
没毛病,一人拿了 5 条 ,而且还有规律
这就是 工作队列 的工作模式【分担压力】
4.1.3 小结
- 在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关系是竞争的关系。
- Work Queues 对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。例如:短信服务部署多个,只需要有一个节点成功发送即可。