RabbitMQ的6种工作模式
官方文档:
http://www.rabbitmq.com/
https://www.rabbitmq.com/getstarted.html
RabbitMQ 常见的 6 种工作模式:
1、simple简单模式
1)、消息产生后将消息放入队列。
2)、消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除。
3)、存在的问题:消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失。
4)、应用场景:聊天(中间有一个过度的服务器)。
5)、代码实现:
pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>rabbitmq-java</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.10.0</version>
</dependency>
</dependencies>
</project>
工具类
package com.example;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
// 连接rabbitmq服务,共享一个工厂对象
private static ConnectionFactory factory;
static {
factory=new ConnectionFactory();
//设置rabbitmq属性
factory.setHost("127.0.0.1");
factory.setUsername("zsx242030");
factory.setPassword("zsx242030");
factory.setVirtualHost("/");
factory.setPort(5672);
}
public static Connection getConnection(){
Connection connection=null;
try {
//获取连接对象
connection = factory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return connection;
}
}
消息提供者
package com.example.simple;
import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Provider {
public static void main(String[] args) {
try {
//获取连接对象
Connection connection = ConnectionUtil.getConnection();
//获取通道对象
Channel channel = connection.createChannel();
//通过通道创建队列,后续所有的操作都是基于channel实现(队列也可以由消费方创建)
channel.queueDeclare("queue1", false, false, false, null);
//向队列中发送消息
channel.basicPublish("", "queue1", null, "Hello RabbitMQ!!!".getBytes());
//断开连接
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消息消费者
package com.example.simple;
import com.example.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) {
try {
Connection connection = ConnectionUtil.getConnection();
//获取通道对象
Channel channel = connection.createChannel();
//监听队列中的消息(消费的是队列,而不是交换机)
channel.basicConsume("queue1", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者获得消息为:" + new String(body, "utf-8"));
}
});
//消费方不需要关闭连接,保持一直监听队列状态
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者获得消息为:Hello RabbitMQ!!!
2、work工作模式(资源的竞争)
1)、消息产生者将消息放入队列,消费者可以有多个,消费者1,消费者2,同时监听同一个队列。消息被消费,
C1 和 C2 共同争抢当前的消息队列内容,谁先拿到谁负责消费消息。
2)、存在的问题:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关
(synchronized,与同步锁的性能不一样),保证一条消息只能被一个消费者使用。
3)、应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到
消息队列中,空闲的系统自动争抢);对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
4)、代码实现:
消息提供者
package com.example.work;
import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Provider {
public static void main(String[] args) {
try {
//获取连接对象
Connection connection = ConnectionUtil.getConnection();
//获取通道对象
Channel channel = connection.createChannel();
//通过通道创建队列
channel.queueDeclare("queue1", false, false, false, null);
//向队列中发送消息
for (int i = 1; i <= 10; i++) {
channel.basicPublish("", "queue1", null, ("Hello RabbitMQ!!!" + i).getBytes());
}
//断开连接
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消息消费者1
package com.example.work;
import com.example.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) {
try {
Connection connection = ConnectionUtil.getConnection();
//获取通道对象
Channel channel = connection.createChannel();
//监听队列中的消息
channel.basicConsume("queue1", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));
}
});
//消费方不需要关闭连接,保持一直监听队列状态
// channel.close();
// connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消息消费者2
package com.example.work;
import com.example.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) {
try {
Connection connection = ConnectionUtil.getConnection();
//获取通道对象
Channel channel = connection.createChannel();
//监听队列中的消息
channel.basicConsume("queue1", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));
}
});
//消费方不需要关闭连接,保持一直监听队列状态
// channel.close();
// connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!9
消费者2获得消息为:Hello RabbitMQ!!!2
消费者2获得消息为:Hello RabbitMQ!!!4
消费者2获得消息为:Hello RabbitMQ!!!6
消费者2获得消息为:Hello RabbitMQ!!!8
消费者2获得消息为:Hello RabbitMQ!!!10
3、publish/subscribe发布订阅(共享资源)
1)、X代表交换机,rabbitMQ 内部组件,erlang 消息产生者是代码完成,代码的执行效率不高,消息产生者将消
息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费。
Exchange 有常见以下 3 种类型:
-
Fanout
:广播,将消息交给所有绑定到交换机的队列。 -
Direct
:定向,把消息交给符合指定 routing key 的队列。 -
Topic
:通配符,把消息交给符合 routing pattern (路由模式)的队列。
Exchange
(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者
没有符合路由规则的队列,那么消息会丢失。
2)相关场景:邮件群发,群聊天,广播(广告)。
3)、代码实现:
消息提供者
package com.example.publishsubscribe;
import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
// 交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建
public class Provider {
public static void main(String[] args) {
try {
//获取连接对象
Connection connection = ConnectionUtil.getConnection();
//获取通道对象
Channel channel = connection.createChannel();
//创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)
// 1.参数一:交换机名称 参数二:交换机类型
channel.exchangeDeclare("fanout_exchange", "fanout");
//通过通道创建队列
//channel.queueDeclare("queue1",false,false,false,null);
//向队列中发送消息
for (int i = 1; i <= 10; i++) {
channel.basicPublish("fanout_exchange", "", null, ("Hello RabbitMQ!!!" + i).getBytes());
}
//断开连接
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消息消费者1
package com.example.publishsubscribe;
import com.example.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) {
try {
Connection connection = ConnectionUtil.getConnection();
//获取通道对象
Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare("fanout_queue1", false, false, false, null);
//给队列绑定交换机
channel.queueBind("fanout_queue1", "fanout_exchange", "");
//监听队列中的消息
channel.basicConsume("fanout_queue1", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));
}
});
//消费方不需要关闭连接,保持一直监听队列状态
// channel.close();
//connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消息消费者2
package com.example.publishsubscribe;
import com.example.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) {
try {
Connection connection = ConnectionUtil.getConnection();
//获取通道对象
Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare("fanout_queue2", false, false, false, null);
//给队列绑定交换机
channel.queueBind("fanout_queue2", "fanout_exchange", "");
//监听队列中的消息
channel.basicConsume("fanout_queue2", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));
}
});
//消费方不需要关闭连接,保持一直监听队列状态
// channel.close();
//connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10
消费者2获得消息为:Hello RabbitMQ!!!1
消费者2获得消息为:Hello RabbitMQ!!!2
消费者2获得消息为:Hello RabbitMQ!!!3
消费者2获得消息为:Hello RabbitMQ!!!4
消费者2获得消息为:Hello RabbitMQ!!!5
消费者2获得消息为:Hello RabbitMQ!!!6
消费者2获得消息为:Hello RabbitMQ!!!7
消费者2获得消息为:Hello RabbitMQ!!!8
消费者2获得消息为:Hello RabbitMQ!!!9
消费者2获得消息为:Hello RabbitMQ!!!10
4、routing路由模式
1)、消息生产者将消息发送给交换机按照路由判断,路由是字符串,当前产生的消息携带路由字符,交换机根据路
由的 key,只能匹配上路由 key 对应的消息队列,对应的消费者才能消费消息。队列与交换机的绑定,不能是任意
绑定了,而是要指定一个 RoutingKey (路由 key)。消息的发送方在向 Exchange 发送消息时,也必须指定消息的
RoutingKey 。Exchange 不再把消息交给每一个绑定的队列,而是根据消息的 Routing Key 进行判断,只有队列
的 Routingkey 与消息的 Routing key 完全一致,才会接收到消息。
2)、根据业务功能定义路由字符串。
3)、从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中。
4)、业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可
以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误。
5)、代码实现:
消息提供者
package com.example.souting;
import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
// 交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建
public class Provider {
public static void main(String[] args) {
try {
//获取连接对象
Connection connection = ConnectionUtil.getConnection();
//获取通道对象
Channel channel = connection.createChannel();
//创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失)
// 1.参数一:交换机名称 参数二:交换机类型
channel.exchangeDeclare("direct_exchange", "direct");
//向队列中发送消息
for (int i = 1; i <= 10; i++) {
channel.basicPublish("direct_exchange",
//设置路由键,符合路由键的队列,才能拿到消息
"insert",
null,
("Hello RabbitMQ!!!" + i).getBytes());
}
//断开连接
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消息消费者1
package com.example.souting;
import com.example.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) {
try {
Connection connection = ConnectionUtil.getConnection();
//获取通道对象
Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare("direct_queue1", false, false, false, null);
//绑定交换机(routingKey:路由键)
channel.queueBind("direct_queue1", "direct_exchange", "select");
channel.queueBind("direct_queue1", "direct_exchange", "insert");
//监听队列中的消息
channel.basicConsume("direct_queue1", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));
}
});
//消费方不需要关闭连接,保持一直监听队列状态
// channel.close();
//connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消息消费者2
package com.example.souting;
import com.example.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) {
try {
Connection connection = ConnectionUtil.getConnection();
//获取通道对象
Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare("direct_queue2", false, false, false, null);
//绑定交换机(routingKey:路由键)
channel.queueBind("direct_queue2", "direct_exchange", "delete");
channel.queueBind("direct_queue2", "direct_exchange", "select");
//监听队列中的消息
channel.basicConsume("direct_queue2", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));
}
});
//消费方不需要关闭连接,保持一直监听队列状态
// channel.close();
//connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10
5、topic 主题模式(路由模式的一种)
1)、Topic 类型与 Direct 相比,都是可以根据 RoutingKey 把消息路由到不同的队列。只不过 Topic 类型
Exchange 可以让队列在绑定 Routing key 的时候使用通配符。
2)、Routingkey 一般都是有一个或多个单词组成,多个单词之间以 . 分割,例如:item.insert。
通配符规则:
#
:匹配一个或多个词
*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.insert.abc
或者item.insert
item.*
:只能匹配 item.insert
usa.#
,因此凡是以 usa.
开头的 routing key
都会被匹配到
#.news
,因此凡是以 .news
结尾的 routing key
都会被匹配
3)、路由功能添加模糊匹配。
4)、消息产生者产生消息,把消息交给交换机。
5)、交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费。
6)、代码实现:
消息提供者
package com.example.topic;
import com.example.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
//交换机和队列可以在提供方和消费方某一方创建,在两边同时创建也可以,只要创建的名称一致。
// 保证,哪一方先运行则在哪一方创建
public class Provider {
public static void main(String[] args) {
try {
//获取连接对象
Connection connection = ConnectionUtil.getConnection();
//获取通道对象
Channel channel = connection.createChannel();
//创建交换机(交换机没有存储数据的能力,数据存储在队列上,如果有交换机没队列的情况下,数据会丢失) //1.参数一:交换机名称 参数二:交换机类型
channel.exchangeDeclare("topic_exchange", "topic");
//向队列中发送消息
for (int i = 1; i <= 10; i++) {
channel.basicPublish("topic_exchange",
// #:匹配0-n个单词(之间以.区分,两点之间算一个单词,可以匹配hello world空格的情况) *(匹配一个单词)
"emp.hello world",
null,
("Hello RabbitMQ!!!" + i).getBytes());
}
//断开连接
channel.close();
connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消息消费者1
package com.example.topic;
import com.example.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer1 {
public static void main(String[] args) {
try {
Connection connection = ConnectionUtil.getConnection();
//获取通道对象
Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare("topic_queue1", false, false, false, null);
//绑定交换机(routingKey:路由键) #:匹配0-n个单词(之间以.区分,两点之间算一个单词)
channel.queueBind("topic_queue1", "topic_exchange", "emp.#");
//监听队列中的消息
channel.basicConsume("topic_queue1", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1获得消息为:" + new String(body, "utf-8"));
}
});
//消费方不需要关闭连接,保持一直监听队列状态
// channel.close();
//connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消息消费者2
package com.example.topic;
import com.example.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer2 {
public static void main(String[] args) {
try {
Connection connection = ConnectionUtil.getConnection();
//获取通道对象
Channel channel = connection.createChannel();
//创建队列
channel.queueDeclare("topic_queue2", false, false, false, null);
//绑定交换机(routingKey:路由键) *:匹配1个单词(之间以.区分,两点之间算一个单词)
channel.queueBind("topic_queue2", "topic_exchange", "emp.*");
//监听队列中的消息
channel.basicConsume("topic_queue2", true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2获得消息为:" + new String(body, "utf-8"));
}
});
//消费方不需要关闭连接,保持一直监听队列状态
// channel.close();
//connection.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
消费者1获得消息为:Hello RabbitMQ!!!1
消费者1获得消息为:Hello RabbitMQ!!!2
消费者1获得消息为:Hello RabbitMQ!!!3
消费者1获得消息为:Hello RabbitMQ!!!4
消费者1获得消息为:Hello RabbitMQ!!!5
消费者1获得消息为:Hello RabbitMQ!!!6
消费者1获得消息为:Hello RabbitMQ!!!7
消费者1获得消息为:Hello RabbitMQ!!!8
消费者1获得消息为:Hello RabbitMQ!!!9
消费者1获得消息为:Hello RabbitMQ!!!10
6、RPC
RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:
1)、客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。
2)、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。
3)、服务端将RPC方法 的结果发送到RPC响应队列。
4)、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。
5)、代码实现:
Client端
package com.example.rpc;
import com.example.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
public class Client {
public static void main(String[] argv) throws IOException, InterruptedException {
String message = "Hello World!!!";
// 建立一个连接和一个通道,并为回调声明一个唯一的回调队列
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 定义一个临时变量的接受队列名
String replyQueueName = channel.queueDeclare().getQueue();
// 生成一个唯一的字符串作为回调队列的编号
String corrId = UUID.randomUUID().toString();
// 发送请求消息,消息使用了两个属性:replyTo和correlationId
// 服务端根据replyTo返回结果,客户端根据correlationId判断响应是不是给自己的
AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName)
.build();
// 发布一个消息,rpc_queue路由规则
channel.basicPublish("", "rpc_queue", props, message.getBytes("UTF-8"));
// 由于我们的消费者交易处理是在单独的线程中进行的,因此我们需要在响应到达之前暂停主线程。
// 这里我们创建的容量为1的阻塞队列ArrayBlockingQueue,因为我们只需要等待一个响应。
final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
// String basicConsume(String queue, boolean autoAck, Consumer callback)
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
//检查它的correlationId是否是我们所要找的那个
if (properties.getCorrelationId().equals(corrId)) {
//如果是,则响应BlockingQueue
response.offer(new String(body, "UTF-8"));
}
}
});
System.out.println(" 客户端请求的结果:" + response.take());
}
}
Server端
package com.example.rpc;
import com.example.ConnectionUtil;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Server {
public static void main(String[] args) {
Connection connection = null;
try {
connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("rpc_queue", false, false, false, null);
channel.basicQos(1);
System.out.println("Awaiting RPC requests:");
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
.correlationId(properties.getCorrelationId()).build();
String response = "";
try {
response = new String(body, "UTF-8");
System.out.println("response (" + response + ")");
} catch (RuntimeException e) {
System.out.println("错误信息 " + e.toString());
} finally {
// 返回处理结果队列
channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
// 确认消息,已经收到后面参数 multiple:是否批量.true:将一次性确认所有小于envelope.getDeliveryTag()的消息。
channel.basicAck(envelope.getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC
// server owner thread
synchronized (this) {
this.notify();
}
}
}
};
// 取消自动确认
boolean autoAck = false;
channel.basicConsume("rpc_queue", autoAck, consumer);
// Wait and be prepared to consume the message from RPC client.
while (true) {
synchronized (consumer) {
try {
consumer.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
connection.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
Awaiting RPC requests:
response (Hello World!!!)
response (Hello World!!!)
response (Hello World!!!)
# 客戶端发起3次请求
客户端请求的结果:Hello World!!!
客户端请求的结果:Hello World!!!
客户端请求的结果:Hello World!!!
7、发布订阅模式与工作队列模式的区别
1、工作队列模式不用定义交换机,而发布/订阅模式需要定义交换机。
2、发布/订阅模式的生产方是面向交换机发送消息,工作队列模式的生产方是面向队列发送消息(底层使
用默认交换机)。
3、发布/订阅模式需要设置队列和交换机的绑定,工作队列模式不需要设置,实际上工作队列模式会将
队列绑定到默认的交换机 。