1
、模式介绍
⚫
Work Queues
:与入门程序的简单模式相比,多了一个或一些消费端,
多个消费端共同消费同一个队列中的消息。
⚫
应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处
理的速度。
小结
:
1
、在一个队列中如果有多个消费者,那么消费者之间对于同一个消息的关
系是
竞争
的关系
2
、
Work Queues
对于任务过重或任务较多情况使用工作队列可以提高任
务处理的速度。例如:短信服务部署多个,
只需要有一个节点成功发送即可。

2
、代码实现
Work Queues
与入门程序的简单模式的代码几乎是一样的。可以完全复
制,并多复制一个消费者进行多
个消费者同时对消费消息的测试。
1
、生产者
生产者代码
Producer_WorkQueues:
package com.dxw.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 生产者:发送消息
*/
public class Producer_WorkQueues {
public static void main(String[] args) throws
IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new
ConnectionFactory();
//2、设置参数
factory.setHost("localhost");//ip 默认localhost
factory.setPort(5672);//端口 默认5672
factory.setVirtualHost("/dxw");//虚拟机 默认/
factory.setUsername("dxw");//用户名 默认guest
factory.setPassword("1234");//密码 默认guest
//3、创建连接
Connection connection = factory.newConnection();
//4、创建Channel
Channel channel = connection.createChannel();
//5、创建队列
/*
* 参数解释:
* queueDeclare(String queue,
* boolean durable,
* boolean exclusive,
* boolean autoDelete,
* Map<String, Object> arguments)
* 1. queue:队列名称
* 如果没有一个名字叫hello_world的队列,则会创建该队
列,如果有则不会创建
* 2. durable:是否持久化,当mq重启之后,队列中消息还在
* 3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
* 4. autoDelete:是否自动删除。当没有Consumer时,自动
删除掉
* 5. arguments:参数。
*/
channel.queueDeclare("work_queues",true,false,false,null)
;
//6、发送消息
/*
* 参数解释:
* basicPublish(String exchange,
* String routingKey,
* BasicProperties props,
* byte[] body)
* 1. exchange:交换机名称。简单模式下交换机会使用默认的
""
* 2. routingKey:路由名称
* 3. props:配置信息
* 4. body:发送消息数据
启动生产者观察控制台
2、消费者
第一个消费者代码Consumer_WorkQueues1:
*/
for(int i=1;i<=10;i++){
String body = i+"hello rabbitmq~~~";
channel.basicPublish("","work_queues",null,body.getBytes(
));
}
//7、释放资源
//channel.close();
//connection.close();
}
}
启动生产者观察控制台

2
、消费者
第一个消费者代码
Consumer_WorkQueues1:
package com.dxw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:接收消息
*/
public class Consumer_WorkQueues1 {
public static void main(String[] args) throws
IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new
ConnectionFactory();
//2. 设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/dxw");//虚拟机 默认/
factory.setUsername("dxw");//用户名 默认guest
factory.setPassword("1234");//密码 默认guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5、创建队列
/*
* 参数解释:
* queueDeclare(String queue,
* boolean durable,
* boolean exclusive,
* boolean autoDelete,
* Map<String, Object> arguments)
* 1. queue:队列名称
* 如果没有一个名字叫hello_world的队列,则会创建该
队列,如果有则不会创建
* 2. durable:是否持久化,当mq重启之后,队列中消息还在
* 3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
* 4. autoDelete:是否自动删除。当没有Consumer时,自
动删除掉
* 5. arguments:参数。
*/
channel.queueDeclare("work_queues",true,false,false,null)
;
//6、接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag,
Envelope envelope, AMQP.BasicProperties properties, byte[]
body) throws IOException {
/*System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey(
));
System.out.println("properties:"+properties);*/
System.out.println("body:"+new
String(body));
}
};
/*
* 参数解释:
* basicConsume(String queue, boolean autoAck,
Consumer callback)
* 1. queue:队列名称
* 2. autoAck:是否自动确认
* 3. callback:回调对象
*/
channel.basicConsume("work_queues",true,consumer);
//关闭资源?不要
}
}
第二个消费者代码
Consumer_WorkQueues2:
package com.dxw.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 消费者:接收消息
*/
public class Consumer_WorkQueues2 {
public static void main(String[] args) throws
IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory factory = new
ConnectionFactory();
//2. 设置参数
factory.setHost("localhost");//ip 默认值 localhost
factory.setPort(5672); //端口 默认值 5672
factory.setVirtualHost("/dxw");//虚拟机 默认/
factory.setUsername("dxw");//用户名 默认guest
factory.setPassword("1234");//密码 默认guest
//3. 创建连接 Connection
Connection connection = factory.newConnection();
//4. 创建Channel
Channel channel = connection.createChannel();
//5、创建队列
/*
* 参数解释:
* queueDeclare(String queue,
* boolean durable,
* boolean exclusive,
* boolean autoDelete,
* Map<String, Object> arguments)
* 1. queue:队列名称
* 如果没有一个名字叫hello_world的队列,则会创建该
队列,如果有则不会创建
* 2. durable:是否持久化,当mq重启之后,队列中消息还在
* 3. exclusive:
* 是否独占。只能有一个消费者监听这队列
* 当Connection关闭时,是否删除队列
* 4. autoDelete:是否自动删除。当没有Consumer时,自
动删除掉
* 5. arguments:参数。
*/
channel.queueDeclare("work_queues",true,false,false,null)
;
//6、接收消息
Consumer consumer = new DefaultConsumer(channel){
/*
回调方法,当收到消息后,会自动执行该方法
1. consumerTag:标识
2. envelope:获取一些信息,交换机,路由key...
3. properties:配置信息
4. body:数据
*/
@Override
public void handleDelivery(String consumerTag,
Envelope envelope, AMQP.BasicProperties properties, byte[]
body) throws IOException {
/*System.out.println("consumerTag:"+consumerTag);
System.out.println("Exchange:"+envelope.getExchange());
System.out.println("RoutingKey:"+envelope.getRoutingKey(
));
System.out.println("properties:"+properties);*/
System.out.println("body:"+new
String(body));
}
};
/*
* 参数解释:
* basicConsume(String queue, boolean autoAck,
Consumer callback)
* 1. queue:队列名称
* 2. autoAck:是否自动确认
* 3. callback:回调对象
注意:先启动两个消费者,然后再启动生产者,然后观察两个生产者控制台输出
3、Pub/Sub订阅模式
1、模式介绍
在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:
⚫ P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是
发给X(交换机)
⚫ C:消费者,消息的接收者,会一直等待消息到来
⚫ Queue:消息队列,接收消息、缓存消息
⚫ Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方
面,知道如何处理消息,例如递交给某个特别队 列、递交给所有队列、
*/
channel.basicConsume("work_queues",true,consumer);
//关闭资源?不要
}
}
注意
:
先启动两个消费者
,
然后再启动生产者
,
然后观察两个生产者控制台输出
