接上文 RabbitMQ-死信队列
1 工作队列模式
xx模式只是一种设计思路,并不是指具体的某种实现,可理解为实现XX模式需要怎么去写业务代码。
之前的是简单的一个消费者一个生产者模式,下边是一个生产者多个消费者的情况:
这里先定义两个监听器
@RabbitListener(queues = "yyds")
public void receiver(String data){ //这里直接接收String类型的数据
System.out.println("一号消息队列监听器 "+data);
}
@RabbitListener(queues = "yyds")
public void receiver2(String data){
System.out.println("二号消息队列监听器 "+data);
}
把配置类的JSON转换和死信队列定义删掉
在rabbitmq网页管理端把死信队列和yyds队列都删除,死信交换机也删除
启动服务
出现队列
然后在amq.direct交换机发送消息
可以发现默认是轮询方式发送的。
此时若将服务关闭,先在交换机发送几条信息,然后再启动服务,那么这几条信息会首先给一号监听器。
关闭服务
发送3条消息,
启动服务器
进入队列详情,可以看到每个消费者都有prefetch count =250。这代表若开启服务前预先有消息在队列,然后开启了监听,会将前250个预存的消息给一号,251-500给二号这样轮询。
这样的话若一开始存在消息,会被一个消费者一次性全部消耗,因为没有对prefetch count(预获取数量,一次性获取消息的最大数量)进行限制,若希望消费者一次只拿一个消息,而不是将所有消息全部获取,可进行配置。
在配置类定义一个自定义的ListenerContainerFactory,可以在这里设定消费者Channel的PrefetchCount的大小
@Resource
private CachingConnectionFactory connectionFactory;
@Bean(name = "listenerContainer")
public SimpleRabbitListenerContainerFactory listenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setPrefetchCount(1); //将PrefetchCount设定为1表示一次只能取一个
return factory;
}
监听器指定工厂
然后关闭服务,在第二个交换机amq.direct发送3条数据
启动服务,出现轮询效果,二号连着两条还是因为没有添加休眠
此时队列详情的一次最大获取数量变为1
除了定义连个相同的监听方法,还可以在注解中定义,比如定义10个同样的消费者:
@RabbitListener(queues = "yyds", containerFactory = "listenerContainer", concurrency = "10")
public void receiver(String data){
System.out.println("一号消息队列监听器 "+data);
}
重启服务,出现10个消费者