基于API的方式
1.使用AmqpAdmin定制消息发送组件
@Autowired private AmqpAdmin amqpAdmin; @Test public void amqpAdmin(){ //1.定义fanout类型的交换器 amqpAdmin.declareExchange(new FanoutExchange("fanout_exchange")); //2.定义两个默认持久化队列,分别处理email和sms amqpAdmin.declareQueue(new Queue("fanout_queue_email")); amqpAdmin.declareQueue(new Queue("fanout_queue_sms")); //3.将队列分别与交换器进行绑定 // 队列名 是队列 交换机的名称 路由 其它参数 amqpAdmin.declareBinding(new Binding("fanout_queue_email",Binding.DestinationType.QUEUE,"fanout_exchange","",null)); amqpAdmin.declareBinding(new Binding("fanout_queue_sms",Binding.DestinationType.QUEUE,"fanout_exchange","",null)); }
2.消息发送者发送消息
创建实体类
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @Data @AllArgsConstructor @NoArgsConstructor public class User { private Integer id; private String name; }
发送消息
@Autowired private RabbitTemplate re; @Test//消息发送者 public void subPublisher(){ User user = new User(1,"小满"); re.convertAndSend("fanout_exchange", "", user); }
如图所以,如果我们直接发送的话就会报这个错,有两种解决方法,第一种是比较常用的让实体类User实现序列化Serializable接口,这里我们不做演示,第二种是写一个配置类,只有在RabbitMQ可以使用
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { //定制JSON格式的消息转化器 @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } }
加上配置类后我们发送就不会报错了,我们也可以在RabbitMQ的可视化端口看到我们发送的消息
3.发送完消息后接下来就是消费消息了,定义接收消息的业务
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class RabbitMQService { //发布订阅模式: @RabbitListener可以指定当前方法监控哪一个队列 @RabbitListener(queues = "fanout_queue_email")//消费者可以消费多个队列的消息 public void subConsumerEmail(Message message){ //当队列中有内容是方法会自动执行 推荐Object来接收 //官网推荐Message byte[] body = message.getBody();//Message将数据存放在body中 String msg = new String(body); System.out.println("邮件业务接收到消息:"+msg); } @RabbitListener(queues = "fanout_queue_sms") public void subConsumerSms(Message message){ byte[] body = message.getBody(); String msg = new String(body); System.out.println("短信业务接收到消息:"+msg); } }
4.重新运行发送端就可以接收到我们发送的数据,接收的数据可能打印在任意一个控制台中,这是idea的机制,我们不需要管
基于配置类的方式
1.在config配置类中定义
import org.springframework.amqp.core.*; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.amqp.support.converter.MessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig { //定制JSON格式的消息转化器 @Bean public MessageConverter messageConverter(){ return new Jackson2JsonMessageConverter(); } // 1.fanout创建一个交换机 @Bean public Exchange fanoutExchange(){ return ExchangeBuilder.fanoutExchange("fanout_exchange").build(); } //2.定义消息队列 @Bean public Queue fanoutQueueEmail(){ return new Queue("fanout_queue_email"); } @Bean public Queue fanoutQueueSms(){ return new Queue("fanout_queue_sms"); } //3.将创建的队列绑定到对应的交换机上 @Bean public Binding bingingEmail(){ return BindingBuilder.bind(fanoutQueueEmail()).to(fanoutExchange()).with("").noargs(); } @Bean public Binding bingingSms(){ return BindingBuilder.bind(fanoutQueueSms()).to(fanoutExchange()).with("").noargs(); } }
2.为了避免api的影响,我们可以在可视化端口将基于api创建的交换机和队列删除
1)删除交换机
2)删除队列,前面也是点击队列的名字
可以看到我已经将交换机和消息队列都已经删除,接下来我们重新启动项目 ,配置类可以在启动的时候自动创建
我们的订阅发布模式也是可以正常运行
基于注解类的方式
1.我们要现将基于配置类的方式注释掉,避免影响我们测试
import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Service; @Service public class RabbitMQService { @RabbitListener(bindings = @QueueBinding( value = @Queue("fanout_queue_email"), exchange=@Exchange(value = "fanout_exchange",type = "fanout") )) public void subConsumerEmail(Message message){ //当队列中有内容是方法会自动执行 推荐Object来接收 //官网推荐Message byte[] body = message.getBody();//Message将数据存放在body中 String msg = new String(body); System.out.println("邮件业务接收到消息:"+msg); } @RabbitListener(bindings = @QueueBinding( value = @Queue("fanout_queue_sms"), exchange=@Exchange(value = "fanout_exchange",type = "fanout") )) public void subConsumerSms(Message message){ byte[] body = message.getBody(); String msg = new String(body); System.out.println("短信业务接收到消息:"+msg); } }
提前将交换机和队列删除,然后运行,就会发现会在启动时会自动生成交换机和队列,测试也不会有影响