官网提供的消息模式:
依赖:
<!-- 加入rabbitmq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
hello模型
没有交换机,直接传消息到队列
provider
/**
* hello模型
* 不需要交换机直接把消息发布到队列
* @throws IOException
* @throws TimeoutException
*/
@Test
public void SendMessage() throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接mq的属性,主机,账号,密码,端口,虚拟主机
connectionFactory.setHost("192.168.169.135");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("wzq.host");
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中的信道
Channel channel = connection.createChannel();
//同心绑定对应的消息队列
//参数1,队列名称,如果队列不存在自动创建
//参数2,用来定义队列是否需要持久化,true则持久化,rabbitmq关闭后重启还存在
//参数3,是否独占队列,true则其他信道或者客户端无法连接该队列
//参数4,是否在消费完成后自动删除队列
//参数5,其他额外的参数,map
channel.queueDeclare("hello", true,false,false,null);
//发布消息
//参数1,交换机名称,这里不需要
//参数2,路由key,一般跟对应队列名称一致
//参数3,传递消息额外设置,这里是设置消息持久化
//参数4,消息内容,需要字节数组
channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbitmq".getBytes());
channel.close();
connection.close();
}
customer
@Test
public void customerMessage() throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接mq的属性,主机,账号,密码,端口,虚拟主机
connectionFactory.setHost("192.168.169.135");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("wzq.host");
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中的信道
Channel channel = connection.createChannel();
//信道绑定对应的消息队列
//参数1,队列名称,如果队列不存在自动创建
//参数2,用来定义队列是否需要持久化,true则持久化,rabbitmq关闭还存在
//参数3,是否独占队列,true则其他信道或者客户端无法连接该队列
//参数4,是否在消费完成后自动删除队列
//参数5,其他额外的参数,map
channel.queueDeclare("hello", true,false,false,null);
//消费消息
//参数1,队列名称
//参数2,开始消息的自动确认机制
//参数3,消息时的回调接口
//参数4,消息内容,需要字节数组
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override //最后一个参数,消息队列中取出的数据
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收的消息:" + new String(body));
}
});
//一直监听
// channel.close();
// connection.close();
}
任务模型
Work queues,任务/工作模型。当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。消息会堆积越来越多,无法及时处理,此时可以使用work模型:让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消息,不会重复执行。
队列平均分配,循环分配给消费者
provider
/**
* 任务模型
* 多消费者消费
* @throws IOException
* @throws TimeoutException
*/
@Test
public void SendWordMessage() throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接mq的属性,主机,账号,密码,端口,虚拟主机
connectionFactory.setHost("192.168.169.135");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("wzq.host");
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中的信道
Channel channel = connection.createChannel();
//信道绑定对应的消息队列
//参数1,队列名称,如果队列不存在自动创建
//参数2,用来定义队列是否需要持久化,true则持久化,rabbitmq关闭后重启还存在
//参数3,是否独占队列,true则其他信道或者客户端无法连接该队列
//参数4,是否在消费完成后自动删除队列
//参数5,其他额外的参数,map
channel.queueDeclare("work", true,false,false,null);
//发布消息
//参数1,交换机名称,这里不需要
//参数2,路由key,一般跟对应队列名称一致
//参数3,传递消息额外设置,这里是设置消息持久化
//参数4,消息内容,需要字节数组
for (int i = 0; i < 100; i++) {
channel.basicPublish("","work", MessageProperties.PERSISTENT_TEXT_PLAIN,(i+": hello work").getBytes());
}
channel.close();
connection.close();
}
多个Customer
public class CustomerWork {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接mq的属性,主机,账号,密码,端口,虚拟主机
connectionFactory.setHost("192.168.169.135");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("wzq.host");
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中的信道
Channel channel = connection.createChannel();
//同心绑定对应的消息队列
//参数1,队列名称,如果队列不存在自动创建
//参数2,用来定义队列是否需要持久化,true则持久化,rabbitmq关闭还存在
//参数3,是否独占队列,true则其他信道或者客户端无法连接该队列
//参数4,是否在消费完成后自动删除队列
//参数5,其他额外的参数,map
channel.queueDeclare("work", true,false,false,null);
//能者多劳,通道每一次只接收一个,而不是平均分配
channel.basicQos(1);
//消费消息
//参数1,队列名称
//参数2,开始消息的自动确认机制:队列分配消息给消费者后,自动确定删除,消费者可以拿到消息,但是可能没消费完
//参数3,消息时的回调接口
//参数4,消息内容,需要字节数组
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override //最后一个参数,消息队列中取出的数据
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
try {
Thread.sleep(10000); //10秒
//任务模型,是平均分配给消费者,这里测试下消费者快慢下的分配情况
}catch (Exception e){
e.printStackTrace();
}
System.out.println("接收的消息:" + new String(body));
//参数1:确认队列中具体消息,队列确认完删除,确认机制
//参数2:是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
//一直监听
// channel.close();
// connection.close();
}
}
public class CustomerWork2 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接mq的属性,主机,账号,密码,端口,虚拟主机
connectionFactory.setHost("192.168.169.135");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("wzq.host");
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中的信道
Channel channel = connection.createChannel();
//同心绑定对应的消息队列
//参数1,队列名称,如果队列不存在自动创建
//参数2,用来定义队列是否需要持久化,true则持久化,rabbitmq关闭还存在
//参数3,是否独占队列,true则其他信道或者客户端无法连接该队列
//参数4,是否在消费完成后自动删除队列
//参数5,其他额外的参数,map
channel.queueDeclare("work", true,false,false,null);
//能者多劳,通道每一次只接收一个,而不是平均分配
channel.basicQos(1);
//消费消息
//参数1,队列名称
//参数2,开始消息的自动确认机制:队列分配消息给消费者后,自动确定删除,消费者可以拿到消息,但是可能没消费完
//参数3,消息时的回调接口
//参数4,消息内容,需要字节数组
channel.basicConsume("work",false, new DefaultConsumer(channel){
@Override //最后一个参数,消息队列中取出的数据
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收的消息:" + new String(body));
//参数1:确认队列中具体消息,队列确认完删除,确认机制
//参数2:是否开启多个消息同时确认
channel.basicAck(envelope.getDeliveryTag(),false);
}
});
//一直监听
// channel.close();
// connection.close();
}
}
平均分配:消费者执行的消息都一样多,自动确认,但是如果某个消费者执行慢,分配还是一样多,处理不均。
能者多劳:每次分配一个,每次确认一个。
fanout模型
fanout又称广播。消息通过交换机分配到绑定的所有的队列中。
- 可以有多个消费者
- 每个消费者都有自己的队列
- 每个队列都要绑定到交换机
- 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定
- 交换机把消息发送给绑定过的所有队列
- 队列的消息者都能拿到消息,实现一条消息被多个消费者消费
Provider
/**
* fanout模型
* 消息通过交换机分配到绑定的所有的队列中
*/
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接mq的属性,主机,账号,密码,端口,虚拟主机
connectionFactory.setHost("192.168.169.135");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("wzq.host");
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中的信道
Channel channel = connection.createChannel();
//信道声明指定的交换机
//参数1:交换机名称
//参数2:交换机类型(fanout、direct、topic)
//rabbitmq中没有的话自动创建
channel.exchangeDeclare("order","fanout");
//发送消息
//参数1:交换机名称
//参数2:路由key,fanout模式不需要
//参数3:额外参数
//参数4:消息字节数组
channel.basicPublish("order","",null,"fanout message".getBytes());
channel.close();
connection.close();
}
}
可多个Customer,一个消息每一个Customer都会接收到消息,可用于分发消息给相同功能处理的不同系统。
public class Customer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接mq的属性,主机,账号,密码,端口,虚拟主机
connectionFactory.setHost("192.168.169.135");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("wzq.host");
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中的信道
Channel channel = connection.createChannel();
//信道声明指定的交换机
//参数1:交换机名称
//参数2:交换机类型(fanout、direct、topic)
//rabbitmq中没有的话自动创建
channel.exchangeDeclare("order","fanout");
//临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
//参数1:队列名字
//参数2:交换机名称
//参数3:路由key,fanout模式不需要
channel.queueBind(queueName,"order","");
//消费消息
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收消息:" + new String(body));
}
});
}
}
direct模型
在fanout模型中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费,这时需要用到direct类型的exchange。生产者向交换机发送消息,发送消息时会指定一个RoutingKey,交换机根据消息的RoutingKey和队列的RoutingKey,把消息发送到对应的队列。
Direct模型:
- 队列与交换机的绑定,不能任意绑定,而是指定一个RoutingKey(路由key)
- 消息的发送方在向exchange发送消息时,也必须指定消息的RoutingKey
- exchange根据RoutingKey进行判断,只有队列的RoutingKey与消息的RoutingKey完全一致,队列才能接收到消息。
Provider
/**
* Title:direct模型
* Description:根据路由key接收消息
* @author WZQ
* @version 1.0.0
* @date 2020/3/29
*/
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接mq的属性,主机,账号,密码,端口,虚拟主机
connectionFactory.setHost("192.168.169.135");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("wzq.host");
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中的信道
Channel channel = connection.createChannel();
//信道声明指定的交换机
//参数1:交换机名称
//参数2:交换机类型(fanout、direct、topic)
//rabbitmq中没有的话自动创建
channel.exchangeDeclare("logs.direct","direct");
//发送消息
//参数1:交换机名称
//参数2:路由key,fanout模式不需要
//参数3:额外参数
//参数4:消息字节数组
channel.basicPublish("logs.direct","direct.error",null,"direct error message".getBytes());
channel.basicPublish("logs.direct","direct.info",null,"direct info message".getBytes());
channel.close();
connection.close();
}
}
Customer1:只接收对应路由key的消息
public class Customer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接mq的属性,主机,账号,密码,端口,虚拟主机
connectionFactory.setHost("192.168.169.135");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("wzq.host");
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中的信道
Channel channel = connection.createChannel();
//信道声明指定的交换机
//参数1:交换机名称
//参数2:交换机类型(fanout、direct、topic)
//rabbitmq中没有的话自动创建
channel.exchangeDeclare("logs.direct","direct");
//临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
//参数1:队列名字
//参数2:交换机名称
//参数3:路由key
channel.queueBind(queueName,"logs.direct","direct.error");
//消费消息
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收消息:" + new String(body));
}
});
}
}
Customer2:只接收对应路由key的消息,key可以绑定多个。
public class Customer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接mq的属性,主机,账号,密码,端口,虚拟主机
connectionFactory.setHost("192.168.169.135");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("wzq.host");
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中的信道
Channel channel = connection.createChannel();
//信道声明指定的交换机
//参数1:交换机名称
//参数2:交换机类型(fanout、direct、topic)
//rabbitmq中没有的话自动创建
channel.exchangeDeclare("logs.direct","direct");
//临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
//参数1:队列名字
//参数2:交换机名称
//参数3:路由key
channel.queueBind(queueName,"logs.direct","direct.test");
channel.queueBind(queueName,"logs.direct","direct.warning");
channel.queueBind(queueName,"logs.direct","direct.info");
//消费消息
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收消息:" + new String(body));
}
});
}
}
topic模型
相对于Direct模型,topics模型的exchange可以让队列在绑定RoutingKey的时候使用通配符,这种RoutingKey一般都是由一个或多个单词组成,多个单词直接以“.”分割。
*指匹配单个,#指匹配所有。例如:
user.#指user开头都可以匹配到
user.*指user+一个字段的可以匹配到
*.user指一个字段+user的可以匹配到
*.*.user指两个字段+user的可以匹配到
Provider
/**
* Title:topic模型
* Description:符号范围匹配路由消息
* @author WZQ
* @version 1.0.0
* @date 2020/3/29
*/
public class Provider {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接mq的属性,主机,账号,密码,端口,虚拟主机
connectionFactory.setHost("192.168.169.135");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("wzq.host");
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中的信道
Channel channel = connection.createChannel();
//信道声明指定的交换机
//参数1:交换机名称
//参数2:交换机类型(fanout、direct、topic)
//rabbitmq中没有的话自动创建
channel.exchangeDeclare("logs.topics","topic");
//发送消息
//参数1:交换机名称
//参数2:路由key,fanout模式不需要
//参数3:额外参数
//参数4:消息字节数组
channel.basicPublish("logs.topics","user.save",null,"topics.user.save".getBytes());
channel.basicPublish("logs.topics","user.select.test",null,"topics.user.select.test".getBytes());
channel.close();
connection.close();
}
}
Customer1:只接收到user.save
public class Customer {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接mq的属性,主机,账号,密码,端口,虚拟主机
connectionFactory.setHost("192.168.169.135");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("wzq.host");
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中的信道
Channel channel = connection.createChannel();
//信道声明指定的交换机
//参数1:交换机名称
//参数2:交换机类型(fanout、direct、topic)
//rabbitmq中没有的话自动创建
channel.exchangeDeclare("logs.topics","topic");
//临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
//参数1:队列名字
//参数2:交换机名称
//参数3:路由key
channel.queueBind(queueName,"logs.topics","user.*");
//消费消息
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收消息:" + new String(body));
}
});
}
}
Customer2:接收到所有
public class Customer2 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接mq的连接工厂对象
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置连接mq的属性,主机,账号,密码,端口,虚拟主机
connectionFactory.setHost("192.168.169.135");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("wzq.host");
connectionFactory.setUsername("root");
connectionFactory.setPassword("123456");
//获取连接对象
Connection connection = connectionFactory.newConnection();
//获取连接中的信道
Channel channel = connection.createChannel();
//信道声明指定的交换机
//参数1:交换机名称
//参数2:交换机类型(fanout、direct、topic)
//rabbitmq中没有的话自动创建
channel.exchangeDeclare("logs.topics","topic");
//临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定交换机和队列
//参数1:队列名字
//参数2:交换机名称
//参数3:路由key
channel.queueBind(queueName,"logs.topics","user.#");
//消费消息
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接收消息:" + new String(body));
}
});
}
}