RabbitMq的基础及springAmqp的使用

news2024/10/6 16:29:52

RabbitMq

官网:RabbitMQ: One broker to queue them all | RabbitMQ

什么是MQ?

mq就是消息队列,消息队列遵循这先入先出原则。一般用来解决应用解耦,异步消息,流量削峰等问题,实现高性能,高可用,可伸缩和最终一致性架构。

rabbitMq的四大核心

image20230424154420240.png

RabbitMq的安装

RabbitMQ是一个开源的遵循 AMQP协议实现的基于 Erlang语言编写,即需要先安装部署Erlang环境再安装RabbitMQ环境。

查看兼容关系:Erlang Version Requirements | RabbitMQ

百度云地址:

链接:百度网盘 请输入提取码 提取码:6666

本篇文章使用版本:3.8.8,liunx7-cenOs7

#在存放位置执行以下指令
 rpm -ivh erlang-21.3-1.el7.x86_64.rpm
 #安装socat 
 yum install socat -y
 #安装mq
 rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm 

启动

#开机自动启动
 chkconfig rabbitmq-server on
#启动服务
/sbin/service rabbitmq-serve start
#查看启动
/sbin/service rabbitmq-serve status
#停止服务
/sbin/service rabbitmq-serve stop

image20230424154409259.png

坑:执行以上指令无效,重新执行下面指令

systemctl start rabbitmq-server.service #启动
systemctl status rabbitmq-server.service#查看状态

安装可视化界面

#尽量停止服务,在安装
#安装可视化界面
rabbitmq-plugins enable rabbitmq_management

访问地址:http://ip:15672/

如果访问不了,查看防火墙是够关闭 systemctl stop firewalld 关闭防火墙,访问成功后走rabbitmq的基本指令

卸载MQ:

systemctl stop rabbitmq-server
yum list | grep rabbitmq
yum -y remove rabbitmq-server.noarch
yum list | grep erlang
yum -y remove erlang-*
rm -rf /usr/lib64/erlang 
rm -rf /var/lib/rabbitmq
rm -rf /usr/local/erlang
rm -rf /usr/local/rabbitmq

docker安装

docker pull rabbitmq:3-management
#运行
docker run \
 -e RABBITMQ_DEFAULT_USER=itcast \
 -e RABBITMQ_DEFAULT_PASS=123321 \
 --name mq \
 --hostname mq1 \
 -p 15672:15672 \   #网页访问端口
 -p 5672:5672 \     #mq连接端口
 -d \
 rabbitmq:3-management

rabbitMq基本指令

#查看用户
rabbitmqctl list_users
#添加用户
rabbitmqctl add_user admin 123456
#设置角色 (超级管理员)
 rabbitmqctl set_user_tags admin administrator
#设置权限 
rabbitmqctl set_permissions -p '/' admin '.*' '.*' '.*'

image20230424160438952.png

登录后也可以在此界面添加用户

image20230424160714982.png

对接java(入门)

image20230506093856370.png

创建一个maven工程:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.chen</groupId>
    <artifactId>mq</artifactId>
    <version>1.0-SNAPSHOT</version>

   <properties>
       <rabbitmq.version>5.8.0</rabbitmq.version>
       <common.version>2.6</common.version>
   </properties>
    <dependencies>
<!--         https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
            <version>${rabbitmq.version}</version>
        </dependency>

        <dependency>
            <groupId>commons-io</groupId>
            <artifactId>commons-io</artifactId>
            <version>${common.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>8</source>
                    <target>8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

生产者:

package com.chen.rabbitmq.one;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 生产者
 */
public class production {


    private static final String MQ_KEY="holle";


    public static void main(String[] args)throws Exception {
//        创建rabbitmq的工厂
        ConnectionFactory factory = new ConnectionFactory();
//    连接地址ip
        factory.setHost("172.17.18.162");
//        用户名
        factory.setUsername("admin");
//        密码
        factory.setPassword("123456");

//        创建连接
        Connection connection = factory.newConnection();
//        创建通道
        Channel channel = connection.createChannel();
//   生产队列
//   参数一:队列名称
//   参数二:持久性(默认为false)
//   参数三:该队列是否可以有多个消费者,是否消息共享
//   参数四:是否自动删除
//   参数五:其他参数
        channel.queueDeclare(MQ_KEY,true,false,false,null);

        /**
         * 发送一个消费者
         * 1.发送到那个交换机
         * 2.路由的key值是哪个 本次是队列的名称
         * 3.其他参数
         * 4.发送消息的消息体
         */
        channel.basicPublish("",MQ_KEY,null,"holle word".getBytes());
        System.out.println("消息发送成功!");
    }

}

测试是否发送成功:

image20230506094013364.png

消费者:

package com.chen.rabbitmq.one;

import com.rabbitmq.client.*;

import java.io.IOException;

public class Consumption {


    private static final String MQ_KEY="holle";

    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("172.17.18.162");
        factory.setUsername("admin");
        factory.setPassword("123456");
//        创建一个新的连接
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
 /*
         参数:
         1: 消费哪个队列
         2.消费成功之后是否要自动应答, true 带边自动应答 false 手动
         3.消费者未成功的回调
         4.消费者取录成功的回调
  */
        channel.basicConsume(MQ_KEY, true,(DeliverCallback) (consumerTag, message) -> System.out.println(new String(message.getBody())),
                (CancelCallback) (consumerTag)-> System.out.println(consumerTag));
    }

}

工作队列:

工作队列(又称任务队列)的主要思想是避免立即执行资源密集型任务,而不得不等待它完成。相反我们安排任务在之后执行。我们把任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。

image20230506103939702.png

1.线程轮询

类似nginx的负载均衡(轮询),线1一次,线2一次。

image20230506111137320.png

工具类:

package com.chen.rabbitmq.tow.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class RabbitUtils {
    public static Channel rabbitConnection() throws Exception{
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("172.17.18.162");
        factory.setUsername("admin");
        factory.setPassword("123456");
//        创建一个新的连接
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        return channel;
    }
}
生产者:

package com.chen.rabbitmq.tow.test;

import com.chen.rabbitmq.tow.utils.RabbitUtils;
import com.rabbitmq.client.Channel;

import java.util.Scanner;

public class Production {
    private final static String MQ_KEY="word";
//    生产者
    public static void production() throws Exception{
        Channel channel = RabbitUtils.rabbitConnection();
        Scanner scanner = new Scanner(System.in);
        //生产队列
        channel.queueDeclare(MQ_KEY,true,false,false,null);
        while (scanner.hasNext()){
            String next = scanner.next();
            channel.basicPublish("",MQ_KEY,null,next.getBytes());
            System.out.println("消息发布成功-> "+next);
        }
    }
    public static void main(String[] args) throws Exception{
        production();
    }
}
消费者:

package com.chen.rabbitmq.tow.test;


import com.chen.rabbitmq.tow.utils.RabbitUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

public class Consumption {

    private final static String MQ_KEY="word";

//    消费者
    public static void consumption() throws Exception{
//        获取连接队列
        Channel channel = RabbitUtils.rabbitConnection();
        channel.basicConsume(MQ_KEY,true,(DeliverCallback)(consumerTag,message)->{
            System.out.println(new String(message.getBody()));
        },(CancelCallback)(tag)->{
            System.out.println(tag);
            System.out.println("中断了");
            });
    }

    public static void main(String[] args) throws Exception{
        consumption();
    }
}

image20230506111456297.png

idea开启两个线程。

消息应答

1.自动应答

RabbitMQ 是一个广泛使用的开源消息代理,它支持多种消息协议,例如 AMQP、MQTT、STOMP 等。在 RabbitMQ 中,自动应答(Automatic Acknowledgement,Auto-ack)是一种消息确认机制,用于标记消息是否已被成功接收和处理。了解自动应答的概念,对于构建可靠、高效的消息传递系统非常重要。

当消费者接收并处理来自 RabbitMQ 的消息时,通常会使用消息确认(acknowledgements)机制来告知 RabbitMQ 该消息已经成功处理。这样一来,RabbitMQ 就可以确保消息不会意外丢失。然而,这种确认过程可能会导致一定的延迟和额外开销。为了解决这个问题,RabbitMQ 提供了自动应答机制。

在自动应答模式下,消费者接收到消息后,RabbitMQ 会立即将该消息标记为已处理。这意味着消费者不需要显式地发送确认(ack)消息给 RabbitMQ。这种机制可以降低延迟,提高消息传递的速度,但是也存在一定的风险。因为消息一旦被发送出去,RabbitMQ 就认为它已经成功处理,而实际上消费者可能还没有完成对消息的处理。如果消费者在处理消息时发生故障,那么这个消息可能会丢失。

2.手动应答

方法:

Channel.basicAck (用于肯定确认)

RabbitMQ已知道该消息并且成功的处理消息,可以将其丢弃了

Channel.basicNack(用于否定确认)

Channel.basicReject (用于否定确认)

Channel.basicNack 相比少一个参数 不处理该消息了直接拒绝,可以将其丢弃了

Multiple

//源码
public void basicAck(long deliveryTag, boolean multiple) throws IOException {
        this.delegate.basicAck(deliveryTag, multiple);
}

multiple 的 true 和 false 代表不同意思:

  1. true 代表批量应答 channel 上未应答的消息

    比如说 channel 上有传送 tag 的消息 5,6,7,8 当前 tag 是 8 那么此时5-8 的这些还未应答的消息都会被确认收到消息应答

    image20230506145530499.png

    2.false 只会应答 tag=8 的消息 5,6,7 这三个消息依然不会被确认收到消息应答

    image20230506145536088.png

消息重新入队

为了解决消息丢失问题。

image20230506150713801.png

具体代码:

生产者:

package com.chen.rabbitmq.three;
import com.chen.rabbitmq.tow.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import java.nio.charset.StandardCharsets;
import java.util.Scanner;
public class Pro {

    private static final String MQ_KEY="mqkey";

    public static void pro() throws Exception{
        Channel channel = RabbitUtils.rabbitConnection();
        channel.queueDeclare(MQ_KEY,true,false,false,null);

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String next = scanner.next();
            channel.basicPublish("",MQ_KEY,null,scanner.next().getBytes());
            System.out.println("消息发布成功-> "+next);
        }
    }
    public static void main(String[] args) throws Exception {
        pro();
    }
}

消费者1:

package com.chen.rabbitmq.three;

import com.chen.rabbitmq.tow.utils.RabbitUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

//消费者1
public class Word1 {

  public static final String MQ_KEY="mqkey";


     public static void word() throws Exception{
         Channel channel = RabbitUtils.rabbitConnection();
         channel.basicConsume(MQ_KEY,false,(DeliverCallback) (consumerTag,message)->{
//             睡眠1s
             try {
                 Thread.sleep(1*1000);
                 System.out.println("Word1接收到消息->"+new String(message.getBody()));
//                 参数一:tag标记  参数二:是否批量
                 channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
        },(CancelCallback) e->{
            System.out.println("消息中断"+e);
        } );

     }

    public static void main(String[] args) throws Exception {
        word();
    }

}

消费者2:

package com.chen.rabbitmq.three;

import com.chen.rabbitmq.tow.utils.RabbitUtils;
import com.rabbitmq.client.CancelCallback;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;

//消费者1
public class Word2 {

  public static final String MQ_KEY="mqkey";


     public static void word() throws Exception{
         Channel channel = RabbitUtils.rabbitConnection();
         channel.basicConsume(MQ_KEY,false,(DeliverCallback) (consumerTag,message)->{
//             睡眠10s
             try {
                 Thread.sleep(10*1000);
                 System.out.println("Word2接收到消息->"+new String(message.getBody()));
//                 参数一:tag标记  参数二:是否批量
                 channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
        },(CancelCallback) e->{
            System.out.println("消息中断"+e);
        } );

     }

    public static void main(String[] args) throws Exception{
        word();
    }

}

image20230506160401052.png

经测试会发现,消费者1为第一个接收到消息,接下来当生产者在生产出一条消息,应到消费者2接收到消息,但是此时消费者2突然出现宕机,使用了应答机制,消息则会重新打到消费者1;

持久化设置

1.队列持久化

作用:当rabbitmq宕机后,重启队列依然存在

//创建队列时的第二个参数为设置持久化 
channel.queueDeclare(MQ_KEY,true,false,false,null);

image20230506161649268.png

2.消息持久化

作用:当rabbitmq宕机了重新启动,发送的消息依然存在。

下面的方法不是绝对的能保证消息的持久化

//生产者  
  private static final String MQ_KEY="mqkey";
    public static void pro() throws Exception{
        Channel channel = RabbitUtils.rabbitConnection();
        channel.queueDeclare(MQ_KEY,true,false,false,null);

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String next = scanner.next();
//MessageProperties.PERSISTENT_TEXT_PLAIN 消息持久化
            channel.basicPublish("",MQ_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,scanner.next().getBytes());
            System.out.println("消息发布成功-> "+next);
        }
    }

3.发布确认

完成以上两步还不足以持久化,要把发布确认加上。

//默认是不开启的
Channel channel = RabbitUtils.rabbitConnection();
channel.confirmSelect();//开启发布确认
发布确认的策略:

1.单个确认发布

这个发布确认是同步的,需等待确认一次在发布下一次,一手交钱一手交货原则

缺点:发布速度特别慢

//单个确认
    public static void one() throws Exception{
        Channel channel = RabbitUtils.rabbitConnection();
        //开启发布确认
        channel.confirmSelect();
        String uuid = UUID.randomUUID().toString();

        //创建队列
        channel.queueDeclare(uuid,true,false,false,null);
        //开始时间
        long begin = System.currentTimeMillis();

        for (Integer i = 0; i < COUNT; i++) {
            String message = i + "";
            channel.basicPublish("",uuid,null,message.getBytes());
                //发布确认
            boolean flag = channel.waitForConfirms();
            if(flag){
                System.out.println("消息确认成功!");
            }
        }
        long last = System.currentTimeMillis();
        System.out.println("耗时:"+(last-begin));
    }

2.批量确认发布

发布速度相对单个发布确认要快,但是当其中一条消息出现异常,将无法查找到那个消息丢失 。

 //批量
    public static void batch() throws Exception{
        Channel channel = RabbitUtils.rabbitConnection();
        String uuid = UUID.randomUUID().toString();
        //开启消息确认
        channel.confirmSelect();
        //创建队列
        channel.queueDeclare(uuid,true,false,false,null);
        //这个这个变量用记录发布值
        Integer messageCount=100;
        Integer record =0;
        //开始时间
        long begin = System.currentTimeMillis();

        for (Integer i = 0; i < COUNT; i++) {
            record++;
            String message=i+"";
            //发布消息
            channel.basicPublish("",uuid,null,message.getBytes());
            if(messageCount.equals(record)){
                channel.waitForConfirms();
                record=0;
            }
        }
        long last = System.currentTimeMillis();
        System.out.println("耗时"+(last-begin));
    }

3.异步确认发布(推荐使用)

异步确认虽然比上的两个代码复杂,但同时也解决了上面两种方式遗留下来的问题。

image20230511104153138.png

public static void asyn() throws Exception{

        Channel channel = RabbitUtils.rabbitConnection();
        //开启发布确认
        channel.confirmSelect();
        String uuid = UUID.randomUUID().toString();

        //创建队列
        channel.queueDeclare(uuid,true,false,false,null);
        //开始时间
        long begin = System.currentTimeMillis();

        //        创建一个线程的ListMap用于记录 ----》处理异步未确认的消息
        ConcurrentSkipListMap<Long, String> map = new ConcurrentSkipListMap<>();

//        监听消息
        channel.addConfirmListener((deliveryTag, multiple)->{
            if(multiple){
                ConcurrentNavigableMap<Long, String> longStringConcurrentNavigableMap =
                        map.headMap(deliveryTag);
                longStringConcurrentNavigableMap.clear();
            }else{
                map.remove(deliveryTag);
            }
            System.out.println("确认消息:"+deliveryTag);

        },(deliveryTag, multiple)->{
            String message = map.get(deliveryTag);
            System.out.println("发送失败的数据是:"+message+"未确认消息:"+deliveryTag+"-----失败");
        });
        for (Integer i = 0; i < COUNT; i++) {
            String message=""+i;
            channel.basicPublish("",uuid,null,message.getBytes());
            //获取信道的标识,存入消息
            map.put(channel.getNextPublishSeqNo(),message);
        }
        long last = System.currentTimeMillis();
        System.out.println("耗时:"+(last-begin));
    }

不公平分发原则(能者多劳原则)

在上面中的所有例子都是尊寻这轮询的规则去执行的,问题:当其中的一台服务响应特别慢时就会影响到整体的效率。

channel.basicQos(1);

//消费者
public static void word() throws Exception{
         Channel channel = RabbitUtils.rabbitConnection();
        //设置不公平分发
         channel.basicQos(1);
         channel.basicConsume(MQ_KEY,false,(DeliverCallback) (consumerTag,message)->{
             try {
               //模拟虚拟机延迟
                 Thread.sleep(1*1000);
                 System.out.println("Word2接收到消息->"+new String(message.getBody()));
                 channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
        },(CancelCallback) e->{
            System.out.println("消息中断"+e);
        } );
}

也可以用来设置预期值!

//消费者1
public static void word2() throws Exception{
         Channel channel = RabbitUtils.rabbitConnection();
        //设置预期值
         channel.basicQos(3);
         channel.basicConsume(MQ_KEY,false,(DeliverCallback) (consumerTag,message)->{
             try {
               //模拟虚拟机延迟
                 Thread.sleep(1*1000);
                 System.out.println("Word2接收到消息->"+new String(message.getBody()));
                 channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
        },(CancelCallback) e->{
            System.out.println("消息中断"+e);
        } );
}
//消费者2
public static void word2() throws Exception{
         Channel channel = RabbitUtils.rabbitConnection();
        //设置预期值
         channel.basicQos(5);  
         channel.basicConsume(MQ_KEY,false,(DeliverCallback) (consumerTag,message)->{
             try {
               //模拟虚拟机延迟
                 Thread.sleep(10*1000);
                 System.out.println("Word2接收到消息->"+new String(message.getBody()));
                 channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
             } catch (InterruptedException e) {
                 e.printStackTrace();
             }
        },(CancelCallback) e->{
            System.out.println("消息中断"+e);
        } );
}

交换机

在RabbitMQ中,生产者发送消息不会直接将消息投递到队列中,而是先将消息投递到交换机中, 在由交换机转发到具体的队列, 队列再将消息以推送或者拉取方式给消费者进行消费

绑定(bindings)

与交换机产生关系,并且能有routekey控制发送消息给哪个队列。

fanout交换机(扇形)

扇形交换机是最基本的交换机类型,它所能做的事清非常简单广播消息。扇形交换机会把能接收到的消息全部发送给绑定在自己身上的队列。因为广播不需要'思考”,所以扇形交换机处理消息的速度也是所有的交换机类型里面最快的。

//消费者
public class Word {
//    交换机名称
    private static String EXCHANGE_NAME="logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitUtils.rabbitConnection();
//        声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//        声明一个队列 临时队列
        String queue = channel.queueDeclare().getQueue();
//        绑定交换机与队列
        channel.queueBind(queue,EXCHANGE_NAME,"");
        System.out.println("等待消息~");

//消费者取消消息时回调接口
        channel.basicConsume(queue,true, (consumerTag,message)->{
            System.out.println("word1控制台打印接收消息:"+new String(message.getBody(),"UTF-8"));
        },cancelCallback->{});
    }
}

public class Word2 {
//    交换机名称
    private static String EXCHANGE_NAME="logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitUtils.rabbitConnection();
//        声明一个交换机
        channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
//        声明一个队列 临时队列
        String queue = channel.queueDeclare().getQueue();
//        绑定交换机与队列
        channel.queueBind(queue,EXCHANGE_NAME,"");
        System.out.println("等待消息~");
//消费者取消消息时回调接口
        channel.basicConsume(queue,true, (consumerTag,message)->{
            System.out.println("word2控制台打印接收消息:"+new String(message.getBody(),"UTF-8"));
        },cancelCallback->{});
    }
}
//生产者
public class send {
    //    交换机名称
    private static String EXCHANGE_NAME="logs";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitUtils.rabbitConnection();

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()){
            String next = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,"",null,next.getBytes("UTF-8"));
            System.out.println("生产者发送消息:"+next);
        }
    }
}

直连交换机: Direct exchange

直连交换机的路由算法非常简单: 将消息推送到binding key与该消息的routing key相同的队列。

代码几乎类型fanout交换机,只需要指定routerkey即可。

主题交换机: Topic exchange

发送到主题交换机的 消息不能有任意的 routing key, 必须是由点号分开的一串单词,这些单词可以是任意的,但通常是与消息相关的一些特征。

如以下是几个有效的routing key:

"stock.usd.nyse", "nyse.vmw", "quick.orange.rabb 代", routing key的单词可以 有很多,最大限制是255 bytes。

Topic 交换机的 逻辑与 direct 交换机有点 相似 使用特定路由键发送的消息 将被发送到所有使用匹配绑定键绑定的队列 ,然而 ,绑定键有两个特殊的情况:

*表示匹配任意一个单词

#表示匹配任意—个或多个单词

image20230525104709697.png

比如上图:

发送routerkey为:ws.orange.rabbit那么对应的就是Q1,Q2

发送routerkey为:lazy.orange.elephant那么对应的就是Q1,Q2

//消费者
public class word1 {

    private static final String EXCHANGE_NAME="topic_logs";

    private static final String QUEUE_NAME="Q1";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitUtils.rabbitConnection();
//        创建交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//       创建队列
        channel.queueDeclare(QUEUE_NAME,true,true,false,null);
//        绑定队列
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.orange.*");
//        接收消息
        channel.basicConsume(QUEUE_NAME,true,(consumerTag,message)->{
            System.out.println("接收到的消息:"+new String(message.getBody()));
        },cancelCallback->{});
        System.out.println("等下消息~");
    }
}
public class word2 {

    private static final String EXCHANGE_NAME="topic_logs";

    private static final String QUEUE_NAME="Q2";

    public static void main(String[] args) throws Exception {
        Channel channel = RabbitUtils.rabbitConnection();
//        创建交换机
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
//       创建队列
        channel.queueDeclare(QUEUE_NAME,true,true,false,null);
//        绑定队列
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"*.*.rabbit");
        channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"lazy.#");
//        接收消息
        channel.basicConsume(QUEUE_NAME,true,(consumerTag,message)->{
            System.out.println("接收到的消息:"+new String(message.getBody()));
        },cancelCallback->{});
        System.out.println("等下消息~");
    }
}
//生产者
public class send {
    private static final String EXCHANGE_NAME="topic_logs";
    public static void main(String[] args) throws Exception{
        Channel channel = RabbitUtils.rabbitConnection();
        Scanner scanner = new Scanner(System.in);
        while (true){
            System.out.println("请输入routerkey:");
            String key = scanner.next();
            System.out.println("请输入消息内容:");
            String message = scanner.next();
            channel.basicPublish(EXCHANGE_NAME,key,null,message.getBytes());
        }
    }
}

image20230525112943356.png

image20230525112959575.png

死信队列

顾名思义:无法被消费的消息,一般来说,producer将消息投递broker或者直接到queue里了,consumer(消费者)从queue取出消息进行消费,但某些时间由特定原因导致queue中的某些消息无法被消费,这样如果没有后续的处理,就变成了死信。

应用场景:为了确保订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息被消息时发生了异常,这是就将消息存到死信中,还比如说:用户商城下单成功,并且点击支付后在指定时间支付时自动失效。

image20230525143950674.png

消息TTL过期时间测试:

//生产者
public class send {
    private static final String NORMAL_EXCHANGE="normal_exchange";
    public static final String NORMAL_QUEUE="normal_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitUtils.rabbitConnection();
//        设置死信时间
        AMQP.BasicProperties basicProperties =
                new AMQP.BasicProperties().builder()
                        .expiration("10000").build();
        for (int i = 0; i < 11; i++) {
            String msg="info"+i;
           channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",basicProperties,msg.getBytes());
        }
    }
}
//消费者1
public class C1 {
    private static final String NORMAL_EXCHANGE="normal_exchange";
    private static final String DEAD_EXCHANGE="dead_exchange";
    public static final String NORMAL_QUEUE="normal_queue";
    public static final String DEAD_QUEUE="dead_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitUtils.rabbitConnection();
//      创建c1交换机
       channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.TOPIC);
//       声明普通队列
        HashMap<String, Object>  map = new HashMap<>();
//        设置过期时间 10s  单位ms 这里有消费整去做控制
//        map.put("x-message-ttl",100000);
//        正常队列设置死信交换机
        map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//       设置死信的routerKey
        map.put("x-dead-letter-routing-key","lisi");
        //    创建普通队列
      channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
      //创建死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
      //        绑定
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
        channel.basicConsume(NORMAL_QUEUE,true,(consumerTag, message) -> {
            System.out.println("C1消息为:"+message.getBody());
        },cancelCallback->{
        });
    }
}
//消费者2
public class C2 {
    public static final String DEAD_QUEUE="dead_queue";
    private static final String DEAD_EXCHANGE="dead_exchange";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitUtils.rabbitConnection();
        channel.basicConsume(DEAD_QUEUE,true,(consumerTag,message)->{
            System.out.println("消息为:"+new String(message.getBody()));
        },cancelCallback->{});
    }
}

正常队列长度的限制:

根据c1做修改,测试报错先删除原来的队列与交换机

//设置正常队列长度的限制
map.put("x-max-length",6);

拒接消息:

添加手动应答拒接。

public class C1 {
    private static final String NORMAL_EXCHANGE="normal_exchange";
    private static final String DEAD_EXCHANGE="dead_exchange";
    public static final String NORMAL_QUEUE="normal_queue";
    public static final String DEAD_QUEUE="dead_queue";
    public static void main(String[] args) throws Exception {
        Channel channel = RabbitUtils.rabbitConnection();
//      创建c1交换机
       channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.TOPIC);
//       声明普通队列
        HashMap<String, Object>  map = new HashMap<>();
//        设置过期时间 10s  单位ms
//        map.put("x-message-ttl",100000);
//        正常队列设置死信交换机
        map.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//       设置死信的routerKey
        map.put("x-dead-letter-routing-key","lisi");
//        设置正常队列长度的限制
//        map.put("x-max-length",6);
        //    创建普通队列
      channel.queueDeclare(NORMAL_QUEUE,false,false,false,map);
      //创建死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
      //        绑定
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
        channel.basicConsume(NORMAL_QUEUE,false,(consumerTag, message) -> {
            String msg = new String(message.getBody());
            System.out.println("C1消息为:"+msg);
//            拒接对应消息
            if(msg.equals("info2")){
//                deliveryTag
                channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
            }else{
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            }
        },cancelCallback->{
        });
    }
}

SpringAMQP

官网地址:Spring AMQP

Spring AMQP 是 Spring 框架中的一个模块,它提供了基于 AMQP(Advanced Message Queuing Protocol,高级消息队列协议)标准的抽象层,用于简化在 Spring 应用程序中使用消息队列的过程。Spring AMQP 不仅简化了与消息代理(如 RabbitMQ)的集成,还提供了一套高度可配置的模板类来生产、消费消息,并管理AMQP基础设施组件,如队(Queue)、交换机(Exchange)和绑定(Binding)。

使用

      <!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

生产者

logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
spring:
  rabbitmq:
    host: 38.6.217.70
    port: 5672
    username: itcast
    password: 123321
    virtual-host: /
package cn.itcast.mq.spring;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {
    @Resource
    private RabbitTemplate rabbitTemplate;
    @Test
    public void testSendMessage2SimpleQueue() throws InterruptedException {
        // 1.发送消息
        String message = "Hello, Spring Amqp!";
        rabbitTemplate.convertAndSend("simple.queue", message);
    }
}

 

消费者

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
//监听机制
@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) {
        System.out.println("spring接收到的消息是:" + msg);
    }
}

预取限制

案例:将50条消息在一秒内分类交给两个消费者消费。

//生成者   
@Test
    public void testSendWordSimpleQueue() throws InterruptedException {
        // 1.发送消息
        String key ="simple.queue";
        String message = "Hello, Spring Amqp____";
        for (int i = 0; i < 49; i++) {
            rabbitTemplate.convertAndSend(key, message+i);
            Thread.sleep(20);
        }
    }
//消费者
@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring接收到的消息是:" + msg+"___"+ LocalDateTime.now());
        Thread.sleep(20);
    }

    @RabbitListener(queues = "simple.queue")
    public void listenFanoutQueue1(String msg) throws InterruptedException {
        System.err.println("FanoutQueue1接收到的消息是:" + msg+"___"+ LocalDateTime.now());
        Thread.sleep(200); //模拟性能
    }
}

通过执行结果我们可以看出listenFanoutQueue1这个监听器执行的是奇数,而listenSimpleQueueMessage则是偶数。且时间超出了1秒。为什么呢?

因为在生产者发送到队列中时,消费者会预取消息,在默认情况下进行平分机制,在上面代码中我们可以看到我们使用了线程睡眠的方式模拟了性能,在平分的情况下,睡眠200的执行了25条,所以导致了超出了1s。 如何调整呢?

logging:
  pattern:
    dateformat: MM-dd HH:mm:ss:SSS
spring:
  rabbitmq:
    host: 38.6.217.70
    port: 5672
    username: itcast
    password: 123321
    virtual-host: /
    listener: #设置预取
      simple:
        prefetch: 1 #每次只取一条
        #这段配置的作用是在使用 RabbitMQ 的时候,配置消费者监听器的简单模式,并设置消息预取值为 1。这意味着每次只会从队列中取出一条消息进行处理,处理完后再去取下一条消息。这种方式可以保证消息的顺序处理。

发布与订阅

 

fanoutExchange

这种交换机需要进行绑定对应的队列,绑定对应的队列后,生产者将消息推送给交换机,交换机会将消息分别都发给绑定的消息队列。

实现

//消费者配置
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutExchangeConfig {

//    创建队列1  fanout.queue1
    @Bean
    public Queue queue1(){
        return new Queue("fanout.queue1");
    }
//    创建交换机 fanoutExchange
    @Bean
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("fanoutExchange");
    }
//   队列1绑定交换机
    @Bean
    public Binding bindingExchange1(){
        return BindingBuilder.bind(queue1()).to(fanoutExchange());
    }
    //    创建队列1  fanout.queue2
    @Bean
    public Queue queue2(){
        return new Queue("fanout.queue2");
    }
    //   队列2绑定交换机
    @Bean
    public Binding bindingExchange2(){
        return BindingBuilder.bind(queue2()).to(fanoutExchange());
    }
}

 

消费者

@Component
public class SpringRabbitListener {
    @RabbitListener(queues = "fanout.queue1")
    public void listenFanoutQueue1QueueMessage(String msg) throws InterruptedException {
        System.out.println("fanout.queue1接收到的消息是:" + msg+"___"+ LocalDateTime.now());
    }
    @RabbitListener(queues = "fanout.queue2")
    public void listenFanoutQueue2QueueMessage(String msg) throws InterruptedException {
        System.out.println("fanout.queue2接收到的消息是:" + msg+"___"+ LocalDateTime.now());
    }
}

生产者

    @Test
    public void testSendMessageFanoutQueue()  {
        // 1.发送消息
        String message = "Hello, testSendMessageFanoutQueue !";
//       交换机名称
        String exchange = "fanoutExchange";
        rabbitTemplate.convertAndSend(exchange,"",message);
    }
DirectExchange

这种交换机需要指定一个key进行发送,通过可以区别发送到那个队列,同时这些队列也可以绑定相同的key,那么也就是实现了fanout的效果。 

实现

//消费者
@Component
public class DirectExchangeListener {

//    可以通过@bena的方式进注入,这里我们采用@RabbitListenner的方式

    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(name = "direct.queue1"),//绑定的队列
                    exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),//绑定的交换机
                    key = {"red", "blue"} //绑定的key
            )
    )
    public void listenDirectQueue1(String msg) throws InterruptedException {
        System.out.println("listenDirectQueue1接收到的消息是:" + msg);
    }


    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(name = "direct.queue2"),//绑定的队列
                    exchange = @Exchange(name = "direct.exchange", type = ExchangeTypes.DIRECT),//绑定的交换机
                    key = {"red", "yellow"} //绑定的key
            )
    )
    public void listenDirectQueue2(String msg) throws InterruptedException {
        System.out.println("listenDirectQueue2接收到的消息是:" + msg);
    }
}
//生产者
    @Test
    public void testSendMessageDirectQueue()  {
        String routingKey = "yellow";
        // 1.发送消息
        String message = "Hello, testSendMessageFanoutQueue !"+"__"+routingKey;
//       交换机名称
        String exchange = "direct.exchange";
        rabbitTemplate.convertAndSend(exchange,routingKey,message);
    }

 

TopicExchange

这种交换机其实和direct类型的交换机差不错,只不过它是使用通配符的方式。

使用

//消费者
@Component
public class TopicExchangeListener {
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
            key = {"china.#"}
    ))
    public void listenTopicQueue1(String msg) throws InterruptedException {
        System.out.println("topic.queue1接收到消息:" + msg);
    }
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "topic.exchange", type = ExchangeTypes.TOPIC),
            key = {"#.news"}
    ))
    public void listenTopicQueue2(String msg) throws InterruptedException {
        System.out.println("topic.queue2接收到消息:" + msg);
    }
}
//生产者   
@Test
    public void testSendMessageTopicQueue()  {
        String routingKey = "news";
        // 1.发送消息
        String message = "Hello, testSendMessageTopicQueue !"+"__"+routingKey;
//       交换机名称
        String exchange = "topic.exchange";
        rabbitTemplate.convertAndSend(exchange,routingKey,message);
    }

消息转换器

例子:

//我们声明一个objQueue
@Bean
    public Queue objQueue(){
        return new Queue("obj.queue");
    }

//发送消息
  @Test
    public void testSendMessageobjQueue()  {
        Map<String, Object> map = new HashMap<>();
        map.put("name","test");
        map.put("age",18);
        rabbitTemplate.convertAndSend("obj.queue",map);
    }

我们重rabbitmq的ui界面中我们可以发现消息是基于JDK完成的序列化。

缺点:这样不能很直接的看出消息的结果,并且占用大量内存,所以下面我们使用jdckson进行json序列化。

发送者

依赖:

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
</dependency>

配置bean

//生产者配置
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }

 

消费者

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
</dependency>
//销售者配置
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
    @Bean
    public MessageConverter messageConverter() {
        return new Jackson2JsonMessageConverter();
    }
   @RabbitListener(queues = "obj.queue")
    public void listenObjQueueMessage( Map<String, Object> msg) throws InterruptedException {
        System.out.println("obj.queue接收到的消息是:" + msg);
    }

后续会更新使用MQ做的具体案例:秒杀、订单业务处理等。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1878443.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

DDR自学笔记

DDR的技术发展 标准名称 内核时钟(MHz) I/O时钟(MHz) 工作电压(v) 预取位数 突发长度 数据速率(MT/s) 数据带宽(GB/s) 拓扑 SDRAM 100-166 100-166 3.3 1 / 100-166 0.8-1.3 T DDR 133-200 133-200 2.5 2n 2 266-400 2.1-3.2 T DDR2 133-200 266-…

MySQL之如何处理超大分页

如何处理MySQL超发分页&#xff1f; 可以使用覆盖索引解决 【点击进入】 MySQL超大分页处理 在数据量较大时&#xff0c;如果使用limit分页查询&#xff0c;在查询时&#xff0c;越往后&#xff0c;分页查询效率会越低。 示例&#xff1a; select * from user limit 900000…

RDMA建链的3次握手和断链的4次挥手流程?

文章目录 基础信息建链 3次握手断链4次挥手建联状态active端passive端 报文结构函数关系其他后记 基础信息 CM: Communication Management 通信管理 连接管理SIDR: Service ID Resolution Protocol. 作用&#xff1a; enables users of Unreliable Datagram service to locate …

JAVA课程复习

简答题65分(理解)❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀看本章小结 读程序写结果45分 填空102分(lambda) 编程310分(20~30行) ❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀-❀ 1~13章,11、13章重…

C语言单链表的算法之逆序

一&#xff1a;什么是链表的逆序 &#xff08;1&#xff09;链表的逆序又叫反向&#xff0c;意思就是把链表中所有的有效节点在链表中的顺序给反过来 二&#xff1a;单链表逆序算法分析 &#xff08;1&#xff09;当需要对一个数据结构进行操作时&#xff0c;就有必要有一套算…

FreeSWITCH 1.10.10 简单图形化界面23-sipml5的demo测试

FreeSWITCH 1.10.10 简单图形化界面23-sipml5的demo测试 00 FreeSWITCH GUI界面预览01、安装FreeSWITCH GUI先看使用手册02. 使用手册在这里0、设置FreeSWITCH账号1、sipml5的demo网站2、注册3、呼叫4、掉线问题 在FreeSWITCH中使用jssip的demo&#xff0c;需要对FreeSWITCH进行…

islower()方法——判断字符串是否全由小写字母组成

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 语法参考 islower()方法用于判断字符串是否由小写字母组成。islower()方法的语法格式如下&#xff1a; str.islower() 如果字符串中包含至少一个区…

C++自定义智能指针

template <class T> class counted_ptr;// 智能指针引用计数类 template <class T> class Ref_Ptr {friend class counted_ptr<T>; private:T* m_pTtr; // 实际的指针size_t counted_ptr; // 引用计数Ref_Ptr(T* p);virtual ~Ref_Ptr(); };template <clas…

如何焊铜管 量测射频前端模块

先说结论 要做Port Extension待测物要上电 且根据逻辑表给Enable pin上电网分输入功率 不要太大 -20dBm即可铜管的接地 要足够 以及足够近铜管与待测物之间 必要时 隔一颗电容不要将匹配元件 也包含在量测范围讯号针不要直接焊在焊盘上 首先 铜管要做Port…

【后端面试题】【中间件】【NoSQL】ElasticSearch面试基本思路和高可用方案(限流、消息队列、协调节点、双集群)

基本思路 业务开发面试Elasticsearch的时候基本问的是基础知识以及倒排索引。 Elasticsearch最基本的可用性保障就是分片&#xff0c;而且是主从分片&#xff0c;所以遇到Elasticsearch如何做到高可用这个问题的时候&#xff0c;首先要提到这一点。 Elasticsearch高可用的核心…

昇思25天学习打卡营第12天|文本解码原理--以MindNLP为例

学AI还能赢奖品&#xff1f;每天30分钟&#xff0c;25天打通AI任督二脉 (qq.com) 文本解码原理--以MindNLP为例 回顾&#xff1a;自回归语言模型 根据前文预测下一个单词 一个文本序列的概率分布可以分解为每个词基于其上文的条件概率的乘积 &#x1d44a;_0:初始上下文单词序…

Unity之HTC VIVE Cosmos环境安装(适合新手小白)(一)

提示&#xff1a;能力有限&#xff0c;错误之处&#xff0c;还望指出&#xff0c;不胜感激&#xff01; 文章目录 前言一、unity版本电脑配置相关关于unity版本下载建议&#xff1a;0.先下载unity Hub1.不要用过于旧的版本2.不要下载最新版本或者其他非长期支持版本 二、官网下…

鸿蒙项目实战-月木学途:2.自定义底部导航

效果预览 Tabs组件简介 Tabs组件的页面组成包含两个部分&#xff0c;分别是TabContent和TabBar。TabContent是内容页&#xff0c;TabBar是导航页签栏&#xff0c;页面结构如下图所示&#xff0c;根据不同的导航类型&#xff0c;布局会有区别&#xff0c;可以分为底部导航、顶部…

AD PCB板子裁剪与泪滴设置

在剪裁板子时。首先&#xff0c;选择选择板子的机械层&#xff0c;之后选择画线。在原来的板子上画上自己想要裁剪的图形。如下下图 之后&#xff0c;选择按照所画的线裁剪板子即可&#xff0c;如下 在焊接PCB时&#xff0c;为了防止多次焊接导至焊盘脱落可以加大焊点的接触面积…

读AI新生:破解人机共存密码笔记16对人工智能的治理

1. 愚蠢的、情绪化的人类 1.1. 与完美理性所设定的不可企及的标准相比&#xff0c;我们都是极其愚蠢的&#xff0c;我们受制于各种情绪的起伏&#xff0c;这些情绪在很大程度上支配着我们的行为 1.2. 为了充分了解人类的认知&#xff0c;我们&#xff08;或者更确切地说&…

python中lxml库的使用简介

目录 1&#xff0e;ElementTree 类 2&#xff0e;Element 类 3&#xff0e;ElementTree 类或 Element 类的查找方法 为方便开发人员在程序中使用 XPath 的路径表达式提取节点对应的内容&#xff0c; Python 提供了 第三方库 lxml 。开发人员通过 lxml 库可以轻松地对 HTM…

25考研:今年初试时间比去年更早了?

过去5年考研初试时间安排如下&#xff1a; 24考研&#xff1a;2023年12月23-24日&#xff08;倒数第二个周末&#xff09; 23考研&#xff1a;2022年12月24-25日&#xff08;倒数第二个周末&#xff09; 22考研&#xff1a;2021年12月25-26日&#xff08;最后一个周末&#xf…

JVM原理(四):JVM垃圾收集算法与分代收集理论

从如何判定消亡的角度出发&#xff0c;垃圾收集算法可以划分为“引用计数式垃圾收集”和“追踪式垃圾收集”两大类。 本文主要介绍的是追踪式垃圾收集。 1. 分代收集理论 当代垃圾收集器大多遵循“分代收集”的理论进行设计&#xff0c;它建立在两个假说之上&#xff1a; 弱分…

宇宙第一大厂亚马逊云科技AWS人工智能/机器学习证书即将上线,一篇文章教你轻松拿下

据麦肯锡《在华企业如何填补AI人才缺口》研究表明&#xff0c;到2030年人工智能为中国带来的潜在价值有望超过1万亿美元&#xff0c;而随着各大企业进入人工智能化&#xff0c;对该领域的人才需求将从目前的100万增长到2030年的600万。然而到保守估计&#xff0c;到2030可以满足…

DP(动态规划)【3】 最长公共子序列 最长回文子串

目录 1.最长公共子序列 状态转移方程需要二维数组&#xff0c;1-dim已经不太够了 又是这个问题&#xff1a;如何读入字符串 2.最长回文子串 1.最长公共子序列 状态转移方程需要二维数组&#xff0c;1-dim已经不太够了 这里dp[i][j]是说S的前i位与T的前j位公共序列&#xff…