一、回顾RabbitMQ基础概念
二、RabbitMQ基础编程模型
使用RabbitMQ提供的原生客户端API进行交互。这是使用RabbitMQ的基础。
1.1、maven依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.9.0</version>
</dependency>
1.2、基础编程模型
1.首先创建连接,获取Channel
private static final String HOST_NAME="192.168.56.10";
private static final int HOST_PORT=5672;
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(HOST_NAME);
factory.setPort(HOST_PORT);
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
connection = factory.newConnection();
2.声明Exchange-可选
channel.exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,Map<String, Object> arguments) throws IOException
3、声明queue
channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments);
4、声明Exchange与Queue的绑定关系-可选
channel.queueBind(String queue, String exchange, String routingKey) throws IOException;
总结:说白了,就是声明一个交换机和队列,然后进行绑定,至于Channel和连接connection实际是一种物理概念,连接rabbitmq使用的,
当然Channel也可以声明多个,但是需要注意命名!
- 在创建channel时,可以在createChannel方法中传入一个分配的int参数channelNumber。这个ChannelNumber就会作为Channel的唯一标识。而RabbitMQ防止ChannelNumber重复的方式是:如果对应的Channel没有创建过,就会创建一个新的Channel。但是如果ChannelNumber已经创建过一个Channel了,这时就会返回一个null。
5、Producer根据应用场景发送消息到queue
channel.basicPublish(String exchange, String routingKey, BasicProperties props,message.getBytes("UTF-8")) ;
6、Consumer消费消息
-
1、被动消费模式
Consumer等待rabbitMQ 服务器将message推送过来再消费。一般是启一个一直挂起的线程来等待。
channel.basicConsume(String queue, boolean autoAck, Consumer callback);
- 另一种是主动消费模式
Comsumer主动到rabbitMQ服务器上去拉取messge进行消费。
GetResponse response = channel.basicGet(QUEUE_NAME, boolean autoAck)
其中需要注意点的是autoAck。autoAck为true则表示消息被Consumer消费成功后,后续就无法再消费了。而如果autoAck设置为false,就需要在处理过程中手动去调用channel的basicAck方法进行应答。如果
不应答的话,这个消息同样会继续被Consumer重复处理。所以这里要注意,如果消费者一直不对消息进行应答,那么消息就会不断的发起重试,这就会不断的消耗系统资源,最终造成服务宕机。
7、完成以后关闭连接,释放资源
channel.close();
conection.clouse();
三、RabbitMQ常用的消息场景
- hello world体验
最直接的方式,P端发送一个消息到一个指定的queue,中间不需要任何exchange规则。C端按queue方式进行消费
//生产者
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//声明队列会在服务端自动创建。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!333";
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode());
builder.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority());
//携带消息ID
builder.messageId(""+channel.getNextPublishSeqNo());
Map<String, Object> headers = new HashMap<>();
//携带订单号
headers.put("order", "123");
builder.headers(headers);
channel.basicPublish("", QUEUE_NAME, builder.build(), message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();
}
//消费者
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//Consumer接口还一个实现QueueConsuemr 但是代码注释过期了。
Consumer myconsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body)
throws IOException {
System.out.println("========================");
String routingKey = envelope.getRoutingKey();
System.out.println("routingKey >"+routingKey);
String contentType = properties.getContentType();
System.out.println("contentType >"+contentType);
long deliveryTag = envelope.getDeliveryTag();
System.out.println("deliveryTag >"+deliveryTag);
System.out.println("content:"+new String(body,"UTF-8"));
System.out.println("messageId:"+properties.getMessageId());
properties.getHeaders().forEach((key,value)-> System.out.println("key: "+key +"; value: "+value));
// (process the message components here ...)
//消息处理完后,进行答复。答复过的消息,服务器就不会再次转发。
//没有答复过的消息,服务器会一直不停转发。
channel.basicAck(deliveryTag, false);
}
};
channel.basicConsume(QUEUE_NAME, false, myconsumer);
}
- Work queues 工作序列
工作任务模式,领导部署一个任务,由下面的一个员工来处理,Producer消息发送给queue,多个Consumer同时往队列上消费消息。
/**
* 发布一个task,交由多个Worker去处理。 每个task只要由一个Worker完成就行。
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RabbitMQUtil.QUEUE_WORK, true, false, false, null);
for (int i = 0;i<5;i++){
String message = "task 1";
channel.basicPublish("", RabbitMQUtil.QUEUE_WORK,
MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
}
channel.close();
connection.close();
}
//消费者1
public static void main(String[] args) throws Exception{
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//这个任务场景一般任务不能因为rabbitmq崩溃而消失,所以把第二个是否持久化设置成true。
//这样,即使rabbitmq服务重启,任务不会丢失
channel.queueDeclare(RabbitMQUtil.QUEUE_WORK, true, false, false, null);
//每个worker同时最多只处理一个消息
channel.basicQos(1);
Consumer myconsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body)
throws IOException {
System.out.println("========================");
String routingKey = envelope.getRoutingKey();
System.out.println("routingKey >"+routingKey);
String contentType = properties.getContentType();
System.out.println("contentType >"+contentType);
long deliveryTag = envelope.getDeliveryTag();
System.out.println("deliveryTag >"+deliveryTag);
System.out.println("content:"+new String(body,"UTF-8"));
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
};
channel.basicConsume(RabbitMQUtil.QUEUE_WORK, myconsumer);
}
//消费者2
public static void main(String[] args) throws Exception{
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//这个任务场景一般任务不能因为rabbitmq崩溃而消失,所以把第二个是否持久化设置成true。
//这样,即使rabbitmq服务重启,任务不会丢失
channel.queueDeclare(RabbitMQUtil.QUEUE_WORK, true, false, false, null);
//每个worker同时最多只处理一个消息
channel.basicQos(1);
Consumer myconsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body)
throws IOException {
System.out.println("========================");
String routingKey = envelope.getRoutingKey();
System.out.println("routingKey >"+routingKey);
String contentType = properties.getContentType();
System.out.println("contentType >"+contentType);
long deliveryTag = envelope.getDeliveryTag();
System.out.println("deliveryTag >"+deliveryTag);
System.out.println("content:"+new String(body,"UTF-8"));
// (process the message components here ...)
channel.basicAck(deliveryTag, false);
}
};
channel.basicConsume(RabbitMQUtil.QUEUE_WORK, myconsumer);
}
这个模式应该是最常用的模式,也是官网讨论比较详细的一种模式,所以官网上也对这种模式做了重点讲述。
- 首先。Consumer端的autoAck字段设置的是false,这表示consumer在接收到消息后不会自动反馈服务器已消费了message,而要改在对message处理完成了之后,再调用channel.basicAck来通知服务器已经消费了该message.这样即使Consumer在执行message过程中出问题了,也不会造成message被忽略,因为没有ack的message会被服务器重新进行投递。
但是,这其中也要注意一个很常见的BUG,就是如果所有的consumer都忘记调用basicAck()了,就会造成message被不停的分发,也就造成不断的消耗系统资源。这也就是 Poison Message(毒消息) - 其次,官方特意提到的message的持久性。关键的message不能因为服务出现问题而被忽略。还要注意,官方特意提到,所有的queue是不能被多次定义的。如果一个queue在开始时被声明为durable,那在后面再次声明这个queue时,即使声明为 not durable,那这个queue的结果也还是durable的。
- 然后,是中间件最为关键的分发方式。这里,RabbitMQ默认是采用的fair dispatch,也叫round-robin模式,就是把消息轮询,在所有consumer中轮流发送。这种方式,没有考虑消息处理的复杂度以及consumer的处理能力。而他们改进后的方案,是consumer可以向服务器声明一个prefetchCount,我把他叫做预处理能力值。channel.basicQos(prefetchCount);表示当前这个consumer可以同时处理几个message。这样服务器在进行消息发送前,会检查这个consumer当前正在处理中的message(message已经发送,但是未收到consumer的basicAck)有几个,如果超过了这个consumer节点的能力值,就不再往这个consumer发布。
3.Publish/Subscribe 订阅 发布 机制
这个机制是对上面的一种补充。也就是把preducer与Consumer进行进一步的解耦。producer只负责发送消息,至于消息进入哪个queue,由exchange来分配。如上图,就是把producer发送的消息,交由exchange同时发送到两个queue里,然后由不同的Consumer去进行消费。
/**
* exchange有四种类型, fanout topic headers direct
* fanout类型的exchange会往其上绑定的所有queue转发消息。
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception{
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//发送者只管往exchange里发消息,而不用关心具体发到哪些queue里。
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "LOG INFO 222";
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
channel.close();
connection.close();
}
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
System.out.println(queueName);
channel.queueBind(queueName, EXCHANGE_NAME, "");
Consumer myconsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body)
throws IOException {
System.out.println("========================");
String routingKey = envelope.getRoutingKey();
System.out.println("routingKey >"+routingKey);
String contentType = properties.getContentType();
System.out.println("contentType >"+contentType);
long deliveryTag = envelope.getDeliveryTag();
System.out.println("deliveryTag >"+deliveryTag);
System.out.println("content:"+new String(body,"UTF-8"));
// (process the message components here ...)
//消息处理完后,进行答复。答复过的消息,服务器就不会再次转发。
//没有答复过的消息,服务器会一直不停转发。
// channel.basicAck(deliveryTag, false);
}
};
channel.basicConsume(queueName,true, myconsumer);
// channel.close();
// connection.close();
}
关键处就是type为”fanout” 的exchange,这种类型的exchange只负责往所有已绑定的队列上发送消息。
- Routing 基于内容的路由
type为”direct” 的exchange
这种模式一看图就清晰了。 在上一章 exchange 往所有队列发送消息的基础上,增加一个路由配置,指定exchange如何将不同类别的消息分发到不同的queue上。
/**
* exchange有四种类型, fanout topic headers direct
* direct类型的exchange会根据routingkey,将消息转发到该exchange上绑定了该routingkey的所有queue
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception{
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//发送者只管往exchange里发消息,而不用关心具体发到哪些queue里。
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String message = "LOG INFO 44444";
channel.basicPublish(EXCHANGE_NAME, "info", null, message.getBytes());
channel.basicPublish(EXCHANGE_NAME, "debug", null, message.getBytes());
channel.basicPublish(EXCHANGE_NAME, "warn", null, message.getBytes());
channel.close();
connection.close();
}
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
// String queueName = channel.queueDeclare().getQueue();
String queueName="direct_queue";
channel.queueDeclare(queueName,false,false,false,null);
// channel.queueBind(queueName, EXCHANGE_NAME, "info");
// channel.queueBind(queueName, EXCHANGE_NAME, "debug");
channel.queueBind(queueName, EXCHANGE_NAME, "warn");
Consumer myconsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body)
throws IOException {
System.out.println("========================");
String routingKey = envelope.getRoutingKey();
System.out.println("routingKey >" + routingKey);
String contentType = properties.getContentType();
System.out.println("contentType >" + contentType);
long deliveryTag = envelope.getDeliveryTag();
System.out.println("deliveryTag >" + deliveryTag);
System.out.println("content:" + new String(body, "UTF-8"));
// (process the message components here ...)
//消息处理完后,进行答复。答复过的消息,服务器就不会再次转发。
//没有答复过的消息,服务器会一直不停转发。
// channel.basicAck(deliveryTag, false);
}
};
channel.basicConsume(queueName, false, myconsumer);
}
5:Topics 基于话题的路由
type为"topic" 的exchange
这个模式也就在上一个模式的基础上,对routingKey进行了模糊匹配
单词之间用,隔开,* 代表一个具体的单词。# 代表0个或多个单词。
/**
* exchange有四种类型, fanout topic headers direct
* topic类型的exchange在根据routingkey转发消息时,可以对rouytingkey做一定的规则,比如anonymous.info可以被*.info匹配到。
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception{
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//发送者只管往exchange里发消息,而不用关心具体发到哪些queue里。
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String message = "LOG INFO";
channel.basicPublish(EXCHANGE_NAME, "anonymous.info", null, message.getBytes());
channel.basicPublish(EXCHANGE_NAME, "tuling.loulan.debug", null, message.getBytes());
channel.close();
connection.close();
}
public static void main(String[] args) throws Exception {
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
String queueName = channel.queueDeclare().getQueue();
//topic的routingkey,*代表一个具体的单词,#代表0个或多个单词。
channel.queueBind(queueName, EXCHANGE_NAME, "*.info");
channel.queueBind(queueName, EXCHANGE_NAME, "#.debug");
Consumer myconsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body)
throws IOException {
System.out.println("========================");
String routingKey = envelope.getRoutingKey();
System.out.println("routingKey >"+routingKey);
String contentType = properties.getContentType();
System.out.println("contentType >"+contentType);
long deliveryTag = envelope.getDeliveryTag();
System.out.println("deliveryTag >"+deliveryTag);
System.out.println("content:"+new String(body,"UTF-8"));
// (process the message components here ...)
//消息处理完后,进行答复。答复过的消息,服务器就不会再次转发。
//没有答复过的消息,服务器会一直不停转发。
// channel.basicAck(deliveryTag, false);
}
};
channel.basicConsume(queueName,true, myconsumer);
}
6:Publisher Confirms 发送者消息确认
RabbitMQ的消息可靠性是非常高的,但是他以往的机制都是保证消息发送到了MQ之后,可以推送到消费者消费,不会丢失消息。但是发送者发送消息是否成功是没有保证的
Producer.basicPublish方法是没有返回值的,也就是说,一次发送消息是否成功,应用是不知道的,这在业务上就容易造成消息丢失。而这个模块就是通过给发送者提供一些确认机制,来保证这个消息发送的过程是成功的。
发送者确认模式默认是不开启的,所以如果需要开启发送者确认模式,需要手动在channel中进行声明。
channel.confirmSelect();
- 发布单条消息
即发布一条消息就确认一条消息。核心代码:
for (int i = 0; i < MESSAGE_COUNT; i++) {
String body = String.valueOf(i);
channel.basicPublish("", queue, null, body.getBytes());
channel.waitForConfirmsOrDie(5_000);
}
channel.waitForConfirmsOrDie(5_000);这个方法就会在channel端等待RabbitMQ给出一个响应,用来表明这个消息已经正确发送到了RabbitMQ服务端。但是要注意,这个方法会同步阻塞channel,在等待确认期间,channel将不能再继续发送消息,也就是说会明显降低集群的发送速度即吞吐量。
- 发送批量消息
之前单条确认的机制会对系统的吞吐量造成很大的影响,所以稍微中和一点的方式就是发送一批消息后,再一起确认。
int batchSize = 100;
int outstandingMessageCount = 0;
long start = System.nanoTime();
for (int i = 0; i < MESSAGE_COUNT; i++) {
String body = String.valueOf(i);
ch.basicPublish("", queue, null, body.getBytes());
outstandingMessageCount++;
if (outstandingMessageCount == batchSize) {
ch.waitForConfirmsOrDie(5_000);
outstandingMessageCount = 0;
}
}
if (outstandingMessageCount > 0) {
ch.waitForConfirmsOrDie(5_000);
}
这种方式可以稍微缓解下发送者确认模式对吞吐量的影响。但是也有个固有的问题就是,当确认出现异常时,发送者只能知道是这一批消息出问题了, 而无法确认具体是哪一条消息出了问题。所以接下来就需要增
加一个机制能够具体对每一条发送出错的消息进行处理
- 异步确认消息
实现的方式也比较简单,Producer在channel中注册监听器来对消息进行确认。核心代码就是一个:
channel.addConfirmListener(ConfirmCallback var1, ConfirmCallback var2)
按说监听只要注册一个就可以了,那为什么这里要注册两个呢?成功一个,失败一个然后关于这个ConfirmCallback,这是个监听器接口,里面只有一个方法: void handle(long sequenceNumber, boolean multiple) throws IOException; 这方法中的两个参数,
- sequenceNumer:这个是一个唯一的序列号,代表一个唯一的消息。在RabbitMQ中,他的消息体只是一个二进制数组,默认消息是没有序列号的。那么在回调的时候,Producer怎么知道是哪一条消息成功或者失败呢?RabbitMQ提供了一个方法
int sequenceNumber =channel.getNextPublishSeqNo();
来生成一个全局递增的序列号,这个序列号将会分配给新发送的那一条消息。然后应用程序需要自己来将这个序列号与消息对应起来。 - multiple:这个是一个Boolean型的参数。如果是false,就表示这一次只确认了当前一条消息。如果是true,就表示RabbitMQ这一次确认了一批消息,在sequenceNumber之前的所有消息都已经确认完成了。
这三种确认机制都能够提升Producer发送消息的安全性。通常情况下,第三种异步确认机制的性能是最好的。
7、Headers 头部路由机制
在官网的体验示例中,还有一种路由策略并没有提及,那就是Headers路由。其实官网之所以没有过多介绍,就是因为这种策略在实际中用得比较少,但是在某些比较特殊的业务场景,还是挺好用的。
/**
* exchange有四种类型, fanout topic headers direct
* headers用得比较少,他是根据头信息来判断转发路由规则。头信息可以理解为一个Map
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception{
// header模式不需要routingKey来转发,他是根据header里的信息来转发的。比如消费者可以只订阅logLevel=info的消息。
// 然而,消息发送的API还是需要一个routingKey。
// 如果使用header模式来转发消息,routingKey可以用来存放其他的业务消息,客户端接收时依然能接收到这个routingKey消息。
String routingKey = "ourTestRoutingKey";
// The map for the headers.
Map<String, Object> headers = new HashMap<>();
headers.put("loglevel", "info");
headers.put("buslevel", "product");
headers.put("syslevel", "admin");
String message = "LOG INFO asdfasdf";
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
//发送者只管往exchange里发消息,而不用关心具体发到哪些queue里。
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.deliveryMode(MessageProperties.PERSISTENT_TEXT_PLAIN.getDeliveryMode());
builder.priority(MessageProperties.PERSISTENT_TEXT_PLAIN.getPriority());
builder.headers(headers);
channel.basicPublish(EXCHANGE_NAME, routingKey, builder.build(), message.getBytes("UTF-8"));
// channel.txSelect();
// channel.txCommit();
// channel.txRollback();
channel.close();
connection.close();
}
public static void main(String[] args) throws Exception {
String routingKey= "ourTestRoutingKey";
x-match:特定的参数。all表示必须全部匹配才算成功。
//any表示只要匹配一个就算成功。
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("x-match","any");
// headers.put("loglevel", "info");
headers.put("buslevel", "product");
// headers.put("syslevel", "admin");
Connection connection = RabbitMQUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.HEADERS);
String queueName = channel.queueDeclare("ReceiverHeader",true,false,false,null).getQueue();
channel.queueBind(queueName, EXCHANGE_NAME,routingKey,headers);
Consumer myconsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body)
throws IOException {
System.out.println("========================");
String routingKey = envelope.getRoutingKey();
System.out.println("routingKey >"+routingKey);
String contentType = properties.getContentType();
System.out.println("contentType >"+contentType);
long deliveryTag = envelope.getDeliveryTag();
System.out.println("deliveryTag >"+deliveryTag);
Map<String, Object> headerInfo = properties.getHeaders();
headerInfo.forEach((key,value)-> System.out.println("header key: "+key+"; value: "+value));
System.out.println("content:"+new String(body,"UTF-8"));
// (process the message components here ...)
//消息处理完后,进行答复。答复过的消息,服务器就不会再次转发。
//没有答复过的消息,服务器会一直不停转发。
channel.basicAck(deliveryTag, false);
}
};
String consumerTag = channel.basicConsume(queueName,true, myconsumer);
System.out.println("consumerTag > "+consumerTag);
}
四、SpringBoot集成RabbitMQ
SpringBoot官方就集成了RabbitMQ,所以RabbitMQ与SpringBoot的集成是非常简单的。不过,SpringBoot集成RabbitMQ的方式是按照Spring的一套统一的MQ模型创建的,因此SpringBoot集成插件中对于生产者、消息、消费者等重要的对象模型,与RabbitMQ原生的各个组件有对应关系,但是并不完全相同。
- 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
- 配置关键参数
server.port=8080
spring.rabbitmq.host=192.168.56.10
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
# 单词推送消息数量
spring.rabbitmq.listener.simple.prefetch=1
# 消费者的消费线程数量
spring.rabbitmq.listener.simple.concurrency=5
# 消费者的最大线程数量
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.listener.simple.acknowledge-mode=none
# 2.6以上版本SpringBoot集成Swagger2要修改配置。坑爹
spring.mvc.pathmatch.matching-strategy=ant_path_matcher
3: 声明Exchange,Queue和Binding
所有的exchange, queue, binding的配置,都需要以对象的方式声明。默认情况下,这些业务对象一经声明,应用就会自动到RabbitMQ上常见对应的业务对象。但是也是可以配置成绑定已有业务对象的。
/**
* 直连模式只需要声明队列,所有消息都通过队列转发。
* @author roykingw
*/
@Configuration
public class DirectConfig {
@Bean
public Queue directQueue() {
return new Queue(MyConstants.QUEUE_DIRECT);
}
}
- 使用RabbitmqTemplate对象发送消息
@ApiOperation(value="direct发送接口",notes="直接发送到队列。task模式")
@GetMapping(value="/directSend")
public Object directSend(String message) throws AmqpException, UnsupportedEncodingException {
//设置部分请求参数
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
messageProperties.setPriority(2);
//设置消息转换器,如json
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
//将对象转换成json再发送。
// rabbitTemplate.convertandsend("",Object);
//发消息
rabbitTemplate.send("directqueue",new Message(message.getBytes("UTF-8"),messageProperties));
return "message sended : "+message;
}
- 使用@RabbitListener注解声明消费者
消费者都是通过@RabbitListener注解来声明。在@RabbitMQListener注解中包含了非常多对Queue进行定制的属性,大部分的属性都是有默认值的。例如ackMode默认是null,就表示自动应答。在日常开发过程中,通常都会简化业务模型,让消费者只要绑定队列消费即可。
@RabbitListener(queues=MyConstants.QUEUE_DIRECT)
public void directReceive2(String message) {
System.out.println("consumer2 received message : " +message);
}