文章目录
- 一、同步通讯与异步通讯
- 1、同步调用的优缺点
- 2、异步调用的优缺点
- 二、RabbitMQ
- 1、MQ消息队列
- 2、RabbitMQ的安装
- 3、RabbitMQ的结构和概念
- 4、RabbitMQ的消息模型
- 5、入门案例
一、同步通讯与异步通讯
同步通讯就像打视频,两边可以实时得到相关信息。异步通讯则像发消息,对方什么时候处理你这个消息不确定。
两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送消息可以同时与多个人收发邮件,但是往往响应会有延迟。
1、同步调用的优缺点
以购物场景为例,用户支付完成后,正常是要调用订单服务更新订单状态,调用仓储服务等。后来还有可能加短信通知、优惠券、积分功能等等
微服务间基于Feign的调用就属于同步方式。根据上面的示例,微服务之间同步调用存在的问题有:
总结就是:
同步调用的优点:
- 时效性较强,可以立即得到结果
同步调用的缺点:
- 耦合度高
- 性能和吞吐能力下降
- 有额外的资源消耗
- 有级联失败问题
2、异步调用的优缺点
异步调用常见实现就是事件驱动模式。如下图,和同步调用不一样的时,当用户支付完成,支付服务将信息传递给Broker以后,由Broker去通知后面的订单服务、发送短信服务等来干活儿,这几个服务什么时候干完活儿,支付服务已经不再关心了。
异步调用的优点有:
优势一:服务解耦
现在再增加模块,则不用再修改支付服务的代码,而只需在新服务中订阅Broker中的事件即可。不关支付服务什么事,解决了服务和服务的耦合问题。
某一天要砍掉发短信的需求,如果是同步调用,则要去改支付服务的代码,异步下则只需让短信服务取消订阅Broker即可。
优势二:性能提升,吞吐量提高
不同于同步调用,异步模式下,当支付服务将消息成功发送给broker后,即可返回支付成功。总耗时为支付服务自身时间+发送事件的时间
优势三:服务没有强依赖,不担心级联失败问题
异步调用下,假设后面某个服务挂了,前面的支付服务也不受影响,因为二者之间没有调用关系,做到了故障隔离。挂了的服务重启后再去Broker拿消息就是了
优势四:流量削峰
当大量用户请求服务时,Broker就像一个拦在洪水前的大坝,保护着后面的服务不受影响,这些服务仍然按照它们的并发能力从Broker中拿消息并处理请求即可。
异步通信的优点:
- 耦合度低
- 吞吐量提升
- 故障隔离
- 流量削峰
异步通信的缺点:
- 依赖于Broker的可靠性、安全性、吞吐能力
- 架构复杂了,业务没有明显的流程线,不好追踪管理
最后,实际开发中,到底是同步调用还是异步调用,看具体需要什么,需要信息的时效性,则同步调用,需要的是高并发,则异步调用。
二、RabbitMQ
1、MQ消息队列
上面实现微服务的异步调用,中间最关键的就是Broker。而MQ就是对它的一种实现和落地。
MQ (MessageQueue),中文是消息队列,简单说就是存放消息的队列。
而消息队列的实现产品,主流的有以下四种:
2、RabbitMQ的安装
RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:https://www.rabbitmq.com/
接下来使用docker部署RabbitMQ:
# 拉取镜像
docker pull rabbitmq:3-management
# 或者使用tar镜像包,docker load -i也行
docker run \
-e RABBITMQ_DEFAULT_USER=root \
-e RABBITMQ_DEFAULT_PASS=root123 \
--name mq \
--hostname mql \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
–hostname主机名,不做集群部署的话加不加都行。开放的两个端口中,15672是Rabbit MQ的一个可视化管理页面端口,5672是做消息通信的一个端口。
访问虚拟机IP:15672进入控制台
3、RabbitMQ的结构和概念
- publisher:生产者
- consumer:消费者
- exchange:交换机,负责将消息路由到队列
- queue:队列,存储消息
- virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离。每个用户应该有一个自己的虚拟主机
4、RabbitMQ的消息模型
RabbitMQ有五种常见的消息模型。可分为这三大类:
➢ 基本消息队列(BasicQueue)
➢ 工作消息队列(WorkQueue)
➢ 发布订阅(Publish、Subscribe),又根据交换机类型不同分为三种:
- Fanout Exchange:广播
- Direct Exchange:路由
- Topic Exchange:主题
5、入门案例
接下来以最简单的基本消息队列来做一个演示:
这里有三个角色:
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接受并缓存消息
- consumer:订阅队列,处理队列中的消息
接下来打开一个demo工程:
其中:
- mq-demo:父工程,管理项目依赖
- publisher:消息的发送者
- consumer:消息的消费者
publisher部分主要为:
- 建立连接
- 创建Channel
- 声明队列
- 发送消息
- 关闭连接和channel
测试代码如下:
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.150.101");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("root123");
// 2.建立连接
Connection connection = factory.newConnection();
// 3.创建通道Channel
Channel channel = connection.createChannel();
// 4.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 5.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 6.关闭通道和连接
channel.close();
connection.close();
}
}
此时,查看控制台,可以看到消息发送成功。到此,发送消息的微服务任务已经完成,解耦合的!
consumer部分主要为:
- 建立连接
- 创建Channel
- 声明队列
- 订阅消息
测试代码为:
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.创建建立连接
ConnectionFactory factory = new ConnectionFactory();
// 设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("192.168.150.101");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("root");
factory.setPassword("root123");
// 2.建立连接
Connection connection = factory.newConnection();
// 3.创建通道Channel
Channel channel = connection.createChannel();
// 4.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 5.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){ //内部类
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 6.处理消息
String message = new String(body); //字节再转成字符串
System.out.println("接收到消息:【" + message + "】"); //这里写拿到消息后要做的事
}
});
System.out.println("等待接收消息。。。。");
}
}
此时消息队列中数据没有了:
为什么生产者创建了队列,但在消费者中还要创建?
因为生产者和消费者所在的服务谁先启动不一定,不在消费者中创建一次的话。如果消费者先启动,则找不到队列。而如果生产者先启动,此时哪怕消费者用有创建队列的代码,也不会去重复创建。
总结:
基本消息队列的消息发送流程:
➢ 建立connection
➢ 创建channel
➢ 利用channel声明队列
➢ 利用channel向队列发送消息
基本消息队列的消息接收流程:
➢ 建立connection
➢ 创建channel
➢ 利用channel声明队列
➢ 定义consumer的消费行为handleDelivery()
➢ 利用channel将消费者与队列绑定
生产者发消息不用等待消费者,消费者处理消息也不用等待生产者。consumer要定义消费行为,也就是一个回调函数handleDelivery() 。利用通道将消费者和队列绑定后,将来一旦有消息,就会投递给这个回调函数去执行。
订阅消息也就是把队列和回调函数绑定一下。绑定完成后继续执行,等有消息投递进来了,再回调这个函数来处理。回调函数里写的则是处理消息的逻辑。