分布式的流处理平台Kafka

news2025/1/15 13:56:04

目录:

    • 一、简介
    • 二、基本概念
    • 三、生产者使用详解
    • 四、发送消息
    • 五、消费者代码示例

一、简介

ApacheKafka 是一个分布式的流处理平台。它具有以下特点:

  • 支持消息的发布和订阅,类似于 RabbtMQ、ActiveMQ 等消息队列;
  • 支持数据实时处理;
  • 能保证消息的可靠性投递;
  • 支持消息的持久化存储,并通过多副本分布式的存储方案来保证消息的容错;
  • 高吞吐率,单 Broker 可以轻松处理数千个分区以及每秒百万级的消息量。

二、基本概念

2.1 Messages And Batches
Kafka 的基本数据单元被称为 message(消息),为减少网络开销,提高效率,多个消息会被放入同一批次 (Batch) 中后再写入。

2.2 Topics And Partitions
Kafka 的消息通过 Topics(主题) 进行分类,一个主题可以被分为若干个 Partitions(分区),一个分区就是一个提交日志 (commit log)。消息以追加的方式写入分区,然后以先入先出的顺序读取。Kafka 通过分区来实现数据的冗余和伸缩性,分区可以分布在不同的服务器上,这意味着一个 Topic 可以横跨多个服务器,以提供比单个服务器更强大的性能。

由于一个 Topic 包含多个分区,因此无法在整个 Topic 范围内保证消息的顺序性,但可以保证消息在单个分区内的顺序性。
在这里插入图片描述
2.3 Producers And Consumers

  1. 生产者
    生产者负责创建消息。一般情况下,生产者在把消息均衡地分布到在主题的所有分区上,而并不关心消息会被写到哪个分区。如果我们想要把消息写到指定的分区,可以通过自定义分区器来实现。

  2. 消费者
    消费者是消费者群组的一部分,消费者负责消费消息。消费者可以订阅一个或者多个主题,并按照消息生成的顺序来读取它们。消费者通过检查消息的偏移量 (offset) 来区分读取过的消息。偏移量是一个不断递增的数值,在创建消息时,Kafka 会把它添加到其中,在给定的分区里,每个消息的偏移量都是唯一的。消费者把每个分区最后读取的偏移量保存在 Zookeeper 或 Kafka 上,如果消费者关闭或者重启,它还可以重新获取该偏移量,以保证读取状态不会丢失。
    在这里插入图片描述
    一个分区只能被同一个消费者群组里面的一个消费者读取,但可以被不同消费者群组中所组成的多个消费者共同读取。多个消费者群组中消费者共同读取同一个主题时,彼此之间互不影响。
    在这里插入图片描述
    2.4 Brokers And Clusters
    一个独立的 Kafka 服务器被称为 Broker。Broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。Broker 为消费者提供服务,对读取分区的请求做出响应,返回已经提交到磁盘的消息。

Broker 是集群 (Cluster) 的组成部分。每一个集群都会选举出一个 Broker 作为集群控制器 (Controller),集群控制器负责管理工作,包括将分区分配给 Broker 和监控 Broker。

在集群中,一个分区 (Partition) 从属一个 Broker,该 Broker 被称为分区的首领 (Leader)。一个分区可以分配给多个 Brokers,这个时候会发生分区复制。这种复制机制为分区提供了消息冗余,如果有一个 Broker 失效,其他 Broker 可以接管领导权。
在这里插入图片描述

三、生产者使用详解

2.1 项目依赖

本项目采用 Maven 构建,想要调用 Kafka 生产者 API,需要导入 kafka-clients 依赖,如下:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.2.0</version>
</dependency>

2.2 创建生产者

创建 Kafka 生产者时,以下三个属性是必须指定的:

  • bootstrap.servers :指定 broker 的地址清单,清单里不需要包含所有的 broker 地址,生产者会从给定的broker 里查找 broker 的信息。不过建议至少要提供两个 broker 的信息作为容错;
  • key.serializer :指定键的序列化器;
  • value.serializer :指定值的序列化器。

创建的示例代码如下:

public class SimpleProducer {

    public static void main(String[] args) {

        String topicName = "Hello-Kafka";

        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop001:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        /*创建生产者*/
        Producer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 10; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "hello" + i, 
                                                                         "world" + i);
            /* 发送消息*/
            producer.send(record);
        }
        /*关闭生产者*/
        producer.close();
    }
}

2.3 测试

  1. 启动Kakfa
    Kafka 的运行依赖于 zookeeper,需要预先启动,可以启动 Kafka 内置的 zookeeper,也可以启动自己安装的:
# zookeeper启动命令
bin/zkServer.sh start

# 内置zookeeper启动命令
bin/zookeeper-server-start.sh config/zookeeper.properties
启动单节点 kafka 用于测试:

# bin/kafka-server-start.sh config/server.properties
# 创建用于测试主题
bin/kafka-topics.sh --create \
                    --bootstrap-server hadoop001:9092 \
                     --replication-factor 1 --partitions 1 \
                     --topic Hello-Kafka

# 查看所有主题
 bin/kafka-topics.sh --list --bootstrap-server hadoop001:9092
  1. 启动消费者
启动一个控制台消费者用于观察写入情况,启动命令如下:

# bin/kafka-console-consumer.sh --bootstrap-server hadoop001:9092 --topic Hello-Kafka --from-beginning

在这里插入图片描述
2.4 可能出现的问题
在这里可能出现的一个问题是:生产者程序在启动后,一直处于等待状态。这通常出现在你使用默认配置启动 Kafka 的情况下,此时需要对 server.properties 文件中的 listeners 配置进行更改:

# hadoop001 为我启动kafka服务的主机名,你可以换成自己的主机名或者ip地址
listeners=PLAINTEXT://hadoop001:9092

四、发送消息

上面的示例程序调用了 send 方法发送消息后没有做任何操作,在这种情况下,我们没有办法知道消息发送的结果。想要知道消息发送的结果,可以使用同步发送或者异步发送来实现。

2.1 同步发送
在调用 send 方法后可以接着调用 get() 方法,send 方法的返回值是一个 Future对象,RecordMetadata 里面包含了发送消息的主题、分区、偏移量等信息。改写后的代码如下:

for (int i = 0; i < 10; i++) {
    try {
        ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);
        /*同步发送消息*/
        RecordMetadata metadata = producer.send(record).get();
        System.out.printf("topic=%s, partition=%d, offset=%s \n",
                metadata.topic(), metadata.partition(), metadata.offset());
    } catch (InterruptedException | ExecutionException e) {
        e.printStackTrace();
    }
}

此时得到的输出如下:偏移量和调用次数有关,所有记录都分配到了 0 分区,这是因为在创建 Hello-Kafka 主题时候,使用 --partitions 指定其分区数为 1,即只有一个分区。

topic=Hello-Kafka, partition=0, offset=40 
topic=Hello-Kafka, partition=0, offset=41 
topic=Hello-Kafka, partition=0, offset=42 
topic=Hello-Kafka, partition=0, offset=43 
topic=Hello-Kafka, partition=0, offset=44 
topic=Hello-Kafka, partition=0, offset=45 
topic=Hello-Kafka, partition=0, offset=46 
topic=Hello-Kafka, partition=0, offset=47 
topic=Hello-Kafka, partition=0, offset=48 
topic=Hello-Kafka, partition=0, offset=49 

2.2 异步发送
通常我们并不关心发送成功的情况,更多关注的是失败的情况,因此 Kafka 提供了异步发送和回调函数。 代码如下:

for (int i = 0; i < 10; i++) {
    ProducerRecord<String, String> record = new ProducerRecord<>(topicName, "k" + i, "world" + i);
    /*异步发送消息,并监听回调*/
    producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata metadata, Exception exception) {
            if (exception != null) {
                System.out.println("进行异常处理");
            } else {
                System.out.printf("topic=%s, partition=%d, offset=%s \n",
                        metadata.topic(), metadata.partition(), metadata.offset());
            }
        }
    });
}

三、自定义分区器
Kafka 有着默认的分区机制:

如果键值为 null, 则使用轮询 (Round Robin) 算法将消息均衡地分布到各个分区上;
如果键值不为 null,那么 Kafka 会使用内置的散列算法对键进行散列,然后分布到各个分区上。
某些情况下,你可能有着自己的分区需求,这时候可以采用自定义分区器实现。这里给出一个自定义分区器的示例:

3.1 自定义分区器

/**
 * 自定义分区器
 */
public class CustomPartitioner implements Partitioner {

    private int passLine;

    @Override
    public void configure(Map<String, ?> configs) {
        /*从生产者配置中获取分数线*/
        passLine = (Integer) configs.get("pass.line");
    }

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, 
                         byte[] valueBytes, Cluster cluster) {
        /*key 值为分数,当分数大于分数线时候,分配到 1 分区,否则分配到 0 分区*/
        return (Integer) key >= passLine ? 1 : 0;
    }

    @Override
    public void close() {
        System.out.println("分区器关闭");
    }
}
需要在创建生产者时指定分区器,和分区器所需要的配置参数:

public class ProducerWithPartitioner {

    public static void main(String[] args) {

        String topicName = "Kafka-Partitioner-Test";

        Properties props = new Properties();
        props.put("bootstrap.servers", "hadoop001:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        /*传递自定义分区器*/
        props.put("partitioner.class", "com.heibaiying.producers.partitioners.CustomPartitioner");
        /*传递分区器所需的参数*/
        props.put("pass.line", 6);

        Producer<Integer, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i <= 10; i++) {
            String score = "score:" + i;
            ProducerRecord<Integer, String> record = new ProducerRecord<>(topicName, i, score);
            /*异步发送消息*/
            producer.send(record, (metadata, exception) ->
                    System.out.printf("%s, partition=%d, \n", score, metadata.partition()));
        }

        producer.close();
    }
}

3.2 测试
需要创建一个至少有两个分区的主题:

 bin/kafka-topics.sh --create \
                    --bootstrap-server hadoop001:9092 \
                     --replication-factor 1 --partitions 2 \
                     --topic Kafka-Partitioner-Test

此时输入如下,可以看到分数大于等于 6 分的都被分到 1 分区,而小于 6 分的都被分到了 0 分区。

score:6, partition=1, 
score:7, partition=1, 
score:8, partition=1, 
score:9, partition=1, 
score:10, partition=1, 
score:0, partition=0, 
score:1, partition=0, 
score:2, partition=0, 
score:3, partition=0, 
score:4, partition=0, 
score:5, partition=0, 

分区器关闭
四、生产者其他属性
上面生产者的创建都仅指定了服务地址,键序列化器、值序列化器,实际上 Kafka 的生产者还有很多可配置属性,如下:

  1. acks

acks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的:

acks=0 : 消息发送出去就认为已经成功了,不会等待任何来自服务器的响应; acks=1 :
只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应; acks=all
:只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。

  1. buffer.memory

设置生产者内存缓冲区的大小。

  1. compression.type

默认情况下,发送的消息不会被压缩。如果想要进行压缩,可以配置此参数,可选值有 snappy,gzip,lz4。

  1. retries

发生错误后,消息重发的次数。如果达到设定值,生产者就会放弃重试并返回错误。

  1. batch.size

当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。

  1. linger.ms

该参数制定了生产者在发送批次之前等待更多消息加入批次的时间。

  1. clent.id

客户端 id,服务器用来识别消息的来源。

  1. max.in.flight.requests.per.connection

指定了生产者在收到服务器响应之前可以发送多少个消息。它的值越高,就会占用越多的内存,不过也会提升吞吐量,把它设置为 1
可以保证消息是按照发送的顺序写入服务器,即使发生了重试。

  1. timeout.ms, request.timeout.ms & metadata.fetch.timeout.ms

timeout.ms 指定了 borker 等待同步副本返回消息的确认时间; request.timeout.ms
指定了生产者在发送数据时等待服务器返回响应的时间; metadata.fetch.timeout.ms
指定了生产者在获取元数据(比如分区首领是谁)时等待服务器返回响应的时间。

  1. max.block.ms

指定了在调用 send() 方法或使用 partitionsFor()方法获取元数据时生产者的阻塞时间。当生产者的发送缓冲区已满,或者没有可用的元数据时,这些方法会阻塞。在阻塞时间达到 max.block.ms 时,生产者会抛出超时异常。

  1. max.request.size

该参数用于控制生产者发送的请求大小。它可以指发送的单个消息的最大值,也可以指单个请求里所有消息总的大小。例如,假设这个值为 1000K,那么可以发送的单个最大消息为 1000K ,或者生产者可以在单个请求里发送一个批次,该批次包含了 1000 个消息,每个消息大小为 1K。

  1. receive.buffer.bytes & send.buffer.byte

这两个参数分别指定 TCP socket 接收和发送数据包缓冲区的大小,-1 代表使用操作系统的默认值。

五、消费者代码示例

在创建消费者的时候以下以下三个选项是必选的:

  • bootstrap.servers :指定 broker 的地址清单,清单里不需要包含所有的 broker 地址,生产者会从给定的
    broker 里查找 broker 的信息。不过建议至少要提供两个 broker 的信息作为容错;
  • key.deserializer :指定键的反序列化器;
  • value.deserializer :指定值的反序列化器。

除此之外你还需要指明你需要想订阅的主题,可以使用如下两个 API :

  • consumer.subscribe(Collection topics) :指明需要订阅的主题的集合;
  • consumer.subscribe(Pattern pattern) :使用正则来匹配需要订阅的集合


最后只需要通过轮询 API(poll) 向服务器定时请求数据。一旦消费者订阅了主题,轮询就会处理所有的细节,包括群组协调、分区再均衡、发送心跳和获取数据,这使得开发者只需要关注从分区返回的数据,然后进行业务处理。 示例如下:

String topic = "Hello-Kafka";
String group = "group1";
Properties props = new Properties();
props.put("bootstrap.servers", "hadoop001:9092");
/*指定分组 ID*/
props.put("group.id", group);
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

/*订阅主题 (s)*/
consumer.subscribe(Collections.singletonList(topic));

try {
    while (true) {
        /*轮询获取数据*/
        ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("topic = %s,partition = %d, key = %s, value = %s, offset = %d,\n",
           record.topic(), record.partition(), record.key(), record.value(), record.offset());
        }
    }
} finally {
    consumer.close();
}

4.1 同步提交
通过调用 consumer.commitSync() 来进行同步提交,不传递任何参数时提交的是当前轮询的最大偏移量。

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record);
    }
    /*同步提交*/
    consumer.commitSync();
}

如果某个提交失败,同步提交还会进行重试,这可以保证数据能够最大限度提交成功,但是同时也会降低程序的吞吐量。基于这个原因,Kafka 还提供了异步提交的 API。

4.2 异步提交
异步提交可以提高程序的吞吐量,因为此时你可以尽管请求数据,而不用等待 Broker 的响应。代码如下:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
    for (ConsumerRecord<String, String> record : records) {
        System.out.println(record);
    }
    /*异步提交并定义回调*/
    consumer.commitAsync(new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
          if (exception != null) {
             System.out.println("错误处理");
             offsets.forEach((x, y) -> System.out.printf("topic = %s,partition = %d, offset = %s \n",
                                                            x.topic(), x.partition(), y.offset()));
            }
        }
    });
}

异步提交存在的问题是,在提交失败的时候不会进行自动重试,实际上也不能进行自动重试。假设程序同时提交了 200 和 300 的偏移量,此时 200 的偏移量失败的,但是紧随其后的 300 的偏移量成功了,此时如果重试就会存在 200 覆盖 300 偏移量的可能。同步提交就不存在这个问题,因为在同步提交的情况下,300 的提交请求必须等待服务器返回 200 提交请求的成功反馈后才会发出。基于这个原因,某些情况下,需要同时组合同步和异步两种提交方式。

注:虽然程序不能在失败时候进行自动重试,但是我们是可以手动进行重试的,你可以通过一个 Map<TopicPartition, Integer> offsets 来维护你提交的每个分区的偏移量,然后当失败时候,你可以判断失败的偏移量是否小于你维护的同主题同分区的最后提交的偏移量,如果小于则代表你已经提交了更大的偏移量请求,此时不需要重试,否则就可以进行手动重试。

4.3 同步加异步提交
下面这种情况,在正常的轮询中使用异步提交来保证吞吐量,但是因为在最后即将要关闭消费者了,所以此时需要用同步提交来保证最大限度的提交成功。

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS));
        for (ConsumerRecord<String, String> record : records) {
            System.out.println(record);
        }
        // 异步提交
        consumer.commitAsync();
    }
} catch (Exception e) {
    e.printStackTrace();
} finally {
    try {
        // 因为即将要关闭消费者,所以要用同步提交保证提交成功
        consumer.commitSync();
    } finally {
        consumer.close();
    }
}

其他内容参考链接

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/471089.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

从零开始实现VAE和CVAE

扩散模型可以看作是一个层次很深的VAE(变分自编码器)&#xff0c;前向&#xff08;forward&#xff0c;或者译为正向&#xff09;的过程&#xff0c;通过在多个尺度上添加噪声来逐步扰乱数据分布&#xff1b;然后是反向的过程&#xff0c;去学习如何恢复数据结构&#xff0c;上…

喜报 | 国家发明专利证书! 再添2项!

​近日&#xff0c;擎创科技自主研发的《一种基于倒序表的实时日志聚类分析方法》以及《一种基于社区检测的运维告警场景生成方法》正式获得国家颁发的发明专利证书&#xff01;擎创的专业性、自主性、创新能力、技术水平以及研发实力在得到了确切的肯定。 作为智能运维领域领先…

DJ4-5 路由算法:LS 和 DV

目录 一、迪杰斯特拉算法 1. 术语定义 2. 算法描述 3. 举例说明 4. 构建从源节点到目的节点的路径 5. 构建最低费用路径树 6. 构建转发表 二、距离向量路由算法 1. 术语定义 2. 举例说明 3. 距离向量表 4. 更新距离向量表 5. 举例说明 三、距离向量路由算法 PLUS…

多维评测指标解读2022MSU世界编码器大赛结果

是极致性能&#xff0c;更是最佳商用。 19项第一之上&#xff0c;是63%的极致带宽降低 近日&#xff0c;2022 MSU世界视频编码器大赛成绩正式揭晓。报告显示&#xff0c;阿里媒体处理服务MPS&#xff08;Alibaba Media Processing Service&#xff09;s264及s265编码器共计斩获…

【黑马旅游案例记录(结合ES)】

黑马旅游案例记录 11.9.黑马旅游案例11.9.1.酒店搜索和分页11.9.1.1.需求分析11.9.1.2.定义实体类11.9.1.3.定义controller11.9.1.4.实现搜索业务 11.9.2.酒店结果过滤11.9.2.1.需求分析11.9.2.2.修改实体类11.9.2.3.修改搜索业务 11.9.3.我周边的酒店11.9.3.1.需求分析11.9.3.…

10 【Sass语法介绍-继承】

1.前言 在我们编写样式的时候&#xff0c;很多情况下我们几个不同的类会有相同的样式代码&#xff0c;同时这几个类又有其自己的样式代码&#xff0c;这使我们就可以通过 Sass 提供的继承 extend 来实现。本节内容我们将讲解 Sass 继承的语法以及继承的多重延伸等等&#xff0…

【无功功率控制】连接到无限电网的小型风电场的无功功率控制(Simulink)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

MongoDB【常用命令】

目录 1&#xff1a;基本常用命令 1.1&#xff1a;演示案例 1.2&#xff1a;数据库操作 1.2.1&#xff1a;选择和创建数据库&#xff0c;查看当前正在使用的数据库命令 1.2.2&#xff1a;数据库的删除 1.3&#xff1a;集合操作 1.3.1&#xff1a;集合的显式创建&#xff0…

安全意识培训:如何提高员工网络安全意识?

随着网络技术的不断发展和应用&#xff0c;网络安全已经成为企业必须关注和重视的问题。尤其是在今天&#xff0c;企业数字化转型的大背景下&#xff0c;网络安全问题日益凸显。对于企业而言&#xff0c;员工是企业安全的第一道防线&#xff0c;提高员工的网络安全意识已经成为…

制作自己的镜像并且推送到docker hub上去。

1、在docker hub(Docker)注册账号&#xff1a;比如我的账号是:zhangyi0833 2、在本机上制作自己已经安装了自己想要的工具的镜像&#xff0c;比如我这里安装了cgdb在centos8上面。通过命令制作自己的镜像&#xff1a; docker commit -m"提交的描述信息" -a"镜像…

如何复刻Midjourney的成功?

AI绘画的大模型和应用非常多&#xff0c;但最有名的非Stable Diffusion和Midjourney莫属&#xff0c;其中&#xff0c;尤其是Midjourney(以下简称MJ)&#xff0c;仅11位成员&#xff0c;8个研发人员中的一半都是尚未毕业的本科生,从未融资&#xff0c;成立3年&#xff0c;千万用…

(原创)Flutter基础入门:手把手教你搭建Flutter混合项目:模块代码依赖方式集成

前言 Flutter是Google开源的构建用户界面&#xff08;UI&#xff09;工具包 支持在不同平台构建一致的ui效果 但在实际业务中&#xff0c;一般不会整个APP都用纯Flutter开发 尤其一些老的项目&#xff0c;会采用接入Flutter的方式来混合开发 那么今天就主要讲一下如何搭建一个…

外卖app开发流程全解析

外卖app开发是现代餐饮业的一个必备部分。在这个数字化时代&#xff0c;人们更愿意使用手机应用程序来订购食品。因此&#xff0c;为了满足客户需求&#xff0c;餐饮企业需要开发自己的外卖app。 第一步&#xff1a;确定目标受众 在开始外卖app的开发之前&#xff0c;需要确定…

Shiro-721---漏洞复现

漏洞原理 Shiro rememberMe 反序列化远程代码执行漏洞 由于 Apache Shiro cookie 中通过 AES-128-CBC 模式加密的 rememberMe 字段存 在问题&#xff0c;用户可通过 Padding Oracle 加密生成的攻击代码来构造恶意的 rememberMe 字段&#xff0c;并重新请求网站&#xff0c;进…

latex论文排版个人向相关问题记录

很久没更新了&#xff0c;小论文基本都见刊了&#xff0c;记录下之前写论文碰上的latex一些排版问题吧&#xff0c;比较琐碎。 伪代码跨页问题 最开始使用algorithms包来写的伪代码&#xff0c;左边会有大方括号&#xff0c;蛮好看的。 不过使用algorithms包进行伪代码撰写&a…

Java语言----动态顺序表(ArrayList)

目录 一.顺序表 二.顺序表的手动实现 2.1顺序表的创建 2.2.基本功能的实现 2.2.1扩容顺序表 2.2.2 判断顺序表是否为满 2.2.3 判断顺序表是否为空 2.2.4打印顺序表 2.2.5清空顺序表 2.3四大功能的实现 2.3.1增加元素 2.3.2删除元素 2.3.3查找元素 2.3.4更改数据 总代码 &a…

记录-有意思的气泡 Loading 效果

这里给大家分享我在网上总结出来的一些知识&#xff0c;希望对大家有所帮助 今日&#xff0c;群友提问&#xff0c;如何实现这么一个 Loading 效果&#xff1a; 这个确实有点意思&#xff0c;但是这是 CSS 能够完成的&#xff1f; 没错&#xff0c;这个效果中的核心气泡效果&am…

【SpringCloud常见面试题】

SpringCloud常见面试题 1.微服务篇1.1.SpringCloud常见组件有哪些&#xff1f;1.2.Nacos的服务注册表结构是怎样的&#xff1f;1.3.Nacos如何支撑阿里内部数十万服务注册压力&#xff1f;1.4.Nacos如何避免并发读写冲突问题&#xff1f;1.5.Nacos与Eureka的区别有哪些&#xff…

易岸公考:公务员有五种类型可以挑选?

公务员分为国试、省试、选拔、乡镇公务员、选拔等不同考试&#xff1b; 不符合选拔选调生条件的&#xff0c;可选择国考、省考、乡镇公务员。 成为公务员后&#xff0c;遴选是你必不可少的晋升渠道。 一、国家考试 国考是指中央和国家机关的公务员考试&#xff0c;其招录机构…

关于TypeVariable的深度理解

在看java源码时&#xff0c;如果涉及到反射&#xff0c;会经常看到TypeVariable。 那么这玩意到底是个什么东西&#xff1f; 这是个必须要搞清楚的概念&#xff0c;否则很难理解源码的意图是什么&#xff1f; 我在这里先给出结论&#xff1a;这个问题的关键是具体类型和类型变…