本文我们学习下使用RabbitMQ实现的几种数据发送的模型——发后即忘模型和远程RPC调用。二者实际上是从业务的角度定义的一个RabbitMQ的使用模型。发后即忘模型,强调发送时不太关心消息接收者的执行结果,仅仅是为了发送信息。而远程RPC调用模型强调,另外开辟通道获取消息接收者的执行结果,而且执行的结果直接影响业务。
从业务上来划分,通常我们通过MQ发出的信息可以分为三种:消息、命令和事件。对于消息来说,我们发送之后不期望会得到回复,或者说不期望马上得到回复,类似于我们接收到手机短信,只是知道这件事情。然后我们怎么去处理或者去不去处理,实际上给我们发送短信的人并不关心,所以这种情况下比较适合使用发后即忘模型。当发送的是命令时,信息的发送者明确知道接受者是谁,通过命令的方式让接收者去进行某项业务,并期望得到反馈,这种情况下比较适合采用远程RPC调用的模型。而最后一种事件,更像是在EDA(Event Driven Architecture)的系统中定义的一种命令,不过命令的格式紧紧和业务模型绑定,所以这里单独提出来叫做事件。很显然,也是使用远程RPC调用的数据发送模型比较合适。
接下来,我们将以实例的方式分别介绍发后即忘模型和远程RPC调用模型的使用。
一、发后即忘模型
我们用代码模拟这样一种业务——业务日志的记录。业务日志其实最符合发后即忘模型的要求,因为日志的记录和我们完成一个业务无关(日志记录成功与否都不会影响业务的成败)。有过编程经验的童鞋都知道,日志按照级别来划分从低到高,可以分为三种:debug、info和error。在这个模型中,我们创建一个topic exchange,然后分别以debug、info和error为主题分别绑定到三个队列。不同级别的日志消费者订阅不同的队列,然后记录到不同的日志文件(或者同一个文件使用不同的标识区分)中。
整个消息的流通图如下:
消息由生产者产生之后,通过一个topic交换机,根据不同的topic发送到响应的队列中,然后定义了3个消费者,每个消费者订阅了存放不同级别日志的通道,获取消息后进行相应的处理。
我们决定采用spring boot集成RabbitMQ的方式实现,首先配置相关的exchange、binding和queue,如下代码:
@Configuration
public class RabbitConfig {
@Value("${spring.rabbitmq.host}")
private String host;
@Value("${spring.rabbitmq.port}")
private int port;
@Value("${spring.rabbitmq.username}")
private String username;
@Value("${spring.rabbitmq.password}")
private String password;
@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host, port);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost("/");
connectionFactory.setPublisherConfirms(true);
return connectionFactory;
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
return template;
}
@Bean
TopicExchange logTopicExchange() {
return new TopicExchange("logTopicExchange", true, false);
}
@Bean
public Queue debugQueue() {
return new Queue("debugQueue", true, false, false);
}
@Bean
public Queue infoQueue() {
return new Queue("infoQueue", true, false, false);
}
@Bean
public Queue errorQueue() {
return new Queue("errorQueue", true, false, false);
}
@Bean
Binding bindingDebugQueue() {
return BindingBuilder.bind(debugQueue()).to(logTopicExchange()).with("debug");
}
@Bean
Binding bindingInfoQueue() {
return BindingBuilder.bind(infoQueue()).to(logTopicExchange()).with("info");
}
@Bean
Binding bindingErrorQueue() {
return BindingBuilder.bind(errorQueue()).to(logTopicExchange()).with("error");
}
}
第1行:通过@Configuration注解开启配置支持
第3~10行:引入配置文件中的RabbitMQ的配置信息
第11~19行:创建链接工厂ConnectionFactory 实例。
第20~25行:创建RabbitTemplate实例,后面将用它来发送消息。
第26~29行:创建了名称为logTopicExchange的主题交换机
第30~41行:创建三个队列。
第42~53行:分别将队列绑定到交换机上。
完成了上述生产者端的配置,接下来我们看下发送消息的代码:
public class LogServiceImpl implements LogService {
private ExecutorService executorService = Executors.newFixedThreadPool(10);
@Autowired
RabbitTemplate rabbitTemplate;
@Override
public void sendMsg(String routeKey, String msg) {
MessageProperties messageProperties = new MessageProperties();
// 设置过期时间,单位:毫秒,30分钟
messageProperties.setExpiration("1800000");
messageProperties.setContentType("text/plain");
messageProperties.setContentEncoding("UTF-8");
byte[] msgBytes = msg.getBytes();
Message message = new Message(msgBytes, messageProperties);
CompletableFuture.runAsync(() -> rabbitTemplate.convertAndSend(
"logTopicExchange",
routeKey,
message), executorService);
}
}
第10~16行:这是在通过传过来的消息来设置Message对象,可以看到,为了防止消息不能被及时读取而大量堆积,这里设置了消息的超时为半个小时。
第17~20行:我们选择了异步发送消息的方法,主要是考虑到业务日志的写入不应该影响业务的实现,而又不会关心日志写入的结果,所以这里采用了异步的方式。
建立单元测试,发送消息之后可以看到,交换机和队列都已经创建,而且消息已经正确路由到了队列中。
生成的交换机:
生成的队列:
生产者一方的准备工作做好之后,我们看下消费者的处理。相比生产者,消费者的实现要简单的多,有关RabbitMQ的配置这里不再重复列举,只看下消费者的监听部分代码:
@Service
public class LogServiceImpl implements LogService {
@Override
@RabbitListener(queues = {"debugQueue"})
public void writeDebug(Message message) {
String str=new String(message.getBody());
System.out.println(str);
}
@Override
@RabbitListener(queues = {"infoQueue"})
public void writeInfo(Message message) {
String str=new String(message.getBody());
System.out.println(str);
}
@Override
@RabbitListener(queues = {"errorQueue"})
public void writeError(Message message) {
String str=new String(message.getBody());
System.out.println(str);
}
}
第1行,@Service必不可少,需要将监听的服务类托管到IOC中。
第4、11、18行,使用3个 @RabbitListener注解来监听debugQueue、infoQueue和errorQueue三个队列
以上就是我们简单实现的一个发后即忘模型的案例。虽然简单,但是足以作为一个经典案例。而且有些细节需要注意,比如:在发送消息时要考虑异步发送,才不会对业务代码进行干扰。接下来我们开始用实例解释下RabbitMQ远程RPC调用的方式。
二、远程RPC调用模型
所谓远程RPC方式调用模型,在上文中我们已经介绍过,简单理解就是发送信息后,生产者一直等待消费者返回消费后的结果。那么问题来了,消费者是怎么把消费的结果返回给生产者呢?毋庸置疑,消费者返回的肯定也是一个消息,那么这个消息要通过哪个交换机?到达哪个通道?下面我们就来一一解决这些问题。
首先,看下远程RPC方式调用模型的示意:
笔者来解释下整个过程:
(1)生产者向业务交换机里面发送业务命令或者事件,同时需要创建一个只有自己能够监听的而且是保证队列名称唯一的私密队列,然后开始监听这个队列。
(2)发送消息的消息头中具有一个叫做reply_to的字段,这个字段设置为上一步骤创建的队列名称。
(3)消费者获取到业务命令或者事件之后,开始执行业务。执行完成业务之后,将回复消息通过默认的交换机传递到reply_to队列里面。
(4)生产者接收到消费者回复的消息之后,完成业务,结束等待。
下面,我们来看下生产者端的代码。ConnectionFactory等基本配置我这里不再展示,需要特别注意的是引入了一个新的Bean——simpleMessageListenerContainer,主要用来手动添加监听的队列以及监听器。
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());
return container;
}
接下来是远程RPC调用的方法:
public void sendRPCMsg(String routeKey, String msg) {
RabbitAdmin admin = new RabbitAdmin(connectionFactory);
Queue replytoQueue = admin.declareQueue();
MessageProperties messageProperties = new MessageProperties();
messageProperties.setReplyTo(replytoQueue.getName());
byte[] msgBytes = msg.getBytes();
Message message = new Message(msgBytes, messageProperties);
rabbitTemplate.convertAndSend(
"eventTopicExchange",
routeKey,
message);
Thread currentThread = Thread.currentThread();
simpleMessageListenerContainer.addQueues(replytoQueue);
simpleMessageListenerContainer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
String str = new String(message.getBody());
System.out.println(str);
simpleMessageListenerContainer.removeQueues(replytoQueue);
LockSupport.unpark(currentThread);
}
});
LockSupport.park();
}
第1~2行:创建一个RabbitAdmin对象,这个对象可以手动创建交换机、队列等等。
第4~11行:将队列名称放到消息的reply_to头部,并且进行消息的发送。
第13~14行:使用simpleMessageListenerContainer监听新创建的队列,并且设置监听对象。
第24行:保持线程阻塞,然后在第21行解除阻塞状态。
我们在第2行创建了一个队列,我们看下declareQueue方法的定义:
public Queue declareQueue() {
try {
DeclareOk declareOk = this.rabbitTemplate.execute(Channel::queueDeclare);
return new Queue(declareOk.getQueue(), false, true, true); // NOSONAR never null
}
catch (AmqpException e) {
logOrRethrowDeclarationException(null, "queue", e);
return null;
}
}
注意上述代码的第4行,在这里实际上创建了一个随机名称的队列,RabbitMQ会保证队列名称的唯一,而创建的Queue对象的后面三个boolean类型的参数指明了队列是不可持久化的、排他的、以及自动删除,也就是说创建的队列只能当前的channel自己监听,而且一旦队列里面没有消息或者channel关闭队列就会消失。就是这些属性,保证了创建了一个临时性的队列,而且其他消费者无法进行监听。
最后,我们再看下消费者的处理逻辑:
@RabbitListener(queues = {"eventQueue"})
public void getMsg(Message message) {
String str = new String(message.getBody());
System.out.println(str);
String replayTo = message.getMessageProperties().getReplyTo();
System.out.println("replayTo =" + replayTo);
byte[] msgBytes = "我收到了".getBytes();
MessageProperties messageProperties = new MessageProperties();
Message replayMessage = new Message(msgBytes, messageProperties);
try {
rabbitTemplate.send(replayTo, replayMessage);
} catch (AmqpException e) {
e.printStackTrace();
}
}
第1行:使用RabbitListener监听名称为eventQueue的队列。
第5行:从接受到的消息中获取replay_to的队列名称。
第11行:向生产者回复消息
我们看到,相比生产者,消费者代码要简单的多,就是多了一个获取replay_to队列并发送消息的过程。下面看下replay_to队列的庐山真面目,如下图红色圈出部分:
三、总结
本文主要介绍了RabbitMQ发后即忘和远程RPC调用两种数据发送模型,现总结如下:
(1)发后即忘数据发送模型针对发送的信息生产者不关心对方的处理结果这一业务前提实现,实现起来比较简单,但是需要注意发送消息时应该采用异步发送,避免消息的发送影响业务。
(2)如果需要等待消费者的返回结果,应该采用远程RPC调用数据发送模型。生产者自己创建接受回复消息的队列,而且应该保证队列名称唯一、队列私有和支持自动删除,通过消息的reply_to头部将队列名称发送给消费者,消费者再通过RabbitMQ的默认交换机向reply_to队列回复消息。