KafKa 3.x(二、Broker,消费者)

news2025/1/12 3:53:54

4. Kafka Broker

4.1 kafka Broker工作流程

4.1.1 Zoopkeeper存储的Kafka信息

  1. 启动Zookeeper客户端
  2. 通过ls命令查看kafka相关信息
    zookeeper中存储的kafka信息
    在Zookeeper的服务端存储的Kafka相关信息
  1. /kafka/brokers/ids [0,1,2] 记录那些服务器
  2. /kafka/brokers/topics/first/partitions/0/state {“leader”:1,“isr”:[1,0,2]} 记录谁是leader,有哪些服务器可用
  3. /kafka/controller {“brokerid”:0} 辅助选择Leader
    可借助prettyZoo可视化工具查看具体信息:
    prettyZoo

4.1.2 Kafka Broker总体工作流程

KafkaLeader选举工作流程

4.2 生产经验-节点服役与退役

4.2.1 服务新节点

  1. 新节点准备
  • 关闭一台旧服务器,执行克隆操作
  • 开启新服务器的kafka,并i需改IP地址
  • 在新服务器上修改主机名。
  • 重启新旧服务器上的ksfka
  • 修改新服务器kafka的broker.id
  • 删除新服务器上的logs和datas文件
  • 启动旧服务器上kafka集群
  • 单独启动新服务器的kafka
  1. 执行负载均衡操作
  • 创建一个要均衡的主题
  • 生成一个负载均衡的计划
  • 创建副本存储计划(所有副本存储在broker0,broker1,broker2,broker3中)。
  • 执行副本计划
  • 验证

4.2.2 退役旧节点

  1. 执行负载均衡操作:先按照退役一台节点,生成执行计划,然后按照服役时操作流程执行负载均衡。
  • 创建一个要均衡的主题
  • 创建执行计划
  • 创建副本存储计划(所有副本存储在broker0,broker1,broker2中)。
  • 关闭新的kafka服务

4.3 Kafka副本

4.3.1 副本基本信息

  1. Kafka副本作用:提高数据可靠性
  2. Kafka默认副本1个,生成环境一般配置2个,保证数据可靠性;多副本会增加磁盘空间,网络速率,降低效率
  3. Kafka副本分为:Leader和Follower。生产者只会把数据发往Leader,然后同步给Follower
  4. Kafka分区中所有副本统称AR(Assigned Repllicas)
    AR = ISR + OSR
    ISR:表示和leader保持同步的follower集合。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR,由replicas.lag.time.max.ms设置,默认30s。Leader发送故障,就会从ISR中选举新Leader。
    OSR:表示Follower与Leader副本同步时,延迟过多的副本。

4.3.2 Leader选举流程

Kafka集群中有一个broker的Controller会被选举未Controller Leader,负责管理集群broker的上下限,所有topic的分区副本分配Leader选举等工作。
Controller的信息同步工作是依赖于Zookeeper的。

4.3.3 Leader和Follower故障处理细节

LEO(Log End Offset):每个副本的最后一个offset,LEO其实就是最新的offset+1
HW(High Watermark):所有副本中最小的LEO

  1. Follower故障
    Follower副本
  • Follower故障发生后会被临时踢出ISR
  • 这个期间LeaderheFollower继续接收数据
  • 待该Follower恢复,Follower会读取本地磁盘记录上次的HW,并将log文件高于HW的部分截取掉,从HW开始向Leader进行同步。
  • 等该Follower的LEO>=该Partition的HW,即Follower追上Leader之后,就可重新加入ISR了。
  1. Leader故障
  • Leader发生故障后,会从ISR中选出新的Leader
  • 为保证多个副本之间的数据一致性,其余的Follower会先将各自的log文件高于HW的部分截掉,然后从新的Leader同步数据。
    注意:这只能保证副本之间的数据一致性,并不能保证数据不丢失或不重复

4.3.4 分区副本配置

如果kafka服务器只有4个节点,那么设置kafka的分区数>服务器数,在kafka中如何分配?

  1. 创建16个分区3个副本:/bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic second --partitions 16 -replication -factor 3
  2. 查看分区和副本:bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic second

4.3.5 生成经验-手动调整分区副本

在生产环境中,每台服务器配置和性能不一样,但是kafka只会根据自己的代码规则创建对应的分区副本,就会导致个别服务器压力大。所以需要手动调整分区副本存储。
需求:创建一个新的topic,4个分区,两个副本,名称为three。将该topic的所有副本都存储到broker0和broker1两台服务器上。

  1. 创建新的topic
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --parititions4 --replication-factor 2 --topic three
  2. 查看分区副本存储情况
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic three
  3. 创建副本存储计划(所有副本都指定存储在broker0,broker1中)
    vim increase-replication-factor.json
    {
    “version”:1
    “partitions”:[{“topic”:“three”,“partition”:0,“replicas”:[0,1]},
    {“topic”:“three”,“partition”:1,“replicas”:[0,1]},
    {“topic”:“three”,“partition”:2,“replicas”:[1,0]},
    {“topic”:“three”,“partition”:3,“replicas”:[1,0]},]
    }
  4. 执行副本存储计划
    bin/kafka-reassign-partitions,sh --bootstrap-server localhost:9092 --reassignment-json-fileincrease-replication-factor.json --execute
  5. 验证副本存储计划
    bin/kafka-reassign-partitions,sh --bootstrap-server localhost:9092 --reassignment-json-fileincrease-replication-factor.json --verify
  6. 查看分区副本存储情况
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic three

4.3.6 生成经验-Leader Partition 负载均衡

如果有broker宕机,会导致LP集中在几台broker上,其他宕机broker重启后都是follower partition,读写请求低,造成集群负载不平衡。
broker宕机

  • auto.Leader.rebalance.enable,默认true。 自动LeaderPartition平衡。
  • leader.imbalance.per.broker.percentage,默认10%。每个broker允许的不平衡的leader的比率。如果每个broker超值,控制器会触发leader的平衡。
  • leader.imbalance.check.interval.seconds,默认300s,检查leader负载是否平衡的间隔时间。
    如下,假设集群只有一个主题:
    Topic:xuyu partition:0 Leader:0 Replicas:3,0,2,1 Isr:3,0,2,1
    Topic:xuyu partition:1 Leader:1 Replicas:1,2,3,0 Isr:1,2,3,0
    Topic:xuyu partition:2 Leader:2 Replicas:0,3,1,2 Isr:0,3,1,2
    Topic:xuyu partition:3 Leader:3 Replicas:2,1,0,3 Isr:2,1,0,3
    针对broker0节点,分区2的AR优先副本是0节点,但是0节点却不是leader,所以不平衡数+1,AR副本总数是4,所以broker0节点不平衡率为1/4>10%,需要再平衡。
    broker1的不平衡为0,不需要再平衡。

4.3.7 生成经验-增加副本因子

生产环境中,由于某个主题重要等级需要提升,需要增加副本,副本数需要先指定计划,然后根据计划执行。

  1. 创建topic
    bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --partitions 3 --replication-factor 1 --topic four
  2. 手动增加副本存储
    创建副本存储计划
    vim increase-replication-factor.json
    {“version”:1,“partitions”:[{“topic”:“four”,“partition”:0,“replicas”:[0,1,2]},{“topic”:“four”,“partition”:1,“replicas”:[0,1,2]},{“topic”:“four”,“partition”:2,“replicas”:[0,1,2]}]}
    执行:
    bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file-increase-replication-factor.json --execute

4.4 Kafka文件存储

4.4.1 文件存储机制

  1. Topic数据的存储机制
    topic是逻辑上的概念,partition是物理上的概念,每个partition对应一个log文件,该文件中存储的就是Producer生产的数据。Producer生产的数据会被不断追加到该log文件末端,为防止log文件过大导致数据定位效率低,kafka采用分片和索引机制,将每个partition分为多个segment。每个segment包括:“index文件、log文件、timeindex文件等”。这些文件在一个文件下,该文件命名规则:topic名+分区序号。
    Kafka文件存储机制
  2. 思考:topic数据到底存储在什么位置?
  • 启动生产者发送消息
    bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic first
  • 查看localhost的/opt/module/kafka/datas/first路径上的文件,windows看log文件
    topic数据
  • 直接查看log日志,是乱码
  • 通过工具查看index和log信息
    kafka-run-classs.sh kafka.tools.DumpLogSegments --files ./000000.index
  1. index文件和log文件详解
    注意:
  • index为稀疏索引,大约每往log文件写入4kb数据,会往index文件写入一条索引。参数log.index.interval.bytes默认4kb。
  • index文件中保存的offset为相对offset,这样能确保offset的值所占空间不会过大,因此能将offset的值控制在固定大小。
    index和log文件详解

4.4.2 文件清楚策略

kafka中日志默认保存7天,可通过一下参数调整:log.retenion.hours(保存时间),log.retention.check.interval.ms(检测时间)。如果超时,kafka会提供delete和compact两种删除方式。

  1. delete:
  • log.cleanup.policy=delete所有数据启用
    基于时间:默认打开,以segment中所有记录中最大时间戳作为该文件时间戳。
    基于大小:默认关闭,超过设置大小,删除最早的segment,log.retention.bytes。默认-1,表示无穷大。
  1. compact日志压缩:对于相同key的不同value值,只保留最后一个版本。
  • log.cleanup.policy=compact所有数据启用压缩策略。
    compact文件压缩
    压缩后offset不连续,需要拿到大于一个当前offset的对应消息进行消费。
    注意:此种只适合特殊场景,比如key为id,val为值,通过压缩,整个消息集群种保存了所有用户最新的资料。

4.5 Kafka高效读写数据

  1. kafka本身是分布式集群,可采用分区技术,并行度高
  2. 读数据采用稀疏索引,可快读定位要消费的数据
  3. 顺序写磁盘:kafka的producer生产数据,要写入到log文件中,写的过程是追加到文件末端,为顺序写,
  4. 页缓存+零拷贝
    零拷贝:kafka的数据加工处理操作交由kafka生产者和kafka消费者处理。kafkaBroker应用层不关心存储的数据,所以不走应用层,效率高。
    PageCache页缓存:kafka重度依赖底层操作系统提供的PageCache功能。当上层有写操作时,操作系统只是将数据写入PageCache。当读时,从PageCacha中查找,找不到再去磁盘。
    kafka零拷贝

5. Kafka消费者

5.1 Kafka消费方式

pull拉模式(Kafka采用):consumer采用从broker中主动拉取数据,因为每个消费者处理能力不同。
push推模式:由于由broker决定消息频率,消费者难适应,处理不足。
pull缺点:如果没有数,消费者进入循环空数据。

5.2 Kafka消费者工作流程

5.2.1 消费者总体工作流程

消费者总体工作流程

5.2.2 消费者组原理

消费者组Consumer Group(GC):消费者组由多个consumer组成。组中消费者groupid相同。

  • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费
  • 消费者组之间互不影响。所有消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者
    消费者组初始化流程:
  1. coordinator:辅助实现消费者组的初始化和分区分配
    coordinator节点选择=groupid的hashcode值%50(__consumer_offsets的分区数量)。eg:groupid的哈希为1,1%50=1,那么其主题1号分区,在哪个broker上,就选择这个节点的coordinator作为这个消费者组的老大。消费者组下的所有消费者提交offset的时候就往这个分区去提交offset。
    coordinator辅助初始化
    消费者组详细消费流程:
    消费者组流程

5.3.1 独立消费者案例(订阅主题)

  1. 需求:创建独立消费者,消费first主题数据。
    注意:在消费者API中必须配置消费者组id,命令行启动消费者不填写消费者组id会被自动填写随机消费者组id
  2. 实现步骤
public class CustomConsumer {
    public static void main(String[] args) {
        //0 配置
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092,node2:9092,node3:9092");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 配置消费者组id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");
        //1 创建一个消费者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
        //2 定义消费主题first
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        kafkaConsumer.subscribe(topics);
        //3 消费数据
        while (true) {
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }
    }
}

5.3.1 独立消费者案例(订阅分区)

需求:创建一个独立消费者,消费first主题0号分区的数据。
实现步骤:

//2 订阅主题对应的分区
ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
topicPartitions.add(new TopicPartition("first", 0)); //0号分区
kafkaConsumer.assign(topicPartitions);

5.3.1 消费者组案例

需求:测试同一个主题的分区数据,只能由一个消费者组中的一个消费。
不同消费者配置相同消费者组id即可,消费者底层会自动分区。
properties.put(ConsumerConfig.GROUP_ID_CONFIG,“test”);

5.4 生产经验-分区的分配以及再平衡

  1. 一个消费者组有消费者组成,一个topic有多个partition组成,到底由哪个消费者消费哪个partition的数据呢?
  2. kafka有4种主流的分区分配策略:Range、RoundRobin、Sticky、CooperativeSticky。可配置partition.assignment.strategy修改。默认Range+CooperativeSticky。可多选。分区平衡策略

5.4.1 Range以及再平衡

  1. 分区分配策略之range
    首先对同一个topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。通过partition数/consumer数来决定每个消费者应该消费几个分区。如果除不尽,那么全面几个消费者将会多消费1个分区
    注意:如果只是针对一个topic而言,c0消费者多消费1个分区影响不大。但是如果有N个topic,那么每个topic,消费者C0都将多消费1个分区,topic越多,C0消费的分区会比其他消费者明显多消费N个分区。 容易产生数据倾斜
  2. Range分区分配策略案例
  • 修改主题first为7个分区
  • bin/kafka-topics.sh --bootstrap-server node1:9092 --alter --topic first --partitions 7
    注意: 分区数只能增加,不能减少。
  • 复制CustomConsumer类,创建CustomConsumer2,3。这样可由三个消费者CustomConsumer1,2,3组成消费者组,组名都为"test",同时启动三个消费者。
  • 启动CustomProducer,发送500个消息。
    consumer0消费到0,1,2分区的随数据,consumer1消费到3,4分区的数据,consumer2消费到5,6分区数据
  • 关闭CustomConsumer,在心跳(45s)内再次发送数据
    consumer1先消费3,4分区数据,consumer2消费到5,6分区数据,在心跳断开后consumer1继续消费0,1,2分区数据
  • 关闭CustomConsumer心跳(45s)后,再次发送数据
    consumer1先消费0,1,2,3,4分区数据,consumer2消费到5,6分区数据
    说明:消费者0已经被踢出消费者组,所以重新按照range方式分配

5.4.2 RoundRobin以及再平衡

  1. RoundRobin分区策略原理
    RoundRobin针对集群种所有topic而言的。
    RoundRobin轮询分区策略是把所有的consumer都列出来,然后按照hashcode进行排序,最后通过轮询算法来分配partition给到各个消费者。
    2.RoundRobin分区分配策略案例
  • 同上,将CustomConsumer,1,2中消费者代码中修改策略为RoundRobin。
    //properties.put(ConsumerConifg.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, “org.apache.kafka.clients.consumer.RoundRobinAssignor”);
  • 重启3个消费者,发消息
    consumer0消费到0,3,6分区的随数据,consumer1消费到1,4分区的数据,consumer2消费到2,5分区数据
  • 关闭CustomConsumer,在心跳(45s)内再次发送数据
    consumer1先消费1,4分区数据,consumer2消费到2,5分区数据,在断开后consumer1继续消费0,6分区数据,consumer2消费到3分区数据
  • 关闭CustomConsumer心跳(45s)后,再次发送数据
    consumer1先消费1,3,5分区数据,consumer2消费到0,2,4,6分区数据

5.4.3 Sticky以及再平衡

粘性分区定义:即在执行一次新分配之前,考虑上一次分配,尽量少的调整分配变动。
粘性分区从0.11.x引入,首先会尽量均衡的放置分区到消费者上面,在出现统一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变
需求:设置first主题,7个分区,3个消费者,采用粘性分区消费,再停掉一个观察。

  1. 修改分区策略
ArrayList<String> startegys = new ArrayList<>();
startegys。add("org.apache.kafka.clients.consumer.StickyAssignor");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,startegys);
  1. 同上
    consumer0消费到0,1分区的随数据,consumer1消费到4,5,6分区的数据,consumer2消费到2,3分区数据,但是重启后会改变。
    关闭CustomConsumer,在心跳(45s)内再次发送数据
    consumer1先消费1,4分区数据,consumer2消费到2,5分区数据,在断开后consumer1继续消费0,6分区数据,consumer2消费到3分区数据
    关闭CustomConsumer心跳(45s)后,再次发送数据
    consumer1先消费1,0,4,6分区数据,consumer2消费到2,3,5分区数据

5.5 offset位移

5.5.1 offset的默认维护位置 **

0.9版本之后,consumer默认将offset保存在卡夫卡一个内置的topic中,该topic为__consumer_offsets
0.9之前,consumer默认将offset保存在Zookeeper

offset默认维护位置
consumer_offsets主题采用k-v存储数据,key为group.id+topic+分区号,value为当前offset的值。每隔一段时间,kafka内部会对这个topic进行compact,即每个group.id+topic+分区号保留最新数据。

  1. 消费offset案例
  • consumer_offsets为kafka的topic,那就可以通过消费者消费
  • 在配置文件中config/consumer.properties中添加exclude.internal.topics=false,默认true,表示不能消费系统主题。为了查看,设为false。
  • 创建新的topic:bin/kafka-topics.sh --bootstrap-server node1:9092 --create --topic offsetTest --partitions 2 --replcation-factor 2
  • 启动生产者往offsetTest发数据。bin/kafka-console-producer.sh --topic offsetTest --bootstrap-server node1:9092
  • 启动消费者消费offsetTest数据:bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic offsetTest --group test(指定消费者组名称,更好观察数据存储位置)
  • 查看消费者消费主题consumer_offsets:bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server node1:9092 --consumer.config config/consumer.properties --formatter “kafka.coordinatior.group.GroupMetadataManager$OffsetsMessageFromatter” --from-beginning

5.5.2 自动提交offset

  • enable.auto.commit:是否开启自动提交offset功能,默认true
  • auto.commit.interval.mx:自动提交offset的时间间隔。默认5s
//自动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);

5.5.3 手动提交offset

  1. commitSync(同步提交):必须等待offset提交完成,再去消费下一批数据
  2. commitAsync(异步提交):发送offset请求就开启下一批数据消费
    相同点:都会将本次提交的一批数据最高的偏移量提交
    不同点:同步阻塞,直到提交成功,有重试机制,异步没有失败重试,会提交失败。
//手动提交
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
//手动提交offset
kafkaConsumer.commitSync();
kafkaConsumer.commitASync();

5.5.4 指定offset进行消费

auto.offset.reset=earliest | latest | none 默认latest
当kafka中没有初始偏移量(第一次)或不存在(被删除)时怎么办?

  1. earliest:自动将偏移量重置为最早的偏移量 --from-beginning
  2. latest:自动重置为最新值
  3. none:没找到先前偏移量,向消费者抛出异常
  4. 任意指定offset位置开始消费(注:每次执行完,要修改消费者组名)
        //指定位置offset消费
        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        //保证分区分配方案已经指定完成
        while(assignment.size() == 0) {
            kafkaConsumer.poll(Duration.ofSeconds(1));
            assignment = kafkaConsumer.assignment();
        }
        for (TopicPartition topicPartition : assignment) {
            kafkaConsumer.seek(topicPartition, 100);
        }

5.5.5 指定时间进行消费

需求:在生产环境中,会遇到消费数据异常,想重新按照时间消费。

        //指定位置offset消费
        Set<TopicPartition> assignment = kafkaConsumer.assignment();
        //保证分区分配方案已经指定完成
        while (assignment.size() == 0) {
            kafkaConsumer.poll(Duration.ofSeconds(1));
            assignment = kafkaConsumer.assignment();
        }
        //时间转offset
        HashMap<TopicPartition, Long> topicPartitionLongHashMap = new HashMap<TopicPartition, Long>();
        for (TopicPartition topicPartition : assignment) {
            topicPartitionLongHashMap.put(topicPartition, System.currentTimeMillis() - 1 * 24 * 3600 * 1000);  //一天前
        }
        Map<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(topicPartitionLongHashMap);
        for (TopicPartition topicPartition : assignment) {
            OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);
            kafkaConsumer.seek(topicPartition, offsetAndTimestamp.offset());
        }

5.5.6 漏消费和重复消费

重复消费:已经消费了数据,但是offset没提交(consumer挂了,再次重启会从上次offset消费)
漏消费:先提交offset后消费(手动消费后,当offset被提交,数据还没落库,消费者线程被kill,导致内存数据丢失)

5.6 生产经验-消费者事务

消费者事务需要将kafka消费端将消费过程和提交offset过程做原子绑定。此时需要将kafka的offset保存到支持事务的自定义介质(mysql)
消费者事务

5.7 生产经验-数据积压(消费者提高吞吐量)

  1. 如果消费能力不足,则增加topic分区数,并且提升消费者组的消费者数量,消费者数=分区数。(两者缺一不可)
  2. 如果下游数据处理不及时:提高每批次拉取数量(500+)。批次拉取数据少(拉取数据/处理时间<生产速度),使处理的数据小于生产的数据(50m),也会积压。

回顾总结

1.zk存储的信息:broker.ids、leaders、controller
2.工作理财
3.服役:
4.退役
5.副本:
副本好处:提高可靠性,生产环境一般2个默认1个,有ledaer和follower
isr,ar,controller选举(第一次随机),leader挂了(leo,hw多删少补),follow挂了
副本分配:负载均衡,保证数据分配
手动副本分配:指定计划、执行计划,验证计划
leader partition的负载均衡 10%
手动增加副本因子
6.存储机制
broker topic partitions log segment 稀疏索引(4KB) 时间戳
7.删除数据
默认7天 删除策略(删除,压缩)
8.高效读写
集群 分区 (提高生产和消费同步,海量数据打散) 稀疏索引 顺序读写 零拷贝和页缓存(Linux内核的缓存)
9.消费者
消费流程 消费者组 分区分配策略(range、轮询、粘性) 再平衡(45s) 订阅主题(可多个)
10.offset
存储在系统主题 自动提交5s 手动提交(同异步) 指定offset消费 按照时间消费 漏,重复消费
11.事务
生产-集群-消费-下游(mysql)
12.数据积压
增加分区,消费者 生产-集群(4个参数) 消费者(2个参数)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/728868.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

关于超卖程序问题分析-java

关于超卖程序问题分析 1.并发情况下&#xff0c;GET缓存 判断>0&#xff0c;成立&#xff0c;均执行扣减库存&#xff0c;导致超卖 2.加锁 以库存key&#xff0c;加锁&#xff0c;setNx&#xff0c;finally解锁 deleteKey 存在问题 1.误解锁&#xff08;是不是也是因为…

Android 11 SystemUI 启动流程

SystemUI 有哪内容 从表面上看&#xff0c; 我们看到的状态栏、通知栏、下拉菜单、导航栏、锁屏、最近任务、低电提示等系统页面都是 SystemUI 的。SystemUI&#xff0c;在源码目录中位于&#xff1a; framework/base/packages 目录下&#xff0c; 可见 SystemUI 和 framework…

vue3脚本绑定CodeMirror的使用

代码&#xff1a; <template><CodeMirrorref"codeMirror":value"codeVal":languageSingle"languageSingle":readOnly"!isEdit"submitCode"submitCode"></CodeMirror> </template><script setup…

文华财经期货APP随身行和同花顺期货可以模拟交易的期货软件,那个更好用?

期货app是一种可以在手机上进行期货交易和行情分析的软件&#xff0c;它可以让投资者随时随地掌握期货市场的动态&#xff0c;进行投资决策。随着科技的发展&#xff0c;越来越多的期货投资者选择通过手机端app来进行期货交易&#xff0c;享受随时随地、方便快捷的服务。市面上…

Js获取浏览器地址栏参数

获取浏览器地址参数 //获取浏览器地址栏参数function getQueryString(name) {var reg new RegExp("(^|&)" name "([^&]*)(&|$)");var result window.location.search.substr(1).match(reg);if (result ! null) {return unescape(result[2…

VMware 17虚拟Ubuntu 22.04设置共享目录

之前使用VM 17之前的版本虚拟CentOS&#xff0c;设置共享目录非常方便&#xff0c;在CentOS中安装VMware Tools即可。随着CentOS变成上游版本后&#xff0c;转向使用Ubuntu&#xff0c;VM也升级到了17&#xff0c;Ubuntu也升级到了最新的22.04&#xff0c;但是发现共享目录不能…

阿里大牛新产Java面试速成指南,主打就是躺着拿Ofeer

很多粉丝后台留言&#xff0c;Java程序员面临的竞争太激烈了…… 我自己也有实感&#xff0c;多年身处一线互联网公司&#xff0c;虽没有直面过求职跳槽的残酷&#xff0c;但经常担任技术面试考官&#xff0c;对程序员招聘市场的现状很清楚。导致现在激烈竞争的原因不外乎三方面…

sqlite-manage数据库可视化管理uniqpp

一、sqlite-manage介绍 sqlite-manage 是 SQLite 数据库可视化管理插件&#xff0c;更方前期查看和操作SQLite数据库&#xff0c;给APP开发者提供方便&#xff0c;避免重复造轮子。 内置增删改查工具类&#xff0c;可按需全局引用或单独引用。 二、使用sqlite要打开模块选项 三…

java 计算网段范围 分析网段包含关系

目录 一、网段范围 二、思路说明 三、代码 1、将一个ip转为数字 2、转换子网掩码&#xff08;255.255.255.0 转为 24&#xff09; 3、根据 ip 与 掩码 计算最大值和最小值 4、测试 5、完整代码 四、难点讲解 1、转换子网掩码&#xff0c; 例&#xff1a;255.255.25…

【习题之Python篇】习题24——回文

问题描述 2020年春节期间&#xff0c;有一个特殊的日期引起了大家的注意&#xff1a;2020年2月2日。因为如果将这个日期按 yyyymmdd 的格式写成一个8位数是 &#xff0c;恰好是一个回文数。我们称这样的日期是回文日期。 有人表示 是“千年一遇”的特殊日子。对此小明很不认同…

JavaWeb 速通CSS

目录 一、CSS入门 1.基本介绍 : 2.CSS的作用 : 3.CSS的语法 : 二、CSS样式 1.字体颜色&#xff1a; 1 说明 2 演示 2.边框 : 1 说明 2 演示 3.背景颜色 : 1 说明 2 演示 4.字体样式 : 1 说明 2 演示 5.div块居中 : 1 说明 2 演示 6.div文本居中 : 1 说明 2 演示 7.超…

【软件测试】Git详细-获取Git仓库,全网最全一篇打通...

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 官方提供了两种获…

GCC is no longer supported解决方法Android Studio

先说解决办法&#xff1a; 找到 -DANDROID_TOOLCHAINgcc进行删除。 原因是NDK版本过高了&#xff0c;r13b开始&#xff0c;llvm / Clang成为默认工具链&#xff0c;r18b开始删除了gcc。 删掉-DANDROID_TOOLCHAINgcc后&#xff0c;构建系统会默认使用clang。

最大流?费用流?结合二分图?例题

最大流 给出起点&#xff0c;终点&#xff0c;与边&#xff0c;边有最大流量限制&#xff0c;问从起点在不超过边的流量限制的情况下最大能从起点流多少流量到终点 反悔思想&#xff1a;如果我们每次找到一条路径就把这条路径上流量最小的边删去直到没有路径连接起点和终点&am…

飞行动力学 - 第5节-part3-爬升性能随高度的变化趋势 之 基础点摘要

飞行动力学 - 第5节-part3-爬升性能随高度的变化趋势 之 基础点摘要 1. 动力学方程2. 爬升角、爬升率趋势3. 参考资料 1. 动力学方程 回顾下&#xff0c;根据牛顿第一运动定律给出的动力学方程&#xff1a; 2. 爬升角、爬升率趋势 从推导公式的角度&#xff0c;上述趋势需要考…

按下数实融合的加速键,新华三推动基础设施变革

去年底&#xff0c;生成式AI&#xff08;AIGC&#xff09;开始席卷全球&#xff0c;吸引社会各界的广泛关注。 正所谓AI黄金时代的到来&#xff0c;将重新定义各行各业。AIGC热浪来袭&#xff0c;标志着在数实融合的大趋势下&#xff0c;人工智能大范围应用的奇点已经来临&…

Excel转图片(Java方式)

先看效果、先看效果、先看效果 左侧&#xff1a;Excel的截图 右侧&#xff1a;生成的图片 开发工具&#xff1a; Eclipse 开发环境&#xff1a; JDK1.8 使用技术&#xff1a; Graphics2D&#xff1a;&#xff08;JDK自带&…

使用python调用ChatGPT API 简单示例

如果你已经获得了OpenAI的API密钥&#xff0c;并且想要使用Python发起ChatGPT对话&#xff0c;你可以使用OpenAI的Python SDK来实现。下面是一个简单的示例代码&#xff1a; 首先&#xff0c;你需要确保已安装OpenAI的Python SDK。你可以使用pip来安装&#xff1a; pip insta…

模板(上)

文章目录 泛型编程函数模板类模板 一、泛型编程 泛型编程&#xff1a;编写与类型无关的通用代码&#xff0c;是代码复用的一种手段。模板是泛型编程的基础。 void Swap(int& left, int& right) {int temp left;left right;right temp; } void Swap(double& …

什么是混合云?

混合云&#xff08;Hybrid Cloud&#xff09;是指结合了私有云和公有云的计算环境。私有云是指在企业内部建立的基础设施&#xff0c;由企业自己管理和控制&#xff1b;而公有云是由第三方云服务提供商&#xff08;如亚马逊AWS、微软Azure、谷歌云等&#xff09;提供的计算资源…