【RabbitMQ】 相关概念 + 工作模式

news2025/1/12 12:26:14

本文将介绍一些MQ中常见的概念,同时也会简单实现一下RabbitMQ的工作流程。

MQ概念

Message Queue消息队列。是用来存储消息的队列,多用于分布式系统之间的通信。

系统间调用通常有:同步通信和异步通信。MQ就是在异步通信的时候使用的。

同步通信:

异步通信:

MQ作用

  • 异步解耦:在业务流程中,一些操作可能非常耗时,但并不需要即时返回结果。可以借助消息队列(MQ)将这些操作异步化。例如,当用户注册后,可以将发送注册短信或邮件通知作为异步任务处理,而不必等待这些操作完成后才告知用户注册成功。
  • 流量削峰:在访问量剧增的情况下,应用仍然需要继续发挥作用,但这样的突发流量并不常见。如果以能处理这类峰值为标准而投入资源,必然是巨大的浪费。使用MQ可以使关键组件支撑突发访问压力,不会因为突发流量而崩溃。例如,秒杀或促销活动中,可以使用MQ来控制流量,将请求排队,系统根据自己的处理能力逐步处理这些请求。
  • 消息分发:当多个系统需要对同一数据做出响应时,可以使用MQ进行消息分发。例如,支付成功后,支付系统可以向MQ发送消息,其他系统订阅该消息,而无需轮询数据库。
  • 延迟通知:在需要在特定时间后发送通知的场景中,可以使用MQ的延迟消息功能。例如,在电子商务平台中,如果用户下单后一定时间内未支付,可以使用延迟队列在超时后自动取消订单。

AMQP

AMQP(Advanced Message Queuing Protocol)是一个高级消息队列协议,定义了一系列消息交换功能,包括交换器(Exchange)和队列(Queue)。这些组件共同工作,使得生产者可以将消息发送到交换器,由队列接收并等待消费者取走。AMQP还定义了一个网络协议,允许客户端应用通过该协议与消息代理和AMQP模型进行交互通信。RabbitMQ遵循AMQP协议,使用Erlang实现,并支持其他协议如STOMP和MQTT。可以说AMQP模型和RabbitMQ模型结构一致。


RabbitMQ

功能完备: MQ 功能较为全面,几乎支持所有主流语言。
开源友好: 界面非常友好,易于使用。
性能: 性能良好,吞吐量可达到万级。
社区活跃度: 社区活跃度较高。
适用场景: 适合中小型公司,数据量和并发量较小的场景


Producer和Consumer

Producer(生产者):是RabbitMQ Server的客户端,向RabbitMQ发送消息。消息通常带有业务逻辑结构的数据,例如JSON字符串,并可能包含标签,RabbitMQ根据标签路由消息。

Consumer(消费者):也是RabbitMQ Server的客户端,从RabbitMQ接收消息。在消费过程中,标签会被丢弃,消费者只收到消息,且无需知道消息的生产者。

Broker(消息代理):即RabbitMQ Server,负责接收和收发消息。通常,一个RabbitMQ Broker可以视为一个RabbitMQ服务节点或实例,大多数情况下等同于一台RabbitMQ服务器。


Connection和Channel

Connection(连接):是客户端与RabbitMQ服务器之间的TCP连接。这是建立消息传递的基础,负责传输客户端和服务器之间的所有数据和控制信息。

Channel(通道/信道):是在Connection之上的一个抽象层。在RabbitMQ中,一个TCP连接可以有多个Channel,每个Channel都是独立的虚拟连接。消息的发送和接收都是基于Channel进行的。通道的主要作用是将消息的读写操作复用到同一个TCP连接上,从而减少建立和关闭连接的开销,提高性能


Virtual host

Virtual Host(虚拟主机):这是一个虚拟概念,为消息队列提供逻辑上的隔离机制。在RabbitMQ中,一个Broker Server上可以存在多个Virtual Host。它允许不同的用户在同一RabbitMQ服务器上创建和管理独立的交换机(exchange)、队列(queue)等,类似于MySQL中的数据库(database)。


Queue

Queue: 队列, 是RabbitMQ的内部对象, 用于存储消息。


Exchange

Exchange(交换机):是消息到达RabbitMQ Broker的第一站。它负责接收生产者发送的消息,并根据特定的规则将这些消息路由到一个或多个队列(Queue)中。Exchange的主要作用是消息路由,它根据类型和规则决定如何转发接收到的消息。

交换机类型

RabbitMQ中的交换机有四种主要类型,每种类型有不同的路由策略:

  1. Fanout:将消息广播到所有绑定到该交换机的队列,无视路由键。
  2. Direct:根据消息的路由键将消息精确路由到与交换机绑定的队列。
  3. Topic:使用通配符模式匹配路由键,将消息路由到匹配的队列。
  4. Headers:基于消息头部的属性进行路由,路由键不会被使用。(很少使用)

AMQP协议中还有另外两种交换机类型:

System:用于内部系统交换和管理,不用于常规的消息路由。
自定义(Custom):用户可以定义自己的交换机类型,以满足特定的需求。

根据交换机类型的不同,创建了许多不同的工作模式。

Routing Key(路由键)
定义: 路由键是一个字符串,通过它生产者在将消息发送到交换器时告诉交换器如何处理该消息。
作用: 决定了消息的路由路径,即消息应发送到哪个队列或哪些队列。
Binding Key(绑定键)
定义: 在 RabbitMQ 中,通过 Binding(绑定)将交换器与队列关联时,通常会指定一个 Binding Key。
作用: 引导 RabbitMQ 知道如何将消息路由到相应的队列。这个键通常与路由键相关联,从而实现消息的过滤和分发。

通常来说路由键和绑定键没什么区别


工作流程

Producer(生产者) 产生了一条消息。
Producer 连接到 RabbitMQ Broker,建立一个连接(Connection),并开启一个信道(Channel)。
Producer 声明一个交换机(Exchange),用于路由消息。
Producer 声明一个队列(Queue),用于存放消息。
Producer 发送消息至 RabbitMQ Broker。
RabbitMQ Broker 接收消息,并将其存入相应的队列(Queue)中。如果未找到相应的队列,则根据生产者的配置,选择丢弃或退回消息给生产者。


工作模式

RabbitMQ提供了七个工作模式。查看文档即可:RabbitMQ Tutorials | RabbitMQ

工作模式图中的字母:

P(生产者):也就是要发送消息的程序。

C(消费者):消费者,消息的接收者。

Queue(消息队列):缓存消息;生产者将消息投递到其中,消费者从中取出消息。

Exchange(交换机X):按照规则分发消息。

相关通用代码提取到Constants类中

public class Constants {
    // 主机ip
    public static final String HOST = "ip地址";
    // rabbitmq服务器端口号 默认是5672
    public static final int PORT = 5672;
    // rabbitmq用户名
    public static final String USER_NAME = "用户名";
    // rabbitmq用户密码
    public static final String PASSWORD = "用户密码";
    // rabbitmq虚拟主机
    public static final String VIRTUAL_HOST = "虚拟主机名";

    // 简单模式
    public static final String SIMPLE_QUEUE = "simple.queue";

    // 工作队列模式
    public static final String WORK_QUEUE = "work.queue";

    // 发布订阅模式
    public static final String FANOUT_EXCHANGE = "fanout.exchange";
    public static final String FANOUT_QUEUE1 = "fanout.queue1";
    public static final String FANOUT_QUEUE2 = "fanout.queue2";

    // 路由模式
    public static final String DIRECT_EXCHANGE = "direct.exchange";
    public static final String DIRECT_QUEUE1 = "direct.queue1";
    public static final String DIRECT_QUEUE2 = "direct.queue2";

    // 通配符模式
    public static final String TOPIC_EXCHANGE = "topic.exchange";
    public static final String TOPIC_QUEUE1 = "topic_queue1";
    public static final String TOPIC_QUEUE2 = "topic_queue2";

    // rpc 模式
    public static final String RPC_REQUEST_QUEUE = "rpc.request.queue";
    public static final String RPC_RESPONSE_QUEUE = "rpc.response.queue";

    // publisher confirms
    public static final String PUBLISHER_CONFIRMS_QUEUE1 = "publisher.confirms.queue1";
    public static final String PUBLISHER_CONFIRMS_QUEUE2 = "publisher.confirms.queue2";
    public static final String PUBLISHER_CONFIRMS_QUEUE3 = "publisher.confirms.queue3";

    // 推拉模式
    public static final String MESSAGE_QUEUE = "message.queue";
}

简单模式

特点: 一个生产者 P 和一个消费者 C,消息只能被消费一次。这种模式也称为点对点(Point-to-Point)模式。

适用场景: 当消息只能被单个消费者处理时。


生产者代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.common.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 创建链接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);
        factory.setPort(Constants.PORT);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = factory.newConnection();

        // 2. 开启信道
        Channel channel = connection.createChannel();
        // 3. 声明交换机 简单模式使用内置交换机 所以不用声明

        // 4. 声明队列
        /*
         * queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
         *                                  Map<String, Object> arguments)
         *  参数说明:
         *  queue: 队列名称
         *  durable: 可持久化
         *  exclusive: 是否独占
         *  autoDelete: 是否自动删除
         *  arguments: 参数
        */
        channel.queueDeclare(Constants.SIMPLE_QUEUE, true, false, false, null);

        // 5. 发送消息
        /*
         * basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
         * 参数说明:
         * exchange: 交换机名称 在简单模式下使用 "" 表示使用默认交换机
         * routingKey: 路由键 在简单模式下routingkey和队列名称保持一致
         * props: 属性配置
         * body: 消息
        */
        for (int i = 0; i < 10; i++) {
            String msg = "hello rabbitmq simple " + i;
            channel.basicPublish("", Constants.SIMPLE_QUEUE, null, msg.getBytes());
        }
        System.out.println("消息发送成功");

        // 6. 关闭资源
        channel.close();
        connection.close();
    }
}

执行代码, 观察结果


消费者代码

import com.rabbitmq.client.*;
import rabbitmq.common.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1. 创建连接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(Constants.HOST);
        factory.setPort(Constants.PORT);
        factory.setUsername(Constants.USER_NAME);
        factory.setPassword(Constants.PASSWORD);
        factory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = factory.newConnection();

        //2. 创建Channel
        Channel channel = connection.createChannel();

        //3. 声明队列 如果没有该队列,会自动创建
        channel.queueDeclare(Constants.SIMPLE_QUEUE,true, false, false, null);

        //4. 消费消息

        // DefaultConsumer: 默认消费者
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //从队列中收到消息, 就会执行的方法
            // 参数说明:
            // 1. 消息的唯一标识
            // 2. 消息的元信息
            // 3. 消息的属性
            // 4. 消息的内容
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+ new String(body));
            }
        };
        /*
         * basicConsume(String queue, boolean autoAck, Consumer callback)
         * 参数说明:
         * queue: 队列名称
         * autoAck: 是否自动确认
         * callback: 接收到消息后, 执行的逻辑
         */
        channel.basicConsume(Constants.SIMPLE_QUEUE, true, consumer);

        //等待程序执行完成
        Thread.sleep(2000);

        //5. 释放资源
        channel.close();
        connection.close();
    }
}

执行代码, 观察结果


工作模式

每个消息会被分派给不同的消费者。消息不会重复。

适用场景:集群环境中进行异步处理。
例如:12306 短信通知服务。订单消息发送到 RabbitMQ,短信服务从 RabbitMQ 获取订单信息并发送通知,任务在短信服务之间进行分配。


生产者代码

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.common.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT); //需要提前开放端口号
        connectionFactory.setUsername(Constants.USER_NAME);//账号
        connectionFactory.setPassword(Constants.PASSWORD);  //密码
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST); //虚拟主机
        Connection connection = connectionFactory.newConnection();

        //2. 开启信道
        Channel channel = connection.createChannel();

        //3. 声明队列   使用内置的交换机
        //如果队列不存在, 则创建, 如果队列存在, 则不创建
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);

        //4. 发送消息
        for (int i = 0; i < 10; i++) {
            String msg = "hello work queue " + i;
            channel.basicPublish("", Constants.WORK_QUEUE, null, msg.getBytes());
        }

        System.out.println("消息发送成功");

        //6. 资源释放
        channel.close();
        connection.close();
    }
}

执行代码, 观察结果


消费者代码

这里创建两个消费者

import com.rabbitmq.client.*;
import rabbitmq.common.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USER_NAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();

        //2. 开启信道
        Channel channel = connection.createChannel();

        //3. 声明队列   使用内置的交换机
        //如果队列不存在, 则创建, 如果队列存在, 则不创建
        channel.queueDeclare(Constants.WORK_QUEUE, true, false, false, null);

        //4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //从队列中收到消息, 就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+ new String(body));
            }
        };
        channel.basicConsume(Constants.WORK_QUEUE, true, consumer);

        //6. 资源释放
//        channel.close();
//        connection.close();
    }
}

执行代码, 观察结果(这里让两个消费者跑起来,不要断开连接,然后再让生产者生产)


发布/订阅模式

工作流程:

发布者将消息发送到交换器,并指定一个主题。
交换器根据消息的主题将其路由到一个或多个队列。
订阅者从这些队列中获取消息。

特点:

广播: 消息可以被发送到多个订阅者。
解耦: 发布者和订阅者之间没有直接的联系,增加了系统的灵活性和可扩展性。

适用场景:

实时通知系统、事件驱动架构等,其中多个组件需要接收和处理相同的消息或事件。


生产者代码

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.common.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USER_NAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();

        //2. 开启信道
        Channel channel = connection.createChannel();

        //3. 声明交换机
        channel.exchangeDeclare(Constants.FANOUT_EXCHANGE, BuiltinExchangeType.FANOUT, true);

        //4. 声明队列
        channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);
        channel.queueDeclare(Constants.FANOUT_QUEUE2,true,false,false,null);

        //5. 交换机和队列绑定
        //                  队列名                 交换机名                 路由键
        // 在 fanout的交换器中,路由键会被忽略。无论路由键是什么,交换器都会将消息广播到所有绑定的队列。因此,传递空字符串在这种情况下是合法的和无影响的。
        channel.queueBind(Constants.FANOUT_QUEUE1,Constants.FANOUT_EXCHANGE,"");
        channel.queueBind(Constants.FANOUT_QUEUE2,Constants.FANOUT_EXCHANGE,"");

        //6. 发布消息
        String msg = "hello fanout";
        channel.basicPublish(Constants.FANOUT_EXCHANGE,"", null, msg.getBytes());
        System.out.println("消息发送成功");

        //7. 释放资源
        channel.close();
        connection.close();
    }
}

消费者代码

import com.rabbitmq.client.*;
import rabbitmq.common.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USER_NAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();

        //2. 开启信道
        Channel channel = connection.createChannel();

        //3. 声明队列
        channel.queueDeclare(Constants.FANOUT_QUEUE1,true,false,false,null);

        //4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //从队列中收到消息, 就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+ new String(body));
            }
        };
        channel.basicConsume(Constants.FANOUT_QUEUE1, true, consumer);

//        // 5. 关闭资源
//        channel.close();
//        connection.close();
    }
}

这里两个消费者获取消息.


路由模式

描述: 路由模式是在发布/订阅模式的基础上增加了路由键(Routing Key)。在这种模式下,交换器(Exchange)根据消息的路由键规则,将消息筛选后发送到符合条件的消费者队列。

关键点:
交换器类型: 常使用 Direct 交换器。
路由键: 消息的路由键决定消息的去向。
绑定键: 队列与交换器的绑定时使用的键,用于确定队列接收哪些消息。

适用场景: 需要根据特定规则分发消息的场景。例如:
日志系统: 系统日志的等级包括 error、warning、info 和 debug。可以通过路由模式将不同等级的日志消息发送到不同的队列,最终输出到不同的文件。


生产者代码

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.common.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 路由模式生产者
 */
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USER_NAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();

        //2. 开启信道
        Channel channel = connection.createChannel();

        //3. 声明交换机
        channel.exchangeDeclare(Constants.DIRECT_EXCHANGE, BuiltinExchangeType.DIRECT, true);

        //4. 声明队列
        channel.queueDeclare(Constants.DIRECT_QUEUE1, true, false, false, null);
        channel.queueDeclare(Constants.DIRECT_QUEUE2, true, false, false, null);

        //5. 绑定交换机和队列 绑定路由键
        // 队列1绑定路由键a
        // 队列2绑定路由键a b c
        channel.queueBind(Constants.DIRECT_QUEUE1, Constants.DIRECT_EXCHANGE, "a");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "a");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "b");
        channel.queueBind(Constants.DIRECT_QUEUE2, Constants.DIRECT_EXCHANGE, "c");

        //6. 发送消息
        String msgA = "hello direct, my routingkey is a...";
        channel.basicPublish(Constants.DIRECT_EXCHANGE,"a", null, msgA.getBytes());

        String msgB = "hello direct, my routingkey is b...";
        channel.basicPublish(Constants.DIRECT_EXCHANGE,"b", null, msgB.getBytes());

        String msgC = "hello direct, my routingkey is c...";
        channel.basicPublish(Constants.DIRECT_EXCHANGE,"c", null, msgC.getBytes());

        System.out.println("消息发送成功");

        //7. 释放资源
        channel.close();
        connection.close();
    }
}

消费者代码

import com.rabbitmq.client.*;
import rabbitmq.common.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USER_NAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();

        //2. 开启信道
        Channel channel = connection.createChannel();

        //3. 声明队列
        channel.queueDeclare(Constants.DIRECT_QUEUE1,true,false,false,null);

        //4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //从队列中收到消息, 就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+ new String(body));
            }
        };
        channel.basicConsume(Constants.DIRECT_QUEUE1, true, consumer);
    }
}


通配符模式

描述: Topic 模式是对路由模式的升级,增加了通配符功能,使消息路由更加灵活。生产者将消息发送给交换机,交换机根据路由键(Routing Key)将消息转发到匹配的队列。Topic 模式允许使用类似正则表达式的方式来定义路由键的模式,从而实现更复杂的匹配规则。

关键点:

交换器类型: 使用 Topic 交换器。
路由键模式: 支持通配符匹配,使路由更加灵活。通配符可以用来匹配多个值或匹配特定模式。

通配符:
*:匹配一个单词。
#:匹配零个或多个单词。


生产者代码

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.common.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 * 通配符模式生产者
 */
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USER_NAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();

        //2. 开启信道
        Channel channel = connection.createChannel();

        //3. 声明交换机
        channel.exchangeDeclare(Constants.TOPIC_EXCHANGE, BuiltinExchangeType.TOPIC, true);

        //4. 声明队列
        channel.queueDeclare(Constants.TOPIC_QUEUE1, true, false, false, null);
        channel.queueDeclare(Constants.TOPIC_QUEUE2, true, false, false, null);

        //5. 绑定交换机和队列
        // 队列1绑定 *.a.*
        // 队列2绑定 *.*.b 和 c.#
        channel.queueBind(Constants.TOPIC_QUEUE1, Constants.TOPIC_EXCHANGE, "*.a.*");
        channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "*.*.b");
        channel.queueBind(Constants.TOPIC_QUEUE2, Constants.TOPIC_EXCHANGE, "c.#");

        //6. 发送消息
        String msg = "hello topic, my routingkey is ae.a.f....";
        channel.basicPublish(Constants.TOPIC_EXCHANGE,"ae.a.f", null, msg.getBytes());  //转发到Q1

        String msg_b = "hello topic, my routingkey is ef.a.b....";
        channel.basicPublish(Constants.TOPIC_EXCHANGE,"ef.a.b", null, msg_b.getBytes()); //转发到Q1和Q2

        String msg_c = "hello topic, my routingkey is c.ef.d....";
        channel.basicPublish(Constants.TOPIC_EXCHANGE,"c.ef.d", null, msg_c.getBytes());//转发Q2
        System.out.println("消息发送成功");

        //7. 释放资源
        channel.close();
        connection.close();
    }
}

消费者代码

import com.rabbitmq.client.*;
import rabbitmq.common.Constants;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USER_NAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();
        //2. 开启信道
        Channel channel = connection.createChannel();
        //3. 声明队列
        channel.queueDeclare(Constants.TOPIC_QUEUE1,true,false,false,null);
        //4. 消费消息
        DefaultConsumer consumer = new DefaultConsumer(channel){
            //从队列中收到消息, 就会执行的方法
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("接收到消息:"+ new String(body));
            }
        };
        channel.basicConsume(Constants.TOPIC_QUEUE1, true, consumer);
    }
}


RPC通信

  1. 客户端发送请求:客户端将消息发送到一个指定的请求队列。在消息属性中设置 replyTo 字段,指定一个回调队列用于接收服务端的响应。
  2. 服务端处理请求:服务端从请求队列中接收并处理请求消息。服务端处理完成后,将响应消息发送到 replyTo 字段指定的回调队列。
  3. 客户端接收响应:客户端在回调队列上等待响应消息。一旦收到响应,客户端会检查消息的 correlationId 属性,以确保该响应是与其请求相对应的。 

服务器代码

import com.rabbitmq.client.*;
import rabbitmq.common.Constants;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeoutException;

/**
 * RPC server
 * 1. 接收请求
 * 2. 发送响应
 */
public class RpcServer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USER_NAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();

        //2. 开启信道
        Channel channel = connection.createChannel();
        // 用来接收请求的队列
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
        // 用来发送响应的队列
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);

        //3. 接收请求
        channel.basicQos(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String request = new String(body, StandardCharsets.UTF_8);
                System.out.println("接收到请求:"+ request);
                String response = "针对request:"+ request +", 响应成功";
                AMQP.BasicProperties basicProperties = new AMQP.BasicProperties().builder()
                                .correlationId(properties.getCorrelationId())
                                .build();
                channel.basicPublish("", Constants.RPC_RESPONSE_QUEUE, basicProperties, response.getBytes());
                channel.basicAck(envelope.getDeliveryTag(), false);

            }
        };
        channel.basicConsume(Constants.RPC_REQUEST_QUEUE, false, consumer);
    }
}

客户端代码

import com.rabbitmq.client.*;
import rabbitmq.common.Constants;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

/**
 * rpc 客户端
 * 1. 发送请求
 * 2. 接收响应
 */
public class RpcClient {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1. 建立连接
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USER_NAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        Connection connection = connectionFactory.newConnection();

        //2. 开启信道
        Channel channel = connection.createChannel();
        // 用来接收请求的队列
        channel.queueDeclare(Constants.RPC_REQUEST_QUEUE, true, false, false, null);
        // 用来发送相应的队列
        channel.queueDeclare(Constants.RPC_RESPONSE_QUEUE, true, false, false, null);

        //3. 发送请求
        String msg = "hello rpc...";
        //设置请求的唯一标识
        String correlationID = UUID.randomUUID().toString();
        //设置请求的相关属性
        AMQP.BasicProperties props = new AMQP.BasicProperties().builder()
                .correlationId(correlationID)
                .replyTo(Constants.RPC_RESPONSE_QUEUE)
                .build();
        channel.basicPublish("", Constants.RPC_REQUEST_QUEUE, props, msg.getBytes());

        //4. 接收响应
        //使用阻塞队列, 来存储响应信息
        final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
        DefaultConsumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String respMsg = new String(body);
                System.out.println("接收到回调消息: "+ respMsg);
                if (correlationID.equals(properties.getCorrelationId())){
                    //如果correlationID校验一致
                    response.offer(respMsg);
                }
            }
        };
        channel.basicConsume(Constants.RPC_RESPONSE_QUEUE, true, consumer);
        String result = response.take();
        System.out.println("[RPC Client 响应结果]:"+ result);
    }
}


发布确认

Publisher Confirms 模式是 RabbitMQ 提供的一种机制,用于确保消息可靠发送到 RabbitMQ 服务器。以下是这种模式的主要特点和工作流程:

  1. 开启 Confirm 模式:生产者通过调用 channel.confirmSelect() 将 Channel 设置为 Confirm 模式。在此模式下,每条发布的消息都会获得一个唯一的 ID,生产者可以将这些序列号与消息关联,以便跟踪消息状态。
  2. 服务器确认:当消息被 RabbitMQ 服务器接收并处理后,服务器会异步地向生产者发送确认(ACK),确认中包含消息的唯一 ID,表明消息已经成功送达。
  3. 确保消息可靠性:通过 Publisher Confirms 模式,生产者可以确保消息被 RabbitMQ 服务器成功接收,从而避免消息丢失的问题。

适用场景:
对数据安全性要求较高的场景: 例如金融交易、订单处理等需要确保数据不丢失的应用场景。

消息中间件在实际应用中可能会面临消息丢失的问题,这些问题可以大致分为三种情况:

生产者问题:由于应用程序故障、网络抖动等原因,生产者未能成功将消息发送到Broker。

消息中间件自身问题:生产者成功将消息发送到Broker,但Broker未能妥善保存消息,从而导致消息丢失。

消费者问题:Broker将消息发送给消费者后,消费者在处理消息时发生故障或未正确处理,导致Broker将消费失败的消息从队列中删除。

使用发送确认机制时,必须将信道设置为confirm(确认)模式。发布确认是 AMQP 0.9.1 协议的扩展,默认情况下不会启用。生产者可以通过 channel.confirmSelect() 将信道设置为确认模式。

其中确认模式有三种策略:

  • 单独确认:每个消息都需要单独确认,生产者在发布每个消息后等待确认,确认消息会带有该消息的唯一ID。
  • 批量确认:一次性确认多个消息,生产者通过设置multiple参数来确认某个序号之前的所有消息,这减少了确认的次数,提高了效率。
  • 异步确认:生产者不必等待每个消息的确认,而是通过回调或其他机制异步接收确认,允许生产者继续处理其他任务,提高了整体性能。

当生产者将信道设置为confirm(确认)模式时:

生产者发布的所有消息将被分配一个唯一的ID(从1开始)。
一旦消息被投递到所有匹配的队列,RabbitMQ将向生产者发送一个确认,包含该消息的唯一ID。
只有在消息写入磁盘后,RabbitMQ才会发出确认(如果消息和队列是可持久化的)。
确认消息中的deliveryTag表示确认消息的序号。

代码实现

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import rabbitmq.common.Constants;

import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;

public class PublisherConfirms {
    private static final Integer MESSAGE_COUNT = 200;
    static Connection createConnection() throws Exception {
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost(Constants.HOST);
        connectionFactory.setPort(Constants.PORT);
        connectionFactory.setUsername(Constants.USER_NAME);
        connectionFactory.setPassword(Constants.PASSWORD);
        connectionFactory.setVirtualHost(Constants.VIRTUAL_HOST);
        return connectionFactory.newConnection();
    }

    public static void main(String[] args) throws Exception {
        //单独确认
        publishingMessagesIndividually();

        //批量确认
        publishingMessagesInBatches();

        //异步确认
        handlingPublisherConfirmsAsynchronously();
        
    }

    /**
     * 异步确认
     */
    private static void handlingPublisherConfirmsAsynchronously() throws Exception{
        try (Connection connection = createConnection()){
            //1. 开启信道
            Channel channel = connection.createChannel();
            //2. 设置信道为confirm模式
            channel.confirmSelect();
            //3. 声明队列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE3, true, false, false, null);
            //4. 监听confirm
            //集合中存储的是未确认的消息ID
            long start = System.currentTimeMillis();
            SortedSet<Long> confirmSeqNo = Collections.synchronizedSortedSet(new TreeSet<>());

            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    if (multiple){
                        confirmSeqNo.headSet(deliveryTag+1).clear();
                    }else {
                        confirmSeqNo.remove(deliveryTag);
                    }
                }

                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    if (multiple){
                        confirmSeqNo.headSet(deliveryTag+1).clear();
                    }else {
                        confirmSeqNo.remove(deliveryTag);
                    }
                    //业务需要根据实际场景进行处理, 比如重发, 此处代码省略
                }
            });
            //5. 发送消息
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms"+i;
                long seqNo = channel.getNextPublishSeqNo();
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE3, null, msg.getBytes());
                confirmSeqNo.add(seqNo);
            }
            while (!confirmSeqNo.isEmpty()){
                Thread.sleep(10);
            }
            long end = System.currentTimeMillis();
            System.out.printf("异步确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);
        }
    }

    /**
     * 批量确认
     */
    private static void publishingMessagesInBatches() throws Exception{
        try(Connection connection = createConnection()) {
            //1. 开启信道
            Channel channel = connection.createChannel();
            //2. 设置信道为confirm模式
            channel.confirmSelect();
            //3. 声明队列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE2, true, false, false, null);
            //4. 发送消息, 并进行确认
            long start = System.currentTimeMillis();
            int batchSize = 100;
            int outstandingMessageCount = 0;
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms"+i;
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE2, null, msg.getBytes());
                outstandingMessageCount++;
                if (outstandingMessageCount==batchSize){
                    channel.waitForConfirmsOrDie(5000);
                    outstandingMessageCount = 0;
                }
            }
            if (outstandingMessageCount>0){
                channel.waitForConfirmsOrDie(5000);
            }
            long end = System.currentTimeMillis();
            System.out.printf("批量确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);

        }
    }

    /**
     * 单独确认
     */
    private static void publishingMessagesIndividually() throws Exception {
        try(Connection connection = createConnection()) {
            //1. 开启信道
            Channel channel = connection.createChannel();
            //2. 设置信道为confirm模式
            channel.confirmSelect();
            //3. 声明队列
            channel.queueDeclare(Constants.PUBLISHER_CONFIRMS_QUEUE1, true, false, false, null);
            //4. 发送消息, 并等待确认
            long start = System.currentTimeMillis();
            for (int i = 0; i < MESSAGE_COUNT; i++) {
                String msg = "hello publisher confirms"+i;
                channel.basicPublish("",Constants.PUBLISHER_CONFIRMS_QUEUE1, null, msg.getBytes());
                //等待确认
                channel.waitForConfirmsOrDie(5000);
            }
            long end = System.currentTimeMillis();
            System.out.printf("单独确认策略, 消息条数: %d, 耗时: %d ms \n",MESSAGE_COUNT, end-start);
        }
    }
}

对比不同的消息数量

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2041982.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Go 1.19.4 错误处理-Day 11

1. 错误处理机制 1.1 先看一段代码&#xff0c;引出Golang错误处理机制 看到了上面的代码后&#xff0c;思考两个问题&#xff1f; &#xff08;1&#xff09;有没有办法&#xff0c;能让test()执行出错了&#xff0c;它下面的fmt代码依然能够运行。 &#xff08;2&#xff09…

service 管理 web 管理插件

clusterIP 资源清单文件 [rootmaster ~]# kubectl create service clusterip websvc --tcp80:80 --dry-runclient -o yaml 解析域名 创建后端应用 负载均衡 固定 IP 服务 端口别名 nodePort 对外发布服务 Ingress 对外发布服务 查询 ingress 控制器类名称 kubectl g…

一个功能强大、易于使用的开源WEB表单构建工具Formbricks

大家好&#xff0c;今天给大家分享的是一个功能强大、易于使用的表单构建工具Formbricks&#xff0c;能够帮助开发者和非开发者快速构建各种类型的表单。 项目介绍 Formbricks 是一个开源的、基于 Web 的表单构建器&#xff0c;旨在帮助开发者和非开发人员轻松创建复杂的表单&…

大数据技术——Hadoop运行环境搭建

目录 一、 Hadoop运行环境搭建 1.1 模板虚拟机环境准备 1.2 克隆虚拟机 一、 Hadoop运行环境搭建 1.1 模板虚拟机环境准备 0&#xff09;安装模板虚拟机&#xff0c;IP地址192.168.10.100、主机名称hadoop100、内存4G、硬盘50G 具体操作参照下列文档 大数据技术之模板虚…

8.15日学习打卡---Spring Cloud Alibaba(三)

8.15日学习打卡 目录&#xff1a; 8.15日学习打卡为什么需要服务网关Higress是什么安装DockerCompose部署Higress创建网关微服务模块Higress路由配置Higress策略配置-跨域配置Higress解决如何允许跨域Higress策略配置之什么是HTTP认证Higress策略配置-Basic 认证什么是JWT认证J…

腾讯云AI代码助手 —— 编程新体验,智能编码新纪元

阅读导航 引言一、开发环境介绍1. 支持的编程语言2. 支持的集成开发环境&#xff08;IDE&#xff09; 二、腾讯云AI代码助手使用实例1. 开发环境配置2. 代码补全功能使用&#x1f4bb;自动生成单句代码&#x1f4bb;自动生成整个代码块 3. 技术对话3. 规范/修复错误代码4. 智能…

C++ | stack/queue

前言 本篇博客讲解cSTL中的stack/queue &#x1f493; 个人主页&#xff1a;普通young man-CSDN博客 ⏩ 文章专栏&#xff1a;C_普通young man的博客-CSDN博客 ⏩ 本人giee: 普通小青年 (pu-tong-young-man) - Gitee.com 若有问题 评论区见&#x1f4dd; &#x1f389;欢迎大…

LMDeploy 量化部署实践闯关任务-50%的A100跑的过程

基础任务&#xff08;完成此任务即完成闯关&#xff09; 使用结合W4A16量化与kv cache量化的internlm2_5-7b-chat模型封装本地API并与大模型进行一次对话&#xff0c;作业截图需包括显存占用情况与大模型回复&#xff0c;参考4.1 API开发(优秀学员必做)使用Function call功能让…

docker compose部署rabbitmq集群,并使用haproxy负载均衡

一、创建rabbitmq的data目录 mkdir data mkdir data/rabbit1 mkdir data/rabbit2 mkdir data/rabbit3 二、创建.erlang.cookie文件&#xff08;集群cookie用&#xff09; echo "secretcookie" > .erlang.cookie 三、创建haproxy.cfg配置文件 global log stdout fo…

力扣 | 动态规划 | 动态规划在树的应用

文章目录 一、96. 不同的二叉搜索树二、95. 不同的二叉搜索树 II三、337. 打家劫舍 III 一、96. 不同的二叉搜索树 LeetCode&#xff1a;96. 不同的二叉搜索树 只求个数实际上比较简单&#xff0c;定义dp[i]表示结点个数为i的二叉搜索树的种树。&#xff08;其实和记忆化搜索…

SpringBoot 自定义 starter

1. 官方文档 SpringBoot 版本 2.6.13&#xff0c;相关链接 Developing with Spring Boot 1.1 什么是 Starter Starters are a set of convenient dependency descriptors that you can include in your application. You get a one-stop shop for all the Spring and relate…

【Redis】数据结构篇

文章目录 键值对数据库是怎么实现的&#xff1f;动态字符串SDSC 语言字符串的缺陷SDS结构设计 整数集合整数集合结构设计整数集合的升级操作 哈希表哈希表结构设计哈希冲突链式哈希Rehash渐进式rehashrehash触发条件 压缩列表压缩列表结构设计连续更新压缩列表的缺陷 quicklist…

深入InnoDB核心:揭秘B+树在数据库索引中的高效应用

目录 一、索引页与数据行的紧密关联 &#xff08;一&#xff09;数据页的双向链表结构 &#xff08;二&#xff09;记录行的单向链表结构 二、未创建索引情况 &#xff08;一&#xff09;无索引下的单页查找过程 以主键为搜索条件 以非主键列为搜索条件 &#xff08;二…

财务会计与管理会计(六)

文章目录 高端费用查询图表VLOOKUP函数应用一段简单的VBA代码的应用 入库税金的二维分析SUMPRODUCT函数的应用 多姿多彩的数据表MOD函数的应用和万能表的应用 判断取值与查找取值的关系INDEX与ATCH函数在查找取值中的应用 在职期间项目分布统计表IF函数的应用 自动填充序号应用…

安卓中Room持久化库的使用

在Android开发中&#xff0c;Room是Google提供的一个持久化库&#xff0c;旨在为应用提供SQLite的抽象层&#xff0c;以简化数据库的访问和操作。相比直接使用SQLite&#xff0c;Room提供更清晰、更简洁的数据库访问机制。 1. Room的基础知识 1.1 引入Room依赖 首先&#xff…

STM32CubeMX学习记录——配置定时器

文章目录 前言一、学习目的二、CubeMX配置三、代码编写 一、学习目的 在STM32学习中&#xff0c;定时器是一个非常重要的组成部分。本文将详细记录如何通过CubeMX工具配置定时器&#xff0c;以实现1ms的定时功能。&#xff08;附计算公式&#xff09; 二、CubeMX配置 &#xf…

锂电搅拌设备实现产线可视化

锂离子电池生产过程中的搅拌设备是确保电池性能与一致性的重要环节。随着智能制造和工业4.0概念的深入发展&#xff0c;实现锂电搅拌设备的产线可视化与信息化已成为提升生产效率、优化产品质量、降低运营成本的关键路径。这一转变不仅要求技术上的革新&#xff0c;还涉及到管理…

如何在 .NET 中实现 SM3withSM2 签名:详细步骤和示例代码

下面是一个详细的示例&#xff0c;展示如何在 .NET 中实现 SM3withSM2 签名和验证&#xff0c;包括生成密钥对、计算哈希、签名和验证。示例使用了 BouncyCastle 库&#xff0c;你可以根据实际需求对代码进行调整。 1. 安装依赖库 使用 NuGet 安装 BouncyCastle 库&#xff1a…

ThinkPHP5.0.15漏洞解析及SQL注入

第一步&#xff1a; 通过查看5.0.15和5.0.16版本的对比&#xff0c;可以看到16版本对在Builder.php里面对数据库的增减做了修正&#xff0c;所以可以15版本的漏洞就存在在这里。这里的代码用的拼接的方式&#xff0c;就可以尝试使用报错注入来实现。 第二步&#xff1a; 我们…

音视频开发继续学习

RGA模块 RGA模块定义 RGA模块是RV1126用于2D图像的裁剪、缩放、旋转、镜像、图片叠加等格式转换的模块。比方说&#xff1a;要把一个原分辨率1920 * 1080的视频压缩成1280 * 720的视频&#xff0c;此时就要用到RGA模块了。 RGA模块结构体定义 RGA区域属性结构体 imgType&am…