启动RabbitMQ
1. 在虚拟机中启动RabbitMQ,要先切换到root用户下: su root
2.关闭防火墙: systemctl stop firewalld
3.rabbitmq-server start # 启用服务
4.rabbitmq-server -detached # 后台启动
1.消息确认机制
有两种确认的方式:
自动ACK:RabbitMQ将消息发送给消费者后就会直接将消息删除,前提是消费者程序没有出现异常,有异常会重新发送,直至到达了最大重试次数后抛出异常后不在重试
手动ACK:通过代码控制决定是否返回确认消息1)开启消息确认机制,在核心配置文件中添加以下配置
# 发送者开启simple确认机制 spring.rabbitmq.publisher-confirm-type=simple # 发送者开启return确认机制 spring.rabbitmq.publisher-returns=true
2)编写配置类
import org.springframework.amqp.core.ReturnedMessage; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import javax.annotation.PostConstruct; //消息确认机制的接口 RabbitTemplate @Configuration public class PublishConfirmAndReturnConfig implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback { @Autowired private RabbitTemplate rt; @PostConstruct//加载回调方法 public void initMethod(){ rt.setConfirmCallback(this); rt.setReturnsCallback(this); } @Override//RabbitTemplate.ConfirmCallback如果消息被正常发送到交换机,则会调用该方法(自动回调) /* * CorrelationData相关数据,有一个id属性,表示消息的唯一标识 * boolean表示当前消息投放到交换机中的状态,trur表示投放成功 * String表示投送失败的原因 * */ public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ System.out.println("消息发送给交换机成功"); }else { System.err.println("消息发送给交换机失败"); } } /** * 消息往队列发送时成功,不会调用该方法 失败了会调用 * @param returnedMessage 返回消息的内容 * message发送的内容 * replyCode响应码 * replyText回应的内容 * exchange交换机 * reotingKey路由键 * */ @Override//RabbitTemplate.ReturnsCallback如果消息被正常从交换机发送到队列,则回调该方法(自动回调) public void returnedMessage(ReturnedMessage returnedMessage) { System.out.println("消息发送到交换机却没有到队列"); System.out.println("消息内容"+returnedMessage.getMessage()); System.out.println("响应码"+returnedMessage.getReplyCode()); System.out.println("回应的内容"+returnedMessage.getReplyText()); System.out.println("交换机"+returnedMessage.getExchange()); System.out.println("路由键"+returnedMessage.getRoutingKey()); } }
这是没有消息确认配置类时的运行数据
这是添加了消息确认配置类时的运行数据 ,可以看到我们是否将数据成功发送到交换机或队列
持久化
1.队列持久化 没有消费者连接该队列的时候,会被RabbitMQ自动删除 autoDelete = "true" 默认为false,不会被自动删除
@RabbitListener( bindings = @QueueBinding( value = @Queue(value = "log_queue_error",autoDelete = "true"), exchange = @Exchange(value = "durable_exchange",type = "direct"), key = "error_log_key" ) ) public void druggConsumerError(String message){ System.out.println("接收error级别的日志"+message); } @RabbitListener( bindings = @QueueBinding( value = @Queue(value = "log_queue_all",autoDelete = "false"), exchange = @Exchange(value = "durable_exchange",type = "direct"), key = {"error_log_key","info_log_key","debug_log_key","waring_log_key"} ) ) public void drugConsumerAll(String message){ System.out.println("接收all级别的日志"+message); }
2.交换机持久化
@RabbitListener( bindings = @QueueBinding( value = @Queue(value = "email_queue",autoDelete = "true"), exchange = @Exchange(value = "temp_exce",type = "topic",autoDelete="true"), key="em_key" ) ) public void emailConsumer(Object o){ System.out.println("接收到邮件:"+o); }
如果当前的交换机没有被任何的队列所映射,会被RabbitMQ自动删除,关闭项目就相当于没有映射
消费端限流
控制消费端的消费速度,方式数据过大造成服务端宕机,通过编写配置文件,控制一次推送的消息数量,来减少一次数据太大冲垮服务器
1.编写核心配置文件application.properties
# 消费端限流实现 #开启手动签收(手动ACK) spring.rabbitmq.listener.simple.acknowledge-mode=manual #一次接收3条消息(在单个请求中处理的消息个数) spring.rabbitmq.listener.simple.prefetch=3 #消费者最小数量 spring.rabbitmq.listener.simple.concurrency=1 #消费者最大数量 spring.rabbitmq.listener.simple.max-concurrency=10
2.创建交换机和队列
// channel标识信道,封装了RabbitMQ通过的相关配置信息,如果当前的消息被成功消费,通过信道进行标记, //获取到相应ACK的确认信息 public void currentLimitingConsumer(Message message, Channel channel) throws InterruptedException, IOException { Thread.sleep(3000);//睡眠3秒钟 long deliveryTag = message.getMessageProperties().getDeliveryTag();//消息的标记,消息的唯一标识 //手动确认消息是否接收,通过消息的id来指定该条消息被成功处理 channel.basicAck(deliveryTag,true);//true表示对应这条消息被消费了 String s = new String(message.getBody()); System.out.println("消费者1::接收到的消息"+s); } @RabbitListener( bindings = @QueueBinding( value = @Queue("current_limiting_queue"), exchange = @Exchange(value = "cle",type = "topic"), key = "current.limiting.#" ) ) public void currentLimitingConsumer2(Message message, Channel channel) throws InterruptedException, IOException { Thread.sleep(3000);//睡眠3秒钟 long deliveryTag = message.getMessageProperties().getDeliveryTag(); channel.basicAck(deliveryTag,true); String s = new String(message.getBody()); System.out.println("消费者2::接收到的消息"+s); }
3.创建消息发送者,我们将100条消息发送给cle交换机
@Test//消息限流 public void currentLimitingPushLisher(){ for (int i = 1; i <101 ; i++) { re.convertAndSend("cle","current.limiting.xm","消息限流:"+i); } }
正如我们的配置配置文件所写,一次性只能接收三条消息