文章目录
- 1.MQ的基本概念
- 2.常见的MQ产品
- 3.MQ 的优势和劣势
- 3.1 优势
- 3.2 劣势
- 4.RabbitMQ简介
- 4.1RabbitMQ 中的相关概念
1.MQ的基本概念
MQ全称 Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。
一般我们的分布式系统有两种方式进行通信:
2.常见的MQ产品
目前业界有很多的 MQ 产品,例如 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用 Redis 充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及 MQ 产品特征,综合考虑。
3.MQ 的优势和劣势
3.1 优势
- 应用解耦
- 异步提速
- 异步提速
3.2 劣势
- 系统可用性降低
- 系统复杂度提高
- 一致性问题
4.RabbitMQ简介
AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。
4.1RabbitMQ 中的相关概念
根据实际开发代码我们来展开分析,下面是RabbitMQ 生产者Java简单例子:
-
factory.setVirtualHost(“/”)
Virtual host,出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等 -
Channel channel = conn.createChannel()
创建了通道,Channel,如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销 -
EXCHANGE_NAME = “direct_exchanger”
Exchange,message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe) and fanout (multicast) -
routingKey
-
bindingKey
-
RabbitMQ的模式
队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key);
消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey;
Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing
Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息。
-
图解:
P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
C1:消费者,其所在队列指定了需要routing key 为 error 的消息
C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息 -
我们先分析Routing模式(路由模式)
Routing模式中,消息会根据指定的路由键被发送到对应的队列中。接收消息的消费者需要绑定相同的交换机和路由键来接收消息。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 消息生产者
*/
public class MyProducer {
private final static String EXCHANGE_NAME = "direct_exchanger";
// 发送MQ消息
public static void SendMQMsg() throws Exception{
ConnectionFactory factory = new ConnectionFactory();
// 连接IP
factory.setHost("10.0.1.102");
// 连接端口
factory.setPort(5672);
// Virtual host
factory.setVirtualHost("/");
// 用户
factory.setUsername("admin");
factory.setPassword("123456");
// 建立连接
Connection conn = factory.newConnection();
// 创建消息通道
Channel channel = conn.createChannel();
// 发送消息
String msg = "{\"msg\":\"Hello world Rabbit MQ\"}";
// String exchange, String routingKey, BasicProperties props, byte[] body
channel.basicPublish(EXCHANGE_NAME, "MAC2", null, msg.getBytes());
channel.close();
conn.close();
}
}
参考:
https://blog.csdn.net/weixin_44009447/article/details/111224460
https://blog.csdn.net/zyb18507175502/article/details/127504610