RabbitMq汇总
- 1.RabbitMq的传统实现方式
- 2.SpringAMQP简化RabbitMq开发
- 2.1 基本消息队列(BasicQueue)
- 2.2 工作消息队列(WorkQueue)
- 2.3 发布订阅 -- 广播(Fanout)
- 2.4 发布订阅 -- 路由(Direct)
- 2.5 发布订阅 -- 主题(Topic)
- 2.SpringAMQP声明交换机和队列
- 2.1 使用bean的方式声明交换机和队列
- 2.2 使用注解的方式声明交换机和队列
1.RabbitMq的传统实现方式
动手实现一个简单的消息队列
无论时发布消息还是消费消息,都要建立连接, 所以我们可以将这个步骤抽取出来
public class ConnectionUtil {
/**
* 建立与RabbitMQ的连接
* @return
* @throws Exception
*/
public static Connection getConnection() throws Exception {
// 定义连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置服务地址
factory.setHost("192.168.202.128");
// 端口
factory.setPort(5672);
// 设置账号信息,用户名、密码、vhost
factory.setVirtualHost("/");
factory.setUsername("itcast");
factory.setPassword("123321");
// 通过工厂获取连接
Connection connection = factory.newConnection();
return connection;
}
}
一、发布消息
- 建立连接
- 创建通道
- 创建队列
- 发送消息
- 关闭通道和连接
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
Connection connection = ConnectionUtil.getConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
}
二、订阅消息
- 建立连接
- 创建通道
- 创建队列
- 订阅消息
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
Connection connection = ConnectionUtil.getConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
2.SpringAMQP简化RabbitMq开发
一、引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
二、在发布者和消费者两端,都要配置MQ地址
SpringAMQP提供了配置来简化手动创建连接这一复杂的过程
spring:
rabbitmq:
host: 192.168.202.128 # 主机名
port: 5672 # 端口
virtual-host: / # 虚拟主机
username: itcast # 用户名
password: 123321 # 密码
三、简化发送消息
SpringAMQP提供了RabbitTemplate类来简化发送消息的步骤
@Autowired
private RabbitTemplate rabbitTemplate;
四、简化订阅消息
SpringAMQP提供了@RabbitListener注解来简化订阅消息的步骤
@RabbitListener(queues = "simple.queue")
public void listenMessage(String msg) throws InterruptedException {
}
2.1 基本消息队列(BasicQueue)
最基本的队列模型:一个生产者发送消息到一个队列,一个消费者从队列中取消息
实际开发中,我们通常事先在rabbitMq界面创建好队列,然后只要记住队列的名称
一、发布消息
注入RabbitTemplate
来简化操作, RabbitTemplate在执行convertAndSend方法时,会自动开启通道, 往指定名称的队列中发送消息, 并在方法结束后关闭连接和通道
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
二、订阅消息
使用@RabbitListener注解
实现对队列的订阅
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
总结:
基本消息队列模型中,在生产者和消费者之间,只有队列这一个媒介
- 生产者只要知道往哪个队列发送消息
- 消费者只要知道订阅哪个队列中的消息
2.2 工作消息队列(WorkQueue)
在基本消息队列(BasicQueue)中,我们只有一个消费者, 在工作消息队列模型下,我们可以设置多个消费者同时订阅一个队列
一、发布消息
实现和基本消息队列时是一样的,只要知道往哪个队列发送消息, 这里我们演示发送多条信息
@Test
public void testWorkQueue() throws InterruptedException {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, message_";
for (int i = 0; i < 20; i++) {
// 发送消息
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
二、订阅消息
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}
总结:
工作消息队列模型中,在生产者和消费者之间,只有队列这一个媒介(跟基本模型一样)
- 生产者只要知道往哪个队列发送消息
- 消费者只要知道订阅哪个队列中的消息(同一个队列)
2.3 发布订阅 – 广播(Fanout)
广播模式下,引入了一个新的概念:交换机
交换机是一个消息的中转站, 它可以实现广播、定向、通配符等不同形式的消息递交方式; 交换机只负责递交消息, 并不具备存储消息的能力, 消息最终存储媒介, 依旧是队列, 因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!
一、发布消息
现在就不是往队列中发消息了, 发送者只要知道往哪个交换机发送消息, 不用去关心交换机将消息转发给哪些队列
@Test
public void testFanoutExchange() {
// 队列名称
String exchangeName = "itcast.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
如果你想了解交换机会往哪些队列中发送消息,可以登录rabbitMq的界面,查看交换机详情, 里面会详细罗列当前交换机绑定的队列
二、订阅消息
消息的最终存储媒介,依旧是队列.消费者只要知道订阅哪个队列中的消息
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者1接收到Fanout消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者2接收到Fanout消息:【" + msg + "】");
}
总结:
- 生产者只要知道往哪个交换机发送消息, 不用去关心交换机将消息转发给哪些队列
- 消费者只要知道订阅哪个队列中的消息
2.4 发布订阅 – 路由(Direct)
之前我们学习了广播(Fanout), 在广播模式下, 只要往交换机发送消息, 那么交换机会将消息转发给所有绑定的队列, 而路由(Direct)又称为定向
交换机绑定了A、B两个队列,我们往交换机中发消息时, 最终希望交换机只把消息转发给B队列,这个过程即路由
一、发布消息
往交换机发消息的时候,需要指定交换机转发给哪个队列(拥有通用routingKey的队列), 此处我们设置的routingKey为red
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "itcast.direct";
// 消息
String message = "红色警报!日本乱排核废水,导致海洋生物变异,惊现哥斯拉!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
我们发现这个交换机绑定了两个队列, 每个队列都设置了两个routingKey, 其中都有red, 所以这两个队列都能收到消息
二、订阅消息
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg){
System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg){
System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}
总结:
- 生产者不仅要知道往哪个交换机发消息, 同时还要通过路由秘钥(routingKey)指定交换机将消息转发给哪些队列(拥有同样的路由秘钥,即可转发)
- 消费者只要知道订阅哪个队列中的消息(一直都没变过)
2.5 发布订阅 – 主题(Topic)
主题(Topic)是对路由(Direct)的一种补充, 我们希望某一个组的队列都能收到消息, 但是在rabbitMq中无法将队列编组, 有了主题(Topic)后,我们可以将这一组的队列的routingKey都设置为china.#
, 那只会Routingkey只要符合通配符规则, 例如china.jiangsu
、china.jiangsu.suzhou
这个组都可以接收到消息
#
:匹配一个或多个词
item.#:能够匹配item.spu.insert 或者 item.spu*
:匹配不多不少恰好1个词
item.*:只能匹配item.spu
一、发布消息
跟路由时一样, 交换机拿到routingKey后会去匹配对应的队列
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "itcast.topic";
// 消息
String message = "喜报!孙悟空大战哥斯拉,胜!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
当下判断两个队列都符合routingKey
二、订阅消息
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){
System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){
System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}
总结:
- 生产者不仅要知道往哪个交换机发消息, 同时还要通过路由秘钥(routingKey)指定交换机将消息转发给哪些队列(拥有同样的路由秘钥,即可转发)
- 交换机会根据routingKey来匹配符合条件的队列(这个过程是交换机来完成,所以我们不用关心)
- 消费者只要知道订阅哪个队列中的消息(一直都没变过)
2.SpringAMQP声明交换机和队列
2.1 使用bean的方式声明交换机和队列
启动项目后, 就会在rabbitMq中创建交换机和队列, 包括两者的绑定关系
package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
/**
* 声明交换机
* @return Fanout类型交换机
*/
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
/**
* 第1个队列
*/
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
/**
* 第2个队列
*/
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
2.2 使用注解的方式声明交换机和队列
Direct定向
基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。
在consumer的SpringRabbitListener中添加两个消费者,同时基于注解来声明队列和交换机:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}
Topic主题
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}