工作队列模式,一个消息只能有一个消费者消费
生产者发送20条消息
消费者有两个
第一个消费
睡一秒取一个
第二个睡2秒取
public class WorkConsumerTest1 { public static void main(String[] args) throws IOException, TimeoutException { //1 创建连接工厂 ConnectionFactory connectionFactory=new ConnectionFactory(); //2 设置rabbitmq ip地址 connectionFactory.setHost("localhost"); //3 创建连接对象 Conection对象 Connection connection=connectionFactory.newConnection(); //4 创建管道 Chanel Channel channel=connection.createChannel(); //5 设置队列属性 /** * 第一个参数:队列的名称 * 第二个参数:队列是否要持久化 * 第三个参数:是否排他性(是否在同一个Connection,如果设置为true,不同的Connection是获得不到消息的) * 第四个参数:是否自动删除消息 * 第五个参数:是否要设置一些额外的参数 */ channel.queueDeclare("02-work",false,false,true,null); //channel.basicQos(1); //6 使用chanel 去 rabbitmq 获取消息进行消费 /** * 第一个参数:队列的名称 * 第二个参数:是否自动签收 * 第三个参数:消息属性 * 第四个参数:消息内容 */ channel.basicConsume("02-work", false,new DeliverCallback(){ /** * 当消息从mq 中取出来了会回调这个方法 * 消费者消费消息就在这个 handle中进行处理 */ @Override public void handle(String s, Delivery delivery) throws IOException { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("消息中的内容为:"+new String(delivery.getBody())); //channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false); } },new CancelCallback(){ /** * 当消息取消了会回调这个方法 */ @Override public void handle(String s) throws IOException { System.out.println(111); } }); //7 关闭消息 注意消费者 需要持续监听,不要关闭 //channel.close(); //connection.close(); } }
手动签收的好处:比如说你自动签收了,消息中间件上删除了,结果执行逻辑出错了
手动签收好处就是当出现异常 ,有余地可以再次执行