直接交换机与上一篇介绍的扇出类型交换机的区别就在于:
扇出交换机的routingKey都是空串,也就是一样的。而直接类型交换机的routingKey都是不一样的。还有就是交换机的类型不一样。
直接类型交换机,也叫做路由模式。通过routingKey可以做到区别的对待,可以只给队列Q1发消息,也可以只给队列Q2发消息。
另外,如果直接类型交换机的routingKey都是一样的,那么虽然类型是直接类型交换机,但是跟扇出类型的交换机的表现类似了。
消费者1
package com.xkj.org.mq.direct;
import com.rabbitmq.client.*;
import com.xkj.org.utils.RabbitMQUtil;
import java.io.IOException;
public class Receiver01 {
public static final String EXCHANGE_NAME = "direct_logs";
public static final String QUEUE_NAME = "console";
public static void main(String[] args) throws IOException {
Channel channel = RabbitMQUtil.getChannel();
//声明交换机为直接类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//队列绑定交换机(两个routingKey info, warming)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "info");
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "warning");
System.out.println("receive01准备接收消息,消息将打印到控制台...");
//接收消息
DeliverCallback deliverCallback = (consumerTag, message)-> {
System.out.println("receive01接收到的消息:"+ new String(message.getBody(), "UTF-8"));
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("receive01消息消费被取消");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
消费者2
package com.xkj.org.mq.direct;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.xkj.org.utils.RabbitMQUtil;
import java.io.IOException;
public class Receiver02 {
public static final String EXCHANGE_NAME = "direct_logs";
public static final String QUEUE_NAME = "disk";
public static void main(String[] args) throws IOException {
Channel channel = RabbitMQUtil.getChannel();
//声明交换机为直接类型
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
//声明一个队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
//队列绑定交换机
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "error");
System.out.println("receive02准备接收消息,消息将打印到控制台...");
//接收消息
DeliverCallback deliverCallback = (consumerTag, message)-> {
System.out.println("receive02接收到的消息:"+ new String(message.getBody(), "UTF-8"));
};
CancelCallback cancelCallback = consumerTag -> {
System.out.println("receive02消息消费被取消");
};
channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback);
}
}
生产者
package com.xkj.org.mq.direct;
import com.rabbitmq.client.Channel;
import com.xkj.org.utils.RabbitMQUtil;
import java.io.IOException;
import java.util.Scanner;
public class DirectLogs {
public static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException {
Channel channel = RabbitMQUtil.getChannel();
Scanner scanner = new Scanner(System.in);
while(scanner.hasNext()) {
String message = scanner.next();
channel.basicPublish(EXCHANGE_NAME, "error", null, message.getBytes("UTF-8"));
}
}
}