文章目录
- 搭建初始环境
- 引入依赖
- 配置配置文件
- HelloWorld模型使用
- Work模型使用
- Fanout 广播模型
- Route 路由模型
- Topic 订阅模型(动态路由模型)
搭建初始环境
引入依赖
<!--引入与rabbitmq集成依赖-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
这个地方我们也可以在快速构建的时候直接勾选;
配置配置文件
spring:
application:
name: springboot_rabbitmq
rabbitmq:
host: 10.15.0.9
port: 5672
username: ems
password: 123
virtual-host: /ems
这个name没有什么实际的意义,但在微服务项目中至关重要
rabbitmq中的配置是为了与我们RabbitMQ的服务进行连接
RabbitTemplate
用来简化操作 使用时候直接在项目中注入即可使用
HelloWorld模型使用
开发生产者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testHello(){
rabbitTemplate.convertAndSend("hello","hello world");
}
convertAndSend方法:转化与发送。它是用来将消息转化为byte然后再发送
- 第一个参数:队列的名称
- 第二个参数:消息的内容
我们运行生产者之后发现是没有队列的:
因为这个队列的创建并不是在生产者这边创建的而是在消费者那边创建的
如果没有消费者的话创建一个队列是没有任何意义的
开发消费者
@Component
@RabbitListener(queuesToDeclare = @Queue("hello"))
public class HelloCustomer {
@RabbitHandler
public void receive1(String message){
System.out.println("message = " + message);
}
}
消费者必须有一个注解@RabbitListener代表消费者监听。此时我们希望他去接收hello队列中的消息,但是这个队列还没有,所以我们这里使用queuesToDeclare 的方式去声明一个队列。
然后我们怎么去拿到队列里面的消息呢?我们可以任意创建一个方法在它上面使用@RabbitHandler注解,代表从队列中取出消息的回调方法,我们可以通过这个回调方法的参数拿到消息。
然后我们运行发现:
这种模式下创建的队列默认就是持久化的,那么我们怎么设置他的是否独占是否自动删除呢?
我们可以利用 @Queue注解:
默认创建的队列是持久化、非独占,不自动删除的。
Work模型使用
开发生产者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testWork(){
for (int i = 0; i < 10; i++) {
rabbitTemplate.convertAndSend("work","hello work!");
}
}
开发消费者
@Component
public class WorkCustomer {
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive1(String message){
System.out.println("work message1 = " + message);
}
@RabbitListener(queuesToDeclare = @Queue("work"))
public void receive2(String message){
System.out.println("work message2 = " + message);
}
}
我们这里是在一个类中构建多个消费者,前面@RabbitListener是在类上用的,@RabbitListener也可以用在方法上
说明:默认在Spring AMQP实现中Work这种方式就是公平调度,如果需要实现能者多劳需要额外配置
Fanout 广播模型
开发生产者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testFanout() throws InterruptedException {
rabbitTemplate.convertAndSend("logs","","这是日志广播");
}
convertAndSend:
- 第一个参数:交换机的名称
- 第二个参数:routingkey
- 第三个参数:产生的消息
同样交换机并不是在生产者中定义而是在消费者中去定义,只执行上述代码是不会创建logs的交换机的
开发消费者
@Component
public class FanoutCustomer {
@RabbitListener(bindings = @QueueBinding(
value = @Queue, //创建临时队列
exchange = @Exchange(name="logs",type = "fanout") //绑定的交换机
))
public void receive1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue, //创建临时队列
exchange = @Exchange(name="logs",type = "fanout") //绑定交换机类型
))
public void receive2(String message){
System.out.println("message2 = " + message);
}
}
Route 路由模型
开发生产者
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testDirect(){
rabbitTemplate.convertAndSend("directs","error","error 的日志信息");
}
开发消费者
@Component
public class DirectCustomer {
@RabbitListener(bindings ={
@QueueBinding(
value = @Queue(),
key={"info","error"},
exchange = @Exchange(type = "direct",name="directs")
)})
public void receive1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(bindings ={
@QueueBinding(
value = @Queue(),
key={"error"},
exchange = @Exchange(type = "direct",name="directs")
)})
public void receive2(String message){
System.out.println("message2 = " + message);
}
}
Topic 订阅模型(动态路由模型)
开发生产者
@Autowired
private RabbitTemplate rabbitTemplate;
//topic
@Test
public void testTopic(){
rabbitTemplate.convertAndSend("topics","user.save.findAll","user.save.findAll 的消息");
}
开发消费者
@Component
public class TopCustomer {
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
key = {"user.*"},
exchange = @Exchange(type = "topic",name = "topics")
)
})
public void receive1(String message){
System.out.println("message1 = " + message);
}
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue,
key = {"user.#"},
exchange = @Exchange(type = "topic",name = "topics")
)
})
public void receive2(String message){
System.out.println("message2 = " + message);
}
}