Kafka(七)可靠性

news2024/11/28 14:44:01

目录

  • 1 可靠的数据传递
    • 1.1 Kafka的可靠性保证
    • 1.2 复制
    • 1.3 Broker配置
      • 1.3.1 复制系数
      • 1.3.2 broker的位置分布
      • 1.3.3 不彻底的首领选举
      • 1.3.4 最少同步副本
      • 1.3.5 保持副本同步
      • 1.3.6 持久化到磁盘
    • 1.2 在可靠的系统中使用生产者
      • 1.2.1 根据需求配置恰当的acks
      • 1.2.2 配置重试参数
      • 1.2.3 处理不可重试错误
    • 1.3 在可靠的系统中使用消费者
      • 1.3.1 消费者的可靠性配置
      • 1.3.2 自动提交偏移量
      • 1.3.3 手动提交偏移量
        • 1 总是在处理完消息后提交偏移量
        • 2 提交频率时性能和重复消息数量之间的权衡
        • 3 在正确的时间点提交正确的偏移量
        • 4 消费者再均衡
        • 5 消费者重试
        • 6 消费者可能需要维护状态
    • 1.4 验证系统可靠性
      • 1.4.1 验证配置
      • 1.4.2 验证应用程序
    • 1.5 监控系统可靠性
  • 2 精确一次性语义
    • 2.1 幂等生产者
      • 2.1.1 启用幂等生产者
      • 2.1.2 工作原理
      • 2.1.3 幂等生产者的局限性
    • 2.2 事务
      • 2.2.1 事务的应用场景
      • 2.2.2 事务可以解决哪些问题
      • 2.2.3 事务是如何保证精确一次性的
        • 生产者端
        • transactional.id=null
        • 消费者端
        • isolation.level=read_uncommitted
      • 2.2.4 事务不能解决哪些问题
        • 1 在流式处理中执行外部操作
        • 2 从Kafka中读取数据并写入数据库
        • 3 从一个数据库读取数据写入Kafka,再从Kafka将数据写入另一个数据库
        • 4 将数据从一个集群复制到另一个集群
        • 5 发布订阅模式
      • 2.2.5 如何使用事务

1 可靠的数据传递

1.1 Kafka的可靠性保证

  1. 分区中的消息时有序的
  2. 一条消息只有被写入分区所有的同步副本时才被认为是“已提交”
  3. 只要还有一个副本是活动的,已提交的消息就不会丢失
  4. 消费者只能读取已提交的消息

1.2 复制

同步副本需满足的条件:

  1. 与ZooKeeper之间有一个活跃的会话,在过去的6秒内向ZooKeeper发送过消息
  2. 在过去的10秒内从首领那里复制过消息
  3. 在过去的10秒内从首领那里复制过最新消息

1.3 Broker配置

1.3.1 复制系数

broker级别配置:default.replication.factor=1
topic级别配置:replication.factor=1
建议非关键数据小于3

1.3.2 broker的位置分布

建议把broker分布在多个不同的机器上

1.3.3 不彻底的首领选举

unclean.leader.election.enable=false
指示是否启用非同步副本可以被选为首领,作为首领选举的最后手段,即使这样做可能会导致数据丢失

1.3.4 最少同步副本

min.insync.replicas=1
最小同步副本数。min.insync.replicas(默认值为1)代表了正常写入生产者数据所需要的最少ISR个数, 当ISR中的副本数量小于min.insync.replicas时,Leader停止写入生产者生产的消息,并向生产者抛出NotEnoughReplicas异常,阻塞等待更多的 Follower 赶上并重新进入ISR, 因此能够容忍min.insync.replicas-1个副本同时宕机。当与min.insync.replicas和acks一起使用时,可以实现更大的耐用性保证。一个典型的场景是创建一个复制因子为3的主题,将min.insync.replicas设置为2,并使用acks “all”进行生产。

1.3.5 保持副本同步

replica.lag.time.max.ms=30000 (30 seconds)
如果一个follower这段时间内没有发送任何fetch请求,或者没有消费leader最新偏移量的消息,那么leader将从isr中删除该follower。
zookeeper.session.timeout.ms=18000 (18 seconds)
允许broker不向ZooKeeper发送心跳的时间间隔。如果超过这个时间不向ZK发送心跳,ZK会认为broker已经死亡,会将其移除出集群。

1.3.6 持久化到磁盘

Kafka会在重启之前和关闭日志片段的时候将消息冲刷到磁盘上,或者等Linux系统页面缓存被填满时冲刷。拥有不同机架上的副本的多个磁盘比只写入首领磁盘更加安全。不过,也可以让broker更频繁的写入磁盘。
flush.messages=9223372036854775807
此设置允许指定一个间隔,在该间隔,我们将强制对写入日志的数据进行fsync。例如,如果将其设置为1,我们将在每条消息之后进行fsync;如果是5,我们将在每5条消息之后进行fsync。通常,我们建议您不要设置此项,并使用复制以提高耐用性,并允许操作系统的后台刷新功能,因为它更高效。此设置可以在每个主题的基础上覆盖(请参阅每个主题配置部分)。
flush.ms=9223372036854775807
此设置允许指定一个时间间隔,在该时间间隔内,我们将强制对写入日志的数据进行fsync。例如,如果将其设置为1000,我们将在1000毫秒后进行fsync。通常,我们建议您不要设置此项,并使用复制以提高耐用性,并允许操作系统的后台刷新功能,因为它更高效。

1.2 在可靠的系统中使用生产者

1.2.1 根据需求配置恰当的acks

acks参数指定了生产者在多少个分区副本收到消息的情况下才会认为消息写入成功。允许以下设置:
acks=0。如果设置为零,则生产者根本不会等待来自服务器的任何确认。该记录将立即添加到套接字缓冲区,并被视为已发送。在这种情况下,无法保证服务器已收到记录,重试配置也不会生效(因为客户端通常不会知道任何故障)。为每条记录返回的偏移量将始终设置为-1。
acks=1。表示只要首领收到消息,并将记录成功写入其本地日志,就返回成功响应,不等待所有追随者的确认。在这种情况下,如果首领在确认成功后,追随者复制之前崩溃,则记录将会丢失。
acks=all。表示首领将等待同步复制集合中所有的副本都确认收到了记录。这保证了只要至少有一个同步复制副本保持活动状态,记录就不会丢失。这是最有力的保证。这相当于acks=-1的设置。
请注意,启用幂等性要求此配置值为“all”。如果设置了冲突的配置并且没有显式启用幂等性,则会禁用幂等性。

1.2.2 配置重试参数

设置自动重试,并使用默认重试次数。
将delivery.timeout.ms设置成愿意等待的时长,生产者会在这段时间内重试。

1.2.3 处理不可重试错误

例如:

  1. 不可重试的broker错误:消息大小,身份验证
  2. 在将消息发送给broker之前发生的错误,比如序列化错误
  3. 在生产者道道重试次数上限或重试消息占用的内存达到上限时发生的错误
  4. 超时

1.3 在可靠的系统中使用消费者

1.3.1 消费者的可靠性配置

group.id
auto.offset.reset

1.3.2 自动提交偏移量

如果所有的处理逻辑都是在轮询里进行的,并且不需要维护轮询之间的状态(比如为了聚合数据),那么就很简单,可以使用自动提交,在轮询结束时提交偏移量。
enable.auto.commit
无法控制应用应用程序可能重复处理的消息的数量
如果应用程序把消息交给另一个后台线程处理,那么只能使用手动提交了
auto.commit.interval.ms
自动提交的频率过大,会增加重复的概率;过小,会增加额外开销

1.3.3 手动提交偏移量

手动提交偏移量增加了灵活性,但也增加了复杂度并且有可能对性能产生影响,所以可能需要考虑如下事项:

1 总是在处理完消息后提交偏移量

如果所有的处理逻辑都是在轮询里进行的,就很简单,选择一个合适的提交频率;
如果涉及额外线程,该如何呢?

2 提交频率时性能和重复消息数量之间的权衡
3 在正确的时间点提交正确的偏移量

一定要在处理完后在提交偏移量

4 消费者再均衡

如何在分区被撤销之前提交偏移量;
如何在应用程序被分配到新分区并清理状态时提交偏移量

5 消费者重试

如果遇到批次中的部分消息需要稍后处理。因为消费者不能针对每一条消息提交偏移量,而是提交最后一条成功的偏移量,所以需要借助额外的工具来处理。
有两种模式来解决这个问题:

  1. 在遇到可重试错误时,提交最后一条处理成功的消息的偏移量,然后把还未处理好的消息保存到缓冲区(这样下一个轮询就不会把他们覆盖掉),并调用消费者的pause()方法,确保其他轮询不会返回数据,之后继续处理缓冲区里的消息。
  2. 在遇到可重试错误时,把消息写到另一个重试主题,并继续处理其他消息。另一个消费者群组负责处理重试主题中的消息,或者让一个消费者同时订阅主主题和重试主题。这种模式有点像其他消息系统中的死信队列。
6 消费者可能需要维护状态

1.在提交偏移量的同时,状态存入另一个主题中,可以开启事务来保证一致性。当一个线程重新启动时,就可以读取状态和从偏移量处读取消息。
2. 使用流式处理框架

1.4 验证系统可靠性

1.4.1 验证配置

测试场景:

  • 首领选举
  • 控制器选举
  • 滚动重启
  • 不彻底的首领选举

1.4.2 验证应用程序

测试场景:

  • 客户端与服务器断开连接
  • 客户端与服务器之间存在高延迟
  • 磁盘被填满
  • 磁盘被挂起
  • 首领选举
  • 滚动重启broker
  • 滚动启动消费者
  • 滚动重启生产者

1.5 监控系统可靠性

生产者:

  • 错误率
  • 重试率

消费者:

  • 消费者滞后

broker:

  • kafka.server:type=BrokerTopicMetrics,name=FailedProduce-RequestsPerSec
  • kafka.server:type=BrokerTopicMetrics,name=FailedFetch-RequestsPerSec

2 精确一次性语义

上一章主要讨论如何保证不丢失消息,但不能保证出现重复消息。
在一些简单的应用程序中,处理重复消息比较简单,消息都包含有唯一标识。
但在处理聚合事件的流式处理应用程序中,因为很难取判断结果的正确性,因为结果中可能包含了重复消息。在这种情况下,需要提供更强的保证,这种保证就是精确一次性处理语义。

2.1 幂等生产者

如果一个操作被执行多次的结果与被执行一次相同,那么这个操作就是幂等的。

2.1.1 启用幂等生产者

如果需要启用幂等生产者,需要在生产者端做如下配置:

  • enable.idempotence=true
  • max.in.flight.requests.per.connection=5
  • acks=all

其实这三个参数都已经是默认配置,无需显式添加。

2.1.2 工作原理

如果启用了幂等生产者,那么每条消息都将包含生产者ID(PID)和序列号。所以一条消息的唯一标识由4个字段组成:

  1. 主题
  2. 分区
  3. 生产者ID
  4. 序列号

broke会用组合唯一标识来跟踪(保存在内存和硬盘中)写入每个分区的最后5条消息,重复的消息会被拒绝,记录错误指标(kafka.server:type=RequestMetrics,name=ErrorsPerSec)并返回错误。生产者会记录这个错误(record-error-rate),并反映在指标当中,但不抛出异常,也不触发告警。

乱序错误:broke收到不连续的序列号,将会返回乱序错误。

2.1.3 幂等生产者的局限性

幂等生产者只能防止由生产者内部重试逻辑引起额消息重复。
如果同一个生产者调用多次producer.send()方法发送同一条消息,或多个生产者都发送同一条消息,Kafka中将产生多条消息。

2.2 事务

Kafka的事务机制为了让流式处理应用程序生成正确的结果,要保证每个输入的消息都被精确处理一次,即使是在发生故障的情况下。
事务适用于流式处理应用程序的基础模式,即“消费-处理-生产”。

2.2.1 事务的应用场景

流式处理应用程序,处理过程包包含了聚合或连接操作,需要更新内部状态,事务对他们来说就会非常有用。

2.2.2 事务可以解决哪些问题

  1. 应用程序崩溃导致的重复处理
    假设应用程序处理完一批消息并产生结果之后,在偏移量提交之前崩溃了。后来的其它实例就会重复处理这些数据,并生成结果,那么结果数据就会包含重复处理的数据。
  2. “僵尸”应用程序导致的重复处理
    假设一个应用程序正在处理消息的时候与Kafka暂时断开了连接,那么Kafka就会安排其它的实例继续处这批消息。这时候应用程序恢复运行了,它继续处理之前的消息批次,并写入结果。这样,结果就包含了重复处理的数据。

2.2.3 事务是如何保证精确一次性的

Kafka事务引入了原子多分区写入的概念,用来保证写入多个分区和提交偏移量是一个原子操作。
在这里插入图片描述

生产者端

上图中,Producer A是事务性生产者,用来启动事务和执行原子多分区写入。需要给事务配置属性transactional.id并用initTransactions()方法初始化。
broker维护了producer.id和transactional.id的映射,如果使用同一个transactional.id再次调用initTransactions()方法,那么生产者分配到的producer.id和之前是一样的。
在调用initTransactions()方法初始化时,broker会为producer分配epoc,用来防止隔离僵尸应用。

transactional.id=null

用于事务传递的TransactionalId。这实现了跨越多个生产者会话的可靠性语义,因为它允许客户端保证使用相同TransactionalId的事务在开始任何新事务之前已经完成。如果没有提供TransactionalId,则生产者仅限于幂等传递。
如果配置了TransactionalId,则隐含enable.idempotence。默认情况下,未配置TransactionId,这意味着无法使用事务。请注意,默认情况下,交易需要至少三个broker的集群,这是建议的生产设置;为了进行开发,您可以通过调整broker设置transaction.state.log.replication.factor来更改这一点。

消费者端

对于消费者,我们通过参数isolation.level来消费者的事务隔离级别,用啦控制消费者如何读取以事务方式写入的消息。

isolation.level=read_uncommitted

控制如何读取以事务方式写入的消息。如果设置为read_committed,consumer.poll()将只返回已提交的事务消息。如果设置为read_uncommitted(默认值),consumer.poll()将返回所有消息,甚至是已中止的事务消息。非事务性消息将在任一模式下无条件返回。
消息将始终以偏移顺序返回。因此,在read_committed模式下,consumer.poll()将只返回最后一个稳定偏移量(LSO, last stable offset)之前的消息,该偏移量小于第一个打开事务的偏移量。特别是,在相关交易完成之前,属于正在进行的交易的消息之后出现的任何消息都将被扣留。因此,当存在进行中事务时,read_committed消费者将无法读取高水位线。
此外,在read_committed中,seekToEnd方法将返回LSO。

在上图中,消费者B在事务提交之前,只能读取到message 2;但是消费者C则可以读取到所有消息。

2.2.4 事务不能解决哪些问题

Kafka事务无法实现精确一次性保证的几种场景:

1 在流式处理中执行外部操作

比如处理数据时发送电子邮件,并不能保证只发送一次,而且无法撤销。

2 从Kafka中读取数据并写入数据库

这种场景的流程是“消费-处理-数据库”,状态数据写入数据库。我们可以考虑在数据库中维护状态数据和偏移量,借助数据的事务来保证数据一致性。
在这里插入图片描述
还有一种场景是“消费-处理-主题-数据库”,状态数据既要写入主题,又要写入数据库。这个问题的一种常见的解决方案是“发件箱模式”。
应用将消息发送到一个Kafka主题(也就是发件箱),然后另外一个独立中继服务将会从Kafka读取消息并更新数据库,需要确保数据库更新是幂等的。
这个模式可以保证消息最终到达Kafka,主题消费者和数据库,要么都不到达。
在这里插入图片描述
这个模式的反模式是用数据库作为发件箱,然后另外一个独立中继服务将确保数据库更新也作为消息发送给Kafka。这种模式可以借助数据的内置约束(唯一索引,外键)保证精确一次性。
在这里插入图片描述

3 从一个数据库读取数据写入Kafka,再从Kafka将数据写入另一个数据库
4 将数据从一个集群复制到另一个集群

有一个为MM2增加精确一次性语义的改进提议:KIP665

5 发布订阅模式

前面讨论的精确一次性保证是针对“消费-处理-生产”的模式,而发布订阅在Kafka
中是“生产-消费-处理”的模式。针对这种模式,生产者可以开启事务,但是消费者只能通过设置隔离级别保证看不到已终止事务的消息,无法开启事务保证只消费一次消息。

2.2.5 如何使用事务

package com.qupeng.demo.kafka.kafkaapache.transaction;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.*;

public class KafkaTransaction<K, V> {

    private final static Logger logger = LoggerFactory.getLogger(KafkaTransaction.class);

    public <K, V> KafkaProducer<K, V> createKafkaProducer() {
        Properties producerProps = new Properties();
        producerProps.put("bootstrap.servers", "your_broker_list");
        producerProps.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        producerProps.put("transactional.id", "your_transactional_id");

        Producer<String, String> producer = new KafkaProducer<>(producerProps);

        Properties consumerProps = new Properties();
        consumerProps.put("bootstrap.servers", "your_broker_list");
        consumerProps.put("group.id", "your_group_id");
        consumerProps.put("enable.auto.commit", "false");
        consumerProps.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        consumerProps.put("isolation.level", "read_committed");

        Consumer<String, String> consumer = new KafkaConsumer<>(consumerProps);

        producer.initTransactions();

        consumer.subscribe(Arrays.asList("input_topic"));
        while (true) {
            ConsumerRecords<String, String> records = null;
            try {
                records = consumer.poll(Duration.ofMillis(100));
                if (records.count() > 0) {
                    producer.beginTransaction();
                    for (ConsumerRecord<String, String> record : records) {
                        // 处理消息
                        String processedValue = processMessage(record.value());
                        // 发送处理后的消息到另一个主题
                        producer.send(new ProducerRecord<>("output_topic", record.key(), processedValue));
                    }
                    Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = consumerOffsets(records);
                    producer.sendOffsetsToTransaction(offsetAndMetadataMap, consumer.groupMetadata());
                    producer.commitTransaction();
                }
            } catch (WakeupException e) {
                // 关闭消费者
                consumer.close();
                throw new KafkaException("");
            } catch (ProducerFencedException | InvalidProducerEpochException e) {
                // 程序已变为僵尸,只能退出
                throw new KafkaException(String.format("The transactional.id %s is used by another process.", "your_transactional_id"));
            }  catch (KafkaException e) {
                // 其它异常,中止事务,重置偏移量,并进行重试
                producer.abortTransaction();
                resetToLatestCommittedPositions(consumer, records);
            } finally {
                producer.close();
                consumer.close();
            }
        }

    }

    private void resetToLatestCommittedPositions(Consumer<String, String> consumer, ConsumerRecords<String, String> records) {
        for (TopicPartition partition : records.partitions()) {
            List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
            consumer.seek(partition, partitionRecords.get(0).offset());
        }
    }

    private Map<TopicPartition, OffsetAndMetadata> consumerOffsets(ConsumerRecords<String, String> records) {
        Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap<>();
        for (TopicPartition partition : records.partitions()) {
            List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
            offsetAndMetadataMap.put(partition, new OffsetAndMetadata(partitionRecords.get(partitionRecords.size() - 1).offset()));
        }

        return offsetAndMetadataMap;
    }

    private String processMessage(String value) {
        return "Message has been handled.";
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1365379.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

PyTorch|构建自己的卷积神经网络--池化操作

在卷积神经网络中&#xff0c;一般在卷积层后&#xff0c;我们往往进行池化操作。实现池化操作很简单&#xff0c;pytorch中早已有相应的实现。 nn.MaxPool2d(kernel_size ,stride ) 这种池化叫做最大池化。 最大池化原理很简单&#xff0c;就是一个filter以一定的stride在原…

深度学习课程实验一浅层神经网络的搭建

一、 实验目的 1、学习如何建立逻辑回归分类器用来识别猫。将引导你逐步了解神经网络的思维方式&#xff0c;同时磨练我们对深度学习的直觉。&#xff08;说明&#xff09;&#xff1a;除非指令中明确要求使用&#xff0c;否则请勿在代码中使用循环&#xff08;for / while&am…

MYSQL篇--sql优化高频面试题

sql优化 1 如何定位及优化SQL语句的性能问题&#xff1f;创建的索引有没有被使用到?或者说怎么才可以知道这条语句运行很慢的原因&#xff1f; 其实对于性能比较低的sql语句定位&#xff0c;最重要的也是最有效的方法其实还是看sql的执行计划&#xff0c;而对于mysql来说 它…

定展中2024上海国际智慧工地展览会

2024第十五届上海国际智慧工地展览会 2024 Shanghai International Smart Site Equipment Expo 时间&#xff1a;2024年03月26日-28日 地点&#xff1a;上海跨国采购会展中心 政策指导: 中华人民共和国国家发展和改革委员会 中华人民共和国工业和信息化部 上海城市数字转型应用…

如何使用css隐藏掉滚动条

1.解决方案 在滚动元素上再包裹一个父元素&#xff0c;然后&#xff0c;该元素添加如下代码&#xff1a; &#xff08;注&#xff1a;PC端浏览器滚动条为8px&#xff09;使元素偏移原来位置8px&#xff0c;目的就是将滚动条区域移动到父元素边框外面&#xff0c;然后&#xff…

LVGL的List控件的触摸按键和实体按键的处理

在LVGL的List控件使用过程中&#xff0c;虽然通过触摸按键选择item&#xff0c;但是有些场景需要实体按键选取item&#xff0c;但是LVGL 的V8.3中没有像Emwin那样有函数选择list item的函数。LVGL中List引入了Group的概念&#xff0c;把列表项都添加到同一个group中。然后通过更…

STM32 实现姿态解算及 MPU6050 相关应用

在STM32上实现姿态解算以及与MPU6050相关的应用&#xff0c;通常涉及使用陀螺仪和加速度计获取姿态数据&#xff0c;并结合姿态解算算法来实现姿态估计。在本文中&#xff0c;我们将讨论如何在STM32上通过MPU6050获取姿态数据&#xff0c;并简要介绍姿态解算算法&#xff0c;并…

Retrieval-Augmented Generation for Large Language Models: A Survey

PS: 梳理该 Survey 的整体框架&#xff0c;后续补充相关参考文献的解析整理。本文的会从两个角度来分析总结&#xff0c;因此对于同一种技术可能在不同章节下都会有提及。第一个角度是从整体框架的迭代来看&#xff08;对应RAG框架章节&#xff09;&#xff0c;第二个是从RAG中…

【架构专题】吃透Spring Cloud Alibaba微服务架构实战派,技术人就可以做分布式架构了!!

什么是中间件&#xff1f; 首先呢&#xff0c;我们得简单明了地来解释一下什么是分布式架构。就像咱们平时吃火锅&#xff0c;每个人都贡献出自己的一份力量加热锅底&#xff0c;最后炖出美味的火锅。同理, 分布式架构也是将计算任务拆分成多个子任务&#xff0c;并发给不同的…

海外跨境独立站和代购系统存在必然联系?独立站建站初期,以及如何运营好独立站。

海外跨境独立站和代购系统在多个方面存在差异&#xff1a; 定位&#xff1a;独立站是拥有独立域名&#xff0c;自主宣传推广媒体与渠道的新型网站&#xff0c;更侧重于培养买家&#xff0c;做品牌建设&#xff0c;相当于个体经营专卖店。而代购系统是利用先进的技术和流程管理…

上海亚商投顾:创业板指再创调整新低 全市场超4700只个股下跌

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 一.市场情绪 三大指数1月5日集体调整&#xff0c;沪指午后跌超1%&#xff0c;创业板指一度跌逾2%&#xff0c;尾盘跌幅有所…

使用 React 和 MUI 创建多选 Checkbox 树组件

在本篇博客中&#xff0c;我们将使用 React 和 MUI&#xff08;Material-UI&#xff09;库来创建一个多选 Checkbox 树组件。该组件可以用于展示树形结构的数据&#xff0c;并允许用户选择多个节点。 前提 在开始之前&#xff0c;确保你已经安装了以下依赖&#xff1a; Reac…

Transformer从菜鸟到新手(四)

引言 上篇文章完成了Transformer剩下组件的编写&#xff0c;因此本文就可以开始训练。 本文主要介绍训练时要做的一些事情&#xff0c;包括定义损失函数、学习率调整、优化器等。 下篇文章会探讨如何在多GPU上进行并行训练&#xff0c;加速训练过程。 数据集简介 从网上找到…

Java-IO-文件操作-FAQ-listlistFiles

1 需求 2 接口 3 示例 import java.io.File;public class Test {public static void main(String[] args) {System.out.println("***** ***** list() ***** *****");for (String s : new File("C:\\Windows\\System32\\drivers\\etc").list()) {System.ou…

【基础篇】十五、JVM垃圾回收器

文章目录 1、垃圾回收器2、Serial Serial Old3、ParNew回收器4、CMS回收器5、Parallel Scavenge Parallel Old6、G1垃圾回收器7、G1的回收流程&#xff1a;Young GC8、G1的回收流程&#xff1a;Mixed GC9、回收器的选择 1、垃圾回收器 对回收算法落地&#xff0c;出现了多种…

YOLOv5改进 | 损失函数篇 | InnerIoU、InnerSIoU、InnerWIoU、FocusIoU等损失函数

一、本文介绍 本文给大家带来的是YOLOv5最新改进,为大家带来最近新提出的InnerIoU的内容同时用Inner的思想结合SIoU、WIoU、GIoU、DIoU、EIOU、CIoU等损失函数,形成 InnerIoU、InnerSIoU、InnerWIoU等新版本损失函数,同时还结合了Focus和AIpha思想,形成的新的损失函数,其…

让充电器秒供多个快充口,乐得瑞推出1拖2功率分配快充线方案

随着PD3.1协议的市场应用越来越多&#xff0c;一些充电器的Type-C接口的输出功率达到百瓦及以上&#xff0c;如何充分利用好这类充电器设备&#xff0c;乐得瑞电子推出1拖2快充线缆解决方案&#xff0c;支持智能功率分配策略支持私有快充协议。 如上图是乐得瑞1拖2功率分配快充…

绝地求生:【违规处罚工作公示】12月25日-12月31日

12月25日至12月31日期间&#xff0c;共计对63,907个违规账号进行了封禁&#xff0c;其中53,880个账号因使用外挂被永久封禁。 若您游戏中遇到违规行为&#xff0c;建议您优先在游戏内进行举报&#xff1b; 另外您也可以在官方微信公众号【PUBG国际版】中点击“ 服务中心 - 举报…

实例:NodeJS 操作 Kafka

本人是C#出身的程序员&#xff0c;c#很简单就能实现&#xff0c;有需要的可以加我私聊。但是就目前流行的开发语言&#xff0c;尤其是面向web方向应用的&#xff0c;我感觉就是Nodejs最简单了。下面介绍&#xff1a; 本文将会介绍在windows环境下启动Kafka&#xff0c;并通过n…

《操作系统导论》笔记

操作系统三个关键&#xff1a;虚拟化( virtualization) 并发(concurrency) 持久性&#xff08;persistence&#xff09; 1 CPU虚拟化 1.1 进程 虚拟化CPU&#xff1a;许多任务共享物理CPU&#xff0c;让它们看起来像是同时运行。 时分共享&#xff1a;运行一个进程一段时间…