RabbitMQ第一讲

news2024/12/26 21:35:09

目录

一、RabbitMQ-01

1.1 MQ概述

1.2 MQ的优势和劣势

1.2.1 优势

1.2.2 劣势

1.2.3 MQ应用场景

1.2.4 常用的MQ产品

1.3 RabbitMQ的基本介绍

1.3.1 AMQP介绍

1.3.2 RabbitMQ基础架构

1.3.3 RabbitMQ的6种工作模式

​编辑

1.4 AMQP和JMS

1.4.1 AMQP

1.4.2 JMS

1.4.3 AMQP与 JMS 区别

1.4.4 市场常见消息队列

1.5 RabbitMQ的安装和配置

1.6 使用图形化添加一个用户和虚拟主机

1.6.1 添加一个用户

1.6.2 添加一个虚拟主机

1.6.3 配置用户和虚拟主机

1.6.4 删除一个虚拟主机

1.6.5 使用可视化完成消息收发

1.7 java测试RabbitMQ的工作模式

1.7.1 简单模式(Hello world)

1.7.2 工作队列模式(Work queues)

1.7.3 发布订阅模式(Publish/Subscribe)

1.7.4 路由模式(Routing)

1.7.5 通配符模式(Topics)

1.7.6 模式间的异同

1.7.7 使用可视化实现主题模式

1.8 SpringBoot项目整合RabbitMQ

1.8.1 介绍

1.8.2 创建父工程

1.8.3 搭建生产者工程

1.8.4 搭建消费者工程

一、RabbitMQ-01

1.1 MQ概述

MQ全称 Message Queue([kjuː])(消息队列),是在消息的传输过程中保存消息的容器。多用于分布式系统之间进行通信。(队列是一种容器,用于存放数据的都是容器,存放消息的就是消息队列)

分布式系统的调用有两种方式:

1)直接调用

B系统直接调用A系统
被调用者叫生产者 调用者是消费者

2)间接调用

A将数据存放到中间一个系统,通过中间的系统发送到B
MQ是用于存放消息的中间件

1.2 MQ的优势和劣势

1.2.1 优势

应用解耦:提高系统容错性和可维护性。
异步提速:提升用户体验和系统吞吐量。
削峰填谷:提高系统稳定性。

1)应用解耦

使用前:系统的耦合性越高,容错性就越低,可维护性就越低。

例:访问 订单系统 的时候依赖于库存系统、支付系统、物流系统
    ①当库存系统发生异常,就有可能导致订单系统发生异常,下单失败
    ②追加系统(X)就只能修改订单系统更改代码,导致维护性比较低

使用后:使用 MQ 使得应用间解耦,提升容错性和可维护性

①库存系统宕机订单系统影响不大,因为消息已经发送到mq了当库存系统恢复的时候就可以正常使用了。
②追加系统的时候跟订单系统无关

2)异步提速

使用前:

一个下单操作耗时:20 + 300 + 300 + 300 = 920ms
用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!

使用后:

用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。
提升用户体验和系统吞吐量(单位时间内处理请求的数目)。
以前920ms处理一个请求,现在25ms处理一个请求,系统的吞吐量增加

3)削峰填谷(一般是削峰)

使用前

一次性来5000个请求,会导致系统崩溃

使用后

使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ 中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”。
使用MQ后,可以提高系统稳定性。

1.2.2 劣势

1)系统可用性降低

系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?

2)系统复杂度提高

MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?

3)一致性问题

A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?

1.2.3 MQ应用场景

1、生产者不需要从消费者处获得反馈。引入消息队列之前的直接调用,其接口的返回值应该为空,这才让明明下层的动作还没做,上层却当成动作做完了继续往后走,即所谓异步成为了可能。
2、容许短暂的不一致性。
3、确实是用了有效果。即解耦、提速、削峰这些方面的收益,超过加入MQ,管理MQ这些成本。

1.2.4 常用的MQ产品

    目前业界有很多的 MQ 产品,例如 RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMq等,也有直接使用 Redis 充当消息队列的案例,而这些消息队列产品,各有侧重,在实际选型时,需要结合自身需求及 MQ 产品特征,综合考虑。 

1.3 RabbitMQ的基本介绍

1.3.1 AMQP介绍

AMQP,即 Advanced Message Queuing Protocol(英[ˈprəʊtəkɒl])(高级消息队列协议),是一个网络协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP 规范发布。类比HTTP。

消息队列中间件
exchange 交换机,用来分发消息到不同的容器  queue通过路由来处理
queue 容器(队列)
routes 路由
​
流程
生产者(procedure)发布消息到交换机(exchange)
交换机(exchange)通过不同的路由规则发布/路由 给不同的queue进行存储
消费者(consumer)通过队列去监听拿到消息进行消费

2007年,Rabbit 技术公司基于 AMQP 标准开发的 RabbitMQ 1.0 发布。RabbitMQ 采用 Erlang 语言开发。Erlang 语言由 Ericson 设计,专门为开发高并发和分布式系统的一种语言,在电信领域使用广泛。

1.3.2 RabbitMQ基础架构

相关概念介绍(如下图)
1、Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker。
​
2、Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost 创建 exchange/queue 等。
​
3、Connection:publisher/consumer 和 broker 之间的 TCP 连接。
​
4、Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个thread创建单独的 channel 进行通讯,AMQP method 包含了channel id 帮助客户端和message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。
​
5、Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到queue 中去。
常用的类型有:
direct (point-to-point)
topic (publish-subscribe) 
fanout (multicast)
​
6、Queue:消息最终被送到这里等待 consumer 取走
​
7、Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding 信息被保存到 exchange 中的查询表中,用于 message 的分发依据。
​
​
​
① procedure 和consumer都是客户端,客户端通过链接和服务端进行通信,所以需要建立起来连接然后进行通信
​
② 一个rabbitmq里面有很多的虚拟机  相当于mysql里面有很多数据库,数据库里面有很多表,都是独立的。
​
③ 每个虚拟机里面有很多的exchange和queue独立分区的作用

1.3.3 RabbitMQ的6种工作模式

RabbitMQ 提供了 6 种工作模式:

1、Hello World (简单模式)
2、work queues (工作队列模式)
3、Publish/Subscribe (发布与订阅模式)
4、Routing (路由模式)
5、Topics (主题模式)
6、RPC (远程调用模式)(远程调用,不太算 MQ;暂不作介绍)

官网对应模式介绍:RabbitMQ Tutorials — RabbitMQ

1.4 AMQP和JMS

MQ是消息通信的模型;

实现MQ的大致有两种主流方式:AMQPJMS

1.4.1 AMQP

AMQP是一种协议,更准确的说是一种binary wire-level protocol(链接协议)。这是其和 JMS的本质差别,AMQP不从API层进行限定,而是直接定义网络交换的数据格式。

1.4.2 JMS

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。(规定了消息客户端的一套api的东西,rabbitmq没有遵循JMS规则

JMS 是 JavaEE 规范中的一种,类比JDBC。

1.4.3 AMQP与 JMS 区别

JMS是定义了统一的接口,来对消息操作进行统一;AMQP是通过规定协议来统一数据交互的格式。
​
JMS限定了必须使用Java语言;AMQP只是协议,不规定实现方式,因此是跨语言的。
​
JMS规定了两种消息模式;而AMQP的消息模式更加丰富

1.4.4 市场常见消息队列

ActiveMQ:基于JMS

ZeroMQ:基于C语言开发

RabbitMQ:基于AMQP协议,erlang语言开发,稳定性好

RocketMQ:基于JMS,阿里巴巴产品

Kafka:类似MQ的产品;分布式消息系统,高吞吐量。

1.5 RabbitMQ的安装和配置

/usr/lwl/soft路径下建立rabbit目录
​
下面要安装的包就是rpm包
注:
RMP 是 LINUX 下的一种软件的可执行程序,你只要安装它就可以了。这种软件安装包通常是一个RPM包(Redhat Linux Packet Manager,就是Redhat的包管理器),后缀是.rpm。
​
常用命令组合:
-ivh:安装显示安装进度--install--verbose--hash
-Uvh:升级软件包--Update;
-qpl:列出RPM软件包内的文件信息[Query Package list];
-qpi:列出RPM软件包的描述信息[Query Package install package(s)];
-qf:查找指定文件属于哪个RPM软件包[Query File];
-Va:校验所有的RPM软件包,查找丢失的文件[View Lost];
-e:删除包

1、安装依赖环境

在rabbitmq目录下进行即可
​
yum -y install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz

2、安装Erlang

rpm -ivh erlang-22.0.7-1.el7.x86_64.rpm

3、安装RabbitMQ

安装依赖的包
rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
​
安装RabbitMQ
rpm -ivh rabbitmq-server-3.7.18-1.el7.noarch.rpm

4、RabbitMQ服务命令

systemctl start rabbitmq-server  # 启动服务
systemctl stop rabbitmq-server  # 停止服务
systemctl restart rabbitmq-server  # 重启服务
systemctl status rabbitmq-server    #查看状态
​
可以先启动服务

5、开启管理界面(图形化)

rabbitmq-plugins enable rabbitmq_management

6、修改默认配置信息

vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/ebin/rabbit.app 
​
# 比如修改密码、配置等等,例如:loopback_users中的 <<"guest">>,只保留guest
修改前:
39             {loopback_users, [<<"guest">>]},
​
修改后:
 39             {loopback_users, "guest"},

修改后可以重启一下RabbitMQ

systemctl restart rabbitmq-server

7、打开客户端

浏览器地址为
http://192.168.111.127:15672/
192.168.111.127是主机的IP地址
15672:1代表着客户端,5672是默认端口号
​
初始用户和密码都是guest
登陆后出现下图即为安装成功

1.6 使用图形化添加一个用户和虚拟主机

1.6.1 添加一个用户

1.6.2 添加一个虚拟主机

1.6.3 配置用户和虚拟主机

这里是将新建的/lwl虚拟主机赋权给lwl用户

1.6.4 删除一个虚拟主机

1.6.5 使用可视化完成消息收发

创建一个队列,在该队列中存放一条消息,再接收一下这条消息

1.7 java测试RabbitMQ的工作模式

这里使用maven项目或者是SpringBoot项目都可以

这里使用的是maven项目,后面有使用SpringBoot项目的示例

创建项目时,一般是父模块用来加载pom文件
子模块用来进行编写代码,子模块会继承父模块的pom文件
​
父模块 testRabbitMQ
子模块1 rabbitprocedure
子模块2 rabbitconsumer

父模块添加依赖

<dependency>
    <groupId>com.rabbitmq</groupId>
    <artifactId>amqp-client</artifactId>
    <version>5.6.0</version>
</dependency>

1.7.1 简单模式(Hello world)

生产者

package com.aaa.simple;
​
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
​
import java.io.IOException;
import java.util.concurrent.TimeoutException;
​
/**
 * 简单模式生产者1
 */
public class Procedure1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建工厂
        ConnectionFactory conn = new ConnectionFactory();
        //2、设置参数
        conn.setHost("192.168.111.127");  //设置ip地址,默认为localhost
        conn.setPort(5672);               //设置端口号,默认为5672
        conn.setUsername("lwl");          //设置rabbit登录用户名,默认为guest
        conn.setPassword("密码");  //设置rabbit登录密码,默认为guest
        conn.setVirtualHost("/lwl");      //设置虚拟主机,默认为 /
​
        //3、需要通过管道建立连接 , 向上抛出异常
        Connection connection = conn.newConnection();
        //4、创建channel
        Channel channel = connection.createChannel();
​
        //5、声明队列
        /** 查看方法中的参数,按着ctrl,点击下面的queueDeclare()会进入到接口中,然后在进入到对应的实现类中
         * (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         * queue:队列的名字
         * durable:是否需要持久化
         * exclusive:是否独占(一个消费者对应一个生产者)
         * autoDelete:是否自动删除
         * arguments:对应的参数
         */
        channel.queueDeclare("testrabbit",false,false,true,null);
        //6、发布消息
        /**
         * (String exchange, String routingKey, BasicProperties props, byte[] body)
         * exchange:交换机的名字
         * routingKey:绑定的key的名字,路由名称,在简单模式下需要和队列的名字一致
         * props:交换机的属性
         * body:要发布的消息内容的byte数组
         */
        byte[] msg = "hello rabbit!".getBytes();
        channel.basicPublish("","testrabbit",null,msg);
    }
}

运行完生产者之后,可以在可视化界面看一下相应的队列中是否有对应的消息

消费者

package com.aaa.simple;
​
import com.rabbitmq.client.*;
​
import java.io.IOException;
import java.util.concurrent.TimeoutException;
​
/**
 * 简单模式消费者1
 */
public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建工厂
        ConnectionFactory conn = new ConnectionFactory();
        //2、设置参数
        conn.setHost("192.168.111.127");  //设置ip地址,默认为localhost
        conn.setPort(5672);               //设置端口号,默认为5672
        conn.setUsername("lwl");          //设置rabbit登录用户名,默认为guest
        conn.setPassword("密码");  //设置rabbit登录密码,默认为guest
        conn.setVirtualHost("/lwl");      //设置虚拟主机,默认为 /
​
        //3、需要通过管道建立连接 , 向上抛出异常
        Connection connection = conn.newConnection();
        //4、创建channel
        Channel channel = connection.createChannel();
        //5、创建consumer对象:consumer是接口,所以这里需要使用匿名函数
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 重写里面的handleDelivery方法
             *  consumerTag:备注的一些信息
             *  envelope:封装的信息(比如交换机、路由等)
             *  properties:协议的属性
             *  body:获取到的队列里面的消息(字节数组形式)
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("consumerTag = " + consumerTag);
                System.out.println("envelope = " + envelope.getExchange());
                System.out.println("envelope = " + envelope.getRoutingKey());
                System.out.println("properties = " + properties);
                System.out.println("body = " + new String(body));
            }
        };
​
        //6、消费队列里面的消息
        /**
         * basicConsume有很多中构造方法,这里使用的是有以下三个参数的构造方法
         * (String queue, boolean autoAck, Consumer callback)
         * queue:代表队列的名字 刚才定义的队列名字为test
         * autoAck:是否自动确认
         * callback:consumer对象
         */
        channel.basicConsume("test",true,consumer);
​
​
        /**
         * consumerTag = amq.ctag-r2fL0LOWbFZoXFhzvRUG2w
         * envelope =
         * envelope = test
         * properties = #contentHeader<basic>(content-type=null, content-encoding=null, headers={}, delivery-mode=1, priority=null, correlation-id=null, reply-to=null, expiration=null, message-id=null, timestamp=null, type=null, user-id=null, app-id=null, cluster-id=null)
         * body = 测试发布消息
         */
    }
}

1.7.2 工作队列模式(Work queues)

Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列中的消息。

应用场景:对于 任务过重或任务较多情况使用工作队列可以提高任务处理的速度。

1.7.2.1轮询分发机制案例

生产者

package com.aaa.workqueue;
​
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
​
import java.io.IOException;
import java.util.concurrent.TimeoutException;
​
/**
 * 工作队列模式生产者1
 */
public class Procedure1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建工厂
        ConnectionFactory conn = new ConnectionFactory();
        //2、设置参数
        conn.setHost("192.168.111.127");  //设置ip地址,默认为localhost
        conn.setPort(5672);               //设置端口号,默认为5672
        conn.setUsername("lwl");          //设置rabbit登录用户名,默认为guest
        conn.setPassword("密码");  //设置rabbit登录密码,默认为guest
        conn.setVirtualHost("/lwl");      //设置虚拟主机,默认为 /
​
        //3、需要通过管道建立连接 , 向上抛出异常
        Connection connection = conn.newConnection();
        //4、创建channel
        Channel channel = connection.createChannel();
​
        //5、声明队列
        /** 查看方法中的参数,按着ctrl,点击下面的queueDeclare()会进入到接口中,然后在进入到对应的实现类中
         * (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         * queue:队列的名字
         * durable:是否需要持久化
         * exclusive:是否独占(一个消费者对应一个生产者)
         * autoDelete:是否自动删除
         * arguments:对应的参数
         */
        channel.queueDeclare("workqueue",false,false,true,null);
        //6、发布消息
        /**
         * (String exchange, String routingKey, BasicProperties props, byte[] body)
         * exchange:交换机的名字
         * routingKey:绑定的key的名字,路由名称,在简单模式下需要和队列的名字一致
         * props:交换机的属性
         * body:要发布的消息内容的byte数组
         */
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("","workqueue",null,("work"+i).getBytes());
        }
​
    }
}

消费者1

package com.aaa.workqueue;
​
import com.rabbitmq.client.*;
​
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
​
/**
 * 工作队列模式消费者1
 */
public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建工厂
        ConnectionFactory conn = new ConnectionFactory();
        //2、设置参数
        conn.setHost("192.168.111.127");  //设置ip地址,默认为localhost
        conn.setPort(5672);               //设置端口号,默认为5672
        conn.setUsername("lwl");          //设置rabbit登录用户名,默认为guest
        conn.setPassword("密码");  //设置rabbit登录密码,默认为guest
        conn.setVirtualHost("/lwl");      //设置虚拟主机,默认为 /
​
        //3、需要通过管道建立连接 , 向上抛出异常
        Connection connection = conn.newConnection();
        //4、创建channel
        channel.basicQos(1);
        //5、创建consumer对象:consumer是接口,所以这里需要使用匿名函数
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 重写里面的handleDelivery方法
             *  consumerTag:备注的一些信息
             *  envelope:封装的信息(比如交换机、路由等)
             *  properties:协议的属性
             *  body:获取到的队列里面的消息(字节数组形式)
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
​
            }
        };
​
        //6、消费队列里面的消息
        /**
         * basicConsume有很多中构造方法,这里使用的是有以下三个参数的构造方法
         * (String queue, boolean autoAck, Consumer callback)
         * queue:代表队列的名字 刚才定义的队列名字为test
         * autoAck:是否自动确认
         * callback:consumer对象
         */
        channel.basicConsume("workqueue",true,consumer);
    }
}

消费者2

package com.aaa.workqueue;
​
import com.rabbitmq.client.*;
​
import java.io.IOException;
import java.util.concurrent.TimeoutException;
​
/**
 * 工作队列模式消费者2
 */
public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建工厂
        ConnectionFactory conn = new ConnectionFactory();
        //2、设置参数
        conn.setHost("192.168.111.127");  //设置ip地址,默认为localhost
        conn.setPort(5672);               //设置端口号,默认为5672
        conn.setUsername("lwl");          //设置rabbit登录用户名,默认为guest
        conn.setPassword("密码");  //设置rabbit登录密码,默认为guest
        conn.setVirtualHost("/lwl");      //设置虚拟主机,默认为 /
​
        //3、需要通过管道建立连接 , 向上抛出异常
        Connection connection = conn.newConnection();
        //4、创建channel
        Channel channel = connection.createChannel();
        //5、创建consumer对象:consumer是接口,所以这里需要使用匿名函数
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 重写里面的handleDelivery方法
             *  consumerTag:备注的一些信息
             *  envelope:封装的信息(比如交换机、路由等)
             *  properties:协议的属性
             *  body:获取到的队列里面的消息(字节数组形式)
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
​
            }
        };
        //6、消费队列里面的消息
        /**
         * basicConsume有很多中构造方法,这里使用的是有以下三个参数的构造方法
         * (String queue, boolean autoAck, Consumer callback)
         * queue:代表队列的名字 刚才定义的队列名字为test
         * autoAck:是否自动确认
         * callback:consumer对象
         */
        channel.basicConsume("workqueue",true,consumer);
        
    }
}

上面这种模式,多个消费者是轮询分发的机制,多个消费者一定会自动平均分配消息队列中的消息

而为什么会自动平均分配呢?
​
消息自动确认
  因为我们在消费者消费的时候有一个参数设置为 autoAck:true,我们设置消费者接收消息自动确认,而一般都是队列中分配好了那个消费者要传递什么信息,直接一次全部传递过去,不是消费一个确认一下。消费者接收到所有消息之后自动确认,队列中会标记删除所有的信息,他不关心你接收完信息之后的后续业务操作。就是只关心你是否收到了数据。
   消息自动确认机制,完成一项任务可能需要几秒钟甚至几分钟,如果一个消费者开始了一项长期的任务,却只完成了一部分就挂了,那么RabbitMQ一旦将消息传递给消费者,就会立刻标记删除,那么因为消费者挂了,接收数据的时候已经确认应答了,队列中的数据也删除了,所以剩余接收到的信息也没了。 
​
如果消息手动确认
  如果生产者发送10条消息,消费者1拿到5条,消费者2拿到5条,不进行自动应答,服务器队列的数据即使消费了,我们没有应答就不会被标记删除,保证服务器队列中的数据一直还在。如果消费者处理完了这条数据,那么手动确认,队列中知道已经确认了进行删除
  
1、消费消息的时候,参数设置为false

channel.basicConsume("workqueue",false,consumer);
​
2、我们在 handlerDeliver 方法中也需要设置手动确认
    
 // 手动确认
       channel.basicAck(envelope.getDeliveryTag(), false);
​
  我们设置一个信道每次只能消费一个消息,如果其中一个消费者服务器挂掉了,连接就断掉了,剩余的未被手动确认的数据还在队列中保存。也能及时得把剩余的消息继续交给消费者2进行处理,不耽误业务的持续进行。
  这就是能者多劳的机制。就是说处理快的消费者处理完业务会很快的手动确认,然后再次进行接收新的消息,处理慢的消费者经过一段时间处理之后再进行确认,就会能者多劳,业务处理快的接受的消息多,处理满的接受的少
​
3、在消费消息之前设置信道中接收消息只能是1个
chanel.basicQos(1); // 设置信道中一次只能消费一个信息

这样就能实现能者多劳机制

1.7.2.2能者多劳机制案例

生产者

package com.aaa.workqueue;
​
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
​
import java.io.IOException;
import java.util.concurrent.TimeoutException;
​
/**
 * 工作队列模式生产者1
 */
public class Procedure1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建工厂
        ConnectionFactory conn = new ConnectionFactory();
        //2、设置参数
        conn.setHost("192.168.111.127");  //设置ip地址,默认为localhost
        conn.setPort(5672);               //设置端口号,默认为5672
        conn.setUsername("lwl");          //设置rabbit登录用户名,默认为guest
        conn.setPassword("密码");  //设置rabbit登录密码,默认为guest
        conn.setVirtualHost("/lwl");      //设置虚拟主机,默认为 /
​
        //3、需要通过管道建立连接 , 向上抛出异常
        Connection connection = conn.newConnection();
        //4、创建channel
        Channel channel = connection.createChannel();
​
        //5、声明队列
        /** 查看方法中的参数,按着ctrl,点击下面的queueDeclare()会进入到接口中,然后在进入到对应的实现类中
         * (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         * queue:队列的名字
         * durable:是否需要持久化
         * exclusive:是否独占(一个消费者对应一个生产者)
         * autoDelete:是否自动删除
         * arguments:对应的参数
         */
        channel.queueDeclare("workqueue",false,false,true,null);
        //6、发布消息
        /**
         * (String exchange, String routingKey, BasicProperties props, byte[] body)
         * exchange:交换机的名字
         * routingKey:绑定的key的名字,路由名称,在简单模式下需要和队列的名字一致
         * props:交换机的属性
         * body:要发布的消息内容的byte数组
         */
        for (int i = 0; i < 10; i++) {
            channel.basicPublish("","workqueue",null,("work"+i).getBytes());
        }
​
    }
}

消费者1

package com.aaa.workqueue;
​
import com.rabbitmq.client.*;
​
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
​
/**
 * 工作队列模式消费者1
 */
public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建工厂
        ConnectionFactory conn = new ConnectionFactory();
        //2、设置参数
        conn.setHost("192.168.111.127");  //设置ip地址,默认为localhost
        conn.setPort(5672);               //设置端口号,默认为5672
        conn.setUsername("lwl");          //设置rabbit登录用户名,默认为guest
        conn.setPassword("密码");  //设置rabbit登录密码,默认为guest
        conn.setVirtualHost("/lwl");      //设置虚拟主机,默认为 /
​
        //3、需要通过管道建立连接 , 向上抛出异常
        Connection connection = conn.newConnection();
        //4、创建channel
        Channel channel = connection.createChannel();
        // 设置信道中一次只能消费一个信息
        channel.basicQos(1);
        //5、创建consumer对象:consumer是接口,所以这里需要使用匿名函数
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 重写里面的handleDelivery方法
             *  consumerTag:备注的一些信息
             *  envelope:封装的信息(比如交换机、路由等)
             *  properties:协议的属性
             *  body:获取到的队列里面的消息(字节数组形式)
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
                //让消费者1休眠
                try {
                    TimeUnit.SECONDS.sleep(1);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                // 消息id,mq 在 channel 中用来标识消息的 id,可用于确认消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                /**
                 * @param1:deliveryTag:用来标识消息的id
                 * @param2:multiple:是否批量。true:将一次性 ACK 所有小于 deliveryTag 的消息
                 */
                // 手动确认
                channel.basicAck(deliveryTag, false);
            }
        };
​
        //6、消费队列里面的消息
        /**
         * basicConsume有很多中构造方法,这里使用的是有以下三个参数的构造方法
         * (String queue, boolean autoAck, Consumer callback)
         * queue:代表队列的名字 刚才定义的队列名字为test
         * autoAck:是否自动确认
         * callback:consumer对象
         */
        channel.basicConsume("workqueue",false,consumer);
    }
}

消费者2

package com.aaa.workqueue;
​
import com.rabbitmq.client.*;
​
import java.io.IOException;
import java.util.concurrent.TimeoutException;
​
/**
 * 工作队列模式消费者2
 */
public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建工厂
        ConnectionFactory conn = new ConnectionFactory();
        //2、设置参数
        conn.setHost("192.168.111.127");  //设置ip地址,默认为localhost
        conn.setPort(5672);               //设置端口号,默认为5672
        conn.setUsername("lwl");          //设置rabbit登录用户名,默认为guest
        conn.setPassword("密码");  //设置rabbit登录密码,默认为guest
        conn.setVirtualHost("/lwl");      //设置虚拟主机,默认为 /
​
        //3、需要通过管道建立连接 , 向上抛出异常
        Connection connection = conn.newConnection();
        //4、创建channel
        Channel channel = connection.createChannel();
        // 设置信道中一次只能消费一个信息
        channel.basicQos(1);
        //5、创建consumer对象:consumer是接口,所以这里需要使用匿名函数
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 重写里面的handleDelivery方法
             *  consumerTag:备注的一些信息
             *  envelope:封装的信息(比如交换机、路由等)
             *  properties:协议的属性
             *  body:获取到的队列里面的消息(字节数组形式)
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
​
                // 消息id,mq 在 channel 中用来标识消息的 id,可用于确认消息已接收
                long deliveryTag = envelope.getDeliveryTag();
                /**
                 * @param1:deliveryTag:用来标识消息的id
                 * @param2:multiple:是否批量。true:将一次性 ACK 所有小于 deliveryTag 的消息
                 */
                // 手动确认
                channel.basicAck(deliveryTag, false);
            }
        };
        //6、消费队列里面的消息
        /**
         * basicConsume有很多中构造方法,这里使用的是有以下三个参数的构造方法
         * (String queue, boolean autoAck, Consumer callback)
         * queue:代表队列的名字 刚才定义的队列名字为test
         * autoAck:是否自动确认
         * callback:consumer对象
         */
        channel.basicConsume("workqueue",false,consumer);
​
    }
}
因为使用了手动提交,而且每次队列中只有一条消息,所以消息不再是平分,
没有睡眠的消费者2效率会更高,消费的消息会比消费者1多

1.7.3 发布订阅模式(Publish/Subscribe)

前面案例中,只有3个角色:

- P:生产者,也就是要发送消息的程序
- C:消费者:消息的接受者,会一直等待消息到来。
- queue:消息队列,图中红色部分

而在订阅模型中,多了一个exchange角色,而且过程略有变化:
生产者发消息给交换机,交换机将消息路由分发给队列,消费者监听队列接收信息
• P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
• C:消费者,消息的接受者,会一直等待消息到来。
• Queue:消息队列,接收消息、缓存消息。
• Exchange:交换机,图中的X。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
    ◦ Fanout:广播,将消息交给所有绑定到交换机的队列
    ◦ Direct:定向,把消息交给符合指定routing key 的队列
    ◦ Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
​
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

生产者

package com.aaa.workqueue;
​
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
​
import java.io.IOException;
import java.util.concurrent.TimeoutException;
​
/**
 * 发布订阅模式生产者1
 */
public class Procedure1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建工厂
        ConnectionFactory conn = new ConnectionFactory();
        //2、设置参数
        conn.setHost("192.168.111.127");  //设置ip地址,默认为localhost
        conn.setPort(5672);               //设置端口号,默认为5672
        conn.setUsername("lwl");          //设置rabbit登录用户名,默认为guest
        conn.setPassword("密码");  //设置rabbit登录密码,默认为guest
        conn.setVirtualHost("/lwl");      //设置虚拟主机,默认为 /
​
        //3、需要通过管道建立连接 , 向上抛出异常
        Connection connection = conn.newConnection();
        //4、创建channel
        Channel channel = connection.createChannel();
​
        //5、声明一个交换机
        String exchangeName = "ex_fan_out";
        /**
         * (String exchange, BuiltinExchangeType type, boolean durable)
         * exchange:交换机的名字
         * type:交换机的名字
         * durable:是否需要持久化
         */
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,false);
​
        //6、声明两个队列
        /**
         * (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         * queue:队列的名字
         * durable:是否需要持久化
         * exclusive:是否独占(一个消费者对应一个生产者)
         * autoDelete:是否自动删除
         * arguments:对应的参数
         */
        String queueName1 = "ex_fan_out_queue1";
        String queueName2 = "ex_fan_out_queue2";
        channel.queueDeclare(queueName1,false,false,true,null);
        channel.queueDeclare(queueName2,false,false,true,null);
​
        //7、将队列和交换机进行绑定
        /** queueBind 和 exchangeBind是一样的使用
         *(String queue, String exchange, String routingKey)
         * queue:队列的名字
         * exchange:交换机的名字
         * routingKey:路由名称,广播模式下不需要,设置为""即可
         */
        channel.queueBind(queueName1,exchangeName,"");
        channel.queueBind(queueName2,exchangeName,"");
​
        //6、发布消息
        /**
         * (String exchange, String routingKey, BasicProperties props, byte[] body)
         * exchange:交换机的名字
         * routingKey:绑定的key的名字,路由名称,在简单模式下需要和队列的名字一致
         * props:交换机的属性
         * body:要发布的消息内容的byte数组
         */
        channel.basicPublish(exchangeName,"",null,"exchange_queue".getBytes());
    }
}

消费者1

package com.aaa.publish;
​
import com.rabbitmq.client.*;
​
import java.io.IOException;
import java.util.concurrent.TimeoutException;
​
/**
 * 发布订阅模式消费者1
 */
public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建工厂
        ConnectionFactory conn = new ConnectionFactory();
        //2、设置参数
        conn.setHost("192.168.111.127");  //设置ip地址,默认为localhost
        conn.setPort(5672);               //设置端口号,默认为5672
        conn.setUsername("lwl");          //设置rabbit登录用户名,默认为guest
        conn.setPassword("密码");  //设置rabbit登录密码,默认为guest
        conn.setVirtualHost("/lwl");      //设置虚拟主机,默认为 /
​
        //3、需要通过管道建立连接 , 向上抛出异常
        Connection connection = conn.newConnection();
        //4、创建channel
        Channel channel = connection.createChannel();
        //5、创建consumer对象:consumer是接口,所以这里需要使用匿名函数
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 重写里面的handleDelivery方法
             *  consumerTag:备注的一些信息
             *  envelope:封装的信息(比如交换机、路由等)
             *  properties:协议的属性
             *  body:获取到的队列里面的消息(字节数组形式)
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
            }
        };
​
        //6、消费队列里面的消息
        /**
         * basicConsume有很多中构造方法,这里使用的是有以下三个参数的构造方法
         * (String queue, boolean autoAck, Consumer callback)
         * queue:代表队列的名字 刚才定义的队列名字为test
         * autoAck:是否自动确认
         * callback:consumer对象
         */
        String queueName1 = "ex_fan_out_queue1";
        channel.basicConsume(queueName1,true,consumer);
    }
}

消费者2

package com.aaa.publish;
​
import com.rabbitmq.client.*;
​
import java.io.IOException;
import java.util.concurrent.TimeoutException;
​
/**
 * 发布订阅模式消费者2
 */
public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建工厂
        ConnectionFactory conn = new ConnectionFactory();
        //2、设置参数
        conn.setHost("192.168.111.127");  //设置ip地址,默认为localhost
        conn.setPort(5672);               //设置端口号,默认为5672
        conn.setUsername("lwl");          //设置rabbit登录用户名,默认为guest
        conn.setPassword("密码");  //设置rabbit登录密码,默认为guest
        conn.setVirtualHost("/lwl");      //设置虚拟主机,默认为 /
​
        //3、需要通过管道建立连接 , 向上抛出异常
        Connection connection = conn.newConnection();
        //4、创建channel
        Channel channel = connection.createChannel();
        //5、创建consumer对象:consumer是接口,所以这里需要使用匿名函数
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 重写里面的handleDelivery方法
             *  consumerTag:备注的一些信息
             *  envelope:封装的信息(比如交换机、路由等)
             *  properties:协议的属性
             *  body:获取到的队列里面的消息(字节数组形式)
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
            }
        };
​
        //6、消费队列里面的消息
        /**
         * basicConsume有很多中构造方法,这里使用的是有以下三个参数的构造方法
         * (String queue, boolean autoAck, Consumer callback)
         * queue:代表队列的名字 刚才定义的队列名字为test
         * autoAck:是否自动确认
         * callback:consumer对象
         */
        String queueName2 = "ex_fan_out_queue2";
        channel.basicConsume(queueName2,true,consumer);
    }
}
因为生产者发布的消息中routingkey是"",也就是广播模式
两个队列都能匹配上,
所以同样的消息两个消费者都能消费

1.7.4 路由模式(Routing)

路由模式特点

• 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
​
• 消息的发送方在 向Exchange发送消息时,也必须指定消息的 RoutingKey。
​
• Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

• P:生产者,向Exchange发送消息,发送消息时,会指定一个routing key。
​
• X:Exchange(交换机),接收生产者的消息,然后把消息递交给 与routing key完全匹配的队列
​
• C1:消费者,其所在队列指定了需要routing key 为 error 的消息
​
• C2:消费者,其所在队列指定了需要routing key 为 info、error、warning 的消息

生产者

package com.aaa.routing;
​
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
​
import java.io.IOException;
import java.util.concurrent.TimeoutException;
​
/**
 * 路由模式生产者1
 */
public class Procedure1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建工厂
        ConnectionFactory conn = new ConnectionFactory();
        //2、设置参数
        conn.setHost("192.168.111.127");  //设置ip地址,默认为localhost
        conn.setPort(5672);               //设置端口号,默认为5672
        conn.setUsername("lwl");          //设置rabbit登录用户名,默认为guest
        conn.setPassword("密码");  //设置rabbit登录密码,默认为guest
        conn.setVirtualHost("/lwl");      //设置虚拟主机,默认为 /
​
        //3、需要通过管道建立连接 , 向上抛出异常
        Connection connection = conn.newConnection();
        //4、创建channel
        Channel channel = connection.createChannel();
​
        //5、声明一个交换机
        String exchangeName = "ex_dir";
        /**
         * (String exchange, BuiltinExchangeType type, boolean durable)
         * exchange:交换机的名字
         * type:交换机的名字
         * durable:是否需要持久化
         */
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,false);
​
        //6、声明两个队列
        /**
         * (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         * queue:队列的名字
         * durable:是否需要持久化
         * exclusive:是否独占(一个消费者对应一个生产者)
         * autoDelete:是否自动删除
         * arguments:对应的参数
         */
        String queueName1 = "ex_dir_queue1";
        String queueName2 = "ex_dir_queue2";
        channel.queueDeclare(queueName1,false,false,true,null);
        channel.queueDeclare(queueName2,false,false,true,null);
​
        //7、将队列和交换机进行绑定
        /** queueBind 和 exchangeBind是一样的使用
         *(String queue, String exchange, String routingKey)
         * queue:队列的名字
         * exchange:交换机的名字
         * routingKey:路由名称,路由模式下需要定义
         */
        channel.queueBind(queueName1,exchangeName,"error");
​
        channel.queueBind(queueName2,exchangeName,"error");
        channel.queueBind(queueName2,exchangeName,"info");
        channel.queueBind(queueName2,exchangeName,"warning");
​
        //6、发布消息
        /**
         * (String exchange, String routingKey, BasicProperties props, byte[] body)
         * exchange:交换机的名字
         * routingKey:绑定的key的名字,路由名称,在简单模式下需要和队列的名字一致
         * props:交换机的属性
         * body:要发布的消息内容的byte数组
         */
        //如果这里routingKey定义为error,那么按照绑定路由和交换机的规则,两个消费者都能收到消息
        //如果这里routingKey定义为warning,那么按照绑定路由和交换机的规则,只有消费者2能收到消息
        channel.basicPublish(exchangeName,"info",null,"exchange_dir_queue".getBytes());
    }
}

消费者1

package com.aaa.routing;
​
import com.rabbitmq.client.*;
​
import java.io.IOException;
import java.util.concurrent.TimeoutException;
​
/**
 * 路由模式消费者1
 */
public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建工厂
        ConnectionFactory conn = new ConnectionFactory();
        //2、设置参数
        conn.setHost("192.168.111.127");  //设置ip地址,默认为localhost
        conn.setPort(5672);               //设置端口号,默认为5672
        conn.setUsername("lwl");          //设置rabbit登录用户名,默认为guest
        conn.setPassword("密码");  //设置rabbit登录密码,默认为guest
        conn.setVirtualHost("/lwl");      //设置虚拟主机,默认为 /
​
        //3、需要通过管道建立连接 , 向上抛出异常
        Connection connection = conn.newConnection();
        //4、创建channel
        Channel channel = connection.createChannel();
        //5、创建consumer对象:consumer是接口,所以这里需要使用匿名函数
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 重写里面的handleDelivery方法
             *  consumerTag:备注的一些信息
             *  envelope:封装的信息(比如交换机、路由等)
             *  properties:协议的属性
             *  body:获取到的队列里面的消息(字节数组形式)
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
            }
        };
​
        //6、消费队列里面的消息
        /**
         * basicConsume有很多中构造方法,这里使用的是有以下三个参数的构造方法
         * (String queue, boolean autoAck, Consumer callback)
         * queue:代表队列的名字 刚才定义的队列名字为test
         * autoAck:是否自动确认
         * callback:consumer对象
         */
        String queueName1 = "ex_dir_queue1";
        channel.basicConsume(queueName1,true,consumer);
    }
}

消费者2

package com.aaa.routing;
​
import com.rabbitmq.client.*;
​
import java.io.IOException;
import java.util.concurrent.TimeoutException;
​
/**
 * 路由模式消费者2
 */
public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建工厂
        ConnectionFactory conn = new ConnectionFactory();
        //2、设置参数
        conn.setHost("192.168.111.127");  //设置ip地址,默认为localhost
        conn.setPort(5672);               //设置端口号,默认为5672
        conn.setUsername("lwl");          //设置rabbit登录用户名,默认为guest
        conn.setPassword("密码");  //设置rabbit登录密码,默认为guest
        conn.setVirtualHost("/lwl");      //设置虚拟主机,默认为 /
​
        //3、需要通过管道建立连接 , 向上抛出异常
        Connection connection = conn.newConnection();
        //4、创建channel
        Channel channel = connection.createChannel();
        //5、创建consumer对象:consumer是接口,所以这里需要使用匿名函数
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 重写里面的handleDelivery方法
             *  consumerTag:备注的一些信息
             *  envelope:封装的信息(比如交换机、路由等)
             *  properties:协议的属性
             *  body:获取到的队列里面的消息(字节数组形式)
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
            }
        };
​
        //6、消费队列里面的消息
        /**
         * basicConsume有很多中构造方法,这里使用的是有以下三个参数的构造方法
         * (String queue, boolean autoAck, Consumer callback)
         * queue:代表队列的名字 刚才定义的队列名字为test
         * autoAck:是否自动确认
         * callback:consumer对象
         */
        String queueName2 = "ex_dir_queue2";
        channel.basicConsume(queueName2,true,consumer);
    }
}
因为生产者发布的消息中routingkey是info
所以只有队列ex_dir_queue2能匹配上,
所以只有消费者2能消费消息

1.7.5 通配符模式(Topics)

Topic类型与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符

Routingkey 一般都是有一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert
​
通配符规则:
#:匹配一个或多个词 
*:匹配一个词
举例:
item.#:能够匹配item.insert.abc 或者 item.insert
item.*:只能匹配item.insert 或者 item.abc

 

 

• 红色Queue:绑定的是usa.# ,因此凡是以 usa.开头的routing key 都会被匹配到
​
• 黄色Queue:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配

生产者

package com.aaa.topics;
​
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
​
import java.io.IOException;
import java.util.concurrent.TimeoutException;
​
/**
 * 主题(通配符)模式生产者1
 */
public class Procedure1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建工厂
        ConnectionFactory conn = new ConnectionFactory();
        //2、设置参数
        conn.setHost("192.168.111.127");  //设置ip地址,默认为localhost
        conn.setPort(5672);               //设置端口号,默认为5672
        conn.setUsername("lwl");          //设置rabbit登录用户名,默认为guest
        conn.setPassword("密码");  //设置rabbit登录密码,默认为guest
        conn.setVirtualHost("/lwl");      //设置虚拟主机,默认为 /
​
        //3、需要通过管道建立连接 , 向上抛出异常
        Connection connection = conn.newConnection();
        //4、创建channel
        Channel channel = connection.createChannel();
​
        //5、声明一个交换机
        String exchangeName = "ex_topic";
        /**
         * (String exchange, BuiltinExchangeType type, boolean durable)
         * exchange:交换机的名字
         * type:交换机的名字
         * durable:是否需要持久化
         */
        channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,false);
​
        //6、声明两个队列
        /**
         * (String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
         * queue:队列的名字
         * durable:是否需要持久化
         * exclusive:是否独占(一个消费者对应一个生产者)
         * autoDelete:是否自动删除
         * arguments:对应的参数
         */
        String queueName1 = "ex_topic_queue1";
        String queueName2 = "ex_topic_queue2";
        channel.queueDeclare(queueName1,false,false,true,null);
        channel.queueDeclare(queueName2,false,false,true,null);
​
        //7、将队列和交换机进行绑定
        /** queueBind 和 exchangeBind是一样的使用
         *(String queue, String exchange, String routingKey)
         * queue:队列的名字
         * exchange:交换机的名字
         * routingKey:路由名称,主题模式下需要定义的有通配符
         */
        channel.queueBind(queueName1,exchangeName,"*.error");
​
        channel.queueBind(queueName2,exchangeName,"#.error");
        channel.queueBind(queueName2,exchangeName,"#.info");
        channel.queueBind(queueName2,exchangeName,"#.warning");
​
        //6、发布消息
        /**
         * (String exchange, String routingKey, BasicProperties props, byte[] body)
         * exchange:交换机的名字
         * routingKey:绑定的key的名字,路由名称,在简单模式下需要和队列的名字一致
         * props:交换机的属性
         * body:要发布的消息内容的byte数组
         */
        //发布消息的时候不能有通配符
        channel.basicPublish(exchangeName,"test.aaa.error",null,"exchange_topic_queue".getBytes());
    }
}

消费者1

package com.aaa.topics;
​
import com.rabbitmq.client.*;
​
import java.io.IOException;
import java.util.concurrent.TimeoutException;
​
/**
 * 主题模式消费者1
 */
public class Consumer1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建工厂
        ConnectionFactory conn = new ConnectionFactory();
        //2、设置参数
        conn.setHost("192.168.111.127");  //设置ip地址,默认为localhost
        conn.setPort(5672);               //设置端口号,默认为5672
        conn.setUsername("lwl");          //设置rabbit登录用户名,默认为guest
        conn.setPassword("密码");  //设置rabbit登录密码,默认为guest
        conn.setVirtualHost("/lwl");      //设置虚拟主机,默认为 /
​
        //3、需要通过管道建立连接 , 向上抛出异常
        Connection connection = conn.newConnection();
        //4、创建channel
        Channel channel = connection.createChannel();
        //5、创建consumer对象:consumer是接口,所以这里需要使用匿名函数
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 重写里面的handleDelivery方法
             *  consumerTag:备注的一些信息
             *  envelope:封装的信息(比如交换机、路由等)
             *  properties:协议的属性
             *  body:获取到的队列里面的消息(字节数组形式)
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
            }
        };
​
        //6、消费队列里面的消息
        /**
         * basicConsume有很多中构造方法,这里使用的是有以下三个参数的构造方法
         * (String queue, boolean autoAck, Consumer callback)
         * queue:代表队列的名字 刚才定义的队列名字为test
         * autoAck:是否自动确认
         * callback:consumer对象
         */
        String queueName1 = "ex_topic_queue1";
        String queueName2 = "ex_topic_queue2";
        channel.basicConsume(queueName1,true,consumer);
    }
}

消费者2

package com.aaa.topics;
​
import com.rabbitmq.client.*;
​
import java.io.IOException;
import java.util.concurrent.TimeoutException;
​
/**
 * 主题模式消费者2
 */
public class Consumer2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1、创建工厂
        ConnectionFactory conn = new ConnectionFactory();
        //2、设置参数
        conn.setHost("192.168.111.127");  //设置ip地址,默认为localhost
        conn.setPort(5672);               //设置端口号,默认为5672
        conn.setUsername("lwl");          //设置rabbit登录用户名,默认为guest
        conn.setPassword("密码");  //设置rabbit登录密码,默认为guest
        conn.setVirtualHost("/lwl");      //设置虚拟主机,默认为 /
​
        //3、需要通过管道建立连接 , 向上抛出异常
        Connection connection = conn.newConnection();
        //4、创建channel
        Channel channel = connection.createChannel();
        //5、创建consumer对象:consumer是接口,所以这里需要使用匿名函数
        Consumer consumer = new DefaultConsumer(channel){
            /**
             * 重写里面的handleDelivery方法
             *  consumerTag:备注的一些信息
             *  envelope:封装的信息(比如交换机、路由等)
             *  properties:协议的属性
             *  body:获取到的队列里面的消息(字节数组形式)
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("body = " + new String(body));
            }
        };
​
        //6、消费队列里面的消息
        /**
         * basicConsume有很多中构造方法,这里使用的是有以下三个参数的构造方法
         * (String queue, boolean autoAck, Consumer callback)
         * queue:代表队列的名字 刚才定义的队列名字为test
         * autoAck:是否自动确认
         * callback:consumer对象
         */
        String queueName1 = "ex_topic_queue1";
        String queueName2 = "ex_topic_queue2";
        channel.basicConsume(queueName2,true,consumer);
    }
}
因为生产者发布的消息中routingkey是test.aaa.error
所以只有队列ex_topic_queue2能匹配上,
所以只有消费者2能消费消息

1.7.6 模式间的异同

1、简单模式 HelloWorld
一个生产者、一个消费者,不需要设置交换机(使用默认的交换机)
​
2、工作队列模式 Work Queue
一个生产者、多个消费者(竞争关系),不需要设置交换机(使用默认的交换机)
​
3、发布订阅模式 Publish/subscribe
需要设置类型为fanout的交换机,并且交换机和队列进行绑定,当发送消息到交换机后,交换机会将消息发送到绑定的队列
​
4、路由模式 Routing
需要设置类型为direct的交换机,交换机和队列进行绑定,并且指定routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
​
5、通配符模式 Topic
需要设置类型为topic的交换机,交换机和队列进行绑定,并且指定通配符方式的routing key,当发送消息到交换机后,交换机会根据routing key将消息发送到对应的队列
​
tips:
Topic主题模式可以实现 Publish/Subscribe发布与订阅模式 和 Routing路由模式 的功能;只是Topic在配置routing key 的时候可以使用通配符,显得更加灵活。

1.7.7 使用可视化实现主题模式

1、创建一个交换机

 

2、创建两个队列

 

3、绑定交换机和队列

①使用交换机绑定队列

 

②使用队列绑定交换机

 

 

 

4、使用交换机发布消息

 

5、查看结果

 

1.8 SpringBoot项目整合RabbitMQ

1.8.1 介绍

在Spring项目中,可以使用Spring-Rabbit去操作RabbitMQ

尤其是在spring boot项目中只需要引入对应的amqp启动器依赖即可,方便的使用RabbitTemplate发送消息,使用注解接收消息。

生产者工程:
1. application.yml或者是application.properties文件配置相关信息;
2. 在生产者工程中编写配置类,用于创建交换机和队列,并进行绑定
3. 注入RabbitTemplate对象,通过RabbitTemplate对象发送消息到交换机
​
​
消费者工程:
4. application.yml或者是application.properties文件配置相关信息
5. 创建消息处理类,用于接收队列中的消息并进行处理

1.8.2 创建父工程

父工程一般是为了加载依赖,创建一个SpringBoot项目

修改pom文件

    <!--1. 父工程依赖:先注释原本的父工程依赖 -->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.4.RELEASE</version>
        <relativePath/>
    </parent>
    
    
    
        <!-- 操作RabbitMQ的依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

修改配置文件(application.properties)

#主机的IP地址:默认为localhost
spring.rabbitmq.host=192.168.111.127
#虚拟主机的名字:默认为/
spring.rabbitmq.virtual-host=/lwl
#登录的用户名:默认为guest
spring.rabbitmq.username=lwl
#登录的密码:默认为guest
spring.rabbitmq.password=密码
#RabbitMQ的端口号:默认为5672
spring.rabbitmq.port=5672

1.8.3 搭建生产者工程

生产者工程和主工程可以建成父子项目
生产者工程可以maven项目或者是SpringBoot项目
    1.如果生产者工程是maven项目,是没有application.properties文件和启动类文件的,需要自己创建何编写
            其中application.properties是在resources目录下
            启动类文件是在java目录下的最外层包下
    2.如果是SpringBoot项目,那么在对应的配置文件中编写即可

如果父类中pom文件没有amqp的依赖,也可以在子项目中加上,本项目已经在父项目中配置过

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

1、配置文件

#主机的IP地址:默认为localhost
spring.rabbitmq.host=192.168.111.127
#虚拟主机的名字:默认为/
spring.rabbitmq.virtual-host=/lwl
#登录的用户名:默认为guest
spring.rabbitmq.username=lwl
#登录的密码:默认为guest
spring.rabbitmq.password=密码
#RabbitMQ的端口号:默认为5672
spring.rabbitmq.port=5672
​
#配置文件中避免有空格,会被错误识别

2、启动类文件

/**
 * 启动类文件一般是以项目名字加Test进行命名的
 * @SpringBootApplication:启动类注解
 * run方法中写的是当前类的class文件
 */
@SpringBootApplication
public class ProcedureRabbitTest {
    public static void main(String[] args) {
        SpringApplication.run(ProcedureRabbitTest.class);
    }
}

3、配置生产者交换机和队列

package com.aaa.procedure.test;
​
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
​
/**
 * @Configuration:声明为配置类,可以更早加载
 */
@Configuration
public class ProcedureConfig {
    //声明队列的名字
    private static final String ROUTING_QUEUE_NAME1 = "routing_queue_name1";
    private static final String ROUTING_QUEUE_NAME2 = "routing_queue_name2";
​
    //声明一个交换机的名字
    private static final String ROUTING_EXCHANGE_NAME = "routing_exchange_name";
​
    /**
     * 声明一个交换机并作为bean注入到spring容器中
     * durable默认为durable
     * autoDelete默认为true
     * 交换机起名字,在绑定队列时使用
     */
    @Bean(name = "ex")
    public Exchange getExchange(){
        //     交换机构建器.交换机类型(交换机名字).是否持久化.是否自动删除.构建
        return ExchangeBuilder.directExchange(ROUTING_EXCHANGE_NAME).durable(false).autoDelete().build();
    }
​
    /**
     * 声明一个队列并作为bean注入到spring容器中
     * autoDelete默认为true
     * 队列起名字,在绑定交换机时使用
     */
    @Bean(name = "qu1")
    public Queue getQueue1(){
        //      队列构建器.不持久化(队列名字).自动删除.构建
        return QueueBuilder.nonDurable(ROUTING_QUEUE_NAME1).autoDelete().build();
    }
    @Bean(name = "qu2")
    public Queue getQueue2(){
        //      队列构建器.不持久化(队列名字).自动删除.构建
        return QueueBuilder.nonDurable(ROUTING_QUEUE_NAME2).autoDelete().build();
    }
​
​
    /**
     * 绑定队列和交换机后作为bean注入到容器中
     * @Qualifier注解:为了标明对应的交换机和队列
     */
    @Bean
    public Binding bingQu1AndEx(@Qualifier("ex") Exchange exchange,@Qualifier("qu1") Queue queue){
        //  绑定构建器.队列名字.交换机名字.routingkey.参数
        return BindingBuilder.bind(queue).to(exchange).with("error").noargs();
    }
    @Bean
    public Binding bingQu2AndEx(@Qualifier("ex") Exchange exchange,@Qualifier("qu2") Queue queue){
        //  绑定构建器.队列名字.交换机名字.routingkey.参数
        return BindingBuilder.bind(queue).to(exchange).with("warnings").noargs();
    }
}

4、使用测试类发布消息

package com.aaa.procedure;
​
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringRunner;
​
import javax.annotation.Resource;
​
/**
 * @SpringBootTest:声明为测试类
 * 因为SpringBoot的版本较低,所以还需要添加这两个注解来确保项目没有问题
 * @RunWith(SpringRunner.class)
 * @ContextConfiguration(classes = {ProcedureRabbitTest.class})
 */
@SpringBootTest
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = ProcedureRabbitTest.class)
public class TestPublish {
​
    //将操作mq的对象引入进来,是通过jar包amqp进行封装的对象
    @Resource
    private RabbitTemplate rabbitTemplate;
​
    @Test
    public void testRouting(){
        /**
         * 使用convertAndSend发送消息
         * (String exchange, String routingKey, Object object)
         * exchange:交换机的名字
         * routingKey:定向路由
         * object:消息的内容
         */
        rabbitTemplate.convertAndSend("routing_exchange_name","warnings","测试SpringBoot路由模式生产者");
    }
}

1.8.4 搭建消费者工程

消费者工程和主工程可以建成父子项目
消费者工程可以maven项目或者是SpringBoot项目
    1.如果消费者工程是maven项目,是没有application.properties文件和启动类文件的,需要自己创建何编写
            其中application.properties是在resources目录下
            启动类文件是在java目录下的最外层包下
    2.如果是SpringBoot项目,那么在对应的配置文件中编写即可

如果父类中pom文件没有amqp的依赖,也可以在子项目中加上,本项目已经在父项目中配置过

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>

1、配置文件

#主机的IP地址:默认为localhost
spring.rabbitmq.host=192.168.111.127
#虚拟主机的名字:默认为/
spring.rabbitmq.virtual-host=/lwl
#登录的用户名:默认为guest
spring.rabbitmq.username=lwl
#登录的密码:默认为guest
spring.rabbitmq.password=密码
#RabbitMQ的端口号:默认为5672
spring.rabbitmq.port=5672
​
#配置文件中避免有空格,会被错误识别

2、启动类

/**
 * 启动类文件一般是以项目名字加Test进行命名的
 * @SpringBootApplication:启动类注解
 * run方法中写的是当前类的class文件
 */
@SpringBootApplication
public class ConsumerRabbitTest {
    public static void main(String[] args) {
        SpringApplication.run(ConsumerRabbitTest.class);
    }
}

3、监听队列

package com.aaa.consumer.test;
​
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
​
/**
 * @Component:将该类作为注解bean注入
 * @RabbitListener:标注监听队列的名字
 */
@Component
public class ConsumerListener {
​
    //监听队列里面的消息:可以同时监听很多个队列
    @RabbitListener(queues = {"routing_queue_name1","routing_queue_name2"})
    public void listenerQueues(Message message){
        //取出队列里面的消息(bytes类型),并将其转换为String类型
        String msg = new String(message.getBody());
        System.out.println("监听到的消息为:"+msg);
    }
}

先运行上述测试程序(交换机和队列才能先被声明和绑定),然后启动消费者;在消费者工程中控制台查看是否接收到对应消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/385695.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

00后跨专业学软件测试,斩获8.5K高薪逆袭职场

我想说的第一句&#xff1a;既然有梦想&#xff0c;就应该去拼搏还记得&#xff0c;我大学毕业前&#xff0c;就已经暗下决心到xxx培训机构接受培训。那个时候&#xff0c;没有任何海同公司的人主动找我或者联系过我&#xff0c;我是自己在网上发现了xxxx培训机构的&#xff01…

PLC实验—西门子S7 1200 PID控制步进电机转速

PLC实验—西门子S7 1200 PID控制步进电机转速 严格讲并不是PID控制&#xff0c;因为并不是并不研究这个方向&#xff0c;研二又比较忙&#xff0c;时间限制只加了比例系数 这里只是抛砖引玉&#xff0c;希望大家可以进一步完善补充 思路 大体思路如下&#xff0c;根据超声波…

三八节买什么数码好物?三八女神节实用不吃灰的数码好物推荐

三八节快到了&#xff0c;在这个小节日里&#xff0c;有哪些实用性强的数码好物值得入手呢&#xff1f;针对这个问题&#xff0c;我来给大家推荐几款实用性超强的数码好物&#xff0c;一起来看看吧。 一、蓝牙耳机 推荐产品&#xff1a;南卡小音舱 参考价&#xff1a;239 南…

Python中Opencv和PIL.Image读取图片的差异对比

近日&#xff0c;在进行深度学习进行推理的时候&#xff0c;发现不管怎么样都得不出正确的结果&#xff0c;再仔细和正确的代码进行对比了后发现原来是Python中不同的库读取的图片数组是有差异的。 image np.array(Image.open(image_file).convert(RGB)) image cv2.imread(…

【持续学习引导:pansharpening】

A continual learning-guided training framework for pansharpening &#xff08;一种持续学习引导的全色锐化训练框架&#xff09; 基于监督学习的全色锐化方法自出现以来一直受到批评&#xff0c;因为它们依赖于尺度移位假设&#xff0c;即这些方法在降低分辨率时的性能通…

IntelliJ IDEA如何整合Maven图文教程详解

Maven 1.Maven简述 Maven是一个构建工具,服务与构建.使用Maven配置好项目后,输入简单的命令,如:mvn clean install,Maven会帮我们处理那些繁琐的任务. Maven是跨平台的. Maven最大化的消除了构建的重复. Maven可以帮助我们标准化构建过程.所有的项目都是简单一致的,简化了学习…

ChatGPT能完全取代软件开发吗,看看它怎么回答?

最近网上一直疯传&#xff0c;ChatGPT 最可能取代的 10 种工作。具体包括①、技术类工作&#xff1a;程序员、软件工程师、数据分析师②、媒体类工作&#xff1a;广告、内容创作、技术写作、新闻③、法律类工作&#xff1a;法律或律师助理④、市场研究分析师⑤、教师⑥、金融类…

如何提高推广邮件的发送成功率?

随着经济的发展&#xff0c;国际之间的贸易往来越加频繁&#xff0c;很多外贸企业需要发送大量的商业推广邮件&#xff0c;来获得销售订单开拓公司业务市场。 随之而来的问题也是越来越多&#xff0c;给众多的外贸企业带来诸多的困扰。外贸企业在发送推广邮件中究竟会遇到什么问…

2.4G收发一体芯片NRF24L01P跟国产软硬件兼容 SI24R1对比

超低功耗高性能 2.4GHz GFSK 无线收发器芯片Si24R1&#xff0c;软硬件兼容NRF24L01P. Si24R1 是一颗工作在 2.4GHz ISM 频段&#xff0c;专为低功耗无线场合设计&#xff0c;集成嵌入式ARQ 基带协议引擎的无线收发器芯片。工作频率范围为 2400MHz-2525MHz&#xff0c;共有 126个…

Nginx网络服务

目录 1.Nginx基础 1.Nginx和Apache的差异 2.Nginx和Apache的优缺点比较 3.编译安装nginx服务 2.认识Nginx服务的主配置文件 nginx.conf 1.全局配置 2.I/O事件配置 3.HTTP设置 4.访问状态统计配置 5.基于授权密码的访问控制 6.基于客户端的访问控制 7.基于域名的ng…

SpringCloud简单介绍

文章目录1. 开源组件2. CAP原则1. 开源组件 功能springcloud netflixspringcloud alibabaspringcloud官方其他服务注册与发现eurekanacosconsulzookeeper负载均衡ribbondubbo服务调用openFeigndubbo服务容错hystrixsentinel服务网关zuulgateway服务配置的同一管理cofig-server…

图论初入门

目录 一、前言 二、图的概念 三、例题及相关概念 1、全球变暖&#xff08;2018年省赛&#xff0c;lanqiao0J题号178&#xff09; 2、欧拉路径 3、小例题 4、例题&#xff08;洛谷P7771&#xff09; 一、前言 本文主要讲了树与图的基本概念&#xff0c;图的存储、DFS遍历…

pytorch训练第一个项目VOC2007分割

一、环境 condapytorch1.2.0cuda10.0pycharm 二、训练内容 数据集&#xff1a;VOC2007 网络&#xff1a;U-net 功能&#xff1a;分割图像分类 三、步骤 安装软件、框架、包、cuda&#xff08;不安用cpu跑也可以&#xff09;&#xff0c;下载数据集、代码、权重文件等。。…

测试外包干了5年,感觉自己已经废了····

前两天有读者想我资讯&#xff1a; 我是一名软件测试工程师&#xff0c;工作已经四年多快五年了。现在正在找工作&#xff0c;由于一直做的都是外包的项目。技术方面都不是很深入&#xff0c;现在找工作都是会问一些&#xff0c;测试框架&#xff0c;自动化测试&#xff0c;感…

Java Map和Set

目录1. 二叉排序树(二叉搜索树)1.1 二叉搜索树的查找1.2 二叉搜索树的插入1.3 二叉搜索树的删除&#xff08;7种情况&#xff09;1.4 二叉搜索树和TreeMap、TreeSet的关系2. Map和Set的区别与联系2.1 从接口框架的角度分析2.2 从存储的模型角度分析【2种模型】3. 关于Map3.1 Ma…

QML键盘事件

在QML中&#xff0c;当有一个按键按下或释放时&#xff0c;会产生一个键盘事件&#xff0c;将其传递给获得有焦点的QML项目&#xff08;讲focus属性设置为true&#xff0c;则获得焦点&#xff09;。 按键处理的基本流程&#xff1a; Qt接收密钥操作并生成密钥事件。如果 QQuic…

keepalived学习记录:对其vip漂移过程采用gdb跟踪

对其vip漂移过程采用gdb跟踪keepalived工具主要功能产生vip漂移过程两种情况gdb调试常用命令gdb调试时打到的函数栈&#xff08;供学习参考&#xff09;函数栈的图是本人理解下画的&#xff0c;不对请多指正 keepalived主要有三个进程&#xff0c;父进程是core进程&#xff0c;…

python编程:查找某个文件夹下所有的文件,包括子文件加下的所有文件,读取指定类型的文件

目录 一、实现要求 二、代码实现 三、效果测试 一、实现要求 1、在电脑上有一个文件夹&#xff0c;该文件夹下面还有子文件夹&#xff0c;具体层级不清楚&#xff0c;需要实现将该文件夹下所有的文件路径读取出来&#xff1b; 2、在1的基础上&#xff0c;只需读取指定类型的文…

论文复现-2:代码部分

以CONLL03数据集为例 文章目录1 整体框架2 数据结构2.1 原始数据集2.2 处理之后的数据集3 代码部分3.0 模型参数3.1 数据预处理3.2 模型方法3.1.1 定义表示的学习权重项的学习双塔模型3.2.2 forward3.3 损失函数3.4 训练与推理Ablation study训练实例1 整体框架 任务是实体识别…

MVVM模式下如何正确【视图绑定+数据】

概述 我如何&#xff08;不在后面的代码中使用代码&#xff09;自动绑定到我想要的视图&#xff1f;据我了解&#xff0c;如果正确完成&#xff0c;这就是模式应该如何工作。我可以使用主窗口 xaml 中的代码实现这一切&#xff0c;我甚至正确创建了一个资源字典&#xff08;因…