✨✨个人主页:沫洺的主页
📚📚系列专栏: 📖 JavaWeb专栏📖 JavaSE专栏 📖 Java基础专栏📖vue3专栏
📖MyBatis专栏📖Spring专栏📖SpringMVC专栏📖SpringBoot专栏
📖Docker专栏📖Reids专栏📖MQ专栏📖SpringCloud专栏
💖💖如果文章对你有所帮助请留下三连✨✨
1️⃣环境搭建
创建SpringBoot项目,引入相关依赖
application.properties配置rabbitmq配置信息
spring.rabbitmq.host=192.168.0.109 spring.rabbitmq.port=5670 #如果使用的是/,可以不用配置,因为默认就是/ spring.rabbitmq.virtual-host=/ spring.rabbitmq.username=guest spring.rabbitmq.password=guest
2️⃣消费者接收消息
@RabbitHandler注解: 当spring扫到该注解,就当成消费者
@RabbitListener注解: 绑定队列Queue与交换机Exchange
durable持久化
autoDelete自动删除
type=ExchangeTypes.DIRECT交换机类型为直连交换机,不写默认直连
key: 就是Routing key@Component public class DirectConsumer { @RabbitHandler @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "211-DirectQueue-01", durable = "true", autoDelete = "false"), exchange = @Exchange(value = "211-DirectExchage-01", type = ExchangeTypes.DIRECT), key = "211-Direct-RoutingKey-01")) public void process2(Message message){ String msg = new String(message.getBody()); System.out.println("消费者1-1收到:"+msg+""); //这里写业务逻辑代码 } }
3️⃣生产者发送消息
注入RabbitTemplate,调用convertAndSend方法发送消息
参数一:交换机 参数二: Routing key 参数三: 消息
@Component public class DirectProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(){ rabbitTemplate.convertAndSend("211-DirectExchage-01","211-Direct-RoutingKey-01","生产者发送的第一条消息"); //System.out.println("生产者第一条消息发送成功"); } }
启动类调用sendMessage发送
@SpringBootApplication public class App { public static void main(String[] args) { ConfigurableApplicationContext context = SpringApplication.run(App.class, args); //调用生产者发送消息 DirectProducer producer = context.getBean(DirectProducer.class); producer.sendMessage(); } }
💦多个消费者消费同一个队列
当生产者发送多条消息时,同一个队列的多个消费者去接收消息
生产者发送多条消息
@Component public class DirectProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(){ rabbitTemplate.convertAndSend("211-DirectExchage-01","211-Direct-RoutingKey-01","生产者发送的第一条消息"); System.out.println("生产者第一条消息发送成功"); rabbitTemplate.convertAndSend("211-DirectExchage-01","211-Direct-RoutingKey-01","生产者发送的第二条消息"); System.out.println("生产者第二条消息发送成功"); } }
同一个队列的多个消费者去接收消息
都是value = "211-DirectQueue-01"的队列
@Component public class DirectConsumer { @RabbitHandler @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "211-DirectQueue-01", durable = "true", autoDelete = "false"), exchange = @Exchange(value = "211-DirectExchage-01", type = ExchangeTypes.DIRECT), key = "211-Direct-RoutingKey-01")) public void process2(Message message){ String msg = new String(message.getBody()); System.out.println("消费者1-1收到:"+msg+""); //这里写业务逻辑代码 } @RabbitHandler @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "211-DirectQueue-01", durable = "true", autoDelete = "false"), exchange = @Exchange(value = "211-DirectExchage-01", type = ExchangeTypes.DIRECT), key = "211-Direct-RoutingKey-01")) public void process2(Message message){ String msg = new String(message.getBody()); System.out.println("消费者1-2收到:"+msg+""); //这里写业务逻辑代码 } }
可以理解为快递员送快递,只关注哪一家(队列)签收的快递,而不关注该家哪个成员(消费者)签收的快递,(只是在签收时的消费者是轮循的)
像这种同一队列多个消费者的好处就是保障了高可用性,只要有一个消费者就能保障接收到消息
💦多个消费者消费不同的队列
当生产者发送多条消息时,不同队列的多个消费者去接收消息
生产者发送多条消息
@Component public class DirectProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(){ rabbitTemplate.convertAndSend("211-DirectExchage-01","211-Direct-RoutingKey-01","生产者发送的第一条消息"); System.out.println("生产者第一条消息发送成功"); rabbitTemplate.convertAndSend("211-DirectExchage-01","211-Direct-RoutingKey-01","生产者发送的第二条消息"); System.out.println("生产者第二条消息发送成功"); } }
不同队列的多个消费者去接收消息
不同队列value = "211-DirectQueue-01"和value = "211-DirectQueue-02"
@Component public class DirectConsumer { @RabbitHandler @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "211-DirectQueue-01", durable = "true", autoDelete = "false"), exchange = @Exchange(value = "211-DirectExchage-01", type = ExchangeTypes.DIRECT), key = "211-Direct-RoutingKey-01")) public void process2(Message message){ String msg = new String(message.getBody()); System.out.println("消费者1收到:"+msg+""); //这里写业务逻辑代码 } @RabbitHandler @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "211-DirectQueue-02", durable = "true", autoDelete = "false"), exchange = @Exchange(value = "211-DirectExchage-01", type = ExchangeTypes.DIRECT), key = "211-Direct-RoutingKey-01")) public void process2(Message message){ String msg = new String(message.getBody()); System.out.println("消费者2收到:"+msg+""); //这里写业务逻辑代码 } }
同样的不是同一家(队列)的成员(消费者),在签收快递时,要签收自己家所有快递(消息)
⛅消费者重试机制/自动应答
application.properties配置
- 配置消费者应答模式
- 配置rabbitmq重试配置信息
#开启消费者应答模式为 auto自动应答 manual手动应答 spring.rabbitmq.listener.direct.acknowledge-mode = auto #spring.rabbitmq.listener.simple.acknowledge-mode = auto #开启消费者自动重试机制,也就是消费者函数只要抛出异常,就会触发重试 false关闭 spring.rabbitmq.listener.simple.retry.enabled=true #设置重试最大次数 spring.rabbitmq.listener.simple.retry.max-attempts=5 #设置重试时间最大间隔 spring.rabbitmq.listener.simple.retry.max-interval= 8000ms #设置重试时间间隔 spring.rabbitmq.listener.simple.retry.initial-interval=1000ms #设置重试时间间隔的倍数 spring.rabbitmq.listener.simple.retry.multiplier=2
当不配置重试机制时,消费者应答出现异常不处理时,就会出现死循环
如下代码段
@Component public class DirectConsumer { @RabbitHandler @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "211-DirectQueue-01", durable = "true", autoDelete = "false"), exchange = @Exchange(value = "211-DirectExchage-01", type = ExchangeTypes.DIRECT), key = "211-Direct-RoutingKey-01")) public void process2(Message message){ String msg = new String(message.getBody()); System.out.println("消费者1-1收到:"+msg+""); //这里写业务逻辑代码 int i = 1/0//会出现死循环 } }
配置后
⛅手动应答模式
配置手动应答,取消重试机制
#开启消费者应答模式为 auto自动应答 manual手动应答 spring.rabbitmq.listener.direct.acknowledge-mode = manual #开启消费者自动重试机制,也就是消费者函数只要抛出异常,就会触发重试 spring.rabbitmq.listener.simple.retry.enabled=false
手动应答模式
@Component public class DirectConsumer { @RabbitHandler @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "211-DirectQueue-01", durable = "true", autoDelete = "false"), exchange = @Exchange(value = "211-DirectExchage-01", type = ExchangeTypes.DIRECT), key = "211-Direct-RoutingKey-01")) //手动应答 public void process1(Message message, Channel channel) throws IOException { String msg = new String(message.getBody()); System.out.println("消费者1-1收到:"+msg+ DateUtil.format(DateUtil.date(),"HH:ss")); //获取应答标签 long deliveryTag = message.getMessageProperties().getDeliveryTag(); try { int i = 1/0;//会出现死循环 //手动应答 参数一:消息标识 参数二:true批量应答,false单个应答 channel.basicAck(deliveryTag,false); } catch (Exception ex) { //把异常消息插入数据库 System.out.println("消费者1-1收到:"+msg+"出现异常信息插入数据库"); System.out.println("异常信息: "+ex.getMessage()); channel.basicAck(deliveryTag,false); } } }
出现异常,将异常消息插入数据库后,这样保障应答后在队列中不堵,然后再让生产者发送消息
channel.basicAck正常应答
channel.basicCancel取消应答
channel.basicReject拒绝应答
没有重试机制,需要自己去写,所以一般不会使用手动应答
🧭投递业务对象
一般生产的都是领域对象dto
例如
@Data @Builder @NoArgsConstructor @AllArgsConstructor public class UserRegisterOk { private String name; private String phone; }
生产者生产对象
@Component public class DirectProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendMessage(){ UserRegisterOk userRegisterOk = UserRegisterOk.builder().name("张三").phone("123456").build(); //要将对象序列化,转成字符串,使用消息转换器MessageConverter rabbitTemplate.convertAndSend("211-DirectExchage-01","211-Direct-RoutingKey-01",userRegisterOk); System.out.println("生产者生产对象发送成功"); } }
消费者接收对象
@Component public class DirectConsumer { @RabbitHandler @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "211-DirectQueue-01", durable = "true", autoDelete = "false"), exchange = @Exchange(value = "211-DirectExchage-01", type = ExchangeTypes.DIRECT), key = "211-Direct-RoutingKey-01")) public void process4(UserRegisterOk userRegisterOk){ System.out.println("消费者收到:"+userRegisterOk.getName()+","+userRegisterOk.getPhone()); } }
运行后报错
配置序列化
@Configuration public class RabbitConfig { @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
💖补充写法
@Component public class DirectConsumer { private static final String ENAME = "211-DirectExchage-01"; private static final String QNAME = "211-DirectQueue-01"; private static final String KEY = "211-Direct-RoutingKey-01"; //定义一个交换机 @Bean public DirectExchange directExchange(){ return new DirectExchange(ENAME, true, false); } //定义一个队列 @Bean public Queue directQueue(){ return QueueBuilder.durable(QNAME).build(); } //创建队列和交换机的绑定关系 @Bean public Binding binding(){ return BindingBuilder.bind(directQueue()).to(directExchange()).with(KEY); } @RabbitHandler @RabbitListener(queues =QNAME) public void process4(UserRegisterOk userRegisterOk){ System.out.println("消费者收到:"+userRegisterOk.getName()+","+userRegisterOk.getPhone()); } }