前言:
RabbitMQ是一个基于AMQP规范实现的消息队列。它具有性能好、高可用、跨平台性、社区活跃等优点,比较适合中小型公司使用。掌握RabbitMQ相关知识,对工作和学习都有帮助。下面我讲详细介绍一下Rabbit的相关知识。
正文:
一、AMQP规范:
首先,我们先要说明一下AMQP规范,这有利于我们学习RabbitMQ相关知识。
1. 概念:
AMQP(Advanced Message Queuing Protocol)是一个应用层的高级消息队列协议,它与JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。基于此协议不受客户端,开发语言等条件的限制,RabbitMQ就是基于此协议实现的。
2. 核心组件:
- ConnectionFactory(连接工厂):生产Connection的的工厂。
- Connection(连接):应用程序与Broker的网络连接TCP/IP/三次握手和四次挥手。AMQP连接通常是长连接。
- Channel(网络信道):大部分的业务操作是在Channel这个接口中完成的,包括:
- 队列的声明queueDeclare;交换机的声明exchangeDeclare;
- 队列的绑定queueBind;发布消息basicPublish;消费消息basicConsume等。
- Broker(中间件):接受客户端的连接,实现AMQP实体服务,如RabbitMQ。
- Producer(生产者):生产消息。
- Consumer(消费者):消费消息。
- Queue(队列):存储着即将被应用消费掉的消息。
- Message(消息):服务与应用程序之间传送的数据,由Properties(属性)和body(主体)组成。
- VirtualHost(虚拟主机):用于进行逻辑隔离,一个虚拟主机理由可以有若干个Exchange和Queue,同一个虚拟主机里面不能有相同名字的Exchange。
- Exchange(交换机):接受消息,根据路由键发送消息到绑定的队列(不具备消息存储的能力)。
- Binding(绑定):Exchange和Queue之间的虚拟连接。
- Routing Key(路由键):路由规则,虚拟机可以用它来确定如何路由一个特定消息。
3. AMQP工作过程:
- 生成者发布消息到交换机(Exchange)。
- 交换机根据路由规则,将消息分发给与当前交换机绑定的队列中。
- 消费者监听接收到消息之后开始业务处理。
二、4种交换机的使用:
RabbitMQ中一供有四种交换机类型,分别是Direct exchange(直连交换机)、Fanout exchange(扇形交换机)、Topic exchange(主题交换机)、Headers exchange(头交换机)。
1. Direct exchange(直连交换机):
要求消息与一个特定的路由键完全匹配,即一对一的,点对点的发送。
2. Fanout exchange(扇形交换机):
一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上,即发布订阅。
3.Topic exchange(主题交换机):
通配符匹配交换机,使用通配符去匹配,路由到对应的队列。
4. Headers exchange(头交换机):
不适用routingKey进行路由匹配,使用请求头中键值路由匹配。
5. 代码实现,以直连交换机为例:
5.1 直连交换机的Rabbit配置类:
/**
* 〈一句话功能简述〉<br>
* 〈直连交换机的Rabbit配置类〉
*
* @author hanxinghua
* @create 2022/9/19
* @since 1.0.0
*/
@Order(-1)
@Configuration
public class DirectRabbitConfig implements BeanPostProcessor {
@Resource
private RabbitAdmin rabbitAdmin;
//初始化rabbitAdmin对象
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类
rabbitAdmin.setAutoStartup(true);
return rabbitAdmin;
}
@Bean
public Queue rabbitDirectQueue() {
/**
* 1、name: 队列名称
* 2、durable: 是否持久化
* 3、exclusive: 是否独享、排外的。如果设置为true,定义为排他队列。则只有创建者可以使用此队列。也就是private私有的。
* 4、autoDelete: 是否自动删除。也就是临时队列。当最后一个消费者断开连接后,会自动删除。
* */
return new Queue(RabbitConstant.DIRECT_TOPIC, true, false, false);
}
@Bean
public DirectExchange rabbitDirectExchange() {
// Direct交换机
return new DirectExchange(RabbitConstant.DIRECT_EXCHANGE, true, false);
}
@Bean
public Binding bindDirect() {
//链式写法,绑定交换机和队列,并设置匹配键
return BindingBuilder
//绑定队列
.bind(rabbitDirectQueue())
//到交换机
.to(rabbitDirectExchange())
//并设置匹配键
.with(RabbitConstant.DIRECT_ROUTING);
}
/**
* 实例化Bean后的处理器
* Tips:
* 由于队列不存在,启动消费者会报错,最好的解决方法是生产者和消费者都尝试声明队列
*
* @param bean
* @param beanName
* @return
* @throws BeansException
*/
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
// 创建交换机
rabbitAdmin.declareExchange(rabbitDirectExchange());
// 创建队列
rabbitAdmin.declareQueue(rabbitDirectQueue());
return null;
}
}
5.2 直连交换机的发送消息服务:
/**
* 〈一句话功能简述〉<br>
* 〈〉
*
* @author hanxinghua
* @create 2022/9/19
* @since 1.0.0
*/
@Service("directRabbitService")
public class DirectRabbitServiceImpl implements RabbitService {
private static SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
@Resource
private RabbitTemplate rabbitTemplate;
@Override
public String sendMsg(String msg) throws Exception {
try {
rabbitTemplate.convertAndSend(RabbitConstant.DIRECT_EXCHANGE, RabbitConstant.DIRECT_ROUTING, msg);
return "ok";
} catch (Exception e) {
e.printStackTrace();
return "error";
}
}
}
5.3 直连交换机的消息消费者:
/**
* 〈一句话功能简述〉<br>
* 〈直连交换机消费者〉
*
* @author hanxinghua
* @create 2022/9/19
* @since 1.0.0
*/
@Component
public class DirectRabbitConsumer {
enum Action {
//处理成功
SUCCESS,
//可以重试的错误,消息重回队列
RETRY,
//无需重试的错误,拒绝消息,并从队列中删除
REJECT
}
@RabbitHandler
@RabbitListener(queuesToDeclare = @Queue(RabbitConstant.DIRECT_TOPIC))
public void process(String msg, Message message, Channel channel) {
long tag = message.getMessageProperties().getDeliveryTag();
Action action = Action.SUCCESS;
try {
System.out.println("消费者RabbitDemoConsumer从RabbitMQ服务端消费消息:" + msg);
if ("bad".equals(msg)) {
throw new IllegalArgumentException("测试:抛出可重回队列的异常");
}
if ("error".equals(msg)) {
throw new Exception("测试:抛出无需重回队列的异常");
}
} catch (IllegalArgumentException e1) {
e1.printStackTrace();
//根据异常的类型判断,设置action是可重试的,还是无需重试的
action = Action.RETRY;
} catch (Exception e2) {
//打印异常
e2.printStackTrace();
//根据异常的类型判断,设置action是可重试的,还是无需重试的
action = Action.REJECT;
} finally {
try {
if (action == Action.SUCCESS) {
//multiple 表示是否批量处理。true表示批量ack处理小于tag的所有消息。false则处理当前消息
channel.basicAck(tag, false);
} else if (action == Action.RETRY) {
//Nack,拒绝策略,消息重回队列
channel.basicNack(tag, false, true);
} else {
//Nack,拒绝策略,并且从队列中删除
channel.basicNack(tag, false, false);
}
channel.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}
三、6种通信模型使用:
RabbitMQ中,主要包括6种通信模型,分别是helloworld模型、work模型、pubsub模型、router模型、topic模型、rpc模型。
1. helloworld模型:
一个生产者发送消息,一个接收者接收消息。
相关代码:
/**
* 〈一句话功能简述〉<br>
* 〈接受队列中的消息〉
* <p>
* 一个生成者发送消息,一个接收者接收消息
*
* @author hanxinghua
* @create 2022/9/23
* @since 1.0.0
*/
public class Recv {
/**
* 队列名称
*/
private final static String QUEUE_NAME = "hello.mq";
public static void main(String[] argv) throws Exception {
// 创建链接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ所在主机ip或者主机名
factory.setHost("localhost");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个频道
Channel channel = connection.createChannel();
// 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println("Waiting for messages.");
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {
/**
* 处理交付
*
* @param consumerTag 这个消息的唯一标记
* @param envelope 信封(请求的消息属性的一个封装)
* @param properties 前面队列带过来的值
* @param body 接受到的消息
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received '" + message + "'");
}
};
// 启动一个消费者,并返回服务端生成的消费者标识
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
/**
* 〈一句话功能简述〉<br>
* 〈发送消息到队列中〉
*
* @author hanxinghua
* @create 2022/9/23
* @since 1.0.0
*/
public class Send {
/**
* 队列名称
*/
private final static String QUEUE_NAME = "hello.mq";
public static void main(String[] argv) throws Exception {
// 创建链接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ所在主机ip或者主机名
factory.setHost("localhost");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个频道
Channel channel = connection.createChannel();
// 指定一个队列
// 第一个参数:队列名称
// 第二个参数:false:重启后,队列没有。true:持久化队列,重启后,队列依然存在
// 第三个参数:声明一个独占队列,仅限于此连接,连接关闭,删除这个队列 true
// 第四个参数:最后一个消费者退出去之后,这个队列是否自动删除
// 第五个参数:队列的其他属性
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 往队列中发出一条消息
String message = "hello world!";
// 第一个参数: 交换机,不能为null,但是可以设置成 ""
// 第二个参数:路由key,不能为null,但是可以设置成 ""
// 第三个参数:设置的队列的属性
// 第四个参数:消息值
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("Sent '" + message + "'");
//关闭频道和连接
channel.close();
connection.close();
}
}
2. work模型:
多个消费者消费的数据之和才是原来队列中的所有数据,适用于流量的消峰。
相关代码:
/**
* 〈一句话功能简述〉<br>
* 〈〉
*
* @author hanxinghua
* @create 2022/9/23
* @since 1.0.0
*/
public class Task {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
for (int i = 0; i < 100 ; i++) {
channel.basicPublish("", TASK_QUEUE_NAME, null,
("我是工作模型:"+i).getBytes("UTF-8"));
}
System.out.println("Sent over!");
channel.close();
connection.close();
}
}
/**
* 〈一句话功能简述〉<br>
* 〈〉
*
* @author hanxinghua
* @create 2022/9/23
* @since 1.0.0
*/
public class Worker1 {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println("Waiting for messages.");
// // 消费端限流策略,同一时刻服务器只会发送一条消息给消费者
// channel.basicQos(1)
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者1接受到的消息是:"+new String(body));
// 进行手动应答 第一个参数:自动应答的这个消息标记 第二个参数:false 就相当于告诉队列受到消息了
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
}
}
/**
* 〈一句话功能简述〉<br>
* 〈〉
*
* @author hanxinghua
* @create 2022/9/23
* @since 1.0.0
*/
public class Worker2 {
private static final String TASK_QUEUE_NAME = "task_queue";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
final Connection connection = factory.newConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println("Waiting for messages.");
final Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消费者2接受到的消息是:"+new String(body));
// 进行手动应答 第一个参数:自动应答的这个消息标记 第二个参数:false 就相当于告诉队列受到消息了
channel.basicAck(envelope.getDeliveryTag(),false);
}
};
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
}
}
3. pubsub模型:
发布订阅模式,使用Fanout交换机。
相关代码:
/**
* 〈一句话功能简述〉<br>
* 〈消息发布者〉
*
* @author hanxinghua
* @create 2022/9/23
* @since 1.0.0
*/
public class Publish {
/**
* 声明交换机的名字
*/
private static final String EXCHANGE_NAME="fanout-01";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明交换机 第一个参数:交换机的名字 第二个参数:交换机的类型,如果使用的是发布订阅模型 只能写 fanout
channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);
// 发送消息到交换机
for (int i = 0; i <100 ; i++) {
channel.basicPublish(EXCHANGE_NAME,"",null,("发布订阅模型的值:"+i).getBytes());
}
System.out.println("Sent over!");
// 关闭资源
channel.close();
connection.close();
}
}
/**
* 〈一句话功能简述〉<br>
* 〈消息订阅者〉
*
* @author hanxinghua
* @create 2022/9/23
* @since 1.0.0
*/
public class Subscribe1 {
/**
* 声明交换机的名字
*/
private static final String EXCHANGE_NAME = "fanout-01";
/**
* 队列的名字
*/
private static final String QUEUE_NAME = "fanout-queue1";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 声明换机
channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);
// 将队列绑定到交换机 第一个参数:队列的名字 第二个参数:交换机的名字
// 第三个参数:路由的key(现在没有用到这个路由的key)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
System.out.println("Waiting for messages.");
// 声明消费者
Consumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("订阅者1接受到的数据是:" + new String(body));
}
};
// 启动一个消费者
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
/**
* 〈一句话功能简述〉<br>
* 〈消息订阅者〉
*
* @author hanxinghua
* @create 2022/9/23
* @since 1.0.0
*/
public class Subscribe2 {
/**
* 声明交换机的名字
*/
private static final String EXCHANGE_NAME = "fanout-01";
/**
* 队列的名字
*/
private static final String QUEUE_NAME = "fanout-queue2";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 声明换机
channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.FANOUT);
// 将队列绑定到交换机 第一个参数:队列的名字 第二个参数:交换机的名字
// 第三个参数:路由的key(现在没有用到这个路由的key)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
System.out.println("Waiting for messages.");
// 声明消费者
Consumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("订阅者1接受到的数据是:" + new String(body));
}
};
// 启动一个消费者
channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
}
}
4. router模型:
路由模型,相当于是分布订阅的升级版,根据路由的key(routing key)来判断是否路由到哪一个队列里面去,使用Direct交换机
相关代码:
/**
* 〈一句话功能简述〉<br>
* 〈〉
*
* @author hanxinghua
* @create 2022/9/23
* @since 1.0.0
*/
public class Producer {
private static final String EXCHANGE_NAME = "direct-01";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明一个交换机,要是路由模式只能是 direct
channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT);
// 发送信息到交换机
for (int i = 0; i < 100; i++) {
if (i % 2 == 0) {
// 这个路由的key是可以随便设置的
channel.basicPublish(EXCHANGE_NAME, "one", null, ("路由模型的值:" + i).getBytes());
} else {
// 这个路由的key是可以随便设置的
channel.basicPublish(EXCHANGE_NAME, "two", null, ("路由模型的值:" + i).getBytes());
}
}
System.out.println("Sent over!");
channel.close();
connection.close();
}
}
/**
* 〈一句话功能简述〉<br>
* 〈〉
*
* @author hanxinghua
* @create 2022/9/23
* @since 1.0.0
*/
public class Consumer1 {
private static final String EXCHANGE_NAME="direct-01";
private static final String QUEUE_NAME="direct-queue-01";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT);
// 绑定队列到交换机 第三个参数:表示的是路由key
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"one");
System.out.println("Waiting for messages.");
// 声明消费者
Consumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//这里就是接受消息的地方
System.out.println("路由key是one的这个队列接受到数据:"+new String(body));
}
};
//绑定消费者
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
}
}
/**
* 〈一句话功能简述〉<br>
* 〈〉
*
* @author hanxinghua
* @create 2022/9/23
* @since 1.0.0
*/
public class Consumer2 {
private static final String EXCHANGE_NAME="direct-01";
private static final String QUEUE_NAME="direct-queue-02";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.DIRECT);
// 绑定队列到交换机 第三个参数:表示的是路由key
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"two");
System.out.println("Waiting for messages.");
// 声明消费者
Consumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//这里就是接受消息的地方
System.out.println("路由key是two的这个队列接受到数据:"+new String(body));
}
};
//绑定消费者
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
}
}
5. topic模型:
相当于是对路由模式的一个升级,在匹配的规则上可以实现模糊匹配
相关代码:
/**
* 〈一句话功能简述〉<br>
* 〈〉
*
* @author hanxinghua
* @create 2022/9/23
* @since 1.0.0
*/
public class Producer {
private static final String EXCHANGE_NAME = "topic-01";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, ExchangeTypes.TOPIC);
// 发送信息到交换机
for (int i = 0; i < 100; i++) {
if (i % 2 == 0) {
channel.basicPublish(EXCHANGE_NAME, "one.one.one", null, ("路由模型的值:" + i).getBytes());
}else {
channel.basicPublish(EXCHANGE_NAME, "one.one", null, ("路由模型的值:" + i).getBytes());
}
}
System.out.println("Sent over!");
channel.close();
connection.close();
}
}
/**
* 〈一句话功能简述〉<br>
* 〈〉
*
* @author hanxinghua
* @create 2022/9/23
* @since 1.0.0
*/
public class Consumer1 {
private static final String QUEUE_NAME="topic-queue-01";
private static final String EXCHANGE_NAME="topic-01";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
// 绑定队列到交换机 第三个参数:表示的是路由key
// 注意 * :只是代表一个单词 # :这个才代表 一个或者多个单词
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"one.*");
System.out.println("Waiting for messages.");
// 声明消费者
Consumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key是one.*的这个队列接受到数据:"+new String(body));
}
};
System.out.println("Waiting for messages.");
//绑定消费者
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
}
}
/**
* 〈一句话功能简述〉<br>
* 〈〉
*
* @author hanxinghua
* @create 2022/9/23
* @since 1.0.0
*/
public class Consumer2 {
private static final String QUEUE_NAME="topic-queue-02";
private static final String EXCHANGE_NAME="topic-01";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// 声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
// 绑定队列到交换机 第三个参数:表示的是路由key
// 注意 * :只是代表一个单词 # :这个才代表 一个或者多个单词
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"one.#");
System.out.println("Waiting for messages.");
// 声明消费者
Consumer defaultConsumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由key是one.#的这个队列接受到数据:"+new String(body));
}
};
//绑定消费者
channel.basicConsume(QUEUE_NAME,true,defaultConsumer);
}
}
6. rpc模型:
相关代码:
/**
* 〈一句话功能简述〉<br>
* 〈〉
*
* @author hanxinghua
* @create 2022/9/23
* @since 1.0.0
*/
public class Server {
private final static String QUEUE_NAME = "rpc-01";
public static void main(String[] args) throws Exception {
// 创建链接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ所在主机ip或者主机名
factory.setHost("localhost");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个频道
Channel channel = connection.createChannel();
// 声明一个队列:客户端向服务器发送数据的队列
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 启动消费者,用来处理客户端发送到队列的消息
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 获取参数
String message = new String(body);
int n = Integer.parseInt(message);
// 模拟服务端的一个功能
String fib = handleInterface(n) + "";
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(properties.getCorrelationId())
.build();
// 将结果返回会客户端
// 注意从properties去获取客户端传送过来的信息,再返回回去
channel.basicPublish("", properties.getReplyTo(), replyProps, fib.getBytes());
}
});
}
private static int handleInterface(int n) {
if (n == 0) {
return 0;
}
return n + 2;
}
}
/**
* 〈一句话功能简述〉<br>
* 〈RPC模型〉
*
* @author hanxinghua
* @create 2022/9/23
* @since 1.0.0
*/
public class Client {
private final static String QUEUE_NAME = "rpc-01";
public static void main(String[] args) throws Exception {
// 创建链接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置RabbitMQ所在主机ip或者主机名
factory.setHost("localhost");
// 创建一个连接
Connection connection = factory.newConnection();
// 创建一个频道
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// 声明一个队列(换了一种方式),用于存储服务器返回到客户端的数据
String replyQueueName = channel.queueDeclare().getQueue();
// 使用UUID随机生成一个id
final String correlationId = UUID.randomUUID().toString();
// 客户端发送给服务器添加的额外属性
AMQP.BasicProperties props = new AMQP.BasicProperties()
.builder()
.correlationId(correlationId)
.replyTo(replyQueueName)
.build();
// 客户端将数据发送到发送队列
channel.basicPublish("", QUEUE_NAME, props, "4".getBytes());
// 启动消费者,用来客户端从相应队列获取到处理的结果
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
// 通过correlationId去保证获取到的是正确的信息
if (properties.getCorrelationId().equals(correlationId)) {
// 处理的结果输出
System.out.println("RPC返回结果:" + new String(body));
}
// 关闭通道,注意一定要等结果返回后再关闭,不然拿不到返回的数据
try {
channel.close();
connection.close();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
});
}
}