当有多个消费者都在同一个队列中拿取消息时,会轮询从队列中拿取消息消费。
RabbitMQUtil类为工具类,获取Channel。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class RabbitMQUtil {
public static Channel getChannel() throws Exception {
//得到工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");
//生成连接
Connection connection = factory.newConnection();
//获取信道
return connection.createChannel();
}
}
- 创建一个生产者,并启动
import com.rabbitmq.client.Channel;
import java.util.Scanner;
public class Producer {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
Scanner sc = new Scanner(System.in);
while (sc.hasNext()) {
String message = sc.next();
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
}
}
- 创建两个消费者,并启动
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class Consumer1 {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到信息:" + new String(message.getBody()));
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
System.out.println("C1消费者启动等待消费......");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
public class Consumer2 {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws Exception {
Channel channel = RabbitMQUtil.getChannel();
DeliverCallback deliverCallback = (consumerTag, message) -> {
System.out.println("接收到信息:" + new String(message.getBody()));
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("消息消费被中断");
};
System.out.println("C2消费者启动等待消费......");
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
- 运行结果图如下。可以看到两个消费者轮流从队列中拿取消息消费。