RabbitMQ消息队列的笔记

news2024/12/18 6:36:11

Rabbit与Java相结合

  1. 引入依赖
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  1. 在配置文件中编写关于rabbitmq的配置
rabbitmq:
	host: 192.168.190.132   //这个那个Linxu的端口号
	port: 5672		//端口号 ,固定的
	username: admin			//这个是那个用户名
	password: 123546			//密码
	virtual-host: powernode      //配置我们发消息发到那个虚拟主机上
  1. 编写一个配置类

1装RabbitMQ之前对Linux的更改

1.1 按照上传下载工具

yum install lrzsz -y

1.2 查看主机名字

more /etc/hostname

1.3 修改主机名

hostnamectl set-hostname 要修改主机名

4

2 链式编程

  1. 第一步:就是set方法中的返回值全部改为对象本身
public Student setAdderess(String adderess) {
this.adderess = adderess;
return this;
}
  1. 第二步:就可以使用链式编程了,因为当我们执行这个student.setId(20)的时候,我们在set方法中设置的返回值是一个Student对象。所以我们还可以继续.setName()
public class Chainlearn {
    public static void main(String[] args) {
        Student student = new Student();
        student.setId(20).setName("lisi").setAge("20").setAdderess("shanxi0");
        System.out.println(student);

    }
  1. 使用lombok生成链式编程 @Accessors(chain = true) //生成链式编程
    1. 第一步就是添加依赖lomback依赖
 <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.28</version>
        </dependency>
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)  //生成链式编程
public class Teacher implements Serializable {
    private Integer id;

    private String name;

    private String age;

    private String adderess;

}

3 建造者模式

建造者模式就是一步一步的建造一个复杂的是对象 。它一般是将一个复杂的对象分解为多个简单的对象,然后一步一步构造而成

3.1 使用lombok就可以给我们生曾建造者模式

就是使用@Bilder 生成建造者模式代码

建造者模式就是用来创建对象的。

实现过程

  1. 在使用lombok注解的情况下,我们在类上定义一个注解@Builder //生成建造者模式
@Data
@AllArgsConstructor
@NoArgsConstructor
@Accessors(chain = true)  //生成链式编程
@Builder  //生成建造者模式
public class Teacher implements Serializable {
    private Integer id;

    private String name;

    private String age;

    private String adderess;

}
  1. 创建对象并赋值就是使用对象.budiler.属性名
//        用建造者创建对象
         Teacher t = Teacher.builder().id(20).name("wangwu").age("10").adderess("真的").build();
    }

4 系统的启动任务

就是SpringBoot工程启动时,就会执行的任务

实现步骤:

就是在springBoot的启动类上实现ApplicationRunner接口,并实现里面的方法,完后在方法中就可以输出东西。(我们应该用@Slf4j)来输出东西,不要使用System.out.println("你好");

@SpringBootApplication
@Slf4j
public class Application implements ApplicationRunner {
    public static void main(String[] args) {
        SpringApplication.run(Application.class,args);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        log.info("系统已启动,我就会运行");
        System.out.println("你好");
    }
}

5 RabbitMQ的定义

其中MQ的全称:message queue 消息队列

5.1 消息中间件

简单来说,消息中间件就是指保存数据的一个容器(服务器),可以用于两个系统之间的数据传递

消息中间件一般有三个主要角色:生产者(producer),消费者(consumer),消息代理(消息队列、消息服务器)

生产者发送消息到消息服务器,然后消费者从消息代理(消息队列)中获取数据并进行处理。

6 使用场景

6.1 异步处理
  1. 不使用MQ时,
    1. 下订单:1、执行下订单业务---->2、再处理加积分业务----->3、发红包业务---->4、发送手机短信业务
  1. 使用MQ时,加快执行速度
    1. 只执行下订单的业务 ,完后直接返回,完后向MQ发送消息---->积分系统处理积分业务后加积分,红包系统处理发红包业务后发红包、手机短信系统执行发送短信的业务后发短信
6.2 系统解耦
  1. 未使用MQ时
    1. 是2A系统调用B系统,再调用C系统......
  1. 使用MQ时
    1. A系统往MQ系统发送一条消息,B系统接受到消息处理,C系统接收到消息处理,其中不同的系统可以使用不同的语言来写。

6.3 流量削峰

就是双十一时,大量订单,发送到系统A,则此时系统A可以把消息发送到MQ中,MQ再匀速的发送给系统B,完后由系统B去操作数据库。

7 RabbitMQ运行环境搭建

7.1 RabbitMQ是由Erlang语言开发的 ,所以需要先下载安装Erlang
  1. 下载Erlang
  2. 安装erlang前先安装Linux依赖库
    1. yum -y install make gcc gcc-c++ kernel-devel m4 ncurses-devel openssl-devel
  1. 解压erlang压缩包文件
    1. tar -zxvf otp_src_25.1.1.tar.gz
  1. 进入otp_src_25.1.1 目录 完后配置
    1. cd otp_src_25.1.1

./configure

  1. 编译
    1. make
  1. 安装,就是把这个放在path目录下,这样在任何的目录就都可以使用了。
    1. make install
  1. 安装好了erlang后可以将解压的文件夹删除:
    1. rm -rf otp_src_25.1.1
  1. 验证erlang是否安装成功
    1. 在命令行输入:erl如果进入了编程命令行则表示安装成功,然后按ctrl + z 退出编程命令行;
7.2 安装RabbitMQ以及启动和停止
  1. 解压RabbitMQ的压缩包,即安装完成,无需再编译
    1. tar -xvf rabbitmq-server-generic-unix-3.10.11.tar.xz -C /usr/local/

说明 -C 选项是指定解压目录,如果不指定会解压到当前目录

此时rabbitmq就安装好了;

  1. 启动RabbitMQ
    1. 首先切换到我们的安装目录下/usr/local ----> cd rabbitmq_server-3.10.11/

--->完后再切换到那个sbin/目录下

    1. 就执行命令启动 ./rabbitmq-server -detached
    2. 当配置完环境变量以后,我们就可以在任何目录下启动 rabbitmq-server -detached

说明:

-detached 将表示在后台启动运行rabbitmq;不加该参数表示前台启动;

rabbitmq的运行日志存放在安装目录的var目录下;

现在的目录是:/usr/local/rabbitmq_server-3.10.11/var/log/rabbitmq

  1. 停止RabbitMQ
    1. 切换到sbin目录下执行: 执行./rabbitmqctl shutdown

7.3 查看MQ的状态
  1. 切换到sbin目录下执行:执行

说明:-n rabbit 是指定节点名称为rabbit,目前只有一个节点,节点名默认为rabbit

此处-n rabbit 也可以省略

7.4 配置path环境变量
  1. 编辑这个文件 vi /etc/profile
  2. 在这个文件中写入以下语句
RABBIT_HOME=/usr/local/rabbitmq_server-3.10.11
PATH=$PATH:$RABBIT_HOME/sbin

export RABBIT_HOME PATH
  1. 刷新环境变量 执行 source /etc/profile
     

8 RabbitMQ的管理命令

8.1 用户管理

用户管理包括增加用户删除用户查看用户列表修改用户密码

这些操作都是通过rabbitmqctl管理命令来实现完成。

  1. 查看当前用户列表
    1. rabbitmqctl list_users
  1. 新增一个用户
    1. 语法:rabbitmqctl add_user Username Password
  1. 设置用户角色
    1. rabbitmqctl set_user_tags 用户名 角色名
  1. 设置用户权限
    1. rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
      1. ".*" ".*" ".*" 这几个分别是读、写、配置的权限
  1. 查看用户权限
    1. rabbitmqctl list_permissions

9 RabbitMQ的 web管理后台

  • 几个步骤:
    • Rabbitmq有一个web管理后台,这个管理后台是以插件的方式提供的,启动后台web管理功能,切换到sbin目录下执行以下几步:
        1. 查看rabbitmq 的插件列表,插件的最前面如果是一个空的[ ]代表没有启用
          1. rabbitmq-plugins list
        1. 启用
          1. rabbitmq-plugins enable rabbitmq_management
        1. 禁用
          1. rabbitmq-plugins disable rabbitmq_management
    • 下一步就是防火墙操作

systemctl status firewalld --检查防火墙状态

systemctl stop firewalld --关闭防火墙,Linux重启之后会失效

systemctl disable firewalld --防火墙置为不可用,Linux重启后,防火墙服务不自动启动,依然是不可用

    • 访问
        1. http://192.168.190.132:15672/

用户名/密码为我们上面创建的admin/123456

注意上面改成你的虚拟主机的ip地址

备注:如果使用默认用户guest、密码guest登录,会提示User can only log in via localhost

说明guest用户只能从localhost本机登录,所以不要使用该用户。

9.1 通过web页面新建虚拟主机
  1. 第一步:点击那个admin
  2. 第二步:点击那个User完后哪里就可以创建用户
  3. 第三步:点击那个Virtual Hosts就可以添加虚拟主机

10 .RabbitMQ工作模型

broker(服务器) 相当于mysql服务器,virtual host相当于数据库(可以有多个数据库)queue相当于表,消息相当于记录。

消息队列有三个核心要素: 消息生产者消息队列消息消费者

  • 生产者(Producer):发送消息的应用;(java程序,也可能是别的语言写的程序)
  • 消费者(Consumer):接收消息的应用;(java程序,也可能是别的语言写的程序)
  • 代理(Broker):就是消息服务器,RabbitMQ Server就是Message Broker;
  • 信道(Channel):连接中的一个虚拟通道,消息队列发送或者接收消息时,都是通过信道进行的;
  • 虚拟主机(Virtual host):一个虚拟分组,在代码中就是一个字符串,当多个不同的用户使用同一个RabbitMQ服务时,可以划分出多个Virtual host,每个用户在自己的Virtual host创建exchange/queue等;(分类比较清晰、相互隔离)
  • 交换机(Exchange):交换机负责从生产者接收消息,并根据交换机类型分发到对应的消息队列中,起到一个路由的作用;
  • 路由键(Routing Key):交换机根据路由键来决定消息分发到哪个队列,路由键是消息的目的地址;
  • 绑定(Binding):绑定是队列和交换机的一个关联连接(关联关系);
  • 队列(Queue):存储消息的缓存;
  • 消息(Message):由生产者通过RabbitMQ发送给消费者的信息;(消息可以任何数据,字符串、user对象,json串等等)

11 .RabbitMQ交换机类型

Exchange(X)可翻译成交换机/交换器/路由器

11.1 RabbitMQ交换器 (Exchange)类型

Fanout Exchange(扇形)

Direct Exchange(直连)

Topic Exchange(主题)

Headers Exchange(头部)

11.2 Fanout Exchange(扇形交换机/器)

Fanout 扇形的,散开的;扇形交换机

投递到所有绑定的队列,不需要路由键,不需要进行路由键的匹配,相当于广播、群发;

上面P代表product(生产者) X代表交换机/器 红色的代表队列

我们的生产者发消息只能发送到交换机上,当消息发送到交换机以后,只要队列和交换机绑定了,那么就把消息分散的发送到这两个队列当中。(如果绑定10个队列,那么就会分散的发送到这10个队列当中)

11.3 例子【Fanout Exchange(扇形交换机/器)】
  • 实现步骤:
    • 第一步:添加依赖
<!--    rabbitMQ的依赖-->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    • 第二步: 添加配置信息
  rabbitmq:		
    host: 192.168.190.132				//地址
    port: 5672					//端口号
    username: admin				//用户名
    password: 123546			//密码
    virtual-host: powernode   //设置发消息发送到那个虚拟主机上
    • 第三步: 创建一个Rabbit的配置类
      • 在这个配置类中有一个三部曲
        • 第一步:定义交换机
@Bean
//FanoutExchange  说明就是定义的一个扇形交换机
public FanoutExchange fanoutExchange(){
    //括号里面就是给交换机起个名字
    return new FanoutExchange("exchange.fanout");  
}
        • 第二步定义队列(多个队列)
@Bean
    public Queue queueA(){		//队列A
        return new Queue("queue.famout.a");
    }

    @Bean
    public Queue queueB(){		//队列B 
        return new Queue("queue.famout.b");
    }
        • 绑定交换机和队列(扇形交换机不需要指定key)
          • 通过binding这个方法,其中参数就是把交换机传进来,以及把要绑定的队列传进来
          • 调用BindingBuilder的.bind()完后把那个队列传进来。再使用to()括号里面是交换机的名字
@Bean
    public Binding bindingA(FanoutExchange fanoutExchange ,Queue queueA){
        return BindingBuilder.bind(queueA).to(fanoutExchange);
    }
 @Bean
 public Binding bindingB(FanoutExchange fanoutExchange ,Queue queueB){
     return BindingBuilder.bind(queueB).to(fanoutExchange);
 }
    • 第四步: 发送消息
      • 第一步:注入rabbitTemplate
      • 第二步:创建一个方法来定义消息,完后再使用Message() 这个类来把消息封装了.
//定义消息
        String msg ="hello world";
        //这里Message中需要的参数是byte ,我们需要getBytes,将String转换为byte
        //需要使用这个将消息封装一下
        Message message = new Message(msg.getBytes());
      • 第三步:就是使用rabbitTemplate 这个类中convertAndSend方法来发送消息.其中括号中参数分别是:
          • 第一个写的是我们在配置类中定义的额交换机的名字
          • 第二个一般都需要路由key吗,但是扇形交换机不需要路由key
          • 第三个参数就是我们封装的消息
rabbitTemplate.convertAndSend("exchange.fanout","",message);

扇形交换机发送消息的完整写法

public class MessageService {
    //先注入rabbitTemplate
    // 这里直接可以使用rabbitTemplate是因为springBoot中有一个自动装配的功能,我们把依赖导入
    @Autowired
    private RabbitTemplate rabbitTemplate;
    public void sendMsg(){
        //定义消息
        String msg ="hello world";
        //这里Message中需要的参数是byte ,我们需要getBytes,将String转换为byte
        //需要使用这个将消息封装一下
        Message message = new Message(msg.getBytes());
        //发送消息,括号里面第一个写的是我们在配置类中定义的额交换机的名字
        //第二个一般都需要路由key吗,但是扇形交换机不需要路由key
        //第三个参数就是我们封装的消息
        rabbitTemplate.convertAndSend("exchange.fanout","",message);
        log.info("消息发送完毕,发送时间为:{}" ,new Date());
    }
}
    • 第五步: 上面编写完方法以后,我们最后一步,调用这个方法,来发送消息,在启动上实现

ApplicationRunner类(这个类表示程序一启动就会执行里面的那个方法),完后并重写里面的方法,完后我们把那个编写消息的那个类注入进来,完后调用那个方法即可.

public class Application implements ApplicationRunner {

    @Autowired
    private MessageService messageService;
    public static void main( String[] args ){
        SpringApplication.run(Application.class,args);
    }

    /**
     * 程序一启动就会运行
     * @param args
     * @throws Exception
     */
    @Override
    public void run(ApplicationArguments args) throws Exception {
        messageService.sendMsg();
    }

述这样就代表消息发送过去了

    • 第六步 : 接收消息
      • 一般情况都是一个模块发送消息,另一个模块接收消息
        • 创建一个新的模块,来接收消息
        • 第一步:还是添加rabbitmq的依赖
        • 第二步:在配置文件中添加rabbitmq的相关配置。
        • 第三步:编写一个接收消息的类,完后来接收消息
          • 这个类编写的步骤:
            • 第一步:创建一个方法,在里面传入Message的参数。
            • 第二步:在这个类上标注解@RabbitListener(),其中括号里面写的就是对队列的名称。
            • 第三步:就是调用message.getBody()方法,来获取消息。
            • 第四步:就是将接收到消息的字节数组,转为字符串
@Slf4j
@Component
public class ReceiveMessage {
    /**
     * 接收两个队列的消息
     * @RabbitListener() 就是接收哪些队列的消息,就写哪些队列
     */
    @RabbitListener(queues = {"queue.famout.a","queue.famout.b"})
    public void receiveMsg(Message message){
        byte[] body = message.getBody();
        //上面那个得到的是字节数据,下面转为字符串
        String msg = new String(body);
        log.info("接收的消息为: {} " ,msg);
    }
}

11.4 Direct Exchange(直连交换机)

根据路由键精确匹配(一模一样)进行路由消息队列;

其中P代表的是生产者 X代表的是交换机 C代表的是消费者 红色的代表队列

那个error代表的就是路由key

那个info error warning 也是路由key

原理就是 :生产者发送消息到交换机时,也会指定一个key(发送key),交换机发送到队列也需要一个key(路由key) ,当发送key和路由key相同时,才会将交换机中的消息发送到队列中

【例子】:当发消息的时候指定的路由key是hello,那么到交换机以后,则消息不会进入任何一个队列

当发消息的时候指定的路由key是error,则到交换机以后,则消息会同时发送到两个队列。

11.5 例子【直连交换机的例子】:

实现步骤:

    • 第一步:添加依赖
    • 第二步:在配置文件中配置rabbitmq的配置
    • 第三步:创建一个rabbitmq的配置类
      • 创建步骤:
        • 还是那三部曲
          • 第一步:定义交换机
@Bean
    public DirectExchange directExchange (){
        return ExchangeBuilder.directExchange(exchangeName).build();
    }
          • 定义队列
@Bean
    public Queue queueA(){
            return QueueBuilder.durable(queueAName).build();
    }

    @Bean
    public Queue queueB(){
           return QueueBuilder.durable(queueBName).build();
    }
          • 交换机与队列绑定
    @Bean
    public Binding bindingA(DirectExchange directExchange,Queue queueA){
        return BindingBuilder.bind(queueA).to(directExchange).with("error");
    }
    @Bean
    public Binding bindingB1(DirectExchange directExchange ,Queue queueB){
        return BindingBuilder.bind(queueB).to(directExchange).with("info");
    }
    @Bean
    public Binding bindingB2(DirectExchange directExchange ,Queue queueB){
        return BindingBuilder.bind(queueB).to(directExchange).with("error");
    }
    @Bean
    public Binding bindingB3(DirectExchange directExchange ,Queue queueB){
        return BindingBuilder.bind(queueB).to(directExchange).with("warning");
    }

完整的代码:

/**
 * 从配置文件中读取指定名字的属性,我们在配置文件中定义exchangeName   queueAName   queueBName
 * 那几个属性的值,完后再这里读取。
 */
@ConfigurationProperties("my")
public class RabbitConfig {
    private String exchangeName;
    private String  queueAName;
    private String queueBName;

    /**
     * 三部曲:
     * 第一步:定义队列
     */
    @Bean
    public DirectExchange directExchange (){
        return ExchangeBuilder.directExchange(exchangeName).build();
    }
    /**
     * 第二步:定义队列
     */
    @Bean
    public Queue queueA(){
            return QueueBuilder.durable(queueAName).build();
    }

    @Bean
    public Queue queueB(){
           return QueueBuilder.durable(queueBName).build();
    }

    /**
     * 交换机与队列绑定
     */
    @Bean
    public Binding bindingA(DirectExchange directExchange,Queue queueA){
        return BindingBuilder.bind(queueA).to(directExchange).with("error");
    }
    @Bean
    public Binding bindingB1(DirectExchange directExchange ,Queue queueB){
        return BindingBuilder.bind(queueB).to(directExchange).with("info");
    }
    @Bean
    public Binding bindingB2(DirectExchange directExchange ,Queue queueB){
        return BindingBuilder.bind(queueB).to(directExchange).with("error");
    }
    @Bean
    public Binding bindingB3(DirectExchange directExchange ,Queue queueB){
        return BindingBuilder.bind(queueB).to(directExchange).with("warning");
    }
}
    • 第四步: 创建发送消息的类
      • 第一步: 创建步骤:首先注入RabbitTemplate
      • 第二步: 创建一个方法 ,并创建一个消息
      • 第三步: 使用 RabbitTemplate来发送消息,
@Slf4j
public class MessageService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(){
        //创建一条消息.使用建造者模式
        Message message = MessageBuilder.withBody("hello world".getBytes()).build();
        //下面就是发送消息,括号里面分别是,参数1 交换机的名字, 参数2 路由key, 参数3 消息
        rabbitTemplate.convertAndSend("exchange.direct", "info",message);
        log.info("消息发送完毕");
    }
11.6 Topic Exchange(主题交换机)

通配符匹配,相当于模糊匹配;

#匹配多个单词,用来表示任意数量(零个或多个)单词

*匹配一个单词(必须有一个,而且只有一个),用 . 隔开的为一个

【例子】:beijing.# == beijing.queue.abc, beijing.queue.xyz.xxx

beijing.* == beijing.queue, beijing.xyz

发送时指定的路由键:lazy.orange.rabbit 则与队列Q1 和队列Q2都匹配。

但是虽然与Q2队列有两个都匹配的,但是,也只会进去以条消息。

11.7 例子【主题交换机】

编写步骤:

  • 第一步:添加mq的依赖
  • 第二步:在配置文件中添加mq的配置
  • 第三步:编写一个配置类
    • 编写配置类的步骤:
      • 第一步:定义交换机
@Bean
    public TopicExchange topicExchange(){
        return ExchangeBuilder.topicExchange(exchangeName).build();
    }
      • 第二步:定义队列
@Bean
    public Queue queueA(){
        return QueueBuilder.durable(queueAName).build();
    }
    @Bean
    public Queue queueB(){
        return QueueBuilder.durable(queueBName).build();
    }
      • 交换机与队列绑定
@Bean
    public Binding bindingA(TopicExchange topicExchange,Queue queueA){
        return BindingBuilder.bind(queueA).to(topicExchange).with("*.orange.*");
    }

    @Bean
    public Binding bindingB1(TopicExchange topicExchange,Queue queueB){
        return BindingBuilder.bind(queueB).to(topicExchange).with("*.*.rabbit");
    }
    @Bean
    public Binding bindingB2(TopicExchange topicExchange,Queue queueB){
        return BindingBuilder.bind(queueB).to(topicExchange).with("lazy.#");
    }
  • 第四步:编写发送消息
@Service
public class MessageService {
    @Autowired
    private AmqpTemplate amqpTemplate;
    /**
     * 发送消息
     */
    @Bean
    public void sendMsg(){
        Message message = MessageBuilder.withBody("hello world".getBytes()).build();
        //有几个参数第一个就是交换机的名字,第二个就是路由key,第三个就是消息
        amqpTemplate.convertAndSend("exchange.topic","lazy.orange.rabbit",message);

    }
  • 第五步:让启动类实现ApplicationRunner 并重写里面的方法(方法的作用就是程序一启动就会调用发消息的那个方法)
public class Application implements ApplicationRunner {
    @Autowired
    private MessageService messageService;

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @Override
    public void run(ApplicationArguments args) throws Exception {
        messageService.sendMsg();
    }
11.8 Headers Exchange(头部交换机)

基于消息内容中的headers属性进行匹配;

其中 P代表的消息的生产者 C代表消息的消费者 X代表的是交换机 红色代表队列

原理: 就是在发消息的时候,我们在消息的头部在加上匹配的值,看与那个队列匹配。

11.9 例子【头部交换机】

实现步骤:

  • 第一步:引入依赖
  • 第二步:在配置文件中加入配置
  • 第三步:创建一个配置类
    • 配置类的编写步骤
      • 第一步:定义头部交换机
@Bean
    public HeadersExchange headersExchange(){
        return ExchangeBuilder.headersExchange(exchangeName).build();
    }
      • 第二步:创建队列
 @Bean
    public Queue queueA(){
        return QueueBuilder.durable(queueAName).build();
    }
    @Bean
    public Queue queueB(){
        return QueueBuilder.durable(queueBName).build();
    }
      • 第三步:交换机与队列绑定,这个需要是头部的信息绑定,所以我们需要创建一个队列,存储每个交换机与队列的绑定。
    @Bean
    public Binding bindingA(HeadersExchange headersExchange,Queue queueA){
        Map<String,Object> headerValues = new HashMap<>();
        headerValues.put("type","m");
        headerValues.put("status",1);
        return BindingBuilder.bind(queueA).to(headersExchange).whereAll(headerValues).match();
    }
    @Bean
    public Binding bindingB(HeadersExchange headersExchange,Queue queueB){
        Map<String,Object> headerValues = new HashMap<>();
        headerValues.put("type","s");
        headerValues.put("status",0);
        return BindingBuilder.bind(queueB).to(headersExchange).whereAll(headerValues).match();
    }
  • 第四步:创建发送消息
public class MessageService {
    @Value("${my.exchangeName}")
    private String exchangeName;
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void  sandMsg(){
        //消息属性
        MessageProperties messageProperties = new MessageProperties();
        Map<String ,Object> headers = new HashMap<>();
        headers.put("type","s");
        headers.put("status",0);
        //设置消息头
            messageProperties.setHeaders(headers);
            //添加消息属性
        Message message = MessageBuilder.withBody("hello world".getBytes())
                .andProperties(messageProperties).build();
        //头部交换机,不需要只当路由key
        rabbitTemplate.convertAndSend(exchangeName,"",message);
    }
}

12 RabbitMQ过期消息

过期消息也叫TTL消息,TTL:Time To Live

消息的过期时间有两种设置方式:(过期消息)

12 .1 设置单条消息的过期时间

就是在MessageProperties()中有一个setExpiration()来设置单个消息的过期时间。

设置步骤:

  • 前面的引入依赖和配置文件的编写都一样,以及配置类的编写没有任何区别,所以我们这里只编写发送消息的步骤
  • 发送消息的步骤:
    • 第一步:先创建一个MessageProperties() ,完后再调用setExpiration()来设置消息的过期时间
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("15000");//设置过期的毫秒数
    • 第二步:通过建造者模式中的MessageBuilder 来创建消息
Message message = MessageBuilder.withBody("hello world".getBytes())
                .andProperties(messageProperties).build();
    • 第三步:通过模板方法rabbitTemplate.convertAndSend来发送消息
rabbitTemplate.convertAndSend("exchange.ttl.a","info",message);

完整的代码编写

@Service
@Slf4j
public class MessageService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("15000");//设置过期的毫秒数
        Message message = MessageBuilder.withBody("hello world".getBytes())
                .andProperties(messageProperties).build();
        rabbitTemplate.convertAndSend("exchange.ttl.a","info",message);
        log.info("消息发送完毕");
    }
}

12.2 通过队列属性设置消息过期时间

设置步骤:

  • 第一步:我们还是编写配置类,并在里面编写三部曲,这里只不过在设置队列的时候,来设置队列的属性
    • 设置队列属性的步骤:
      • 有两种方式:
        • 第一种:就是通过new的方式,就是new Queue的时候,里面有5个参数,其中第一个是队列名称,第一个就是是否持久化 ,还有一个Map集合,其中就是在Map集合中来设置过期时间。
        • 第二步:就是创建一个集合,完后使用Map.put()方式,来向集合中放入队列的过期时间,其中队列过期时间的属性x-message-ttl
 @Bean
    public Queue queue(){
        //方式1,new的方式
        Map<String ,Object> arguments =new HashMap<>();
        arguments.put("x-message-ttl",15000);
       return new Queue(queueName,true,false,false,arguments);
    }
      • 第二种就是:建造者模式: 这种模式,还是要提前创建一个Map集合,完后通过建造者模式,将集合传入进去。
    • 完整的代码
//第二种:建造者模式
@Bean
    public Queue queue(){
        //方式1,new的方式
        Map<String ,Object> arguments =new HashMap<>();
        arguments.put("x-message-ttl",15000); 
    	return QueueBuilder
                .durable(queueName)
                .withArguments(arguments).build();
  • 第二步:发送消息的步骤:
    • 因为这个设置了队列的过期消息,所以,这里只需要,创建消息,完后再使用rabbit模板发送消息
public class MessageService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(){
        Message message = MessageBuilder.withBody("hello world".getBytes())
                .build();
        rabbitTemplate.convertAndSend("exchange.ttl.b","info",message);
        log.info("消息发送完毕");
    }
}

【注意】:如果单个消息和队列都设置了过期时间,那么消息的过期时间以二者之间较小的那个数值为准。

13 RabbitMQ的死信队列

13.1如何消费者的手动确认
    • 第一步:在消费者端的配置文件中加入配置
     listener:
      simple:
        acknowledge-mode: manual
    • 第二步:在接收消息的一段那段配置来通知其删除
      • 实现过程:
        • 第一步:先编写一个方法,参数是Message Channel 俩个参数
        • 第二步:获取消息的属性
//第一步:获取消息的属性
        MessageProperties messageProperties = message.getMessageProperties();
        • 第三步:根据消息的属性来获取消息的唯一标识(如同身份证号)
//获取消息的唯一标识(如同身份证号)
        long tag = messageProperties.getDeliveryTag();
        • 第四步:编写一个try ....catch 代码块,try { }里面编写的正常接收到消息,正常接收消息后,则告诉消费者就可以删除了 channel.basicAck(tag,false);
try {
            byte[] body = message.getBody();
            String str = new String(body);
            log.info("接收到的消息为"+ str);
        //正常接收后,告诉服务器可以删除了。
            //消费者的手动确认 tag代表确认的就是这条消息
            //false代表只确认当前一条
            channel.basicAck(tag,false);
        • 第五步:就是在catch里面编写的,说明报错了,告诉服务器不要删除,我们需要重新接收一遍 channel.basicNack(tag,false,true);
}catch (Exception e){
            //报错了,我们没有接收到消息,告诉服务器不要删除,我们再重新接收一遍
            log.error("接收者出现问题");
            //告诉服务器不要删除  ,第一个参数:当前消息的标识
                //第二个参数,只代表当前消息
            //第三个参数:重新放回到队列里面
            try {
                channel.basicNack(tag,false,true);
            } catch (IOException ex) {
                e.printStackTrace();
            }
            throw new RuntimeException(e);
        }
  • 完整的代码编写
package com.powernode.message;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

@Component
@Slf4j
public class MessageReceive {

    @RabbitListener(queues = {"queue.normal.4"})
    public void receviceMsg(Message message , Channel channel){
        //第一步:获取消息的属性
        MessageProperties messageProperties = message.getMessageProperties();
        //获取消息的唯一标识(如同身份证号)
        long tag = messageProperties.getDeliveryTag();

        try {
            byte[] body = message.getBody();
            String str = new String(body);
            log.info("接收到的消息为"+ str);
        //正常接收后,告诉服务器可以删除了。
            //消费者的手动确认 tag代表确认的就是这条消息
            //false代表只确认当前一条
            channel.basicAck(tag,false);
        }catch (Exception e){
            //报错了,我们没有接收到消息,告诉服务器不要删除,我们再重新接收一遍
            log.error("接收者出现问题");
            //告诉服务器不要删除  ,第一个参数:当前消息的标识
                //第二个参数,只代表当前消息
            //第三个参数:重新放回到队列里面
            try {
                channel.basicNack(tag,false,true);
            } catch (IOException ex) {
                e.printStackTrace();
            }
            throw new RuntimeException(e);
        }
    }
}
13.2 什么条件下会变成死信队列?
    • 1、消息过期
      • 是通过定义消息属性来设置消息的过期时间,在正常队列哪里设置死信交换机
 public void sendMsg(){
        //定义一个消息属性
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration("15000");
        //定义消息
        Message message = MessageBuilder.withBody("hello world".getBytes())
                .andProperties(messageProperties)
                .build();
        //发消息的几个参数
        rabbitTemplate.convertAndSend("exchange.normal.2","order",message);
      
    • 2、队列过期
      • 是在定义正常队列哪里设置队列过期,以及设置死信交换机
 @Bean
    public Queue normalQueue(){
        Map<String, Object> arguments =new HashMap<>();
//        //设置消息过期时间
       arguments.put("x-message-ttl" ,20000);
        //设置死信交换机,一旦消息过了20秒,而且还没有消费者,则就会进入这个死信交换机。
        arguments.put("x-dead-letter-exchange",exchangeDlxName);
        //设置死信路由key,这个key就是死信交换机和死信队列绑定的呢个key
        arguments.put("x-dead-letter-routing-key","error" );
        return QueueBuilder.durable(queueNormalName)
                .withArguments(arguments)
                .build();
    }
    • 3、队列达到最大长度,当达到队列设置的最大长度时,如果此时还有消息进入队列,那么最开始进入队列的消息就会变成死信。
    public Queue normalQueue(){
        Map<String, Object> arguments =new HashMap<>();
        //设置队列的最大长度
        arguments.put("x-max-length",5);
        //设置死信交换机,一旦消息过了20秒,而且还没有消费者,则就会进入这个死信交换机。
        arguments.put("x-dead-letter-exchange",exchangeDlxName);
        //设置死信路由key,这个key就是死信交换机和死信队列绑定的呢个key
        arguments.put("x-dead-letter-routing-key","error" );
        return QueueBuilder.durable(queueNormalName)
                .withArguments(arguments)
                .build();
    }
    • 4、消费者从正常的队列中接收消息,但是对消息不进行确认,并且不对消息进行重新投递(不重新入队),此时消息就进入死信队列。(就是当我们开启了手动确认以后,在发生异常之后,消费者手动不确认,并且不重新入队) channel.basicNack(tag,false,flase);
catch (Exception e){
            //报错了,我们没有接收到消息,告诉服务器不要删除,我们再重新接收一遍
            log.error("接收者出现问题");
            //告诉服务器不要删除 ,
    		//第一个参数:当前消息的标识
            //第二个参数,只代表当前消息
            //第三个参数:是否重新放回到队列里面
            try {
                channel.basicNack(tag,false,flase);
            } catch (IOException ex) {
                e.printStackTrace();
            }
            throw new RuntimeException(e);
    • 5、消费者拒绝消息

开启手动确认模式,并拒绝消息,不重新投递(不重新入队),则进入死信队列

channel.basicReject(tag,false);

   }catch (Exception e){
            log.error("接收者出现问题");
            try {
                //消费者拒绝消息,
                // 第一个参数:当前消息的唯一标识
                //第二个参数:是否重新入队;
                //basicReject与basicNack的区别:basicReject只能处理一条消息
                channel.basicReject(tag,false);
            } catch (IOException ex) {
                e.printStackTrace();
            }
            throw new RuntimeException(e);
        }
    }
13.3 死信队列的详细解释

其中死信队列也叫做死信交换机、死信邮箱等说法。

DLX:Dead - Letter - Exchange 死信交换器、死信邮箱

过程:

第一步:生产者生产消息 ----> 消息发送到交换机 ----> 交换机根据路由key发送到队列----->当没有死信交换机之前 在规定的时间内消息过期无人接收,则消息丢了 -----> 创建一个死信交换机 -----> 当创建一个死信交换机之后,在规定的时间内无人接收的消息则进入死信交换机 。---->死信交换机根据路由key路由到另一条队列(死信队列)---->消费者也可以接收死信队列的信息。

代码的实现过程

  • 第一步:引入依赖
  • 第二步:在配置文件中配置
  • 第三步:创建一个Rabbit的配置类(就是创建交换机等)
    • 配置类中的第一步:就是引入配置类中的交换机和队列的名称
    • 第二步:创建正常的交换机
 @Bean
    public DirectExchange normalExchange(){
        return ExchangeBuilder.directExchange(exchangeNormalName).build();
    }
    • 第三步:创建正常的队列,并在正常的队列中,设置消息的过期时间设置死信交换机以及设置死信路由key (就是死信交换机与死信队列绑定的那个key)
 @Bean
    public Queue normalQueue(){
        Map<String, Object> arguments =new HashMap<>();
        //设置消息过期时间
        arguments.put("x-message-ttl" ,20000);
        //设置死信交换机,一旦消息过了20秒,而且还没有消费者,则就会进入这个死信交换机。
        arguments.put("x-dead-letter-exchange",exchangeDlxName);
        //设置死信路由key,这个key就是死信交换机和死信队列绑定的呢个key
        arguments.put("x-dead-letter-routing-key","error" );
        return QueueBuilder.durable(queueNormalName)
                .withArguments(arguments)
                .build();
    }
    • 第四步:正常的交换机与正常的队列绑定
public Binding bindingNormal(DirectExchange normalExchange ,Queue normalQueue){
        return BindingBuilder.bind(normalQueue).to(normalExchange).with("order");
    }
    • 第五步:声明死信交换机(就和正常的交换机一样)
@Bean
    public DirectExchange dlxExchange(){
        return ExchangeBuilder.directExchange(exchangeDlxName).build();
    }
    • 第六步:声明死信队列
@Bean
    public Queue dlxQueue(){
        return QueueBuilder.durable(queueDlxName).build();
    }
    • 第七步:死信队列与死信交换机绑定
 @Bean
    public Binding bindingDlx(DirectExchange dlxExchange ,Queue dlxQueue){
        return BindingBuilder.bind(dlxQueue).to(dlxExchange).with("error");
    }
    • 第八步:创建消息,并发送消息
@Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(){
            //定义消息
        Message message = MessageBuilder.withBody("hello world".getBytes()).build();
        //发消息的几个参数
        rabbitTemplate.convertAndSend("exchange.normal.a","order",message);
        log.info("消息发送完毕");
    }

14 RabbitMQ的延迟队列

使用场景就是关闭规定时间内未支付的订单

场景:有一个订单,15分钟内如果不支付,就把该订单设置为交易关闭,那么就不能支付了,这类实现延迟任务的场景就可以采用延迟队列来实现,当然除了延迟队列来实现,也可以有一些其他办法实现;

14.1 定时任务方式
  • 每隔3秒扫描一次数据库,查询过期的订单然后进行处理;

优点:

简单,容易实现;

缺点:

1、存在延迟(延迟时间不准确),如果你每隔1分钟扫一次,那么就有可能延迟1分钟;

2、性能较差,每次扫描数据库,如果订单量很大

14.2、 被动取消

当用户查询订单的时候,判断订单是否超时,超时了就取消(交易关闭)

优点:

对服务器而言,压力小;

缺点:

1、用户不查询订单,将永远处于待支付状态,会对数据统计等功能造成影响;

2、用户打开订单页面,有可能比较慢,因为要处理大量订单,用户体验少稍差;

14.3、JDK延迟队列(单体应用,不能分布式下)

DelayedQueue

无界阻塞队列,该队列只有在延迟期满的时候才能从中获取元素

优点:

实现简单,任务延迟低;

缺点:

服务重启、宕机,数据丢失;

只适合单机版,不适合集群;

订单量大,可能内存不足而发生异常; oom

14.4、 采用消息中间件(rabbitmq)

1、RabbitMQ本身不支持延迟队列,可以使用TTL(消息过期)结合DLX(死信队列)的方式来实现消息的延迟投递,即把DLX跟某个队列绑定,到了指定时间,消息过期后,就会从DLX路由到这个队列,消费者可以从这个队列取走消息。

上述 :order.ttl.queue代表的是正常队列

order.dlx.queue 代表的是死信队列

其中交换机只有一个,当生产者根据key找到交换机以后,交换机根据key找到正常队列,过期以后,正常队列指定的死信交换机还是那个交换机(即那条消息就会返回给那个交换机),完后交换机根据死信路由key,发送到死信队列中

存在问题 :如何解决消息过期时间不一致的问题?
如果先发送的消息,消息延迟时间长,会影响后面的 延迟时间段的消息的消费;

就是因为队列是先进先出,因为第一条消息如果设置的时间大于第二个消息设置的时间,那么只有等第一个消息过期以后,才能轮到第二个消息,那么这样的话,第二个消息就和设置的过期时间不一致。

解决方式:

不同延迟时间的消息要发到不同的队列上,同一个队列的消息,它的延迟时间应该一样(即相同的过期时间放到相同的队列里面)

代码实现

  • 第一步:引入依赖
  • 第二步:引入配置文件的信息
  • 第三步:编写交换机以及队列,并绑定,在队列哪里指定死信交换机的名称。
  • 第四步:编写发送消息的类,并创建几条消息,并指定消息的过期时间。分别发送到不同时间的交换机上。
  • 第五步:接收消息(接收的死信队列的消息)。
14 .5 、 使用rabbitmq-delayed-message-exchange 延迟插件
  • 第一步:选择对应的版本下载 rabbitmq-delayed-message-exchange 插件,下载地址:http://www.rabbitmq.com/community-plugins.html
  • 第二步:下载完成以后,解压这个插件

unzip rabbitmq_delayed_message_exchange-3.10.2.ez

如果unzip 没有安装,先安装一下

yum install unzip -y

  • 第三步:先找sbin 目录, 找到这个 plugins目录,启用插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange 开启插件;

原理

安装这个插件以后,就会有一个新的交换机: 延迟交换机类似于直连交换机

消息发送后不会直接投递到队列,而是存储到 Mnesia(嵌入式数据库),检查 x-delay 时间(消息头部);

延迟插件在 RabbitMQ 3.5.7 及以上的版本才支持,依赖 Erlang/OPT 18.0 及以上运行环境;

Mnesia 是一个小型数据库,不适合于大量延迟消息的实现

解决了消息过期时间不一致出现的问题。

代码实现过程:

  • 第一步:引入mq依赖
  • 第二步:在配置文件中引入配置信息
  • 第三步:创建一个配置类,来创建延迟交换机和队列,以及绑定(通过自定义交换机来创建)
 @Configuration
public class RabbitConfig {
    @Value("${my.exchangeName}")
    private String exchangeName;

    @Value("${my.queueDelayName}")
    private String queueDelayName;



    /**
     * 创建交换机
     * 使用自定交换机来创建延迟交换机
     * @return
     */
    @Bean
    public CustomExchange customExchange(){
                //自定义交换机
        Map<String, Object> arguments =new HashMap<>();
        arguments.put("x-delayed-type","direct");
        return  new CustomExchange(exchangeName,"x-delayed-message",true,false,arguments);
    }
    /**
     * 创建队列
     */
    @Bean
    public Queue queue(){
        return QueueBuilder.durable(queueDelayName).build();

    }
    /**
     * 交换机与队列绑定
     */
    @Bean
    public Binding binding(CustomExchange customExchange ,Queue  queue){
        return BindingBuilder.bind(queue).to(customExchange).with("plugin").noargs();
    }
  • 第四步 :发送消息
@Service
@Slf4j
public class MessageService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送消息
     */
    @Bean
    public void sendMsg() {
        {
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setHeader("x-delay", 25000);  //第一条消息,设置延迟时间
            Message message = MessageBuilder.withBody("hello world".getBytes())
                    .andProperties(messageProperties).build();
            rabbitTemplate.convertAndSend("exchange.delay.4", "plugin", message);
            log.info("发送消息完毕");
        }
        {
            MessageProperties messageProperties = new MessageProperties();
            messageProperties.setHeader("x-delay", 15000);  //第二天消息,设置延迟时间
            Message message = MessageBuilder.withBody("hello world111".getBytes())
                    .andProperties(messageProperties).build();
            rabbitTemplate.convertAndSend("exchange.delay.4", "plugin", message);
            log.info("发送消息完毕");
        }
    }
  • 第五步 :接收消息
@Component
@Slf4j
public class ReceiveMessage {
    /**
     * 接收延迟队列的消息,使用的是插件
     * @param message
     */
    @RabbitListener(queues = {"queue.delay.4"})
    public void receiveMsg(Message message){
        String body = new String(message.getBody());
        log.info("接收到消息的为: "+body);
    }

15 消息的可靠性投递

消息的可靠性投递就是要保证消息投递过程中每一个环节都要成功,那么这肯定会牺牲一些性能,性能与可靠性是无法兼得的;

如果业务实时一致性要求不是特别高的场景,可以牺牲一些可靠性来换取性能。

① 代表消息从生产者发送到Exchange; ----使用RabbitMQ消息Confirm模式(生产者确认回调)来解决

② 代表消息从Exchange路由到Queue---使用RabbitMQ消息Return模式 备用交换机模式 来解决
③ 代表消息在Queue中存储;------- 通过设置交换机的持久化,以及队列的持久化、每条消息的持久化

④ 代表消费者监听Queue并消费消息;----自动确认改成手动确认

15 .1 RabbitMQ消息Confirm模式(生产者确认回调)----可能因为网络或者Broker的问题导致①失败,而此时应该让生产者知道消息是否正确发送到了Broker的exchange中;

消息的confirm确认机制,是指生产者投递消息后,到达了消息服务器Broker里面的exchange交换机,则会给生产者一个应答,生产者接收到应

答,用来确定这条消息是否正常的发送到Broker的exchange中,这也是消息可靠性投递的重要保障;

代码的具体实现过程(如何实现生产者确认回调)

  • 第一步: 引入依赖
  • 第二步:在配置文件中配置信息 publisher-confirm-type: correlated 开启确认模式
rabbitmq:
	host: 192.168.190.132
	port: 5672
	username: admin
	password: 123456
	virtual-host: powernode       #pei zhi fa xioa xi fa dao na ge xu ni zhu ji shang
	publisher-confirm-type: correlated   #开启生产者的确认模式,设置成关联模式
  • 第三步:编写一个类实现 RabbitTemplate.ConfirmCallback 类,并实现里面的方法,在方法里面有三个参数,第一个参数是关联数据 第二个参数是判断消息是否到达交换机 ,第三个是原因,这个就是不管消息有没有到达交换机,都会回调这个类,给与我们提示。【这个类也可以直接在发消息的时候直接实现】
@Component
@Slf4j
public class MyConfirmCallBack implements RabbitTemplate.ConfirmCallback {
    /**
     *
     * @param correlationData correlation data for the callback.  关联数据
     * @param ack true for ack, false for nack  真 到达交换机    或者假  没有到达交换机
     * @param cause An optional cause, for nack, when available, otherwise null. 原因
     */
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info(correlationData.getId());  //获得id
        if (ack){
            //说明消息到达交换机
            log.info("消息正确到达交换机");
            return;
        }
        //说明消息没有到达交换机
        log.error("消息没有到达交换机,原因为{}" ,cause);
    }
}
@Service
@Slf4j
public class MessageService implements RabbitTemplate.ConfirmCallback{
    @Autowired
    private RabbitTemplate rabbitTemplate;
    //这个rabbitTemplate 还没有使用我们那个回调接口
    @PostConstruct      //构造方法后执行,如同初始化
    public void init(){
        rabbitTemplate.setConfirmCallback(this);
    }

    public void sendMsg(){
        Message message = MessageBuilder.withBody("hello world".getBytes()).build();
        //定义一个关联数据
        CorrelationData correlationData =new CorrelationData();
        correlationData.setId("order_123456");  //发送订单信息
        rabbitTemplate.convertAndSend("exchange.confrim.1","info",message,correlationData);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        log.info(correlationData.getId());  //获得id
        if (ack){
            //说明消息到达交换机
            log.info("消息正确到达交换机");
            return;
        }
        //说明消息没有到达交换机
        log.error("消息没有到达交换机,原因为{}" ,cause);
    }
}
  • 第四步:在我们发消息的那个类中定义这个关联数据,并调用,完后在我们发送消息到交换机以后,不管有没有到达交换机,都会给我们一个提示
@Service
@Slf4j
public class MessageService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private MyConfirmCallBack myConfirmCallBack;
    //这个rabbitTemplate 还没有使用我们那个回调接口
    @PostConstruct      //构造方法后执行,如同初始化
    public void init(){
        rabbitTemplate.setConfirmCallback(myConfirmCallBack);
    }


    
    public void sendMsg(){
        Message message = MessageBuilder.withBody("hello world".getBytes()).build();
        //定义一个关联数据
        CorrelationData correlationData =new CorrelationData();
        correlationData.setId("order_123456");  //发送订单信息
        rabbitTemplate.convertAndSend("exchange.confrim.1","info",message,correlationData);
    }
}
15.2 RabbitMQ消息Return模式(保证交换机的消息到达队列)-----可能因为路由关键字错误,或者队列不存在,或者队列名称错误导致②失败。

rabbitmq 整个消息投递的路径为:

producer —> exchange —> queue —> consumer

>> 消息从 producer 到 exchange 则会返回一个 confirmCallback

>> 消息从 exchange –> queue 投递失败则会返回一个 returnCallback;

我们可以利用这两个callback控制消息的可靠性投递

代码的实现过程

  • 第一步:引入依赖
  • 第二步:在配置文件配置相关信息,开启return模式
  rabbitmq:
    host: 192.168.190.132
    port: 5672
    username: admin
    password: 123456
    virtual-host: powernode       #pei zhi fa xioa xi fa dao na ge xu ni zhu ji shang
    publisher-returns: true      #开启return模式
  • 第三步:编写一个类来实现RabbitTemplate.ReturnsCallback 并实现里面的方法,(如果这个类被调用,说明没有正确的路由到队列)
@Component
@Slf4j
public class MyReturnCallBack implements RabbitTemplate.ReturnsCallback {
    @Override
    public void returnedMessage(ReturnedMessage returned) {
        //如果这个方法被调用,说明消息没有到达队列
        log.error("消息从交换机没有正确的路由到队列,原因为{}",returned.getMessage());
    }
}
  • 第四步:在发送消息的类中,来设置,使 rabbitTemplate 调用我们编写的那个类
public class MessageService {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    
    @Autowired
    private MyReturnCallBack myReturnCallBack;
    //这个rabbitTemplate 还没有使用我们那个回调接口

    @PostConstruct      //构造方法后执行,如同初始化
    public void init(){
        rabbitTemplate.setReturnsCallback(myReturnCallBack);
    }
  • 第五步 : 编写发送消息
@Service
@Slf4j
public class MessageService {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private MyReturnCallBack myReturnCallBack;
    //这个rabbitTemplate 还没有使用我们那个回调接口

    @PostConstruct      //构造方法后执行,如同初始化
    public void init(){
        rabbitTemplate.setReturnsCallback(myReturnCallBack);
    }

    public void sendMsg(){
        Message message = MessageBuilder.withBody("hello world".getBytes()).build();
        rabbitTemplate.convertAndSend("exchange.return.1","info11111",message);
    }
}
15.3 确保消息在队列正确地存储

可能因为系统宕机、重启、关闭等等情况导致存储在队列的消息丢失,即③出现问题;

(1)、队列持久化

QueueBuilder.durable(QUEUE).build();

(2)、交换机持久化

ExchangeBuilder.directExchange(EXCHANGE).durable(true).build();

(3)、消息持久化

MessageProperties messageProperties = new MessageProperties();
 messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); 
15.4 确保消息从队列正确地投递到消费者

如果消费者收到消息后未来得及处理即发生异常,或者处理过程中发生异常,会导致④失败。

为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(message acknowledgement);

实现步骤:

  1. #开启手动ack消息消费确认 ,在配置文件中配置
spring.rabbitmq.listener.simple.acknowledge-mode=manual
  1. #编写配置类(配置交换机、队列、以及绑定)
  2. 编写一个发送消息的类
  3. 编写一个接收消息的类 ,开启手动消息确认后,我们接收完消息后,并不会删除
@Component
@Slf4j
public class ReceiveMessage {
    /**
     * 接收延迟队列的消息,使用的是插件
     * @param message
     */
    @RabbitListener(queues = {"queue.delay.4"})
    public void receiveMsg(Message message , Channel channel){
        //获取消息的唯一标识
        long tag = message.getMessageProperties().getDeliveryTag();
        String body = null;
        try {
            body = new String(message.getBody());
            log.info("接收到消息的为: "+body);
            //TODO 正确的话,我们会有插入数据库等
            //正确的话,就是告诉服务器,可以删除了
            //第一个参数是,当前消息,
            // 第二个参数:是否批量处理
            channel.basicAck(tag,false);
        } catch (Exception e) {
            log.error("消息处理出现问题");
            try {
                //三个参数分别是:当前消息
                //是否是批量处理
                //是否重新入队
                channel.basicNack(tag,false,true);
            } catch (IOException ex) {
                throw new RuntimeException(ex);
            }
            throw new RuntimeException(e);
        }

16 交换机的属性

  • 具体参数(以下属性是在创建交换机的时候设置的)
    • Name:交换机名称;就是一个字符串
    • Type:交换机类型,direct, topic, fanout, headers四种
    • Durability:持久化,声明交换机是否持久化,代表交换机在服务器重启后是否还存在;默认是持久化的, yes为持久化 ,flase为不持久化(没有保存到磁盘上).
    • Auto delete:是否自动删除,曾经有队列绑定到该交换机,后来解绑了,那就会自动删除该交换机;
    • Internal:内部使用的,如果是yes,客户端无法直接发消息到此交换机,它只能用于交换机与交换机的绑定。
    • Arguments:只有一个取值alternate-exchange,表示备用交换机;当队列与交换机因为路由无法发送消息,则会发送到备用交换机上.
  • 例子
@Bean
    public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange(exchangeName)
                .durable(true)
                .autoDelete()   //设置自动删除,默认不是自动删除的,boolean类型的默认为flase
                .build();
    }
16.1 备用交换机

正常的情况下,备用队列是不应该有消息的,当备用队列有消息时,说明出错了.提示我们及时更改代码.

主交换机是direct(直连的) 备用交换机是fanout(扇形的)

描述:就是我们生产者与交换机的key是hello ,完后交换机与队列的key是info,完后就不能进入队列里面,则就会进入备用交换机里面,从而进入备用队列。

当路由没有写错,则就会进入指定的交换机,不会进入备用交换机。

  • 备用交换机的设置:
  • 第一种:使用建造者模式的时候 .alternate()
 public DirectExchange directExchange(){
        return ExchangeBuilder.directExchange(exchangeName)
                .alternate()  //设置备用交换机
                .build();

17 队列的详细属性

队列属性的设置都是在创建队列的时候,来设置队列的属性。

  1. Type:队列类型
  2. Name:队列名称,就是一个字符串,随便一个字符串就可以;
  3. Durability:声明队列是否持久化,代表队列在服务器重启后是否还存在;
  4. Auto delete: 是否自动删除,如果为true,当没有消费者连接到这个队列的时候,队列会自动删除;
  5. Exclusive:exclusive属性的队列只对首次声明它的连接可见,并且在连接断开时自动删除;基本上不设置它,设置成false
  6. Arguments:队列的其他属性,例如指定DLX(死信交换机等)
    1. x-expires:Number

当Queue(队列)在指定的时间未被访问,则队列将被自动删除;

    1. x-message-ttl:Number

发布的消息在队列中存在多长时间后被取消(单位毫秒);

    1. x-overflow:String

设置队列溢出行为,当达到队列的最大长度时,消息会发生什么,

有效值为Drop Head 删除头部 默认的

Reject Publish 拒绝发布 ,一旦队列满了以后,就不再接收新的消息

    1. x-max-length:Number

队列所能容下消息的最大长度,当超出长度后,新消息将会覆盖最前面的消息,类似于Redis的LRU算法;

    1. x-single-active-consumer:默认为false

激活单一的消费者,也就是该队列只能有一个消息者消费消息;

    1. x-max-length-bytes:Number

限定一定的字节装入队列,当超出后就不在装入队列;

    1. x-dead-letter-exchange:String

指定队列关联的死信交换机,有时候我们希望当队列的消息达到上限后溢出的消息不会被删除掉,而是走到另一个队列中保存起来;

    1. x-dead-letter-routing-key:String

指定死信交换机的路由键,一般和6一起定义;

    1. x-max-priority:Number

如果将一个队列加上优先级参数,那么该队列为优先级队列; 数字越大优先级越高,默认优先级为0

(1)、给队列加上优先级参数使其成为优先级队列

x-max-priority=10【0-255取值范围】

(2)、给消息加上优先级属性

通过优先级特性,将一个队列实现插队消费;

实现方式:在创建消息的时候,通过MessageProperties中的setPriority()来设置优先级。

public void sendMsg(){
        MessageProperties messageProperties =new MessageProperties();
        messageProperties.setPriority(6);
        Message message = MessageBuilder.withBody("hello world".getBytes())
    									.andProperties(messageProperties)
    									.build();
        rabbitTemplate.convertAndSend("exchange.return.1","info11111",message);
    }

18 消息的幂等性

消息消费时的幂等性(消息不被重复消费)

同一个消息,第一次接收,正常处理业务,如果该消息第二次再接收,那就不能再处理业务,否则就处理重复了;

幂等性是:对于一个资源,不管你请求一次还是请求多次,对该资源本身造成的影响应该是相同的,不能因为重复的请求而对该资源重复造成影响;

其中select 、update、delete 都是幂等的。insert是非幂等的。

在http中:get、put、delete都是幂等的。post是非幂等的。

18.1 如何避免消息的重复消费问题?

全局唯一ID + Redis

生产者在发送消息时,为每条消息设置一个全局唯一的messageId,消费者拿到消息后,将其放入redis中,并设置这个key,完后再对设置的结果进行判断,如果为true,说明这条消息不存在就可以存入数据库了 ,当一条相同的id的消息再次过来,则返回结果为flase,则不处理。

private ObjectMapper objectMapper 用于序列化和反序列化的 (json格式的)

objectMapper.writeValueAsString(传入的对象) 将对象转为字符串

objectMapper.readValue(消息,要转成那个对象) 将字符串或字节数组,转为对象

序列化----就是将对象转为字符串,或者字节数组

反序列化---就是将字符串或者字节数组转为对象

具体的代码实现过程:

消费者拿到消息以后,放入redis中,并设置这个key,完后再对设置的结果进行判断,如果为true,说明设置成功了(下一步就可以插入数据库了)

//放入redis中,并设置id
Boolean  setRuslt = RedisTemplate.opsForValue().setIfAbsent(要设置的id)
//如果id不存在,说明没有这条消息,就可以放入数据库了
if(setRuslt){
    //就可以放入数据库了
}
//下一条消息再过来,就返回的flase,完后就不处理了。

19 RabbitMQ集群与高可用

RabbitMQ 的集群分两种模式,一种是默认集群模式,一种是镜像集群模式;

在RabbitMQ集群中所有的节点(一个节点就是一个RabbitMQ的broker服务器)被归为两类:一类是磁盘节点,一类是内存节点;

磁盘节点会把集群的所有信息(比如交换机、绑定、队列等信息)持久化到磁盘中,而内存节点只会将这些信息保存到内存中,如果该节点宕机或重启,内存节点的数据会全部丢失,而磁盘节点的数据不会丢失;

19 .1 默认集群模式

默认集群模式也叫 普通集群模式、或者 内置集群模式;

RabbitMQ默认集群模式,只会把交换机、队列、虚拟主机等元数据信息在各个节点同步,而具体队列中的消息内容不会在各个节点中同步;

接发消息的原理

创建队列的时候,只要在任何一个节点创建,都会复制到其他的两个节点上。

接收消息的时候:

但是发消息的时候只会发送到节点1上,接消息的时候也是从节点1接收消息

发送消息的时候:

假如发消息的时候,我们连接到节点2,节点2上本身是不存储消息的,它会把请求转到节点1上,从而发送到节点1上。节点1停了,还是不能发消息的

假如接消息的时候是从节点3上接收,节点3上没有消息,它会把请求转到节点1上,从而接收到消息

元数据(除了消息本身其他的都是元数据)

队列元数据:队列名称和属性(是否可持久化,是否自动删除)

交换器元数据:交换器名称、类型和属性

绑定元数据:交换器和队列的绑定列表

(虚拟主机)vhost元数据:vhost内的相关属性,如安全属性等;

默认集群模式式

优点:

1)节省存储空间;

2)性能提高;

如果消息需要复制到集群中每个节点,网络开销不可避免,持久化消息还需要写磁盘,占用磁盘空间。

缺点:

当其中一个队列宕机以后,则内容就会全部消失。

19.2 集群的搭建
  • 第一步:从已经安装好rabbitmq的机器 clone 三台机器,【注意clone完,先不要启动三台机器,三台机器均要重新生成mac地址,防止clone出的机器ip地址重复
    • 虚拟机的克隆步骤:
      • 将虚拟机关机
      • 点击管理---->点击克隆---->选中虚拟机当前的状态--->选中创建完整的虚拟机-->填写克隆的虚拟机的名称---> 点击完成--->在启动前改变其物理ip--->启动前点击网络适配器---->高级---->召见MAC地址--->点击几遍生成 ------->克隆完成
  • 第二步:使用xshell 连接三台机器
  • 第三步:修改三台机器的/etc/hosts 文件,vim /etc/hosts
192.168.131.128 rabbit130
192.168.131.129 rabbit133
192.168.131.130 rabbit134

  • 第三步:三台机器均重启网络,使节点名生效(可以直接关机重新启动)
systemctl restart network
  • 第四步:三台机器的防火墙处理
systemctl status firewalld ---查看防火墙的状态
systemctl stop firewalld  --关闭防火墙
systemctl disable firewalld  --开机不启动防火墙
  • 第五步:三台机器 .erlang.cookie文件保持一致由于是clone出的三台机器,所以肯定是一样的
如果我们使用解压缩方式安装的RabbitMQ,那么该文件会在${用户名}目录下,
也就是${用户名}/.erlang.cookie;
如果我们使用rpm安装包方式进行安装,那么这个文件会在/var/lib/rabbitmq目录下;
查看隐藏文件用的ls-a

注意 .erlang.cookie的权限为400,目前已经是400

  • 第六步: 分别启动
rabbitmq-server -detached
  • 第七步: 使用以下命令查看集群状态 Disk Nodes 表示磁盘节点
rabbitmqctl cluster_status
  • 第八步:以上的三个机器还是互不相干三个机器,并不是集群,构建集群
    • 1、先把其他的两个rabbitmq停掉
rabbitmqctl stop_app  仅仅是停掉rabbitmq
    • 2、把当前rabbit中的交换机和队列全部重置
rabbitmqctl reset
    • 3、将当前rabbit加入到第一个rabbit中,并成为内存节点

--ram 参数表示让rabbitmq128成为一个内存节点,如果不带参数默认为disk磁盘节点;

rabbit@rabbit128 代表的主rabbitmq

rabbitmqctl join_cluster rabbit@rabbit128 --ram   rabbit@rabbit128主rabbitmq
    • 4、启动rabbitmq
rabbitmqctl start_app
    • 5、查看主rabbit的状态
rabbitmqctl cluster_status
19.3 操作集群中的一个节点,添加用户和权限等
#列出用户
rabbitmqctl list_users
# 添加用户
rabbitmqctl add_user admin 123456
#查看权限
rabbitmqctl list_permissions
#设置权限
rabbitmqctl set_permissions admin ".*" ".*" ".*"
#设置角色
rabbitmqctl set_user_tags admin administrator


【注意】:这个启动插件需要给集群中的每一个rabbit都单独启动
#列出插件
rabbitmq-plugins list
#启动web控制台插件
rabbitmq-plugins enable rabbitmq_management 

使用web浏览器添加一个虚拟主机:powernode

实现原理

RabbitMQ底层是通过Erlang架构来实现的,所以rabbitmqctl会启动Erlang节点,并基于Erlang节点来使用Erlang系统连接RabbitMQ节点,在连接过程中需要正确的Erlang Cookie和节点名称,Erlang节点通过交换Erlang Cookie以获得认证;

19.4 springboot连接集群

只需要改变配置文件即可 ,其他接发消息是一样的。

spring:
  #配置rabbitmq
  rabbitmq:
    # 连接集群,使用逗号分隔
    addresses: 192.168.150.150:5672,192.168.150.151:5672,192.168.150.152:5672
    username: admin
    password: 123456
    virtual-host: powernode

20 镜像集群模式

镜像模式是基于默认集群模式加上一定的配置得来的;

在默认模式下的RabbitMQ集群,它会把所有节点的交换机、绑定、队列的元数据进行复制确保所有节点都有一份相同的元数据信息,但是队列数据分为两种:一种是队列的元数据信息(比如队列的最大容量,队列的名称等配置信息),另一种是队列里面的消息;

镜像模式,则是把所有的队列数据完全同步,包括元数据信息和消息数据信息,当然这对性能肯定会有一定影响,当对数据可靠性要求较高时,可以使用镜像模式;

20.1镜像模式的搭建

实现镜像模式也非常简单,它是在普通集群模式基础之上搭建而成的;

镜像队列配置命令:

./rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
  • -p Vhost(虚拟主机): 可选参数,针对指定vhost下的queue进行设置;
  • Name: policy的名称;(可以自己取个名字就可以)
  • Pattern: queue的匹配模式(正则表达式); ----就是说想对哪些主机进行镜像
  • Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode;---ha代表高可用
  • 【例子】:{“ha-mode”:”exactly”,”ha-params”:2}
    • ha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes
      • all:表示在集群中所有的节点上进行镜像
      • exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
      • nodes:表示在指定的节点上进行镜像,节点名称通过ha-params指
    • ha-params:ha-mode模式需要用到的参数
    • ha-sync-mode:进行队列中消息的同步方式,有效值为automatic(自动)和manual(手动)
  • priority:可选参数,policy的优先级;
  • 【例子】:"^policy_表示以policy开头的队列
./rabbitmqctl set_policy -p powernode ha_policy "^policy_" 
'{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}'
  • 【例子2】:如果要在所有节点所有队列上进行镜像,则(在任意节点执行如下命令):

所有节点、所有虚拟主机、所有队列 都进行镜像

./rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}' 
  • 【例子3】:针对某个虚拟主机进行镜像
rabbitmqctl set_policy -p powernode ha-all "^"
'{"ha-mode":"all","ha-sync-mode":"automatic"}'

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2261450.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

VSCode,Anaconda,JupyterNotebook

文章目录 一. 下载VSCode并安装二. 下载Anaconda并安装1. anaconda介绍2. Anaconda的包管理功能3. Anaconda的虚拟环境管理4.Jupyter Notebook5. Jupyter Notebook使用简介6. Jupyter Notebook快捷键7.Jupyter notebook的功能扩展8. Jupyter notebook和Jupyter lab的区别 三. V…

动态导出word文件支持转pdf

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、功能说明二、使用步骤1.controller2.工具类 DocumentUtil 导出样式 前言 提示&#xff1a;这里可以添加本文要记录的大概内容&#xff1a; 例如&#xff…

那些不属性的C语言关键字-const

大家都知道const修饰的变量是不可变的&#xff0c;但是到底是怎么实现的那&#xff0c;有方法修改只读变量的值吗&#xff0c;今天我们结合实验代码&#xff0c;分析下const关键字的实现原理 const变量 1.const修饰局部变量 int main(){const int abc 123;printf("%d\…

【Java 数据结构】List -> 给我一个接口!!!

&#x1f525;博客主页&#x1f525;&#xff1a;【 坊钰_CSDN博客 】 欢迎各位点赞&#x1f44d;评论✍收藏⭐ 目录 1. 什么是 List 2. List 常用的方法 3. List 的使用 1. 什么是 List 其实 List 是一个接口&#xff0c;它继承了 Collection 接口 下列为 List 接口中的各种…

【5G】5G的主要架构选项

最初&#xff0c;在3GPP讨论中考虑了所有可能的聚合和核心网络组合&#xff0c;共有八个架构选项。以下重点介绍option2、3、4和7。 1. 独立组网 (Standalone, SA) 架构选项 2 &#xff1a;Standalone architecture with 5G-core 特点&#xff1a; 5G核心网&#xff08;5GC, …

Ajax简单理解

Ajax 1 什么是ajax AJAXAsynchronous JavaScript and XML (异步的JavaScript和XML)AJAX不是新的编程语言&#xff0c;二十一种使用现有标准的新方法 AJAX 最大的优点是在不重新加载整个页面的情况下&#xff0c;可以与服务器交换数据并更新部分网页内容。 AJAX 不需要任何浏…

【GESP】C++二级考试大纲知识点梳理, (2)计算机网络的基本概念及分类

GESP C二级官方考试大纲中&#xff0c;共有9条考点&#xff0c;本文针对C&#xff08;2&#xff09;号知识点进行总结梳理。 &#xff08;2&#xff09;了解计算机网络的概念&#xff0c;了解计算机网络的分类&#xff08;广域网&#xff08;WAN&#xff09;、城域网&#xff0…

相机与NAS的奇妙组合,如何使用相机拍照自动上传或备份到NAS

相机与NAS的奇妙组合&#xff0c;如何使用相机拍照自动上传或备份到NAS 哈喽小伙伴们好&#xff0c;我是Stark-C~ 对于喜欢使用专业器材拍照摄影的小伙伴来说&#xff0c;想要将相机存储卡中的照片或视频导出到电脑上&#xff0c;要么是使用数据线直接和相机连接&#xff0c;…

window下的qt5.14.2配置vs2022

这里做一个笔记&#xff0c;已知qt5.14.2和vs2022不兼容&#xff0c;无法自动扫描到vs的编译器。但由于团队协作原因&#xff0c;必须使用qt5.14.2&#xff0c;并且第三方库又依赖vs2022。其实qt5.15.2是支持vs2022的&#xff0c;如果能够用qt5.15.2&#xff0c;还是建议使用qt…

QT从入门到精通(一)——Qlabel介绍与使用

1. QT介绍——代码测试 Qt 是一个跨平台的应用程序开发框架&#xff0c;广泛用于开发图形用户界面&#xff08;GUI&#xff09;应用程序&#xff0c;也支持非图形应用程序的开发。Qt 提供了一套工具和库&#xff0c;使得开发者能够高效地构建高性能、可移植的应用程序。以下是…

【协作笔记Trilium Notes Docker部署】开源协作笔记Trilium Notes本地Docker部署远程协作

文章目录 前言1. 安装docker与docker-compose2. 启动容器运行镜像3. 本地访问测试4.安装内网穿透5. 创建公网地址6. 创建固定公网地址 前言 今天分享一款在G站获得了26K的强大的开源在线协作笔记软件&#xff0c;Trilium Notes的中文版如何在Linux环境使用docker本地部署&…

app的测试范围以及web和app的测试区别

目录 图1.App的测试范围1.1功能测试1.2专项测试1.3性能测试 2.Web和App的测试区别2.1相同点2.2不同点 &#x1f44d; 点赞&#xff0c;你的认可是我创作的动力&#xff01; ⭐️ 收藏&#xff0c;你的青睐是我努力的方向&#xff01; ✏️ 评论&#xff0c;你的意见是我进步的…

数据分析实战—鸢尾花数据分类

1.实战内容 (1) 加载鸢尾花数据集(iris.txt)并存到iris_df中,使用seaborn.lmplot寻找class&#xff08;种类&#xff09;项中的异常值&#xff0c;其他异常值也同时处理 。 import pandas as pd from sklearn.datasets import load_iris pd.set_option(display.max_columns, N…

docker 拉取镜像 | 创建容器 | 容器运行

拉取镜像 拉取镜像的命令&#xff1a;docker pull name &#xff08;name换为你要拉取的镜像名&#xff09; docker pull docker.1panel.live/hispark/qiankunbp:1.0.0 docker.1panel.live/hispark/qiankunbp:1.0.0为镜像名 拉取海思的镜像&#xff1a;&#xff08;如果之前拉…

添加标签(vue3)

点击添加按钮&#xff1a; 最多添加5个 注意&#xff1a; 不只可以el-form 进行校验&#xff0c;也可以对单个el-form-item 进行校验 vue elementUI form组件动态添加el-form-item并且动态添加rules必填项校验方法-CSDN博客 el-form 里边有el-form-item &#xff0c;el-fo…

Dash for Mac 代码API文档管理软件安装

Mac分享吧 文章目录 Dash for Mac 代码API文档管理软件 效果图展示一、Dash 代码API文档管理软件 Mac电脑版——v7.3.31️⃣&#xff1a;下载软件2️⃣&#xff1a;安装软件2.1 左侧安装包拖入右侧文件夹中&#xff0c;等待安装完成&#xff0c;运行软件2.2 打开软件&#xff…

Unity添加newtonsoft-json

package name "com.unity.nuget.newtonsoft-json": "3.2.1",打开包管理器 输入包名称和版本 点击添加

分布式全文检索引擎ElasticSearch-数据的写入存储底层原理

一、数据写入的核心流程 当向 ES 索引写入数据时&#xff0c;整体流程如下&#xff1a; 1、客户端发送写入请求 客户端向 ES 集群的任意节点&#xff08;称为协调节点&#xff0c;Coordinating Node&#xff09;发送一个写入请求&#xff0c;比如 index&#xff08;插入或更…

TensorRT C++ API模型加速 —— TensorRT配置、模型转换、CUDA C++加速、TensorRT C++ 加速

文章目录 前言&#xff1a;TensorRT简介0.1、TensorRT算法流程0.2、TensorRT主要优化技术 一、TensorRT配置1.1、TensorRT环境配置1.1.1、CUDA安装1.1.2、TensorRT下载1.1.3、TensorRT CUDA配置1.1.4、TensorRT配置1.1.4.1、TensorRT python配置1.1.4.2、TensorRT C配置&#x…

RPC 服务与 gRPC 的入门案例

RPC 协议 RPC&#xff08;Remote Procedure Call Protocol&#xff09;即远程过程调用协议&#xff0c;它是一种通过网络从远程计算机程序上请求服务的协议&#xff0c;允许一个计算机程序可以像调用本地服务一样调用远程服务 。 RPC的主要作用是不同的服务间方法调用就像本地…