生产者
package com.qf.mq2302.routing;
import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
public class EmitLog {
public static final String EXCHANGE_NAME="emitlogs";
public static void main(String[] args) throws Exception {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
//创建一个路由模式的交换机,默认创出来,不持久化,不自动删除,不是内部交换机
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
String msg="hello routing!!";
//准备routingKey
String routingKey="info";
//发送消息
channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes("utf-8"));
channel.close();
connection.close();
}
}
消费者1号
package com.qf.mq2302.routing;
import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import java.io.IOException;
public class ReceiveError {
private static final String EXCHANGE_NAME="emitlogs";
public static void main(String[] args) throws Exception {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
//该消费者创建一个自己独占的队列,绑定到指定交换机接收消息。
String queueName = channel.queueDeclare().getQueue();
//准备号要绑定时使用的routingkey
String routingKey = "error";
//绑定该队列到交换机
channel.queueBind(queueName,EXCHANGE_NAME,routingKey);
//设置预留消息队列,也就是,RabbitMQ发过来,我可以存几个。当确认一个就会又发过来一个,
// 但是这些相当于线程池里的线程,然后每个线程又去开辟一个新的线程去执行,回调方法,
// 当回调方法确认完事,才会释放当前这个线程,然后去队列里在消费一个过来。
channel.basicQos(1);
//autoAck :false不自动确认,需要手动确认,如果手动不确认,就会按照 channel.basicQos(1);的数量,给多少就消费多少,不会再给你发了。
channel.basicConsume(queueName, false, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
byte[] body = message.getBody();
String msg = new String(body, "utf-8");
System.out.println(msg);
//手动ACK
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}
},consumerTag -> {});
}
}
消费者2号
package com.qf.mq2302.routing;
import com.qf.mq2302.utils.MQUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import java.io.IOException;
public class ReceiveIOther {
private static final String EXCHANGE_NAME="emitlogs";
public static void main(String[] args) throws Exception {
Connection connection = MQUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
//该消费者创建一个自己独占的队列,绑定到指定交换机接收消息。
String queueName = channel.queueDeclare().getQueue();
//准备号要绑定时使用的routingkey
String routingKey1 = "error";
String routingKey2 = "info";
String routingKey3 = "warn";
//绑定该队列到交换机
channel.queueBind(queueName,EXCHANGE_NAME,routingKey1);
channel.queueBind(queueName,EXCHANGE_NAME,routingKey2);
channel.queueBind(queueName,EXCHANGE_NAME,routingKey3);
channel.basicQos(1);
channel.basicConsume(queueName, false, new DeliverCallback() {
@Override
public void handle(String consumerTag, Delivery message) throws IOException {
byte[] body = message.getBody();
//获取routingKey
String routingKey = message.getEnvelope().getRoutingKey();
String msg = new String(body, "utf-8");
System.out.println(msg);
//手动ACK
channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
}
},consumerTag -> {});
}
}