5种消息模式
- 简单消息模式:1个生产者 + 1个队列 + 1个消费者;生产者只负责生产,消费者只负责消费,两者在同一个队列中操作
- 工作队列消息模式:1个生产者 + 1个队列 + 多个消费者; 一条消息只能被消费一次
- 订阅消息模式之 fanout(订阅模式):1个生产者 + 1个交换机 + 多个队列 + 多个消费者,一条消息可以被多个消费者消费;
- 订阅消息模式之durect/router(路由模式):1个生产者 + 1个交换机 + 多个队列 + 多个消费者 ,routingKey ,一条消息发送给符合 routingKey 的队列;
- 订阅消息模式之topic(主题模式):通配符,#:匹配一个或者多个 *:一个词;
1.简单消息模式 simple Exchange
P:消息的生产者
C:消息的消费者
红色:队列
生产者将消息发送到队列,消费者从队列中获取消息。
需要导入RabbitMQ的客户端依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.4.1</version>
</dependency>
创建工具类ConnectionUtil ,用来获取MQ的连接
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
public class ConnectionUtil {
public static Connection getConnection() throws Exception {
//定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置服务地址
factory.setHost("47.96.115.67");
//端口
factory.setPort(5672);
//设置账号信息,用户名、密码、vhost
//factory.setVirtualHost("testhost");
factory.setUsername("guest");
factory.setPassword("guest");
// 通过工程获取连接
Connection connection = factory.newConnection();
return connection;
}
}
创建生产者发送消息到队列
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String QUEUE_NAME = "q_test_01";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明(创建)队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息内容
String message = "我是一条简单模型的消息";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//关闭通道和连接
channel.close();
connection.close();
}
}
生产者发送消息到队列后,可以在RabbitMQ的管理控制台中查看该队列的消息
消费者从队列中拿消息
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class Recv {
private final static String QUEUE_NAME = "q_test_01";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
// 从连接中创建通道
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列
channel.basicConsume(QUEUE_NAME, true, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
当消费者从队列中将该消息拿出后,队列中的该消息就会被弹出队列.
此时我们在到控制台中查看该队列,可以发现该队列的消息已经被消费了,队列为空
2.工作消息模式 work Exchange
一个生产者、2个消费者。
一个消息只能被一个消费者获取。
**生产者:**我们来定义一个生产者向队列中发送100条信息;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String QUEUE_NAME = "test_queue_work";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i = 0; i < 100; i++) {
// 消息内容
String message = "工作消息" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//睡眠时间逐渐递增
Thread.sleep(i * 10);
}
channel.close();
connection.close();
}
}
消费者1:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class RecvOne {
private final static String QUEUE_NAME = "test_queue_work";
public static void start() throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发一条消息给消费者
//channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME, true, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("消费者1获得" + message + "'");
//休眠
Thread.sleep(10);
// 返回确认状态,注释掉表示使用自动确认模式
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
消费者2:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class RecvTwo {
private final static String QUEUE_NAME = "test_queue_work";
public static void start() throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 同一时刻服务器只会发一条消息给消费者
//channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME, true, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("消费者2获得" + message + "'");
// 休眠1秒
Thread.sleep(1000);
//下面这行注释掉表示使用自动确认模式
//channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
编写线程启动类testMain,同时启动两个线程:
import java.util.concurrent.*;
public class testMain {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(2);
//线程池的创建方式推荐使用ThreadPoolExecutor,这也是阿里官方推荐的方式,这样可以避免线程过多导致内存占用一直增加的问题
ExecutorService threadPool = new ThreadPoolExecutor(
2,
2,
1,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
threadPool.submit(()->{
try {
RecvOne.start();
//后续在此处进行业务处理
} catch (Exception e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
});
threadPool.submit(()->{
try {
RecvTwo.start();
//后续在此处进行业务处理
} catch (Exception e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
});
//关闭线程处理
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
//关闭线程池
threadPool.shutdown();
}
}
查看结果:
内容基本上是由线程睡眠时间决定的, 随眠时间长的获取的消息少,睡眠时间短的,获得的消息多.
但是可以看到大致上消费者1和消费者2获得的消息差不多是一样多的,然而代码中的线程2的睡眠时间要远大于线程1,出现这种差不多的结果并不是我们想实现的,应该根据线程的能力来分配消息
总结:
- 消费者1和消费者2获取到的消息内容是不同的,同一个消息只能被一个消费者获取。
- 消费者1和消费者2获取到的消息的数量是相同的,一个是消费奇数号消息,一个是偶数。
- 其实,这样是不合理的,因为消费者1线程停顿的时间短。应该是消费者1要比消费者2获取到的消息多才对。 RabbitMQ 默认将消息顺序发送给下一个消费者,这样,每个消费者会得到相同数量的消息。即轮询(round-robin)分发消息。
- 怎样才能做到按照每个消费者的能力分配消息呢?联合使用 Qos 和 Acknowledge 就可以做到。
basicQos 方法设置了当前信道最大预获取(prefetch)消息数量为1。消息从队列异步推送给消费者,消费者的 ack 也是异步发送给队列,从队列的视角去看,总是会有一批消息已推送但尚未获得 ack 确认,Qos 的 prefetchCount 参数就是用来限制这批未确认消息数量的。设为1时,队列只有在收到消费者发回的上一条消息 ack 确认后,才会向该消费者发送下一条消息。prefetchCount 的默认值为0,即没有限制,队列会将所有消息尽快发给消费者。- 2个概念
- 轮询分发 :使用任务队列的优点之一就是可以轻易的并行工作。如果我们积压了好多工作,我们可以通过增加工作者(消费者)来解决这一问题,使得系统的伸缩性更加容易。在默认情况下,RabbitMQ将逐个发送消息到在序列中的下一个消费者(而不考虑每个任务的时长等等,且是提前一次性分配,并非一个一个分配)。平均每个消费者获得相同数量的消息。这种方式分发消息机制称为Round-Robin(轮询)。
- 公平分发 :虽然上面的分配法方式也还行,但是有个问题就是:比如:现在有2个消费者,所有的奇数的消息都是繁忙的,而偶数则是轻松的。按照轮询的方式,奇数的任务交给了第一个消费者,所以一直在忙个不停。偶数的任务交给另一个消费者,则立即完成任务,然后闲得不行。而RabbitMQ则是不了解这些的。这是因为当消息进入队列,RabbitMQ就会分派消息。它不看消费者为应答的数目,只是盲目的将消息发给轮询指定的消费者。
为了解决这个问题,我们使用basicQos( prefetchCount = 1)方法,来限制RabbitMQ只发不超过1条的消息给同一个消费者。当消息处理完毕后,有了反馈,才会进行第二次发送。
还有一点需要注意,使用公平分发,必须关闭自动应答,改为手动应答。
Work模式的“能者多劳”
打开上述代码的注释:
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
//开启这行 表示使用手动确认模式
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
同时改为手动确认:
// 监听队列,false表示手动返回完成状态,true表示自动
channel.basicConsume(QUEUE_NAME, false, consumer);
测试:
消费者1比消费者2获取的消息更多。
消息的确认模式
消费者从队列中获取消息,服务端如何知道消息已经被消费呢?
- 模式1:自动确认:只要消息从队列中获取,无论消费者获取到消息后是否成功消息,都认为是消息已经成功消费。
- 模式2:手动确认:消费者从队列中获取消息后,服务器会将该消息标记为不可用状态,等待消费者的反馈,如果消费者一直没有反馈,那么该消息将一直处于不可用状态。
手动模式:
自动模式:
3.订阅模式 Fanout Exchange
解读:
- 1个生产者,多个消费者
- 每一个消费者都有自己的一个队列
- 生产者没有将消息直接发送到队列,而是发送到了交换机
- 每个队列都要绑定到交换机
- 生产者发送的消息,经过交换机,到达队列,实现,一个消息被多个消费者获取的目的
注意: 一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费
订阅模型中,通过交换机连接的多个队列之间的消息一致,区别在于消费者的不同,发送方的消息可以通过交换机将信息存储在各个队列中,消费者在对所属的队列进行消费,各个队列之间的消息互不影响.
生产者(看作是后台系统)
向交换机中发送消息。
注意 : 消息发送到没有队列绑定的交换机时,消息将丢失,因为,交换机没有存储消息的能力,消息只能存在在队列中。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
//定义所连接的交换机名称
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
for (int i = 0; i < 100; i++) {
// 消息内容
String message = "工作消息" + i;
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//睡眠时间逐渐递增
Thread.sleep(i * 10);
}
channel.close();
connection.close();
}
}
消费者1(看作是前台系统)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class RecvOne {
private final static String QUEUE_NAME = "test_queue_work1";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void start() throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("消费者1:" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
消费者2(看作是搜索系统)
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class RecvTwo {
private final static String QUEUE_NAME = "test_queue_work2";
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void start() throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("消费者2:" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
测试类:
import java.util.concurrent.*;
public class testMain {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(2);
//线程池的创建方式推荐使用ThreadPoolExecutor,这也是阿里官方推荐的方式,这样可以避免线程过多导致内存占用一直增加的问题
ExecutorService threadPool = new ThreadPoolExecutor(
2,
2,
1,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
threadPool.submit(()->{
try {
RecvOne.start();
//后续在此处进行业务处理
} catch (Exception e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
});
threadPool.submit(()->{
try {
RecvTwo.start();
//后续在此处进行业务处理
} catch (Exception e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
});
//关闭线程处理
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
//关闭线程池
threadPool.shutdown();
}
}
测试结果:
同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,只有其中一个消费者实例会消费到消息。
在管理工具中查看队列和交换机的绑定关系:
4.路由模式 Direct Exchange
路由模型就是通过配置,连接到交换机的路由名称进行匹配
交换机虽然可以连接多个队列,通过路由配置信息通过交换机后只会存储到对应路由路径的队列中
生产者:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
//定义所连接的交换机名称
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String message = "路由1 消息传递";
//配置消息所要送达的路径,通过这里的配置,该消息只会到”route1“路径下的队列中去
channel.basicPublish(EXCHANGE_NAME, "route1", null, message.getBytes());
System.out.println("[生产者] 生产"+message);
channel.close();
connection.close();
}
}
消费者1:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class RecvOne {
private final static String QUEUE_NAME = "test_queue_direct_1";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void start() throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
//设置绑定到交换机的路径为”route2“
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "route2");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("消费者1 只能接收到来route2中的消息:" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
消费者2:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class RecvTwo {
private final static String QUEUE_NAME = "test_queue_direct_2";
private final static String EXCHANGE_NAME = "test_exchange_direct";
public static void start() throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
//设置该队列包含路径 ”route1“、”route2“
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "route2");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "route1");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("消费者2 可以接收到来route1、route2中的消息:" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
测试:
可以看到 无论生产者怎么生产数据,由于路由的绑定,产品只能进入direct2中给消费者2消费。
生产者配置:
String message = "路由1 消息传递 ";
channel.basicPublish(EXCHANGE_NAME, "route1", null, message.getBytes());
System.out.println("[生产者] 生产"+message);
消费者1 配置:
// 绑定队列到交换机
//设置绑定到交换机的路径为”route2“
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "route2");
消费者2 配置:
// 绑定队列到交换机
//设置该队列包含路径 ”route1“、”route2“
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "route2");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "route1");
5.主题模式(通配符模式)Topic Exchange
主题模式:与路由模式类似,只不过是在路由模式的基础上,对路由匹配进行了解析配置
同一个消息被多个消费者获取。一个消费者队列可以有多个消费者实例,但是在单个队列中只有其中一个消费者实例会消费到消息。
生产者:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Send {
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void main(String[] argv) throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明exchange
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
// 消息内容
String message = "Hello World!!";
channel.basicPublish(EXCHANGE_NAME, "routekey.1", null, message.getBytes());
System.out.println(" [生产者] Sent '" + message + "'");
channel.close();
connection.close();
}
}
消费者1:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class RecvOne {
private final static String QUEUE_NAME = "test_queue_topic_work_1";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void start() throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "routekey.*");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [消费者1] 获得 ' " + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
消费者2:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
public class RecvTwo {
private final static String QUEUE_NAME = "test_queue_topic_work_2";
private final static String EXCHANGE_NAME = "test_exchange_topic";
public static void start() throws Exception {
// 获取到连接以及mq通道
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 绑定队列到交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "*.*");
// 同一时刻服务器只会发一条消息给消费者
channel.basicQos(1);
// 定义队列的消费者
QueueingConsumer consumer = new QueueingConsumer(channel);
// 监听队列,手动返回完成
channel.basicConsume(QUEUE_NAME, false, consumer);
// 获取消息
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [消费者2] 获得 '" + message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}
多线程启动测试类:
import java.util.concurrent.*;
public class testMain {
public static void main(String[] args) {
CountDownLatch countDownLatch = new CountDownLatch(2);
//线程池的创建方式推荐使用ThreadPoolExecutor,这也是阿里官方推荐的方式,这样可以避免线程过多导致内存占用一直增加的问题
ExecutorService threadPool = new ThreadPoolExecutor(
2,
2,
1,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
threadPool.submit(()->{
try {
RecvOne.start();
//后续在此处进行业务处理
} catch (Exception e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
});
threadPool.submit(()->{
try {
RecvTwo.start();
//后续在此处进行业务处理
} catch (Exception e) {
e.printStackTrace();
}finally {
countDownLatch.countDown();
}
});
//关闭线程处理
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
//关闭线程池
threadPool.shutdown();
}
}
测试结果:
可以看到消费者1、消费者2都通过配置的路由,匹配到了对应的路径队列,拿到了消息
Springboot集成RabbitMQ
- springboot集成RabbitMQ非常简单,如果只是简单的使用配置非常少,springboot提供了spring-boot-starter-amqp对消息各种支持。
配置pom文件,主要是添加spring-boot-starter-amqp的支持
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置application.properties文件
配置rabbitmq的安装地址、端口以及账户信息
spring.application.name=spirng-boot-rabbitmq
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
1.简单队列
配置队列
package com.zpc.rabbitmq;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitConfig {
@Bean
public Queue queue() {
return new Queue("q_hello");
}
}
** 发送者**
package com.zpc.rabbitmq;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.text.SimpleDateFormat;
import java.util.Date;
@Component
public class HelloSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
String context = "hello " + date;
System.out.println("Sender : " + context);
//简单对列的情况下routingKey即为Q名
this.rabbitTemplate.convertAndSend("q_hello", context);
}
}
接收者
package com.zpc.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "q_hello")
public class HelloReceiver {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver : " + hello);
}
}
测试
package com.zpc.rabbitmq;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitMqHelloTest {
@Autowired
private HelloSender helloSender;
@Test
public void hello() throws Exception {
helloSender.send();
}
}
2.多对多使用(Work模式)
注册两个Receiver:
package com.zpc.rabbitmq;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "q_hello")
public class HelloReceiver2 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver2 : " + hello);
}
}
@Test
public void oneToMany() throws Exception {
for (int i=0;i<100;i++){
helloSender.send(i);
Thread.sleep(300);
}
}
public void send(int i) {
String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());//24小时制
String context = "hello " + i + " " + date;
System.out.println("Sender : " + context);
//简单对列的情况下routingKey即为Q名
this.rabbitTemplate.convertAndSend("q_hello", context);
}
3.Topic Exchange(主题模式)
- topic 是RabbitMQ中最灵活的一种方式,可以根据routing_key自由的绑定不同的队列
首先对topic规则配置,这里使用两个队列(消费者)来演示。
配置队列,绑定交换机
package com.zpc.rabbitmq.topic;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class TopicRabbitConfig {
final static String message = "q_topic_message";
final static String messages = "q_topic_messages";
@Bean
public Queue queueMessage() {
return new Queue(TopicRabbitConfig.message);
}
@Bean
public Queue queueMessages() {
return new Queue(TopicRabbitConfig.messages);
}
/**
* 声明一个Topic类型的交换机
* @return
*/
@Bean
TopicExchange exchange() {
return new TopicExchange("mybootexchange");
}
/**
* 绑定Q到交换机,并且指定routingKey
* @param queueMessage
* @param exchange
* @return
*/
@Bean
Binding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {
return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
}
@Bean
Binding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {
return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
}
}
创建2个消费者
q_topic_message 和q_topic_messages
package com.zpc.rabbitmq.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "q_topic_message")
public class Receiver1 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver1 : " + hello);
}
}
package com.zpc.rabbitmq.topic;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "q_topic_messages")
public class Receiver2 {
@RabbitHandler
public void process(String hello) {
System.out.println("Receiver2 : " + hello);
}
}
消息发送者(生产者)
package com.zpc.rabbitmq.topic;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MsgSender {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send1() {
String context = "hi, i am message 1";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("mybootexchange", "topic.message", context);
}
public void send2() {
String context = "hi, i am messages 2";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("mybootexchange", "topic.messages", context);
}
}
send1方法会匹配到topic.#和topic.message,两个Receiver都可以收到消息,发送send2只有topic.#可以匹配所有只有Receiver2监听到消息。
测试
package com.zpc.rabbitmq.topic;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitTopicTest {
@Autowired
private MsgSender msgSender;
@Test
public void send1() throws Exception {
msgSender.send1();
}
@Test
public void send2() throws Exception {
msgSender.send2();
}
}
4.Fanout Exchange(订阅模式)
- Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout交换机发送消息,绑定了这个交换机的所有队列都收到这个消息。
1)配置队列,绑定交换机
package com.zpc.rabbitmq.fanout;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutRabbitConfig {
@Bean
public Queue aMessage() {
return new Queue("q_fanout_A");
}
@Bean
public Queue bMessage() {
return new Queue("q_fanout_B");
}
@Bean
public Queue cMessage() {
return new Queue("q_fanout_C");
}
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange("mybootfanoutExchange");
}
@Bean
Binding bindingExchangeA(Queue aMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(aMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeB(Queue bMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(bMessage).to(fanoutExchange);
}
@Bean
Binding bindingExchangeC(Queue cMessage, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(cMessage).to(fanoutExchange);
}
}
2)创建3个消费者
package com.zpc.rabbitmq.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "q_fanout_A")
public class ReceiverA {
@RabbitHandler
public void process(String hello) {
System.out.println("AReceiver : " + hello + "/n");
}
}
package com.zpc.rabbitmq.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "q_fanout_B")
public class ReceiverB {
@RabbitHandler
public void process(String hello) {
System.out.println("BReceiver : " + hello + "/n");
}
}
package com.zpc.rabbitmq.fanout;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(queues = "q_fanout_C")
public class ReceiverC {
@RabbitHandler
public void process(String hello) {
System.out.println("CReceiver : " + hello + "/n");
}
}
3)生产者
package com.zpc.rabbitmq.fanout;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MsgSenderFanout {
@Autowired
private AmqpTemplate rabbitTemplate;
public void send() {
String context = "hi, fanout msg ";
System.out.println("Sender : " + context);
this.rabbitTemplate.convertAndSend("mybootfanoutExchange","", context);
}
}
4)测试
package com.zpc.rabbitmq.fanout;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitFanoutTest {
@Autowired
private MsgSenderFanout msgSender;
@Test
public void send1() throws Exception {
msgSender.send();
}
}
结果如下:
三个消费者都收到消息:
AReceiver : hi, fanout msg
CReceiver : hi, fanout msg
BReceiver : hi, fanout msg