服务通信
分布式系统通信两种方式:
- 直接远程调用(同步)
- 借助第三方间接通信(异步)
同步通讯的问题
Feign就属于同步通讯。存在的如下问题
- 耦合度高,每次添加新的模块就要修改原有模块的代码
- 性能下降,调用者需要等待服务者返回的结果,如果调用链过长,则响应的时间越长
- 资源浪费,在等待的过程中,不会释放CPU与内存资源,在高并发的场景下占用浪费资源过大
- 级联失败,当调用链中一个服务宕机,那么调用者也会出现问题。
异步调用方案
异步调用常见的实现方式为事件驱动模式
事件驱动模式优点:
- 服务解耦,添加模块不需要更改其他服务的代码
- 性能提升,在用户请求的模块可以直接返回结果,不需要等待其他服务执行完毕后再返回结果
- 服务没有强依赖关系,一个服务宕机不会影响到其他服务
- 流量削峰
缺点:
- 依赖了第三方组件,第三方组件需要保证可靠性、安全性、吞吐能力
- 架构复杂,业务没有明显流程线,不好追踪管理
- 一致性问题
MQ
MQ:Message Queue消息队列,是消息在传输的过程中保存消息的容器。多用于分布式系统之间进行通信
Kafka适用于数据量大但对数据安全性不高的场景比如说日志的传输
RabbitMQ与RocketMQ适用于对数据安全要求较高的场景,比如说业务之间的传输信息
满足什么条件才可以使用MQ?
- 生产者不需要从消费者处获取任何信息
- 容许短暂不一致性
- 使用MQ的效果收益大于管理MQ成本
RabbitMQ的下载
在虚拟机上启动dokcer服务后拉去rabbitmq镜像
systemctl start docker
docker pull rabbitmq
RabbitMQ的启动
docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
--name mq \
--hostname mql \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:latest
命令解释:
-e RABBITMQ_DEFAULT_USER=admin :指定登录账号为admin
-e RABBITMQ_DEFAULT_PASS=admin :指定登录密码为admin
--name mq :容器名为mq
--hostname mq1 主机名为mq1(做集群时使用,不添加也可以)
-p 15672:15672 端口映射
-p 5672:5672
-d 后台允许
rabbitmq:latest
访问15672端口输入密码登录
可能会遇到的问题
1、关闭防火墙后访问端口仍然无法访问15672端口
解决方法:
开启防火墙
systemctl start firewalld
开放端口
firewall-cmd --zone=public --add-port=15672/tcp --permanent
重新加载配置文件
firewall-cmd --reload
2、即使开放了端口15672也无法访问页面
解决方法:
如果是docker拉取的rabbitmq镜像,需要手动进入容器下载rabbitmq的管理插件
进入容器
docker exec -it 容器名 bash
下载rabbitmq的管理插件
rabbitmq-plugins enable rabbitmq_management
修改配置文件
cd /etc/rabbitmq/conf.d/
echo management_agent.disable_metrics_collector = false > management_agent.disable_metrics_collector.conf
退出镜像
exit
重启rabbitmq
docker restart 容器名
RabbitMQ的结构与概念
RabbitMQ中的几个概念
- channel:操作MQ的工具
- exchange:路由消息到队列中
- queue:缓存消息
- virtualhost:虚拟主机,是对queue、exchange等资源的逻辑分组
常见消息模型
不使用交换机的
- 基本消息队列
- 工作消息队列
使用交换机的
- Fanout Exchange:广播
- Direct Exchange:路由
- Topic Exchange:主题
简单消息队列的实现
只存在三种角色:
- publisher:消息发布者,将消息发送到队列queue
- queue:消息队列,负责接受并缓存消息
- consumer:订阅队列,处理队列中的消息
示例代码:
引入依赖
<dependencies>
<!--rabbitMQ的Java客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.16.0</version>
</dependency>
</dependencies>
/**
* 发送消息方
*/
public class Producer_Hello {
public static void main(String[] args) throws IOException, TimeoutException {
//1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2、设置参数
connectionFactory.setHost("192.168.116.131");
connectionFactory.setPort(5672);//默认也是5672
connectionFactory.setVirtualHost("/");//设置虚拟机 默认值是/
connectionFactory.setUsername("admin");//默认值是guest
connectionFactory.setPassword("admin");//默认值是guest
//3、创建连接Connection
Connection connection = connectionFactory.newConnection();
//4、创建Channel
Channel channel = connection.createChannel();
//5、创建队列Queue
/**
* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
* 参数
* 1.queue:队列名称
* 2.durable:是否持久化
* 3.exclusive:
* *是否独占。只能有一个消费者监听这队列当
* *Connection关闭时,是否删除队列全
* 4.autoDelete:是否自动删除。当没有Consumer时,自动删除掉
* 5.arguments:参数。
*/
//如果没有一个交helloWorld的队列,那么会自动创建一个
channel.queueDeclare("hello_World",true,false,false,null);
//6、发送消息
/**
* String exchange, String routingKey, BasicProperties props, byte[] body
* 1、exchange交换机(简单模式下不会使用交换机,默认使用"")
* 2、routingKey:路由名称
* 3、props:配置信息
* 4、body:发送消息数据
*/
String body="Hello";
channel.basicPublish("","hello_World",null,body.getBytes());
//7、释放资源
channel.close();
connection.close();
}
}
首先看到目前没有连接
打断点启动
当Connection connection = connectionFactory.newConnection()运行结束后。查看控制台连接信息
接下来启动消费者
public class Consumer_Hello {
public static void main(String[] args) throws Exception {
//1、创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2、设置参数
connectionFactory.setHost("192.168.116.131");
connectionFactory.setPort(5672);//默认也是5672
connectionFactory.setVirtualHost("/");//设置虚拟机 默认值是/
connectionFactory.setUsername("admin");//默认值是guest
connectionFactory.setPassword("admin");//默认值是guest
//3、创建连接Connection
Connection connection = connectionFactory.newConnection();
//4、创建Channel
Channel channel = connection.createChannel();
//5、创建队列Queue
/**
* String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
* 参数
* 1.queue:队列名称
* 2.durable:是否持久化,当mq重启之后,还在
* 3.exclusive:
* *是否独占。只能有一个消费者监听这队列当
* *Connection关闭时,是否删除队列全
* 4.autoDelete:是否自动删除。当没有Consumer时,自动删除掉
* 5.arguments:参数。
*/
//如果没有一个交helloWorld的队列,那么会自动创建一个
channel.queueDeclare("hello_World",true,false,false,null);
/**
* String queue, boolean autoAck, Consumer callback
* queue:队列名称
* autoAck:是否自动确认
* callback:回调对象
*/
Consumer consumer =new DefaultConsumer(channel){
/*
回调方法,收到消息后,自动执行
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag:"+consumerTag);
System.out.println("envelope:"+envelope.getExchange());
System.out.println("properties:"+properties);
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("hello_World",true,consumer);
//消费者需要监听因此不需要关闭资源
}
}
生产者与消费者都需要声明队列是为了避免队列不存在的情况
SpringAMQP的使用
AMQP是用于在应用程序或之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。而SpringAMQP是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现
生产者实现
引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在application.yml配置如下信息
spring:
rabbitmq:
host: 192.168.116.131
port: 5672
username: admin
password: admin
virtual-host: /
编写测试类
@SpringBootTest
@RunWith(SpringRunner.class)
public class test {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSend2SimpleQueue() throws Exception {
String queueName ="hello";
String message = "hello, spring amqp";
rabbitTemplate.convertAndSend(queueName,message);
}
}
运行测试观察rabbit控制台
消费者实现
引入依赖和配置相关信息与消费者相同,不同的是,编写一个监听器去监听队列
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "hello")
public void listenSimpleQueueMessage(String msg){
System.out.println("接收到消息:"+msg);
}
}
启动引导类观察控制台
Work Queue工作队列
提高消息处理速度, 避免消息的堆积问题
案例实现:
生产者1秒内生产50条消息
@Test
public void testWorkQueueSendMessage() throws Exception {
String queueName ="hello";
String message = "hello, spring amqp__";
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend(queueName,message+i);
Thread.sleep(20);
}
}
而消费者代码如下
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "hello")
public void listenWorkQueueMessage1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:"+msg);
Thread.sleep(30);
}
@RabbitListener(queues = "hello")
public void listenWorkQueueMessage2(String msg) throws InterruptedException {
System.out.println("====消费者2接收到消息:"+msg);
Thread.sleep(50);
}
}
运行结果如下
可以看到每个消费者各处理25条,消费者1处理更快处理结束不会去处理更多的消息而是等待消费者2处理结束。
这种情况是因为Rabbit中存在消息预取的行为,当消息处理前会从Channel中提前拿去一部分消息(类似于轮询平均分配)后再去处理,当我们希望处理更快的设备能够读取更多的消息时,我们可以设置消息预取限制。在application.yml文件中添加如下配置
spring:
rabbitmq:
host: 192.168.116.131
port: 5672
username: admin
password: admin
virtual-host: /
listener:
simple:
prefetch: 1 #每次最多获取一条消息,处理完成后才能获取下一条消息
修改完后再次执行观察控制台
可以看到消费能力更强的处理消息更多。
工作队列模式应用于任务过重或任务过多的场景(比如说发送短信)
发布订阅模式
前两种模式只是将消息发送给一个消费者,而发布订阅模式可以将消息发送给多个消费者。实现方式是加入了exchange(交换机)。exchange只负责路由,不负责存储。路由失败则消息丢失。
Fanout交换机(广播模式)
@Configuration
public class FanoutConfig {
//声明交换机
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("fanoutExchange");
}
//声明队列
@Bean
public Queue queue1(){
return new Queue("fanoutQueue1");
}
@Bean
public Queue queue2(){
return new Queue("fanoutQueue2");
}
//声明绑定关系
@Bean
public Binding binding1(Queue queue1,FanoutExchange exchange){
return BindingBuilder
.bind(queue1)
.to(exchange);
}
@Bean
public Binding binding2(Queue queue2,FanoutExchange exchange){
return BindingBuilder
.bind(queue2)
.to(exchange);
}
}
重启消费者观察Rabbit控制台
编写监听器
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "fanoutQueue1")
public void listenFanoutQueueMessage1(String msg){
System.out.println("从队列queue1中获取到消息:"+msg);
}
@RabbitListener(queues = "fanoutQueue2")
public void listenFanoutQueueMessage2(String msg){
System.out.println("从队列queue2中获取到消息:"+msg);
}
}
编写生产者测试类
@Test
public void testFanoutQueueSendMessage() throws Exception {
String exchangeName = "fanoutExchange";
String message = "hello, fanout";
rabbitTemplate.convertAndSend(exchangeName,"",message);
}
启动观察Rabbit控制台
Direct交换机(路由模式)
- 每一个Queue都与Exchange设置一个BindingKey
- 发布者发送消息时,指定消息的RoutingKey
- Exchange将消息路由到BindingKey与消息RoutingKey一致的队列
案例实现
如果和Fanout模式一样去声明绑定关系的话,会比较麻烦,编写代码较多,我们可以采用注解的方式去声明绑定关系。
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(value = @Queue("directQueue1")
,exchange = @Exchange(value = "directExchange"
,type = ExchangeTypes.DIRECT),
key = {"red","blue"}))
public void listenDirectQueueMessage1(String msg){
System.out.println("从队列queue1中获取到消息:"+msg);
}
@RabbitListener(bindings = @QueueBinding(value = @Queue("directQueue2")
,exchange = @Exchange(value = "directExchange"
,type = ExchangeTypes.DIRECT),
key = {"red","yellow"}))
public void listenDirectQueueMessage2(String msg){
System.out.println("从队列queue1中获取到消息:"+msg);
}
}
运行消费者后观察Rabbit控制台
编写生产者测试类代码
@Test
public void testDirectQueueSendMessage() throws Exception {
String exchangeName = "directExchange";
String message = "hello, direct";
rabbitTemplate.convertAndSend(exchangeName, "blue", message);
rabbitTemplate.convertAndSend(exchangeName, "red", message + " red");
rabbitTemplate.convertAndSend(exchangeName, "yellow", message + " yellow");
}
运行观察控制台
TopicExchange(话题模式)
案例实现
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue("topicQueue1"),
exchange = @Exchange(value = "topicExchange",type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueueMessage1(String msg){
System.out.println("从中国话题队列中获取到消息:"+msg);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue("topicQueue2"),
exchange = @Exchange(value = "topicExchange",type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueueMessage2(String msg){
System.out.println("从新闻话题队列中获取到消息:"+msg);
}
}
运行观察Rabbit控制台
编写生产者测试类代码
@Test
public void testTopicQueueSendMessage() throws Exception {
String exchangeName = "topicExchange";
String message = "hello, topic";
rabbitTemplate.convertAndSend(exchangeName, "china.news", message+" 中国新闻");
rabbitTemplate.convertAndSend(exchangeName, "china.#", message + "晴朗");
rabbitTemplate.convertAndSend(exchangeName, "#.news", message + "战争");
}
运行观察Rabbit控制台
发送三条消息但共有5条消息
消息转换器
在简单消息队列的实现中,我们发送消息发送的是字节数组。但是接收的消息反而是String类型的字符。那是因为。Spring中对消息的处理是由org.springframework.amqp.support.converter.MessageConverter处理默认使用SimpleMessageConverter来实现序列化(基于JDK的ObjectOutputStream实现)
进行一个测试,创建一个object.queue队列,发送一个Map类型的数据
@Test
public void testSendObject() throws Exception {
Map<String, Object> map = new HashMap<>();
map.put("name","zmbwcx");
String queueName = "object.queue";
rabbitTemplate.convertAndSend(queueName,map);
}
观察Rabbit控制台
消息内容被JDK序列化为上图内容,这种序列化方式不安全且占用内存更大。增加了传输成本。
我们可以修改为JSON的序列化方式,具体操作如下
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
</dependency>
@Bean
public MessageConverter jsonMessageConverter(){
return new Jackson2JsonMessageConverter();
}
重新发送一条消息,观察Rabbit控制台
生产者与消费者应该使用同一个消息转换器,因此,消费者也应进行相同操作