RabbitMQ - 安装和使用
- 一. 安装
- 二. RabbitMQ的简单使用
- 2.1 创建交换机
- 2.1.1 交换机类型
- 2.1.2 持久化方式
- 2.2 创建队列
- 2.3 绑定交换机和队列
- 2.4 SpringBoot整合
- 2.5 另外一种监听写法
一. 安装
一键安装:
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management
相关端口讲解:
- 4396:
EPMD
与节点间相互通信的端口。 - 5671,5672:
AMQP
协议的端口,基于此进行消息传递。 - 15671,15672:15672端口,用于访问
RabbitMQ
的Web
管理界面。 - 25672:用于节点间和
CLI
工具通信的端口。
运行之后,效果如下:找不到之后它会自动去拉镜像并且运行。
再通过docker ps
查看容器的运行情况:
运行完毕之后,访问http://IP:15672
就能访问Web
控制页面,默认的账号密码都是:guest
登录后界面如下:
二. RabbitMQ的简单使用
先来说下RabbitMQ
发送一条消息的一个大致流程:
- 生产者将消息发送到一个指定的交换机。并且可以指定一个路由
routing key
。 - 一个交换机可以有多个队列进行绑定。交换机根据
routing key
,决定将消息发送到哪一个队列上。 - 消费者监听某个队列,如果队列中有消息,则开始消费。
2.1 创建交换机
从上面我们可以得知,RabbitMQ
消息传递的时候,并不是直接将信息发送到队列中,而是发送给交换机。交换机相当于一个交通枢纽,决定了将接收到的消息发送到哪个队列中。
控制台上创建一个交换机的操作顺序如下:bulletPreProcessor-exchange
其中,有几个属性做一个基本介绍:
2.1.1 交换机类型
Type
:交换机的类型,有四种。
第一种:直连交换机:Direct exchange
。
特点:
- 一个队列和交换机绑定的同时,还会绑定一个
routing_key
。 - 生产者发送消息给交换机的同时,指定一个路由
key
,交换机就会将消息发送给指定的routing_key
。
第二种:扇形交换机:Fanout exchange
。
特点:
- 将接收到的消息发送给绑定到自己身上的所有队列。
- 无需处理路由,因此处理消息的速度最快。
第三种:主题交换机:Topic exchange
。相当于直连交换机的一种升华模式。直连交换机里面一般是指定一个唯一的routing_key
。而主题交换机则可以选取带有一定匹配规则的routing_key
。
例如,主题交换机绑定了以下Key
:*.#.*.....
。其中
#
:表示任意数量的单词。*
:表示一个单词。.
:每个部分的一个分割点。
那么满足对应的条件的routing_key
,都会接收对应的消息。例如一个消息的routing_key
为fast.rabbit.white
,那么包含这几个单词的队列都会接收到这条消息:
fast、white、rabbit
。
第四种:首部交换机:Headers exchange
。特点:
- 不再使用
routing_key
作为路由键。而是使用Headers
信息来进行交换。类似于HTTP
请求头。 - 绑定交换机和队列的时候,要求
Hash
结构携带一个名为x-match
的键。 value
可以是any
(任意匹配一个)或者all
(全部匹配)。
2.1.2 持久化方式
持久化方式有两种:
Durable
:持久化到磁盘。Transient
:不持久化。
2.2 创建队列
创建一个名为originBullet-queue
的队列:
2.3 绑定交换机和队列
点击我们刚创建的交换机:
然后点击Bindings
,添加一个绑定关系。指定我们的routing_key
为 bullet.originMessage
.
结果如下:
2.4 SpringBoot整合
1.pom
依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.RabbitMQ
相关配置:
spring:
application:
name: tv-service-bulletCurtain
rabbitmq:
username: guest
password: guest
# 虚拟主机,默认是/
virtual-host: /
# 超时时间
connection-timeout: 30000
listener:
simple:
# 消费模式,手动
acknowledge-mode: manual
# 并发数
concurrency: 1
# 最大并发数
max-concurrency: 1
# 限流
prefetch: 1
addresses: 你的IP地址:5672
3.RabbitMQ
配置类:
@Configuration
public class RabbitMQConfig {
@Bean
public Queue TestDirectQueue() {
return new Queue("originBullet-queue", true);
}
@Bean
DirectExchange TestDirectExchange() {
return new DirectExchange("bulletPreProcessor-exchange", true, false);
}
@Bean
Binding bindingDirect() {
return BindingBuilder.bind(TestDirectQueue()).to(TestDirectExchange()).with("bullet.originMessage");
}
}
4.生产者OriginMessageSender
:(SendMessageEntity
是我外部传入的对象,Demo
可以自己随便传)
@Component
public class OriginMessageSender {
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(SendMessageEntity sendMessageEntity) {
CorrelationData correlationData = new CorrelationData();
correlationData.setId(UUID.randomUUID().toString());// 唯一ID
Map<String, Object> map = new HashMap<>();
map.put("message", JSONObject.toJSONString(sendMessageEntity));
rabbitTemplate.convertAndSend("bulletPreProcessor-exchange",// 交换机名称
"bullet.originMessage",// 路由Key
map, correlationData);
}
}
5.消费者OriginMessageReceiver
:
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.Map;
/**
* @author Zong0915
* @date 2022/12/9 下午8:35
*/
@Component
@Slf4j
public class OriginMessageReceiver {
@RabbitListener(queues = "originBullet-queue")
@RabbitHandler
public void onOriginMessage(@Payload Map testMessage, @Headers Map<String, Object> headers, Channel channel) throws IOException {
log.info("***********消费开始*************");
log.info("消费体:{}", JSONObject.toJSONString(testMessage));
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
}
}
测试如下:(我这里整合了WebSocket
,发送弹幕的时候顺便发了个Q)
生产者发送消息给交换机:
消费者消费:
到这里一个简单的生产消费过程就结束了。
2.5 另外一种监听写法
如果不使用RabbitMQConfig
来配置,我们可以在监听的时候指定对应的交换机和队列以及路由,消费者写法如下:
@RabbitListener(
bindings = @QueueBinding(
value = @Queue(value = "originBullet-queue", durable = "true"),
exchange = @Exchange(name = "bulletPreProcessor-exchange", type = "direct"),
key = "bullet.originMessage"
)
)
@RabbitHandler
public void onOriginMessage(@Payload Map testMessage, @Headers Map<String, Object> headers, Channel channel) throws IOException {
log.info("***********消费开始*************");
log.info("消费体:{}", JSONObject.toJSONString(testMessage));
Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
channel.basicAck(deliveryTag, false);
}