同步通讯的优缺点:
优点:时效高,数据一致,过程简单
缺点:耦合度高。性能下降。CPU等待资源的浪费。级联失败。
2、异步通讯:异步调用常见的实现就是事件驱动模式
异步的优缺点:
优点:耦合度低 ,吞吐量提升 ,故障隔离(不在联机发生失败) ,流量削峰(Broker缓存事件,让其他服务慢慢来执行)
缺点:依赖于Broker的可靠性、安全性、吞吐量
架构复杂,没有明显的流程线,不好追踪管理
一般的项目都是使用同步通讯,因为需要拿到其他服务的返回结果,而异步通讯只是通知他有事情要做,而不需要他返回结果。
三、RabbitMq:
是一个开源的消息代理软件,广泛用于实现消息传递和异步处理。它基于消息队列的设计理念,允许不同的应用程序或服务之间进行通信,而无需它们直接相互依赖。
主要特点:
-
可靠性:RabbitMQ 提供消息确认机制,确保消息在处理过程中不会丢失。
-
灵活的路由:通过交换机和队列的配置,RabbitMQ 支持多种消息路由模式,包括点对点和发布/订阅。
-
多协议支持:支持多种消息协议,如 AMQP、STOMP 和 MQTT。
-
高可用性:通过集群和镜像队列功能,可以实现高可用性和负载均衡。
-
管理界面:提供用户友好的管理界面,便于监控和管理消息队列。
几种模式:
AMQP:
AMQP(Advanced Message Queuing Protocol)是一种开放标准的消息传递协议,旨在支持消息的可靠传递和异步通信。它允许不同的应用程序通过消息代理进行通信,无论这些应用程序使用的编程语言或平台如何。
AMQP 的主要特点包括:
- 消息队列:支持将消息存储在队列中,允许消费者异步处理消息。
- 发布/订阅模式:支持多种消息传递模式,包括点对点和发布/订阅。
- 可靠性:提供消息确认机制,确保消息不会丢失。
- 灵活性:支持不同的消息传递场景,适用于多种分布式系统。
- 跨平台:由于其开放性,AMQP 可以在不同的操作系统和语言之间进行互操
RabbitMQ的依赖,yml配置:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
rabbitmq:
host: 192.168.136.128 #地址
port: 5672 #端口
username: root #用户账号
password: root #用户密码
virtual-host: /
RabbitMQ:发送和接收:
发送
package org.example;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RabbitMQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendMessage()
{
rabbitTemplate.convertAndSend("simple.cc","hello,RabbitMq");
}
}
接收
package org.example.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class SimpleQueueListener {
@RabbitListener(queues = "simple.cc")
public void getMessage(String message)
{
log.info(message);
}
}
2、workqueue:
代码:workqueue有预取机制,当一个没有设置预取上限,那么两个消费者会均分消息,即使不能立即处理也会将消息拿到。当设置prefetch的值那么就会按这个上限那取一定数量的消息,将消息处理完成之后,再从队列中拿消息。
package org.example;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class RabbitMQTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void sendWorkQueue()
{
for (int i = 0; i < 50; i++) {
rabbitTemplate.convertAndSend("simple.cc","hello,--"+ i);
try {
Thread.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
}
package org.example.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class SimpleQueueListener {
//@RabbitListener(queues = "simple.cc")
//public void getMessage(String message)
//{
// log.info(message);
//}
@RabbitListener(queues = "simple.cc")
public void getWorkQueueMessage(String message)
{
log.info(message);
try {
Thread.sleep(20);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
@RabbitListener(queues = "simple.cc")
public void getWorkQueueMessages2(String message) {
log.error(message);
try {
Thread.sleep(100);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
3、发布订阅模式:
4、Fanout交换机:将消息路由到每一个队列,缺点是exchange只是转发消息,而不保存,如果路由的时候丢失消息那么消息就直接丢失了。
代码:
@RabbitListener(queues = "fanout.queue1")
public void getFanoutMessage(String message)
{
log.info(message);
}
@RabbitListener(queues = "fanout.queue2")
public void getFanoutMessage1(String message)
{
log.info(message);
}
package org.example.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
@Bean
public FanoutExchange fanoutExchange()
{
return new FanoutExchange("cc.fanout");
}
@Bean
public Queue fanoutQueue1()
{
return new Queue("fanout.queue1");
}
@Bean
public Queue fanoutQueue2()
{
return new Queue("fanout.queue2");
}
@Bean
public Binding binding(Queue fanoutQueue1,FanoutExchange fanoutExchange)
{
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
}
@Bean
public Binding binding2(Queue fanoutQueue2,FanoutExchange fanoutExchange)
{
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
}
}
@Test
public void sendFanoutMessage()
{
rabbitTemplate.convertAndSend("cc.fanout","","hello,fanout");
}
5、Direct:可以通过key与相应的queue绑定,绑定之后相应的key只能发送到相应的queue上。当然一个queue可以绑定多个key,那么就可以实现广播(fanout)
代码:
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "cc.direct",type = ExchangeTypes.DIRECT),
key = {"blue","red"}
))
public void getDirectMessage1(String message)
{
log.info(message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "cc.direct",type = ExchangeTypes.DIRECT),
key = {"yellow","red"}
))
public void getDirectMessage2(String message)
{
log.info(message);
}
@Test
public void sendFanoutMessage()
{
rabbitTemplate.convertAndSend("cc.direct","red","hello,direct");
rabbitTemplate.convertAndSend("cc.direct","blue","hello,direct,blue");
rabbitTemplate.convertAndSend("cc.direct","yellow","hello,direct,yellow");
}
6、Topic:通过通配符来匹配某一类消息,通过key的匹配来选择不需要的某一类消息
代码:
@Test
public void sendTopicMessage()
{
rabbitTemplate.convertAndSend("cc.topic","china.cc","hello,china,cc");
rabbitTemplate.convertAndSend("cc.topic","china.news","china,weather");
rabbitTemplate.convertAndSend("cc.topic","japan.news","japan,weather");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "cc.topic",type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void getTopicMessage1(String message)
{
log.info(message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "cc.topic",type = ExchangeTypes.TOPIC),
key = "*.news"
))
public void getTopicMessage2(String message)
{
log.info(message);
}
7、SpringAMQP:消息转换器,默认使用jdk的序列化
引依赖:
<!--json-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.14.0</version>
</dependency>
添加配置,覆盖原本的序列化方式:
@Bean
public MessageConverter messageConverter()
{
return new Jackson2JsonMessageConverter();
}