一、使用场景
削峰、解耦、异步。
基于AMQP(高级消息队列协议)协议来统一数据交互,通过channel(网络信道)传递信息。erlang语言开发,并发量12000,支持持久化,稳定性好,集群不支持动态扩展。
二、组成及工作流程
1.主要组成
Broker(消息代理): 消息队列服务进程,一个Broker以开多个虚拟主机(VirtualHost)。 VirtualHost(虚拟主机):虚拟主机,用于进行逻辑隔离,一个虚拟主机可以有若干个Exchange和Queue Exchange(交换机):消息队列交换机,按一定的规则将消息路由转发到某个队列。 Queue:消息队列。
2.工作流程
生产者发送消息流程: 1、和Broker建立TCP连接。 2、和Broker建立通道。 3、通过通道消息发送给Broker,由Exchange将消息进行转发。 4、Exchange将消息转发到指定的Queue(队列)。 消费者接收消息流程: 1、和Broker建立TCP连接 2、和Broker建立通道 3、监听指定的Queue(队列) 4、当有消息到达Queue时Broker默认将消息推送给消费者。 5、接收到消息。 6、ack回复。
三、交换机Exchange(默认direct)
交换机,接受消息,根据路由键发送消息到绑定的队列(不具备消息存储的能力)。
1.交换机种类
Direct: 单播直连交换机,Exchange将消息完全匹配路由键(routing key)的方式绑定消息,获取信息时也要匹配Exchange和路由键。
fanout: 广播式交换机(Publish/subscribe),不管消息的路由键(routing key),Exchange都会将消息转发给所有绑定的Queue。
topic: 主题交换机,工作方式类似于组播,Exchange会将消息转发和路由键(routing key)符合匹配模式的所有队列,如: routing_key为user.stock的Message会转发给绑定匹配模式为 *.stock 、user.stock* 、 #.user.stock.#的队列。(*表是匹配一个任意词组,#表示匹配0个或多个词组)。
headers: 头交换机,无Binding Key;当然也无Routing Key。根据发送的消息内容中的headers属性进行匹配。
2.交换机属性
Name:交换机名称 Durability:持久化标志,表明此交换机是否是持久化的 Auto-delete:删除标志,表明当所有队列在完成使用此exchange时,是否删除 Arguments:依赖代理本身。
3.交换机状态
持久(durable)
暂存(transient)
4.消息确认机制(ACK)
自动ACK:消息一旦被接收,消费者自动发送ACK。 手动ACK:消息接收后,不会发送ACK,需要手动调用。
四、rabbitmq 客户端的使用
1.引入依赖
<!-- rabbitmq 客户端依赖 -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
2.创建连接工具
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class MyRabbitMQUtils {
public static Connection getConnel() throws Exception{
//1 创建 ConnectionFactory
ConnectionFactory factory = new ConnectionFactory() ;
factory.setHost("127.0.0.1");
//端口
factory.setPort(5672);
//设置虚拟机,一个mq服务可以设置多个虚拟机,每个虚拟机就相当于一个独立的mq
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
factory.setAutomaticRecoveryEnabled(true);
factory.setNetworkRecoveryInterval(3000);
Connection connection = factory.newConnection();
// Channel channel = connection.createChannel();
return connection;
}
}
3.生产者
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class Producer {
// 交换机名称
private final static String EXCHANGE_NAME = "simple_exchange";
// 队列名称
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[]args) throws Exception{
Connection produceConnection = MyRabbitMQUtils.getConnel();
Channel produceChannel = produceConnection.createChannel();
// 建立交换机(广播)
produceChannel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT,true);
/*
* 1、queue 队列名称
* 2、durable 是否持久化
* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* 5、arguments 参数,可以设置一个队列的扩展参数,如:可设置存活时间
*/
produceChannel.queueDeclare(QUEUE_NAME, false, false, false, null);
/*
* 1、exchange,交换机,如果不指定将使用mq的默认交换机(设置为"")
* 2、routingKey,路由key,交换机根据路由key来将消息转发到指定的队列,如果使用默认交换机,routingKey设置为队列的名称
* 3、props,消息的属性
* 4、body,消息内容
*/
for(int i=0;i<10;i++){
String message = "生产者发布的消息---!";
message = message+i;
produceChannel.basicPublish(EXCHANGE_NAME, QUEUE_NAME, null, message.getBytes());
System.out.println(" Producer 发布'" + message + "'");
}
//关闭通道和连接(资源关闭最好用try-catch-finally语句处理)
produceChannel.close();
produceConnection.close();
}
}
4.消费者
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
public class Comsumer {
// 队列名称
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
Connection comsumerConnection = MyRabbitMQUtils.getConnel();
Channel comsumerChannel = comsumerConnection.createChannel();
/*
* 参数明细
* 1、queue 队列名称
* 2、durable 是否持久化
* 3、exclusive 是否独占连接,队列只允许在该连接中访问,如果connection连接关闭队列则自动删除,如果将此参数设置true可用于临时队列的创建
* 4、autoDelete 自动删除,队列不再使用时是否自动删除此队列,如果将此参数和exclusive参数设置为true就可以实现临时队列(队列不用了就自动删除)
* 5、arguments 参数,可以设置一个队列的扩展参数,如:可设置存活时间
*/
comsumerChannel.queueDeclare(QUEUE_NAME, false, false, false, null);
//实现消费方法
DefaultConsumer consumer = new DefaultConsumer(comsumerChannel){
/*
* 当接收到消息后此方法将被调用
* @param consumerTag 消费者标签,用来标识消费者的,在监听队列时设置channel.basicConsume
* @param envelope 信封,通过envelope
* @param properties 消息属性
* @param body 消息内容
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//交换机
String exchange = envelope.getExchange();
//消息id,mq在channel中用来标识消息的id,可用于确认消息已接收
long deliveryTag = envelope.getDeliveryTag();
// body 即消息体
String msg = new String(body,"utf-8");
System.out.println("Comsumer 获得: " + msg + "!");
// 手动 ACK
comsumerChannel.basicAck(envelope.getDeliveryTag(),false);
}
};
// 监听队列,第二个参数:是否自动进行消息确认。
//参数:String queue, boolean autoAck, Consumer callback
/**
* 参数明细:
* 1、queue 队列名称
* 2、autoAck 自动回复,当消费者接收到消息后要告诉mq消息已接收,如果将此参数设置为tru表示会自动回复mq,如果设置为false要通过编程实现回复。
* 3、callback,消费方法,当消费者接收到消息要执行的方法。
*/
comsumerChannel.basicConsume(QUEUE_NAME, false, consumer);
}
}
五、Spring中使用RabbitMQ
1.引入依赖
<!-- AMQP 依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.2.7.RELEASE</version>
</dependency>
<!--springboot测试依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.5.6</version>
</dependency>
2.更改配置
spring:
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
3.把交换机、和队列加入IOC容器中
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.ExchangeBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
// email队列
public static final String QUEUE_EMAIL = "queue_email";
// sms队列
public static final String QUEUE_SMS = "queue_sms";
// topics类型交换机
public static final String EXCHANGE_NAME="topic.exchange";
public static final String ROUTINGKEY_EMAIL="topic.#.email.#";
public static final String ROUTINGKEY_SMS="topic.#.sms.#";
//声明交换机
@Bean(EXCHANGE_NAME)
public Exchange exchange(){
//durable(true) 持久化,mq重启之后交换机还在
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//声明email队列
/**
* new Queue(QUEUE_EMAIL,true,false,false)
* durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
* auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
* exclusive 表示该消息队列是否只在当前connection生效,默认是false
*/
@Bean(QUEUE_EMAIL)
public Queue emailQueue(){
return new Queue(QUEUE_EMAIL);
}
//声明sms队列
@Bean(QUEUE_SMS)
public Queue smsQueue(){
return new Queue(QUEUE_SMS);
}
//ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
@Bean
public Binding bindingEmail(@Qualifier(QUEUE_EMAIL) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
}
//ROUTINGKEY_SMS队列绑定交换机,指定routingKey
@Bean
public Binding bindingSMS(@Qualifier(QUEUE_SMS) Queue queue, @Qualifier(EXCHANGE_NAME) Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
}
}
4.模拟业务发送消息
import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@SpringBootTest
public class Send {
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sendMsgByTopics(){
/**
* 参数:
* 1、交换机名称
* 2、routingKey
* 3、消息内容
*/
for (int i=0;i<5;i++){
String message = "恭喜您,注册成功!userid="+i;
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"topic.sms.email",message);
System.out.println(" [x] Sent '" + message + "'");
}
}
}
5.消息的监听及处理
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class Receive {
//监听邮件队列
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue_email", durable = "true"),
exchange = @Exchange(value = "topic.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),
key = {"topic.#.email.#","email.*"}))
public void rece_email(String msg){
System.out.println(" [邮件服务] received : " + msg + "!");
}
//监听短信队列
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "queue_sms", durable = "true"),
exchange = @Exchange(value = "topic.exchange", ignoreDeclarationExceptions = "true", type = ExchangeTypes.TOPIC),
key = {"topic.#.sms.#"}))
public void rece_sms(String msg){
System.out.println(" [短信服务] received : " + msg + "!");
}
}