kafka生产者api和数据操作

news2025/1/12 22:57:12

Kafka 生产者

发送流程

image-20220517092114720

消息发送过程中涉及到两个线程——main线程和Sender线程

main线程

  • 使用serializer(并非java默认)序列化数据,使用partitioner确认发送分区

  • 在main线程中创建了一个双端队列RecordAccumulator,main线程将批次数据发送给RecordAccumulator。创建批次数据是从内存池中分配内存,在发送成功后释放到内存池

  • Sender线程不断从RecordAccumulator中拉取消息发送给kafka Broker

  • 一个分区创建一个DQuene,在内存中完成RecordAccumulator(缓冲队列)的创建(总大小默认32M),每批次大小默认16K

sender线程

  • 数据到达batch.size护着linger.ms之后,sender线程开始发送数据

  • sender创建请求队列(每个broker一个),默认最多缓存5个请求

  • 发送完成后等待broker的ack应答

    • 0,表示不需要等待落盘
    • 1,表示等待leader收到数据后应答
    • -1(all),表示等待leader和ISR队列中的所有节点收到数据后应答
  • 发送成功则sender清除队列,并清理RecordAccumulator中每一个分区的数据,失败则重试再次发送

生产者相关参数

  • bootstrap.servers ,连接的broker清单

  • key.serializer 和 value.serializer,指定发送消息的 key 和 value 的序列化类型

  • buffer.memory RecordAccumulator,缓冲区总大小,默认 32m

  • batch.size 缓冲区一批数据最大值,默认 16k。适当增加该值,可 以提高吞吐量,但是如果该值设置太大,会导致数据 传输延迟增加。

  • linger.ms,如果数据迟迟未达到 batch.size,sender 等待 linger.time 之后就会发送数据。单位 ms,默认值是 0ms,表示没 有延迟。生产环境建议该值大小为 5-100ms 之间。

  • acks

    • 0:生产者发送过来的数据,不需要等数据落盘应答。

    • 1:生产者发送过来的数据,Leader 收到数据后应答。

    • -1(all):生产者发送过来的数据,Leader+和 isr 队列 里面的所有节点收齐数据后应答。默认值是-1,-1 和 all 是等价的。

  • max.in.flight.requests.per.connection,允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。

  • retries,当消息发送出现错误的时候,系统会重发消息。

    • retries 表示重试次数。默认是 int 最大值,2147483647。
    • 如果设置了重试,还想保证消息的有序性,需要设置 MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1 否则在重试此失败消息的时候,其他的消息可能发送 成功了。
  • retry.backoff.ms,两次重试之间的时间间隔,默认是 100ms。

  • enable.idempotence,是否开启幂等性,默认 true,开启幂等性。

  • compression.type,生产者发送的所有数据的压缩方式。默认是 none,也 就是不压缩。 支持压缩类型:none、gzip、snappy、lz4 和 zstd。

异步发送

异步发送指的是外部数据向RecordAccumulator发送数据的过程

不带回调的异步发送

maven依赖

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>3.0.0</version>
</dependency>
<dependency>
      <groupId>org.slf4j</groupId>
      <artifactId>slf4j-simple</artifactId>
      <version>1.7.25</version>
      <scope>compile</scope>
</dependency>

发送demo

import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

public class CustomerProducer {

    public static void main(String[] args) {
        //创建配置对象
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092");
        //key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //创建kafka对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "hello" + i));
        }
        kafkaProducer.close();
    }
}

带回调函数的异步发送

回调函数会在producer收到ack时调用,有两个参数

  • 元数据信息(RecordMetadata)和异常信息(Exception)
  • 如果 Exception 为 null,说明消息发 送成功,如果 Exception 不为 null,说明消息发送失败
  • 回调中获取的metadata来自于RecordAccumulator
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

public class CustomerProducerCallback {

    public static void main(String[] args) {
        //创建配置对象
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092");
        //key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // tls 连接
        // properties.put("security.protocol", "SSL");
        // properties.put("sasl.mechanism", "SCRAM-SHA-512");
        //创建kafka对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "hello" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null){
                        System.out.println("topic:"+recordMetadata.topic()+" partition:"+recordMetadata.partition());
                    }
                }
            });
        }
        kafkaProducer.close();
    }
}

output:
[main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: 
        acks = -1
        batch.size = 16384
        bootstrap.servers = [localhost:9092, localhost:9092]
        buffer.memory = 33554432
        client.dns.lookup = use_all_dns_ips
        client.id = producer-1
        compression.type = none
        connections.max.idle.ms = 540000
        delivery.timeout.ms = 120000
        enable.idempotence = true
        interceptor.classes = []
        key.serializer = class org.apache.kafka.common.serialization.StringSerializer
        linger.ms = 0
        max.block.ms = 60000
        max.in.flight.requests.per.connection = 5
        max.request.size = 1048576
        metadata.max.age.ms = 300000
        metadata.max.idle.ms = 300000
        metric.reporters = []
        metrics.num.samples = 2
        metrics.recording.level = INFO
        metrics.sample.window.ms = 30000
        partitioner.class = class com.example.MyPartitioner
        receive.buffer.bytes = 32768
        reconnect.backoff.max.ms = 1000
        reconnect.backoff.ms = 50
        request.timeout.ms = 30000
        retries = 2147483647
        retry.backoff.ms = 100
        sasl.client.callback.handler.class = null
        sasl.jaas.config = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.min.time.before.relogin = 60000
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        sasl.kerberos.ticket.renew.window.factor = 0.8
        sasl.login.callback.handler.class = null
        sasl.login.class = null
        sasl.login.refresh.buffer.seconds = 300
        sasl.login.refresh.min.period.seconds = 60
        sasl.login.refresh.window.factor = 0.8
        sasl.login.refresh.window.jitter = 0.05
        sasl.mechanism = GSSAPI
        security.protocol = PLAINTEXT
        security.providers = null
        send.buffer.bytes = 131072
        socket.connection.setup.timeout.max.ms = 30000
        socket.connection.setup.timeout.ms = 10000
        ssl.cipher.suites = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.3]
        ssl.endpoint.identification.algorithm = https
        ssl.engine.factory.class = null
        ssl.key.password = null
        ssl.keymanager.algorithm = SunX509
        ssl.keystore.certificate.chain = null
        ssl.keystore.key = null
        ssl.keystore.location = null
        ssl.keystore.password = null
        ssl.keystore.type = JKS
        ssl.protocol = TLSv1.3
        ssl.provider = null
        ssl.secure.random.implementation = null
        ssl.trustmanager.algorithm = PKIX
        ssl.truststore.certificates = null
        ssl.truststore.location = null
        ssl.truststore.password = null
        ssl.truststore.type = JKS
        transaction.timeout.ms = 60000
        transactional.id = null
        value.serializer = class org.apache.kafka.common.serialization.StringSerializer

[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka version: 3.0.0
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka commitId: 8cb0a5e9d3441962
[main] INFO org.apache.kafka.common.utils.AppInfoParser - Kafka startTimeMs: 1687969802272
[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: MqCwKFtET36BCuCusypwCg
[main] INFO org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms
topic:first partition:0
topic:first partition:0
topic:first partition:0
topic:first partition:0
topic:first partition:0

同步发送

外部数据发送到RecordAccumulator之后,需要等到全部发送到kafka集群才会发送下一批

public class CustomerProducerSync {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //创建配置对象
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092");
        //key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //创建kafka对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        //发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "hello" + i)).get(); //添加get即成为同步发送
        }
        kafkaProducer.close();
    }
}

生产者分区策略

生产者为什么要分区

  • 便于合理使用存储资源, 每个Partition在一个Broker上存储, 可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。 合理控制分区的任务, 可以实现负载均衡的效果。

  • 提高并行度, 生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

DefaultPartitioner,默认分区器的策略

  • if partition is specified in th record, use it

  • if no partition is specified but a key is present choose a partition based on a hash of the key

  • if no partition or key is present choose the sticky partition that changes when the batch is full

对于PreducerRecord类,存在多种构造方法

  • 如果指定partition,则发送到对应分区
  • 如果未指定partition,但是指定key,通过将key的hash和topic的partition数进行取余得到partition的值
  • 未指定partition和key,采用sticky partition(黏性分区),即随机选择一个分区并尽可能保持使用该分区。等到此分区的batch已满(到达16K)或者已完成,再随机选择一个分区(和上一次不同)使用

自定义分区器如下

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String s, Object key, byte[] bytes, Object value, byte[] bytes1, Cluster cluster) {
        int partition;
        String v = value.toString();
        if(v.contains("hello"))
            partition = 1;
        else 
            partition = 0;
        return partition;
    }

    @Override
    public void close() {}

    @Override
    public void configure(Map<String, ?> map) {}
}

//在CustomerProducer中添加配置信息
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName());

生产者提升吞吐量

  • batch.size 设置批次大小,默认16k
  • linger.ms 等待时间默认为0(batchsize不起作用),修改为5-100ms
  • compression.type 压缩数据
  • RecordAccumulator 缓冲区大小默认32m,修改为64m,但是太大会导致较高的延迟
//在CustomerProducer中添加配置信息,在输出的日志中都能看到
// batch.size: 批次大小, 默认 16K
properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384); 
// linger.ms: 等待时间,默认 0
properties.put(ProducerConfig.LINGER_MS_CONFIG,1); 
// RecordAccumulator: 缓冲区大小, 默认 32M: buffer.memory
properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432); 
//默认none,可配置gzip,snappy,lz4,zstd
properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy"); 

数据的可靠性

image-20220513094016259

ACK应答级别

https://kafka.apache.org/documentation/#producerconfigs_acks

  • ack = 0,leader数据不落盘就应答。此时如果leader故障,内存中的数据会丢失,生产环境不适用

  • ack = 1,leader数据落盘但没有同步到follower就应答,常用于传输日志

  • ack = -1,leader数据落盘并等待同步到所有follower在应答,和钱相关可靠性要求比较高的场景

ISR对列

对于ack为-1的情况,如果leader收到数据,所有follower开始同步数据,但某个follower出现故障迟迟不能回复,如何解决?

  • leader维护了一个动态的in-sync replica set(ISR),即和leader保持同步的follwer-leader集合(leader:0, isr:0,1,2)
  • 如果follower长时间未向leader发送通信请求或同步数据,follower将被提出ISR,时间阈值由replica.lag.time.max.ms参数设定,默认30s

数据可靠性分析

  • 实际上,如果分区副本设置为1,或者ISR应答的最小副本数量为1(min.insync.replicas默认为1),和ack=1的效果一样,仍然有丢失数据的风险

数据完全可靠条件 = A C K 级别设置为 − 1 + 分区副本大于等于 2 + I S R 中应答的最小副本数大于等于 2 数据完全可靠条件 = ACK级别设置为-1+分区副本大于等于2+ISR中应答的最小副本数大于等于2 数据完全可靠条件=ACK级别设置为1+分区副本大于等于2+ISR中应答的最小副本数大于等于2

//在CustomerProducer中添加配置信息,在输出的日志中都能看到
// 设置 acks
properties.put(ProducerConfig.ACKS_CONFIG,1);
// 重试次数 retries,默认是 int 最大值, 2147483647
properties.put(ProducerConfig.RETRIES_CONFIG,3);

数据重复分析

对于ack=-1的配置,如果leader收到数据并同步到follower之后,但是还没有进行应答,突然宕机。此时会选举新的leader,producer会发送同样的数据造成数据重复

image-20220512205239077

数据传递语义

  • 至少一次(at least once)

    • ACK级别设置为-1 + 分区副本 >= 2 + ISR中应答的最小副本数>=2
    • 至少一次(at least once)保证数据不丢失,但不保证数据不重复
  • 最多一次(at most once)

    • ACK为 0
    • 最多一次(at most once)保证数据不重复,但是不保证数据不丢
  • 精确一次(exactly once):开启幂等性+至少一次ACK级别设置为-1+分区副本>=2 + ISR中应答的最小副本数>=2

幂等性

幂等性能够保证无论producer向broker发送多少数据,始终保证broker只持久化一条数据
精确一次 ( e x a c t l y o n c e ) = 幂等性 + 至少一次 ( A C K ( − 1 ) + 分区副本 > = 2 + I S R 中应答的最小副本数 > = 2 ) 精确一次(exactly once) = 幂等性 + 至少一次(ACK(-1)+分区副本>=2+ISR中应答的最小副本数>=2) 精确一次(exactlyonce)=幂等性+至少一次(ACK(1)+分区副本>=2+ISR中应答的最小副本数>=2)
幂等性判断重复数据的标准,具有<PID,partition,seqnumber>相同主键的消息提交时,broker只会持久化一条,其中的PID是kafka每次重启都会分配新的,partition表示分区号,sequence number单调递增

  • 幂等性只能保证单分区单会话内不重复,kafka重启后pid会重置

  • 开启参数enable.Idempotence,默认开启为true,kafka在内存中直接将重复的数据删除

image-20220512210134041

重启之后还是可能产生重复数据,需要使用生产者事务

事务原理

由于幂等性只能保证在单分区和会话保证数据不重复,因此需要事务的来实现

  • 开启事务,必须开启幂等性

image-20220513094116493

kafka事务的api

// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
String consumerGroupId) throws
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

使用事务保证消息的仅一次发送

//重要:手动指定事务id
properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction_id_0");
//创建kafka对象
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
kafkaProducer.initTransactions();
kafkaProducer.beginTransaction();
try {
    for (int i = 0; i < 5; i++) {
        kafkaProducer.send(new ProducerRecord<>("first", "world" + i), new Callback() {
            @Override
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if (e == null) {
                    System.out.println("topic:" + recordMetadata.topic() + " partition:" + recordMetadata.partition());
                }
            }
        }).get();
    }
    //发送数据
    kafkaProducer.commitTransaction();
} catch (Exception e) {
    kafkaProducer.abortTransaction();
} finally {
    kafkaProducer.close();
}

数据有序

  • producer在不同分区内产生数据,无法保证有序
  • 单分区内,有序(有条件)
  • 多分区,分区之间无序。多分区有序,需要在comsumer端收到所有数据后进行整体重排序

在这里插入图片描述

数据乱序

kafka在1.x版本之前保证数据单分区有序,条件是max.in.flight.requests.per.connection =1(不需要考虑幂等性)

kafka在1.x版本之后

  • 未开启幂等性,max.in.flight.requests.per.connection =1

  • 开启幂等性,max.in.flight.requests.per.connection <= 5。原因:开启幂等性后kafka服务端会缓存producer发来的最近5个request的元数据,通过重新排序来保证有序

image-20220512212238271

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/700883.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

从0到1搭建spring cloud alibaba +springboot+nacos+dubbo微服务

版本关系&#xff1a; spring cloud alibaba各组件对应关系 创建父工程&#xff0c;pom.xml配置如下&#xff1a; 由以上版本对应关系&#xff1a; springboot版本&#xff1a;2.3.2.RELEASE spring cloud 版本选择&#xff1a;Hoxton.SR9 spring cloud alibaba版本选择&#…

【UE5 Cesium】02-Cesium for Unreal 添加在线数据集

上一篇&#xff1a; 【UE Cesium】01-在虚幻5中使用Cesium 步骤 1. 点击“connected to Cesium ion as xxx” 在弹出的网址中点击“Asset Depot”&#xff08;资产仓库&#xff09; 找到“Melbourne Photogrammetry”点击添加&#xff0c;添加到你的账户中。&#xff08;这里我…

关于我花了一个星期学习微信小程序开发、并且成功开发出一个商城项目系统的心得体会

前言 一直做的PC端的项目开发&#xff0c;想做一下手机端的开发。后端基本上是不用怎么变化&#xff0c;主要变化的是前端&#xff0c;前端网页运行的地方不同&#xff0c;一个运行在手机&#xff0c;一个运行在PC网页上。微信小程序的开发和Vue框架开发有诸多相似之处&#xf…

smardaten用户手册全新发布!5个超实用的使用技巧(建议收藏!)

社区版发布后&#xff0c;很多用户自行下载安装使用&#xff0c;我们收到了一些客官关于产品文档的吐槽和建议~~于是&#xff0c;我们重新编排了用户手册&#xff0c;来帮助大家更快、更好、更简单的上手无代码开发。今天睿睿来跟大家分享用户手册更新点&#xff0c;以及如何使…

常用网络接口自动化测试框架

目录 一、RESTful&#xff08;resource representational state transfer)类型接口测试 (一&#xff09;GUI界面测试工具&#xff1a;jmeter &#xff08;二&#xff09;JAVA语言脚本测试&#xff08;HttpClient) 二、WebService接口测试 &#xff08;一&#xff09;GUI界…

JAVA1

文章目录 计算机的硬件与软件DOS命令 计算机的硬件与软件 DOS命令

Flink-任务槽和并行度的关系

任务槽和并行度都跟程序的并行执行有关&#xff0c;但两者是完全不同的概念。简单来说任务槽是静态的概念&#xff0c;是指TaskManager具有的并发执行能力&#xff0c;可以通过参数taskmanager.numberOfTaskSlots进行配置&#xff1b;而并行度是动态概念&#xff0c;也就是Task…

菜鸟推出新一代资产管理操作系统“WIN”

在6月28日的2023全球智慧物流峰会上&#xff0c;菜鸟地网发布了新一代资产管理操作系统“WIN”。基于菜鸟地网多年积累的全球一体化物流基础设施网络和资产管理经验&#xff0c;依托物联网、大数据、人工智能等物流科技能力&#xff0c;“WIN”将为客户提供全链路的资产开发和运…

学习笔记20230629 -- 《分享在jsp分布式项目支援开发衍生功能时遇到和解决的问题》

1.jsp项目的页面跳转&#xff0c;需要后端的java技术做支撑&#xff0c;在java的接口文件中写跳转接口&#xff0c;使用ajax去请求这个跳转接口&#xff0c;将返回的数据&#xff08;html标签代码&#xff09;&#xff0c;放到当前页面或弹窗的"content"属性中 2…

联合体结合位域的作用

联合体结合位域的作用 例如 这段代码&#xff0c;巧妙运用了位域和联合体的特性&#xff0c;rx370x_cfg_data_t位域控制每个成员的大小 使总大小为32&#xff0c;cfg_u32和位域的大小相等&#xff0c;因为联合体共用一个空间的原因&#xff0c;此时cfg_u32中存放的内容就是位域…

如何实现WinApp的UI自动化测试?自动化工具如何选择人?

WinApp&#xff08;WindowsAPP&#xff09;是运行在Windows操作系统上的应用程序&#xff0c;通常会提供一个可视的界面&#xff0c;用于和用户交互。例如运行在Windows系统上的Microsoft Office、PyCharm、Visual Studio Code、Chrome&#xff0c;都属于WinApp。常见的WinApp&…

遇到客户服务问题,有哪些解决方法?

在当今竞争激烈的商业世界中&#xff0c;客户服务已成为任何成功企业不可或缺的一部分。然而&#xff0c;许多企业仍然难以提供高质量的客户服务。今天&#xff0c;我们简单聊一聊客户服务会遇到哪些问题&#xff1f;怎么解决&#xff1f; 1、客户服务人员培训不足 中小企业在…

12 MFC常用控件(二)

文章目录 滑动条控件初始化滚动条滑动滚动条获取消息 微调控件进度条控件时间控件 滑动条控件 初始化滚动条 CSliderCtrl* sliderCtrl (CSliderCtrl*)GetDlgItem(IDC_SLIDER1);sliderCtrl->SetRange(0,100);//设置范围sliderCtrl->SetPos(50);//当前显示在50//int nPos…

常见的锁策略CAS

目录 一、乐观锁&悲观锁 1.1、悲观锁 1.2、乐观锁 二、重量级锁&轻量级锁 2.1、轻量级锁 2.2、重量级锁 三、自旋锁&挂机等待锁 3.1、自旋锁 3.2、挂起等待锁 四、读写锁&普通互斥锁 4.1、读写锁 4.2、互斥锁 五、公平锁&非公平锁 六、可…

HBase(7):大量数据的计数统计

当HBase中数据量大时&#xff0c;可以使用HBase中提供的MapReduce程序来进行计数统计。语法如下&#xff1a; $HBASE_HOME/bin/hbase org.apache.hadoop.hbase.mapreduce.RowCounter 表名 1 启动YARN集群 启动yarn集群 start-yarn.sh 启动history server mr-jobhistory-da…

计算机视觉:多通道卷积操作

本文重点 前面我们学习了对灰度图的卷积操作(二维图像),本节课程我们学习RGB 彩色图像的卷积操作(三维立体)也就是说现在我们不仅想检测灰度图像的特征,也想检测 RGB 彩色图像的特征。 彩色图片的表示方法 彩色图片通常使用RGB(Red、Green、Blue)三个颜色通道来表示…

【裸机开发】UART 串口通信(一)—— 寄存器解析

目录 一、认识 UART 1、概念 2、帧格式 二、IO 复用为 UART 寄存器解析 1、原理图分析 2、寄存器解析 三、UART 相关寄存器解析 1、UART1_UCR1~4 2、UART1_USR1~2 3、波特率配置 4、UART1_URXD 5、UART1_UTXD 一、认识 UART 1、概念 UART 是一种通用的串行、异步…

web自动化测试如何实现(二)

目录 主流的自动化方案 web自动化测试环境如何搭建 1.安装selenium &#x1f381;更多干货 完整版文档下载方式&#xff1a; 主流的自动化方案 怎么进行选择&#xff1a; 如果有前端开发基础&#xff1a;cypress 如果只打算测试web端&#xff1a;playwright 除此之外&a…

【运维工程师学习】Linux常用命令

Linux常用命令 验证系统版本验证系统类型&#xff08;32位or64位&#xff09;验证分区情况验证CPU配置验证内存配置文件操作1、pwd2、cd3、ls 如何像将ll定义为ls -l的别名&#xff0c;这样去设置定义别名文件操作&#xff08;mkdir、rm、cp、mv&#xff09;文本编辑&#xff0…

Spring专家课程Day03_SpringMVC概述

文章目录 一、SpringMVC 项目搭建1、SpringMVC 简介2、SpringMVC的核心组件和执行流程3、SpringMVC的项目创建&#xff08;Idea&#xff09;3.1 Maven中创建 二、Thymeleaf试图_显示注册试图三、控制器接收表达数据四、接受get请求参数_RequestParm&#xff1b;总结&#xff1a…