一,RabbitMQ的工作模式
RabbitMQ 的工作模式是指 RabbitMQ 中不同的消息传递方式,包括简单模式、工作队列模式、发布订阅模式、路由模式和主题模式 。这些工作模式适用于不同的应用场景。详细的文档可参照RabbitMQ的官网:RabbitMQ: easy to use, flexible messaging and streaming — RabbitMQ
依赖信息
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.itheima</groupId>
<artifactId>rabbitmq-consumer</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
1,简单模式
Simple 简单模式:一个生产者、一个队列、一个消费者,这种交换机是不参与的
其中P是生产者,C是消费者,红色的为消息队列即 MQ,后面几种模式一样。
生产者端代码:
public class Producer_HelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.207.72.43");//ip 默认值是localhost
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");//用户名 默认是guest
factory.setPassword("123");
//3.创建连接
Connection connection = factory.newConnection();
//4.创建 channel
Channel channel = connection.createChannel();
//5.创建队列 Queue
//如果没有一个名字叫hello_world的队列就会自动创建一个
/**
* queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
* queue:队列名称
* durable:是否持久化,当mq重启之后还在
* exclusive:是否独占,只能有一个消费者监听这个队列或者当connection关闭时,是否删除队列
* autoDelete:是否自动删除,当没有consumer时,自动删除掉
* arguments:参数
*/
channel.queueDeclare("hello_world",true,false,false,null);
String body = "hello rabbitmq";
//6.发送消息
/**
* basicPublish(String exchange, String routingKey, AMQP.BasicProperties props, byte[] body)
* exchange:交换机名称,简单模式下交换机会使用默认的“”
* routingKey:路由名称,默认交换机使用队列名称
* props:配置信息
* body:发送消息数据
*/
channel.basicPublish("","hello_world",null,body.getBytes());
//7.释放资源
channel.close();
connection.close();
}
}
消费者端代码:
public class Consumer_HelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.207.72.43");//ip 默认值是localhost
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");//用户名 默认是guest
factory.setPassword("123");
//3.创建连接
Connection connection = factory.newConnection();
//4.创建 channel
Channel channel = connection.createChannel();
//5.创建队列 Queue
//如果没有一个名字叫hello_world的队列就会自动创建一个
channel.queueDeclare("hello",true,false,false,null);
//6.接收消息
/**
* basicConsume(String queue, boolean autoAck, Consumer callback)
* queue:队列名称
* autoAck:是否自动确认
* callback:回调对象
*/
Consumer consumer = new DefaultConsumer(channel) {
/**
* 回调方法,当收到消息后会执行这个方法
* consumerTag:标识
* envelope:获取一些信息,交换机,路由key...
* properties:配置信息
* body:数据
* @param consumerTag
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag: " + consumerTag);
System.out.println("Exchange: " + envelope.getExchange());
System.out.println("RoutingKey: " + envelope.getRoutingKey());
System.out.println("properties: " + properties);
System.out.println("body: " + new String(body));
}
};
channel.basicConsume("hello_world",true,consumer);
//消费者不需要关闭资源,需要一直监听
}
}
2,工作/队列模式
Queue 队列模式:生产者将消息发布到一个队列中,消费者从队列中获取消息
生产者端代码:
public class Producer_WorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.207.72.43");//ip 默认值是localhost
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");//用户名 默认是guest
factory.setPassword("123");
//3.创建连接
Connection connection = factory.newConnection();
//4.创建 channel
Channel channel = connection.createChannel();
//5.创建队列 Queue
//如果没有一个名字叫hello_world的队列就会自动创建一个
channel.queueDeclare("work_queues",true,false,false,null);
for (int i = 0; i < 10; i++) {
String body = i + "hello rabbitmq";
//6.发送消息
channel.basicPublish("","work_queues",null,body.getBytes());
}
//7.释放资源
channel.close();
connection.close();
}
}
消费者端代码:
//消费者1
public class Consumer_WorkQueues1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.207.72.43");//ip 默认值是localhost
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");//用户名 默认是guest
factory.setPassword("123");
//3.创建连接
Connection connection = factory.newConnection();
//4.创建 channel
Channel channel = connection.createChannel();
//5.创建队列 Queue
//如果没有一个名字叫hello_world的队列就会自动创建一个
channel.queueDeclare("work_queues",true,false,false,null);
//6.接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* 回调方法,当收到消息后会执行这个方法
* @param consumerTag
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag: " + consumerTag);
System.out.println("Exchange: " + envelope.getExchange());
System.out.println("RoutingKey: " + envelope.getRoutingKey());
System.out.println("properties: " + properties);
System.out.println("body: " + new String(body));
}
};
channel.basicConsume("work_queues",true,consumer);
}
}
//消费者2
public class Consumer_WorkQueues2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.207.72.43");//ip 默认值是localhost
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");//用户名 默认是guest
factory.setPassword("123");
//3.创建连接
Connection connection = factory.newConnection();
//4.创建 channel
Channel channel = connection.createChannel();
//5.创建队列 Queue
//如果没有一个名字叫hello_world的队列就会自动创建一个
channel.queueDeclare("work_queues",true,false,false,null);
//6.接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* 回调方法,当收到消息后会执行这个方法
* @param consumerTag
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag: " + consumerTag);
System.out.println("Exchange: " + envelope.getExchange());
System.out.println("RoutingKey: " + envelope.getRoutingKey());
System.out.println("properties: " + properties);
System.out.println("body: " + new String(body));
}
};
channel.basicConsume("work_queues",true,consumer);
}
}
3,直连模式
Direct 直连模式:生产者直接将消息发送到队列中,消费者从队列中获取消息
生产者端代码:
public class Producer_PubSub {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.207.72.43");//ip 默认值是localhost
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");//用户名 默认是guest
factory.setPassword("123");
//3.创建连接
Connection connection = factory.newConnection();
//4.创建 channel
Channel channel = connection.createChannel();
//5.创建交换机
/**
* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
* exchange:交换机名称
* type:交换机类型 是一个枚举类型
* durable:是否持久化
* autoDelete:是否自动删除
* internal:内部使用,一般设为false
* arguments:参数,设为null
*/
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
//6.创建队列
String queueName1 = "queue1";
String queueName2 = "queue2";
channel.queueDeclare(queueName1,true,false,false,null);
channel.queueDeclare(queueName2,true,false,false,null);
//7.绑定队列和交换机
/**
* queueBind(String queue, String exchange, String routingKey)
* queue:队列名称
* exchange:交换机名称
* routingKey:路由键,绑定规则
* 如果交换机类型为FANOUT类型,则routingKey设为“” 说明交换机会绑定每一个queue
*/
channel.queueBind(queueName1,exchangeName,"");
channel.queueBind(queueName2,exchangeName,"");
//8.发送消息
String body = "日志信息:数据库被删除";
channel.basicPublish(exchangeName,"",null,body.getBytes());
//9.释放资源
channel.close();
connection.close();
}
}
消费者端代码:
//消费者1
public class Consumer_PubSub1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.207.72.43");//ip 默认值是localhost
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");//用户名 默认是guest
factory.setPassword("123");
//3.创建连接
Connection connection = factory.newConnection();
//4.创建 channel
Channel channel = connection.createChannel();
String queueName1 = "queue1";
String queueName2 = "queue2";
//6.接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* 回调方法,当收到消息后会执行这个方法
* @param consumerTag
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag: " + consumerTag);
// System.out.println("Exchange: " + envelope.getExchange());
// System.out.println("RoutingKey: " + envelope.getRoutingKey());
// System.out.println("properties: " + properties);
System.out.println("body: " + new String(body));
System.out.println("将日志信息打印到控制台..");
}
};
channel.basicConsume(queueName1,true,consumer);
}
}
//消费者2
public class Consumer_PubSub2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.207.72.43");//ip 默认值是localhost
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");//用户名 默认是guest
factory.setPassword("123");
//3.创建连接
Connection connection = factory.newConnection();
//4.创建 channel
Channel channel = connection.createChannel();
String queueName1 = "queue1";
String queueName2 = "queue2";
//6.接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* 回调方法,当收到消息后会执行这个方法
* @param consumerTag
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag: " + consumerTag);
// System.out.println("Exchange: " + envelope.getExchange());
// System.out.println("RoutingKey: " + envelope.getRoutingKey());
// System.out.println("properties: " + properties);
System.out.println("body: " + new String(body));
System.out.println("将日志信息保存数据库..");
}
};
channel.basicConsume(queueName2,true,consumer);
}
}
4,路由模式
Routing 路由模式:生产者将消息发布到一个交换器上,交换器根据规则将消息路由到目标队列中
生产者端代码:
public class Producer_Routing {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.207.72.43");//ip 默认值是localhost
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");//用户名 默认是guest
factory.setPassword("123");
//3.创建连接
Connection connection = factory.newConnection();
//4.创建 channel
Channel channel = connection.createChannel();
//5.创建交换机
/**
* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
* exchange:交换机名称
* type:交换机类型 是一个枚举类型
* durable:是否持久化
* autoDelete:是否自动删除
* internal:内部使用,一般设为false
* arguments:参数,设为null
*/
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
//6.创建队列
String queueName1 = "queue1_direct";
String queueName2 = "queue2_direct";
channel.queueDeclare(queueName1,true,false,false,null);
channel.queueDeclare(queueName2,true,false,false,null);
//7.绑定队列和交换机
/**
* queueBind(String queue, String exchange, String routingKey)
* queue:队列名称
* exchange:交换机名称
* routingKey:路由键,绑定规则
* 如果交换机类型为FANOUT类型,则routingKey设为“” 说明交换机会绑定每一个queue
*/
//队列1的绑定 error
channel.queueBind(queueName1,exchangeName,"error");
//队列2的绑定 error info
channel.queueBind(queueName2,exchangeName,"info");
channel.queueBind(queueName2,exchangeName,"error");
//8.发送消息
String body = "日志信息:数据库被删除";
channel.basicPublish(exchangeName,"error",null,body.getBytes());
//9.释放资源
channel.close();
connection.close();
}
}
5,通配符模式
Topic 通配符模式:生产者将消息发布到一个主题上,消费者订阅该主题并获取消息
生产者端代码:
public class Producer_Topics {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.207.72.43");//ip 默认值是localhost
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");//用户名 默认是guest
factory.setPassword("123");
//3.创建连接
Connection connection = factory.newConnection();
//4.创建 channel
Channel channel = connection.createChannel();
//5.创建交换机
/**
* exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
* exchange:交换机名称
* type:交换机类型 是一个枚举类型
* durable:是否持久化
* autoDelete:是否自动删除
* internal:内部使用,一般设为false
* arguments:参数,设为null
*/
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
//6.创建队列
String queueName1 = "queue1_topic";
String queueName2 = "queue2_topic";
channel.queueDeclare(queueName1,true,false,false,null);
channel.queueDeclare(queueName2,true,false,false,null);
//7.绑定队列和交换机
//需求:所有error级别的日志打印控制台和存入数据库,所有order系统的日志存入数据库
/**
* queueBind(String queue, String exchange, String routingKey)
* queue:队列名称
* exchange:交换机名称
* routingKey:路由键,绑定规则
* 如果交换机类型为FANOUT类型,则routingKey设为“” 说明交换机会绑定每一个queue
*/
//队列1的绑定
channel.queueBind(queueName1,exchangeName,"#.error");
channel.queueBind(queueName1,exchangeName,"order.*");
channel.queueBind(queueName2,exchangeName,"*.*");
//8.发送消息
String body = "日志信息:数据库被删除";
channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
//9.释放资源
channel.close();
connection.close();
}
}
消费者端代码:
//消费者1
public class Consumer_Topic1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.207.72.43");//ip 默认值是localhost
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");//用户名 默认是guest
factory.setPassword("123");
//3.创建连接
Connection connection = factory.newConnection();
//4.创建 channel
Channel channel = connection.createChannel();
String queueName1 = "queue1_topic";
String queueName2 = "queue2_topic";
//6.接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* 回调方法,当收到消息后会执行这个方法
* @param consumerTag
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag: " + consumerTag);
// System.out.println("Exchange: " + envelope.getExchange());
// System.out.println("RoutingKey: " + envelope.getRoutingKey());
// System.out.println("properties: " + properties);
System.out.println("body: " + new String(body));
System.out.println("将日志信息打印到控制台..");
}
};
channel.basicConsume(queueName1,true,consumer);
}
}
//消费者2
public class Consumer_Topic2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//2.设置参数
factory.setHost("123.207.72.43");//ip 默认值是localhost
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");//用户名 默认是guest
factory.setPassword("123");
//3.创建连接
Connection connection = factory.newConnection();
//4.创建 channel
Channel channel = connection.createChannel();
String queueName1 = "queue1_topic";
String queueName2 = "queue2_topic";
//6.接收消息
Consumer consumer = new DefaultConsumer(channel) {
/**
* 回调方法,当收到消息后会执行这个方法
* @param consumerTag
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// System.out.println("consumerTag: " + consumerTag);
// System.out.println("Exchange: " + envelope.getExchange());
// System.out.println("RoutingKey: " + envelope.getRoutingKey());
// System.out.println("properties: " + properties);
System.out.println("body: " + new String(body));
System.out.println("将日志信息存入数据库..");
}
};
channel.basicConsume(queueName2,true,consumer);
}
}
二,SpringBoot 整合 RabbitMQ
生产者端整合
步骤:
- 创建生产者工程 ;
- 添加依赖(也可以在创建 Spring Boot的时候添加依赖);
- 添加配置信息;
- 编写代码发送消息。
配置rabbitMQ的基本信息:
#配置rabbitmq的基本信息 为了创建连接工厂
spring:
rabbitmq:
host: 123.207.72.43
port: 5672
username: admin
password: 123
virtual-host: /
配置信息(创建一个RabbitMQConfig类):
/**
* 该类用来创建交换机和队列的 同时将交换机和队列进行绑定
*/
@Configuration
public class RabbitMQConfig {
//交换机名称
public static final String EXCHANGE_NAME = "topic_exchange";
//队列名称
public static final String QUEUE_NAME = "topic_queue";
//创建交换机
@Bean("bootExchange")
public Exchange bootExchange() {
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//创建队列
@Bean("bootQueue")
public Queue bootQueue() {
return QueueBuilder.durable(QUEUE_NAME).build();
}
//将交换机和队列进行绑定
/**
* 1.知道哪个队列
* 2.知道哪个交换机
* 3.routing key
* @param queue
* @param exchange
* @return
*/
@Bean
public Binding bindExchangeQueue(@Qualifier("bootQueue") Queue queue,@Qualifier("bootExchange") Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
编写测试代码发送消息:
@SpringBootTest
@RunWith(SpringRunner.class)
public class ProducerTest {
@Resource
private RabbitTemplate rabbitTemplate;
@Test
public void send() {
rabbitTemplate.convertAndSend(RabbitMQConfig.EXCHANGE_NAME,"boot.haha","hello rabbitmq");
System.out.println("消息发送成功!");
}
}
消费者端整合
步骤:
- 创建生产者工程;
- 添加依赖(也可以在创建 Spring Boot的时候添加依赖);
- 配置整合;
- 编写消息监听器。
配置rabbitMQ的基本信息:
#配置rabbitmq的基本信息 为了创建连接工厂
spring:
rabbitmq:
host: 123.207.72.43
port: 5672
username: admin
password: 123
virtual-host: /
编写消息监听器:
@Component
public class RabbitMQListener {
@RabbitListener(queues = "topic_queue")
//这里的message对象就是接收到的消息
public void listenQueue(Message message) {
System.out.println("message: " + message);
}
}