任务模型
概念
当消息处理比较耗时的时候,可能生产消息的速度大于消费的速度,长此以往,就会导致消息堆积,无法及时处理,此时可以使用任务模型,当多个消费者绑定一个队列,共同消费其中的消息。
代码
-
生产者
/** * 生产者 */ public class Provider { /** * 生产消息 */ @Test public void sendMessage() throws IOException, TimeoutException { //获取连接对象 Connection connection = RabbitMQUtils.getConnection(); //获取连接中通道 Channel channel = connection.createChannel(); /** * 通过通道声明队列 * 队列名称 不存在自动创建 * 是否持久化 * 是否独占队列 * 是否在消费完成后自动删除队列 * 附加参数 */ channel.queueDeclare("work",true,false,false,null); /** * 生产消息 * 1. 交换机名称 * 2. 队列名称 * 3. 传递消息额外设置 * 4. 具体消息 */ for (int i = 0; i < 10; i++) { channel.basicPublish("","work",null,("我是工作模型中的消息" + i).getBytes()); } RabbitMQUtils.close(connection,channel); } }
-
消费者1
/** * 消费者1 */ public class Customer1 { public static void main(String[] args) throws IOException, TimeoutException { //获取连接对象 Connection connection = RabbitMQUtils.getConnection(); //获取连接中通道 Channel channel = connection.createChannel(); /** * 通道绑定队列 * 队列名称 不存在自动创建 * 是否持久化 * 是否独占队列 * 是否在消费完成后自动删除队列 * 附加参数 */ channel.queueDeclare("work",true,false,false,null); /** * 消费消息 * 1. 消费的队列 * 2. 开始消息的自动确认机制 * 3. 回调接口 重写该接口的handleDelivery方法 处理消息 */ channel.basicConsume("work",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者1消费的消息:"+ new String(body) ); } }); } }
-
消费者2
/** * 消费者2 */ public class Customer2 { public static void main(String[] args) throws IOException, TimeoutException { //获取连接对象 Connection connection = RabbitMQUtils.getConnection(); //获取连接中通道 Channel channel = connection.createChannel(); /** * 通道绑定队列 * 队列名称 不存在自动创建 * 是否持久化 * 是否独占队列 * 是否在消费完成后自动删除队列 * 附加参数 */ channel.queueDeclare("work",true,false,false,null); /** * 消费消息 * 1. 消费的队列 * 2. 开始消息的自动确认机制 * 3. 回调接口 重写该接口的handleDelivery方法 处理消息 */ channel.basicConsume("work",true,new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("消费者2消费的消息:"+ new String(body) ); } }); } }
-
先启动连个消费者,然后启动生产者
官方总结
默认情况下,RabbitMQ将按顺序将每个消息发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为轮询算法。
默认情况下,通道是自动确认的,拿到消息之后不论是否消费完毕就会向Rabbit MQ发送确认。这样的话,如上两个消费者有一个有延迟也是平均分配。我们希望有延迟的消费者少消费,没有延迟的消费者多消费。可以关闭其自动确认机制。完成消费之后手动确认即可。
消费者代码
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接对象
Connection connection = RabbitMQUtils.getConnection();
//获取连接中通道
Channel channel = connection.createChannel();
//每次消费一个消息
channel.basicQos(1);
/**
* 通道绑定队列
* 队列名称 不存在自动创建
* 是否持久化
* 是否独占队列
* 是否在消费完成后自动删除队列
* 附加参数
*/
channel.queueDeclare("work",true,false,false,null);
/**
* 消费消息
* 1. 消费的队列
* 2. 自动确认机制
* 3. 回调接口 重写该接口的handleDelivery方法 处理消息
*/
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2消费的消息:"+ new String(body) );
//手动确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
}
此时,我们的消费者1加入sleep模拟延迟,消费者2 正常执行。这样的话,结果如下
即达到了我们想要的效果。能者多劳。