Direct消息模型
* 路由模型:
* 一个交换机可以绑定多个队列
* 生产者给交换机发送消息时,需要指定消息的路由键
* 消费者绑定队列到交换机时,需要指定所需要消费的信息的路由键
* 交换机会根据消息的路由键将消息转发到对应的队列
* 缺点:
* 当消息很多的时候,需要指定的路由键也会很多,究极复杂。
生产者
package com.example.demo02.mq.direct;
import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.io.IOException;
public class DirectSender {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct.exchange", BuiltinExchangeType.DIRECT, false);
String msg1 = "{To DirectReceiver1: orderId:1001}";
String msg2 = "{To DirectReceiver2: orderId:1002}";
channel.basicPublish("direct.exchange","order.save",null,msg1.getBytes());
channel.basicPublish("direct.exchange","order.update",null,msg2.getBytes());
channel.close();
connection.close();
}
}
消费者1
package com.example.demo02.mq.direct;
import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class DirectReceiver1 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct.exchange", BuiltinExchangeType.DIRECT, false);
channel.queueDeclare("direct.queue1", false, false, false, null);
channel.queueBind("direct.queue1","direct.exchange","order.save");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("DirectReceiver1接收到的新增订单消息是:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("direct.queue1",false,consumer);
}
}
消费者2
package com.example.demo02.mq.direct;
import com.example.demo02.mq.util.ConnectionUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
public class DirectReceiver2 {
public static void main(String[] args) throws Exception {
Connection connection = ConnectionUtils.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare("direct.exchange", BuiltinExchangeType.DIRECT, false);
channel.queueDeclare("direct.queue2", false, false, false, null);
channel.queueBind("direct.queue2","direct.exchange","order.update");
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("DirectReceiver2接收到的修改订单消息:" + new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume("direct.queue2",false,consumer);
}
}
结果