Kafka学习笔记(基础篇)

news2025/1/13 10:22:02

目录

Kafka简介

消息队列

Kafka的应用场景

消息队列的两种模型

Kafka集群搭建

Kafka的生产者/消费者/工具

Kafka的基准测试工具

Kafka Java API开发

生产者程序开发

消费者程序开发

生产者使用异步方式生产消息

Kafka中的重要概念

消费者组

幂等性

事务编程

Kafka中的分区副本机制

生产者的分区写入策略

消费组Consumer Group Rebalance机制

消费者的分区分配策略

副本的ACK机制

高级API(High-Level API)、低级API(Low-Level API)

Kafka原理

leader和follower

AR\ISR\OSR

leader选举

Kafka读写流程

Kafka的物理存储

消息传递的语义性

Kafka的消息不丢失

数据积压

数据清理&配额限速


Kafka简介

消息队列

  • 消息队列——用于存放消息的组件

  • 程序员可以将消息放入到队列中,也可以从消息队列中获取消息

  • 很多时候消息队列不是一个永久性的存储,是作为临时存储存在的(设定一个期限:设置消息在MQ中保存10天)

  • 消息队列中间件:消息队列的组件,例如:Kafka、Active MQ、RabbitMQ、RocketMQ、ZeroMQ

Kafka的应用场景

  • 异步处理

    • 可以将一些比较耗时的操作放在其他系统中,通过消息队列将需要进行处理的消息进行存储,其他系统可以消费消息队列中的数据

    • 比较常见的:发送短信验证码、发送邮件

  • 系统解耦

    • 原先一个微服务是通过接口(HTTP)调用另一个微服务,这时候耦合很严重,只要接口发生变化就会导致系统不可用

    • 使用消息队列可以将系统进行解耦合,现在第一个微服务可以将消息放入到消息队列中,另一个微服务可以从消息队列中把消息取出来进行处理。进行系统解耦

  • 流量削峰

    • 因为消息队列是低延迟、高可靠、高吞吐的,可以应对大量并发

  • 日志处理

    • 可以使用消息队列作为临时存储,或者一种通信管道

消息队列的两种模型

  • 生产者、消费者模型

    • 生产者负责将消息生产到MQ中

    • 消费者负责从MQ中获取消息

    • 生产者和消费者是解耦的,可能是生产者一个程序、消费者是另外一个程序

  • 消息队列的模式

    • 点对点:一个消费者消费一个消息

    • 发布订阅:多个消费者可以消费一个消息

Kafka集群搭建

  • Kafka集群是必须要有ZooKeeper的

注意:

  • 每一个Kafka的节点都需要修改broker.id(每个节点的标识,不能重复)

  • log.dir数据存储目录需要配置

Kafka的生产者/消费者/工具

  • 安装Kafka集群,可以测试以下

    • 创建一个topic主题(消息都是存放在topic中,类似mysql建表的过程)

    • 基于kafka的内置测试生产者脚本来读取标准输入(键盘输入)的数据,并放入到topic中

    • 基于kafka的内置测试消费者脚本来消费topic中的数据

  • 推荐大家开发的使用Kafka Tool

    • 浏览Kafka集群节点、多少个topic、多少个分区

    • 创建topic/删除topic

    • 浏览ZooKeeper中的数据

Kafka的基准测试工具

  • Kafka中提供了内置的性能测试工具

    • 生产者:测试生产每秒传输的数据量(多少条数据、多少M的数据)

    • 消费者:测试消费每条拉取的数据量

  • 对比生产者和消费者:消费者的速度更快

Kafka Java API开发

生产者程序开发

  1. 创建连接

    • bootstrap.servers:Kafka的服务器地址

    • acks:表示当生产者生产数据到Kafka中,Kafka中会以什么样的策略返回

    • key.serializer:Kafka中的消息是以key、value键值对存储的,而且生产者生产的消息是需要在网络上传到的,这里指定的是StringSerializer方式,就是以字符串方式发送(将来还可以使用其他的一些序列化框架:Google ProtoBuf、Avro)

    • value.serializer:同上

  2. 创建一个生产者对象KafkaProducer

  3. 调用send方法发送消息(ProducerRecor,封装是key-value键值对)

  4. 调用Future.get表示等带服务端的响应

  5. 关闭生产者

public class KafkaProducerTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 创建用于连接Kafka的Properties配置
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1.itcast.cn:9092");
        props.put("acks", "all");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2. 创建一个生产者对象KafkaProducer
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        // 3. 发送1-100的消息到指定的topic中
        for(int i = 0; i < 100; ++i) {
            // 构建一条消息,直接new ProducerRecord
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", null, i + "");
            Future<RecordMetadata> future = kafkaProducer.send(producerRecord);
            // 调用Future的get方法等待响应
            future.get();
            System.out.println("第" + i + "条消息写入成功!");
        }

        // 4.关闭生产者
        kafkaProducer.close();
    }
}

消费者程序开发

  • group.id:消费者组的概念,可以在一个消费组中包含多个消费者。如果若干个消费者的group.id是一样的,表示它们就在一个组中,一个组中的消费者是共同消费Kafka中topic的数据。

  • Kafka是一种拉消息模式的消息队列,在消费者中会有一个offset,表示从哪条消息开始拉取数据

  • kafkaConsumer.poll:Kafka的消费者API是一批一批数据的拉取

/**
 * 消费者程序
 *
 * 1.创建Kafka消费者配置
 * Properties props = new Properties();
 * props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");
 * props.setProperty("group.id", "test");
 * props.setProperty("enable.auto.commit", "true");
 * props.setProperty("auto.commit.interval.ms", "1000");
 * props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 * props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 *
 * 2.创建Kafka消费者
 * 3.订阅要消费的主题
 * 4.使用一个while循环,不断从Kafka的topic中拉取消息
 * 5.将将记录(record)的offset、key、value都打印出来
 */
public class KafkaConsumerTest {

    public static void main(String[] args) {
        // 1.创建Kafka消费者配置
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");
        // 消费者组(可以使用消费者组将若干个消费者组织到一起),共同消费Kafka中topic的数据
        // 每一个消费者需要指定一个消费者组,如果消费者的组名是一样的,表示这几个消费者是一个组中的
        props.setProperty("group.id", "test");
        // 自动提交offset
        props.setProperty("enable.auto.commit", "true");
        // 自动提交offset的时间间隔
        props.setProperty("auto.commit.interval.ms", "1000");
        // 拉取的key、value数据的
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 2.创建Kafka消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

        // 3. 订阅要消费的主题
        // 指定消费者从哪个topic中拉取数据
        kafkaConsumer.subscribe(Arrays.asList("test"));

        // 4.使用一个while循环,不断从Kafka的topic中拉取消息
        while(true) {
            // Kafka的消费者一次拉取一批的数据
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(5));
            // 5.将将记录(record)的offset、key、value都打印出来
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                // 主题
                String topic = consumerRecord.topic();
                // offset:这条消息处于Kafka分区中的哪个位置
                long offset = consumerRecord.offset();
                // key\value
                String key = consumerRecord.key();
                String value = consumerRecord.value();

                System.out.println("topic: " + topic + " offset:" + offset + " key:" + key + " value:" + value);
            }
        }
    }
}

生产者使用异步方式生产消息

  • 使用匿名内部类实现Callback接口,该接口中表示Kafka服务器响应给客户端,会自动调用onCompletion方法

    • metadata:消息的元数据(属于哪个topic、属于哪个partition、对应的offset是什么)

    • exception:这个对象Kafka生产消息封装了出现的异常,如果为null,表示发送成功,如果不为null,表示出现异常。

// 二、使用异步回调的方式发送消息
ProducerRecord<String, String> producerRecord = new ProducerRecord<>("test", null, i + "");
kafkaProducer.send(producerRecord, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        // 1. 判断发送消息是否成功
        if(exception == null) {
            // 发送成功
            // 主题
            String topic = metadata.topic();
            // 分区id
            int partition = metadata.partition();
            // 偏移量
            long offset = metadata.offset();
            System.out.println("topic:" + topic + " 分区id:" + partition + " 偏移量:" + offset);
        }
        else {
            // 发送出现错误
            System.out.println("生产消息出现异常!");
            // 打印异常消息
            System.out.println(exception.getMessage());
            // 打印调用栈
            System.out.println(exception.getStackTrace());
        }
    }
});

Kafka中的重要概念

  • broker

    • Kafka服务器进程,生产者、消费者都要连接broker

    • 一个集群由多个broker组成,功能实现Kafka集群的负载均衡、容错

  • producer:生产者

  • consumer:消费者

  • topic:主题,一个Kafka集群中,可以包含多个topic。一个topic可以包含多个分区

    • 是一个逻辑结构,生产、消费消息都需要指定topic

  • partition:Kafka集群的分布式就是由分区来实现的。一个topic中的消息可以分布在topic中的不同partition中

  • replica:副本,实现Kafkaf集群的容错,实现partition的容错。一个topic至少应该包含大于1个的副本

  • consumer group:消费者组,一个消费者组中的消费者可以共同消费topic中的分区数据。每一个消费者组都一个唯一的名字。配置group.id一样的消费者是属于同一个组中

  • offset:偏移量。相对消费者、partition来说,可以通过offset来拉取数据

消费者组

  • 一个消费者组中可以包含多个消费者,共同来消费topic中的数据

  • 一个topic中如果只有一个分区,那么这个分区只能被某个组中的一个消费者消费

  • 有多少个分区,那么就可以被同一个组内的多少个消费者消费

幂等性

  • 生产者消息重复问题

    • Kafka生产者生产消息到partition,如果直接发送消息,kafka会将消息保存到分区中,但Kafka会返回一个ack给生产者,表示当前操作是否成功,是否已经保存了这条消息。如果ack响应的过程失败了,此时生产者会重试,继续发送没有发送成功的消息,Kafka又会保存一条一模一样的消息

  • 在Kafka中可以开启幂等性

    • 当Kafka的生产者生产消息时,会增加一个pid(生产者的唯一编号)和sequence number(针对消息的一个递增序列)

    • 发送消息,会连着pid和sequence number一块发送

    • kafka接收到消息,会将消息和pid、sequence number一并保存下来

    • 如果ack响应失败,生产者重试,再次发送消息时,Kafka会根据pid、sequence number是否需要再保存一条消息

    • 判断条件:生产者发送过来的sequence number 是否小于等于 partition中消息对应的sequence

事务编程

  • 开启事务的条件

    • 生产者

      // 开启事务必须要配置事务的ID
      props.put("transactional.id", "dwd_user");
    • 消费者

      // 配置事务的隔离级别
      props.put("isolation.level","read_committed");
      // 关闭自动提交,一会我们需要手动来提交offset,通过事务来维护offset
      props.setProperty("enable.auto.commit", "false");
    • 生产者

      • 初始化事务

      • 开启事务

      • 需要使用producer来将消费者的offset提交到事务中

      • 提交事务

      • 如果出现异常回滚事务

如果使用了事务,不要使用异步发送

public class TransactionProgram {
    public static void main(String[] args) {
        // 1. 调用之前实现的方法,创建消费者、生产者对象
        KafkaConsumer<String, String> consumer = createConsumer();
        KafkaProducer<String, String> producer = createProducer();

        // 2. 生产者调用initTransactions初始化事务
        producer.initTransactions();

        // 3. 编写一个while死循环,在while循环中不断拉取数据,进行处理后,再写入到指定的topic
        while(true) {
            try {
                // (1)	生产者开启事务
                producer.beginTransaction();

                // 这个Map保存了topic对应的partition的偏移量
                Map<TopicPartition, OffsetAndMetadata> offsetMap = new HashMap<>();

                // 从topic中拉取一批的数据
                // (2)	消费者拉取消息
                ConsumerRecords<String, String> concumserRecordArray = consumer.poll(Duration.ofSeconds(5));
                // (3)	遍历拉取到的消息,并进行预处理
                for (ConsumerRecord<String, String> cr : concumserRecordArray) {
                    // 将1转换为男,0转换为女
                    String msg = cr.value();
                    String[] fieldArray = msg.split(",");

                    // 将消息的偏移量保存
                    // 消费的是ods_user中的数据
                    String topic = cr.topic();
                    int partition = cr.partition();
                    long offset = cr.offset();

                	int i = 1 / 0;

                    // offset + 1:offset是当前消费的记录(消息)对应在partition中的offset,而我们希望下一次能继续从下一个消息消息
                    // 必须要+1,从能消费下一条消息
                    offsetMap.put(new TopicPartition(topic, partition), new OffsetAndMetadata(offset + 1));

                    // 将字段进行替换
                    if(fieldArray != null && fieldArray.length > 2) {
                        String sexField = fieldArray[1];
                        if(sexField.equals("1")) {
                            fieldArray[1] = "男";
                        }
                        else if(sexField.equals("0")){
                            fieldArray[1] = "女";
                        }
                    }

                    // 重新拼接字段
                    msg = fieldArray[0] + "," + fieldArray[1] + "," + fieldArray[2];

                    // (4)	生产消息到dwd_user topic中
                    ProducerRecord<String, String> dwdMsg = new ProducerRecord<>("dwd_user", msg);
                    // 发送消息
                    Future<RecordMetadata> future = producer.send(dwdMsg);
                    try {
                        future.get();
                    } catch (Exception e) {
                        e.printStackTrace();
                        producer.abortTransaction();
                    }
//                            new Callback()
//                    {
//                        @Override
//                        public void onCompletion(RecordMetadata metadata, Exception exception) {
//                            // 生产消息没有问题
//                            if(exception == null) {
//                                System.out.println("发送成功:" + dwdMsg);
//                            }
//                            else {
//                                System.out.println("生产消息失败:");
//                                System.out.println(exception.getMessage());
//                                System.out.println(exception.getStackTrace());
//                            }
//                        }
//                    });
                }

                producer.sendOffsetsToTransaction(offsetMap, "ods_user");

                // (6)	提交事务
                producer.commitTransaction();
            }catch (Exception e) {
                e.printStackTrace();
                // (7)	捕获异常,如果出现异常,则取消事务
                producer.abortTransaction();
            }
        }
    }

    // 一、创建一个消费者来消费ods_user中的数据
    private static KafkaConsumer<String, String> createConsumer() {
        // 1. 配置消费者的属性(添加对事务的支持)
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "node1.itcast.cn:9092");
        props.setProperty("group.id", "ods_user");
        // 配置事务的隔离级别
        props.put("isolation.level","read_committed");
        // 关闭自动提交,一会我们需要手动来提交offset,通过事务来维护offset
        props.setProperty("enable.auto.commit", "false");
        // 反序列化器
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        // 2. 构建消费者对象
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(props);

        // 3. 订阅一个topic
        kafkaConsumer.subscribe(Arrays.asList("ods_user"));

        return kafkaConsumer;

    }

    // 二、编写createProducer方法,用来创建一个带有事务配置的生产者
    private static KafkaProducer<String, String> createProducer() {
        // 1. 配置生产者带有事务配置的属性
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1.itcast.cn:9092");
        // 开启事务必须要配置事务的ID
        props.put("transactional.id", "dwd_user");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        // 2. 构建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props);

        return kafkaProducer;
    }
}

Kafka中的分区副本机制

生产者的分区写入策略

  • 轮询(按照消息尽量保证每个分区的负载)策略,消息会均匀地分布到每个partition

    • 写入消息的时候,key为null的时候,默认使用的是轮询策略

  • 随机策略(不使用)

  • 按key写入策略,key.hash() % 分区的数量

  • 自定义分区策略(类似于MapReduce指定分区)

乱序问题

  • 在Kafka中生产者是有写入策略,如果topic有多个分区,就会将数据分散在不同的partition中存储

  • 当partition数量大于1的时候,数据(消息)会打散分布在不同的partition中

  • 如果只有一个分区,消息是有序的

消费组Consumer Group Rebalance机制

  • 再均衡:在某些情况下,消费者组中的消费者消费的分区会产生变化,会导致消费者分配不均匀(例如:有两个消费者消费3个,因为某个partition崩溃了,还有一个消费者当前没有分区要削峰),Kafka Consumer Group就会启用rebalance机制,重新平衡这个Consumer Group内的消费者消费的分区分配。

  • 触发时机

    • 消费者数量发生变化

      • 某个消费者crash

      • 新增消费者

    • topic的数量发生变化

      • 某个topic被删除

    • partition的数量发生变化

      • 删除partition

      • 新增partition

  • 不良影响

    • 发生rebalance,所有的consumer将不再工作,共同来参与再均衡,直到每个消费者都已经被成功分配所需要消费的分区为止(rebalance结束)

消费者的分区分配策略

分区分配策略:保障每个消费者尽量能够均衡地消费分区的数据,不能出现某个消费者消费分区的数量特别多,某个消费者消费的分区特别少

  • Range分配策略(范围分配策略):Kafka默认的分配策略

    • n:分区的数量 / 消费者数量

    • m:分区的数量 % 消费者数量

    • 前m个消费者消费n+1个分区

    • 剩余的消费者消费n个分区

  • RoundRobin分配策略(轮询分配策略)

    • 消费者挨个分配消费的分区

  • Striky粘性分配策略

    • 在没有发生rebalance跟轮询分配策略是一致的

    • 发生了rebalance,轮询分配策略,重新走一遍轮询分配的过程。而粘性会保证跟上一次的尽量一致,只是将新的需要分配的分区,均匀的分配到现有可用的消费者中即可

    • 减少上下文的切换

副本的ACK机制

producer是不断地往Kafka中写入数据,写入数据会有一个返回结果,表示是否写入成功。这里对应有一个ACKs的配置。

  • acks = 0:生产者只管写入,不管是否写入成功,可能会数据丢失。性能是最好的

  • acks = 1:生产者会等到leader分区写入成功后,返回成功,接着发送下一条

  • acks = -1/all:确保消息写入到leader分区、还确保消息写入到对应副本都成功后,接着发送下一条,性能是最差的

根据业务情况来选择ack机制,是要求性能最高,一部分数据丢失影响不大,可以选择0/1。如果要求数据一定不能丢失,就得配置为-1/all。

分区中是有leader和follower的概念,为了确保消费者消费的数据是一致的,只能从分区leader去读写消息,follower做的事情就是同步数据,Backup。

高级API(High-Level API)、低级API(Low-Level API)

  • 高级API就是直接让Kafka帮助管理、处理分配、数据

    • offset存储在ZK中

    • 由kafka的rebalance来控制消费者分配的分区

    • 开发起来比较简单,无需开发者关注底层细节

    • 无法做到细粒度的控制

  • 低级API:由编写的程序自己控制逻辑

    • 自己来管理Offset,可以将offset存储在ZK、MySQL、Redis、HBase、Flink的状态存储

    • 指定消费者拉取某个分区的数据

    • 可以做到细粒度的控制

    • 原有的Kafka的策略会失效,需要我们自己来实现消费机制

Kafka原理

Kafka的基础架构

leader和follower

  • Kafka中的leader和follower是相对分区有意义,不是相对broker

  • Kafka在创建topic的时候,会尽量分配分区的leader在不同的broker中,其实就是负载均衡

  • leader职责:读写数据

  • follower职责:同步数据、参与选举(leader crash之后,会选举一个follower重新成为分区的leader

  • 注意和ZooKeeper区分

    • ZK的leader负责读、写,follower可以读取

    • Kafka的leader负责读写、follower不能读写数据(确保每个消费者消费的数据是一致的),Kafka一个topic有多个分区leader,一样可以实现数据操作的负载均衡

AR\ISR\OSR

  • AR表示一个topic下的所有副本

  • ISR:In Sync Replicas,正在同步的副本(可以理解为当前有几个follower是存活的)

  • OSR:Out of Sync Replicas,不再同步的副本

  • AR = ISR + OSR

leader选举

  • Controller:controller是kafka集群的老大,是针对Broker的一个角色

    • Controller是高可用的,是用过ZK来进行选举

  • Leader:是针对partition的一个角色

    • Leader是通过ISR来进行快速选举

  • 如果Kafka是基于ZK来进行选举,ZK的压力可能会比较大。例如:某个节点崩溃,这个节点上不仅仅只有一个leader,是有不少的leader需要选举。通过ISR快速进行选举。

  • leader的负载均衡

    • 如果某个broker crash之后,就可能会导致partition的leader分布不均匀,就是一个broker上存在一个topic下不同partition的leader

    • 通过以下指令,可以将leader分配到优先的leader对应的broker,确保leader是均匀分配的

    bin/kafka-leader-election.sh --bootstrap-server node1.itcast.cn:9092 --topic test --partition=2 --election-type preferred

Kafka读写流程

  • 写流程

    • 通过ZooKeeper找partition对应的leader,leader是负责写的

    • producer开始写入数据

    • ISR里面的follower开始同步数据,并返回给leader ACK

    • 返回给producer ACK

  • 读流程

    • 通过ZooKeeper找partition对应的leader,leader是负责读的

    • 通过ZooKeeper找到消费者对应的offset

    • 然后开始从offset往后顺序拉取数据

    • 提交offset(自动提交——每隔多少秒提交一次offset、手动提交——放入到事务中提交)

Kafka的物理存储

  • Kafka的数据组织结构

    • topic

    • partition

    • segment

      • .log数据文件

      • .index(稀疏索引)

      • .timeindex(根据时间做的索引)

  • 深入了解读数据的流程

    • 消费者的offset是一个针对partition全局offset

    • 可以根据这个offset找到segment段

    • 接着需要将全局的offset转换成segment的局部offset

    • 根据局部的offset,就可以从(.index稀疏索引)找到对应的数据位置

    • 开始顺序读取

消息传递的语义性

Flink里面有对应的每种不同机制的保证,提供Exactly-Once保障(二阶段事务提交方式)

  • At-most once:最多一次(只管把数据消费到,不管有没有成功,可能会有数据丢失)

  • At-least once:最少一次(有可能会出现重复消费)

  • Exactly-Once:仅有一次(事务性性的保障,保证消息有且仅被处理一次)

Kafka的消息不丢失

  • broker消息不丢失:因为有副本relicas的存在,会不断地从leader中同步副本,所以,一个broker crash,不会导致数据丢失,除非是只有一个副本。

  • 生产者消息不丢失:ACK机制(配置成ALL/-1)、配置0或者1有可能会存在丢失

  • 消费者消费不丢失:重点控制offset

    • At-least once:一种数据可能会重复消费

    • Exactly-Once:仅被一次消费

数据积压

  • 数据积压指的是消费者因为有一些外部的IO、一些比较耗时的操作(Full GC——Stop the world),就会造成消息在partition中一直存在得不到消费,就会产生数据积压

  • 在企业中,我们要有监控系统,如果出现这种情况,需要尽快处理。虽然后续的Spark Streaming/Flink可以实现背压机制,但是数据累积太多一定对实时系统它的实时性是有说影响的

数据清理&配额限速

  • 数据清理

    • Log Deletion(日志删除):如果消息达到一定的条件(时间、日志大小、offset大小),Kafka就会自动将日志设置为待删除(segment端的后缀名会以 .delete结尾),日志管理程序会定期清理这些日志

      • 默认是7天过期

    • Log Compaction(日志合并)

      • 如果在一些key-value数据中,一个key可以对应多个不同版本的value

      • 经过日志合并,就会只保留最新的一个版本

  • 配额限速

    • 可以限制Producer、Consumer的速率

    • 防止Kafka的速度过快,占用整个服务器(broker)的所有IO资源

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/720439.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

英文单词的3σ值

最近做log的nlp&#xff0c;发现日志当中有一些很长的但是无意义的词汇&#xff0c;很影响训练模型&#xff0c;这边想通过单次长度去排除那些无意义词汇&#xff0c;去查了gpt英文单次的3σ值&#xff0c;记录下

谷歌浏览器中的谷歌翻译失效了?如何解决谷歌翻译不响应问题?

1 原因分析 因为谷歌把国内的服务器关了。 2 下载软件 &#xff08;1&#xff09;Mac OS https://github.com/Ponderfly/GoogleTranslateIpCheck/releases/download/1.6/GoogleTranslateIpCheck-mac-x64.zip&#xff08;2&#xff09;Windows https://github.com/Ponderfl…

【MySQL体系结构及CetOS7安装MySQL和修改密码】

MySQL体系结构及安装MySQL MySQL体系结构CentOS7安装MySQL四种方法1、离线安装2、在线安装3、通用二级制方式4、容器方式安装 设置及修改密码忘记密码恢复 MySQL体系结构 MySQL是一种常用的关系型数据库管理系统&#xff08;RDBMS&#xff09;&#xff0c;其体系结构包括以下&…

ARM架构(ARM汇编指令练习)

文章目录 前言一、MOV指令二、内存访问指令1.LDR指令2.STR指令 三、数据处理指令1.ADD指令2.SUB指令3.位操作指令4.CMP指令 四、跳转指令总结 前言 本篇文章带大家来学习ARM汇编的一些常用的指令&#xff0c;这里指令都是非常基础的指令。 一、MOV指令 MOV&#xff08;Move&…

一个 Kill 不掉的 MySQL 会话

究竟是什么原因&#xff0c;导致主从切换过程中存在一个无法 kill 的会话&#xff1f; 作者&#xff1a;秦广飞 爱可生 DBA 团队成员&#xff0c;负责项目日常问题处理及公司平台问题排查&#xff0c;对数据库有兴趣&#xff0c;对技术有想法。一入 IT 深似海&#xff0c;从此节…

【钱处理】商业计算怎样才能保证精度不丢失

以项目驱动学习&#xff0c;以实践检验真知 前言 很多系统都有「处理金额」的需求&#xff0c;比如电商系统、财务系统、收银系统&#xff0c;等等。只要和钱扯上关系&#xff0c;就不得不打起十二万分精神来对待&#xff0c;一分一毫都不能出错&#xff0c;否则对系统和用户来…

集群 第一章

目录 1.群集的含义 2.群集分类 3.群集架构 4.负载调度工作模式 5.lvs 虚拟服务器 6.nat 模式 lvs 负载均衡群集部署 7.总结 1.群集的含义 由多台主机构成&#xff0c;但对外只表现为一个整体&#xff0c;只提供一个访问入口&#xff08;域名与IP地址&#xff09;&#…

建筑师们,你们该把三维模型弄到PPT里做汇报了!

➤如何实现项目汇报效率比传统的PPT高&#xff0c;同时汇报效果更直观&#xff1f; ➤如何摆脱方案汇报只能向客户交付数据&#xff0c;安装专业软件查看项目成果&#xff1f; ➤如何将无人机航测数据、CAD图纸、BIM设计成果进行融合&#xff0c;挖掘出更深层次的应用&#x…

mysql体系结构及安装部署mysql

目录 1.体系结构 2.安装mysql 1.yum 源安装 2. 第二种安装方式-通用二进制方式 3.mysql改密方式 第一种&#xff0c;知道密码的情况下 第二种&#xff0c;不知道密码 1.体系结构 MySQL server连接层连接池&#xff08;缓冲池&#xff09;SQL层系统管理和控制工具SQL…

亿级日活业务稳如磐石 华为云发布性能测试服务CodeArts PerfTest

HDC期间可参与华为云PaaS生态抽奖活动&#xff0c;活动链接在文末 计算机软件作为人类逻辑智慧的伟大结晶之一&#xff0c;已经渗透到了人类社会的各个角落。早期的计算机发展对硬件有很强的依赖性&#xff0c;只有少数的个人或者机构才能拥有软件这种“奢侈品”。但随着软件行…

了解和搭建zabbix 6.0(来日方长)

前言&#xff1a; 作为一个运维&#xff0c;需要会使用监控系统查看服务器系统性能、应用服务状态和网站流量指标等&#xff0c;利用监控系统的数据去了解网站上线发布的结果和健康状态。 利用一个优秀的监控软件&#xff0c;我们可以: 通过一个友好的界面进行浏览整个网站所有…

OpenAI呼吁超级智能需要被监管

ChatGPT的创建者OpenAI发出了对超级智能监管的发人深省的呼吁&#xff0c;并与核能的监管进行了类比。在最近的一篇博客文章中&#xff0c;OpenAI强调了人工智能快速发展可能产生的影响以及这个不断变化的领域治理的紧迫性。该公司表示&#xff0c;预计人工智能系统将在未来十年…

你一定要用这个API管理工具,看完你就知道为什么了

以下是经常发生在程序员之间的对话&#xff1a; 小张&#xff1a;你知道为什么程序员不喜欢写文档&#xff1f; 小王&#xff1a;因为代码就是最好的文档啊&#xff01;谁还需要写那些冗长的说明呢&#xff1f; 小张&#xff1a;那你知道为什么程序员也不喜欢别人不写文档吗…

JAVA POI的excel中包含图片进行读取保存,单张图片,多张图片

JAVA POI的excel中包含图片进行读取保存,单张图片,多张图片 ---------------------------------------------效果---------------------------------------------------------- 1.单张图片 2.多张图片 import java.io.FileInputStream; import java.io.FileOutputStream;…

途乐证券|有色金属板块崛起涨超2%,云南锗业两连板

周三(7月5日)&#xff0c;A股三大股指震荡整理。截至上午收盘&#xff0c;上证指数跌幅达0.51%&#xff0c;报3228.68点&#xff1b;深证成指和创业板指跌幅分别为0.53%和0.59%&#xff1b;沪深两市合计成交额5310.1.6亿元&#xff0c;总体来看&#xff0c;两市个股跌多涨少。 …

机器学习25:《数据准备和特征工程-III》采样和分隔

目录 1.采样和分割数据 1.1 抽样简介 1.2 过滤 PII&#xff08;个人身份信息&#xff09; 2.数据不平衡 2.1 下采样和增加权重 3.数据分割示例 3.1 随机分割可能不是最好的方法 4.分割数据 5.随机化 5.1 实际考虑 5.2 散列的注意事项 6.参考文献 1.采样和分割数据-…

2023,中国电商重回元老时代

中国的历史上不缺“太上皇”&#xff0c;但“太上皇”再度站到台前的很少。公元1457年&#xff0c;被囚禁在南宫的“太上皇”朱祁镇复位&#xff0c;上演了中国历史上少见的南宫复辟。而危机时刻被推举为皇帝的朱祁钰&#xff0c;后来的庙号是代宗&#xff0c;阴阳怪气十足。 …

php://input文件包含

实验目的 通过本实验&#xff0c;了解php封装伪协议&#xff0c;掌握php://input文件包含的用法 实验环境 操作机&#xff1a;kali 靶机&#xff1a;Windows 实验地址&#xff1a;http://靶机ip/exp/include2/input/input2/ 工具&#xff1a;burpsuite 用户名&#xff1a…

好用的屏幕录制工具--Bandicam(班迪录屏)

最近准备录个电脑的音频&#xff0c;找了好多个软件&#xff0c;除了收费就是功能有限&#xff0c;经过好一番折腾试用&#xff0c;发现了2个介面友好功能强大的录屏、录音软件&#xff1a; ① Bandicam(班迪录屏)&#xff08;强烈推荐&#xff09; ② 数据蛙录屏 Bandicam(班迪…

【PC】CPU与GPU

文章目录 CPU与主板CPU是什么主板是什么功能 GPU与显卡GPU是什么显卡是什么功能 CPU与GPU的关系 ALU&#xff1a; 算术单元&#xff08;Arithmetic Unit&#xff09;&#xff1a;算术单元执行基本的算术运算&#xff0c;如加法、减法、乘法和除法。它能够对整数、浮点数和定点数…