【kafka】Java客户端代码demo:自动异步提交、手动同步提交及提交颗粒度、动态负载均衡

news2024/12/23 20:41:09

一,代码及配置项介绍

kafka版本为3.6,部署在3台linux上。

maven依赖如下:

        <!-- kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.13</artifactId>
            <version>3.6.0</version>
        </dependency>

生产者、消费者和topic代码如下:

    String topic = "items-01";
    
    @Test
    public void producer() throws ExecutionException, InterruptedException {
        Properties p = new Properties();
        p.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.129:9092,192.168.184.130:9092,192.168.184.131:9092");
        p.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        p.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        p.setProperty(ProducerConfig.ACKS_CONFIG, "-1");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(p);
        while(true){
            for (int i = 0; i < 3; i++) {
                for (int j = 0; j <3; j++) {
                    ProducerRecord<String, String> record = new ProducerRecord<>(topic, "item"+j,"val" + i);
                    Future<RecordMetadata> send = producer
                            .send(record);

                    RecordMetadata rm = send.get();
                    int partition = rm.partition();
                    long offset = rm.offset();
                    System.out.println("key: "+ record.key()+" val: "+record.value()+" partition: "+partition + " offset: "+offset);

                }
            }
        }
    }

    @Test
    public void consumer(){
    	        //基础配置
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.129:9092,192.168.184.130:9092,192.168.184.131:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //消费的细节
        String group = "user-center";
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,group);
        //KAKFA IS MQ  IS STORAGE
        /**
         *         "What to do when there is no initial offset in Kafka or if the current offset
         *         does not exist any more on the server
         *         (e.g. because that data has been deleted):
         *         <ul>
         *             <li>earliest: automatically reset the offset to the earliest offset
         *             <li>latest: automatically reset the offset to the latest offset</li>
         *             <li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li><li>anything else: throw exception to the consumer.</li>
         *         </ul>";
         */
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");//第一次启动,米有offset

        //自动提交时异步提交,丢数据&&重复数据
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");

//        p.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"15000");
//        p.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");


        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList(topic));

        while(true){
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(0));
            if (!records.isEmpty()){
                System.out.println();
                System.out.println("-----------------" + records.count() + "------------------------------");

                Iterator<ConsumerRecord<String,String>> iterator = records.iterator();
                while (iterator.hasNext()){
                    ConsumerRecord<String,String> record = iterator.next();
                    int partition = record.partition();
                    long offset = record.offset();
                    System.out.println("key: " + record.key() + " val: " + record.value() + " partition: " + partition + " offset: " + offset);
                }

                }
            }
    }

这里先简单解释一下,kafka的topic只是一个逻辑上的概念,实际上的物理存储是依赖分布在broker中的分区partition来完成的。kafka依赖的zk中有一个__consumer_offsets[1]话题,存储了所有consumer和group消费的进度,包括当前消费到的进度current-offset、kafka写入磁盘的日志中记录的消息的末尾log-end-offset

kafka根据消息的key进行哈希取模的结果来将消息分配到不同的partition,partition才是consumer拉取的对象。每次consumer拉取,都是从一个partition中拉取。(这一点,大家可以自己去验证一下)

在这里插入图片描述

下面代码,是描述的当consumer第一次启动时,在kafka中还没有消费的记录,此时current-offset为"-"时,consumer应如何拉取数据的行为。有3个值可选,latest、earliest、none。

当设置如下配置为latest,没有current-offset时,只拉取consumer启动后的新消息。
earliest,没有current-offset时,从头开始拉取消息消费。
node,没有current-offset时,抛异常。

它们的共同点就是current-offset有值时,自然都会按照current-offset拉取消息。

properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");

下面代码,true表示设置的异步自动提交,false为手动提交。

properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");

下面代码,设置的是自动提交时,要过多少秒去异步自动提交。

properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"15000");

下面代码,是设置kafka批量拉取多少数据,默认的应该是最大500,小于500。 kafka可以批量的拉取数据,这样可以节省网卡资源。

properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");

二、异步自动提交

部分设置项如下:

        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");//自动提交时异步提交,丢数据&&重复数据
        properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"15000");
        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10"); 

开启生产者,生产出一些消息,可以看到之前拉取完数据的group又有了新的数据。
在这里插入图片描述
开启消费者,可以看到消息被消费掉。
在这里插入图片描述

因为提交是异步的,我们需要需要为了业务代码留出处理时间。所以需要设置异步提交时间。

假设在间隔时间(AUTO_COMMIT_INTERVAL_MS_CONFIG,自动提交间隔毫秒数配置)内,还没有提交的时候,消费过数据假设数据,consumer挂了。那么consumer再次启动时,从kafka拉取数据,就会因为还没有提交offset,而重新拉取消费过的数据,导致重复消费。

假设现在已经过了延间隔时间,提交成功了,但是业务还没有完成,并且在提交后失败了。那么这个消费失败的消息也不会被重新消费了,导致丢失消息。

为了解决上述的问题,可以使用手动同步提交。

三、手动同步提交

假设我们现在是按照批量拉取,下面介绍2种提交粒度的demo,粒度由小到大,分别是按条提交,按partition提交 && 按批次提交。

3.1 按条提交


    public void test1(){

        //基础配置
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.129:9092,192.168.184.130:9092,192.168.184.131:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //消费的细节
        String group = "user-center";
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,group);
        //KAKFA IS MQ  IS STORAGE
        /**
         *         "What to do when there is no initial offset in Kafka or if the current offset
         *         does not exist any more on the server
         *         (e.g. because that data has been deleted):
         *         <ul>
         *             <li>earliest: automatically reset the offset to the earliest offset
         *             <li>latest: automatically reset the offset to the latest offset</li>
         *             <li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li><li>anything else: throw exception to the consumer.</li>
         *         </ul>";
         */
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");//第一次启动,米有offset

        //自动提交时异步提交,丢数据&&重复数据
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");

//        p.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"15000");
//        p.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");


        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList(topic));
        while(true){
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(0));
            if (!records.isEmpty()){
                System.out.println();
                System.out.println("-----------------" + records.count() + "------------------------------");
                
                Iterator<ConsumerRecord<String,String>> iterator = records.iterator();
                while (iterator.hasNext()){
                    ConsumerRecord<String,String> next = iterator.next();
                    int p = next.partition();
                    long offset = next.offset();
                    String key = next.key();
                    String value = next.value();
                    System.out.println("key: " + key + " val: " + value + " partition: " + p + " offset: " + offset);

                    TopicPartition sp = new TopicPartition(topic,p);
                    OffsetAndMetadata om = new OffsetAndMetadata(offset);
                    HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
                    map.put(sp,om);
                    consumer.commitSync(map);
                    
                }
            }
        }
    }

3.2 按partition提交 && 按批次提交

由于消费者每一次拉取都是从一个partition中拉取,所以其实按partition拉取和按批次拉取,是一回事。整体成功

    @Test
    public void test2(){
        //基础配置
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.129:9092,192.168.184.130:9092,192.168.184.131:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //消费的细节
        String group = "user-center";
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,group);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");

        //自动提交时异步提交,丢数据&&重复数据
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");

//        p.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"15000");
//        p.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Arrays.asList(topic));

        while(true){
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(0));
            if (!records.isEmpty()){
                System.out.println();
                System.out.println("-----------------" + records.count() + "------------------------------");

                Set<TopicPartition> partitions = records.partitions();
                for (TopicPartition partition : partitions){
                    List<ConsumerRecord<String,String>> pRecords = records.records(partition);

                    Iterator<ConsumerRecord<String,String>> pIterator = pRecords.iterator();
                    while (pIterator.hasNext()){
                        ConsumerRecord<String,String> next = pIterator.next();
                        int p = next.partition();
                        long offset = next.offset();
                        String key = next.key();
                        String value = next.value();
                        System.out.println("key: " + key + " val: " + value + " partition: " + p + " offset: " + offset);
                    }
                    //按partition提交
                    long offset = pRecords.get(pRecords.size() - 1).offset();
                    OffsetAndMetadata om = new OffsetAndMetadata(offset);
                    HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
                    map.put(partition,om);
                    consumer.commitSync(map);
                }
                //按批次提交
//                consumer.commitSync();
            }
        }
    }

四,动态负载均衡

我们知道,对于一个topic,一个group中,为了保证消息的顺序性,默认只能有一个consumer来消费。假设我们有3台消费者,那么此时,另外2台消费者就会闲着不干活。有没有可能能够既保证消费消息的顺序性,又能够提升性能呢?

答案就是kafka的动态负载均衡

前面提到了,producer会根据消息的key的哈希取模的结果来把消息分配到某个partition,也就是说同一个key的消息,只存在于一个partition中。而且消费者拉取消息,一个批次,只从一个partition中拉取消息

假设我们现在有一个topic,有2个partition。那么我们可不可以在组内3台消费者中,挑2台出来,各自对应这个topic的2个partition,这样消费者和partition一一对应。既能保证消息的顺序性,又能够提升性能。这就是kafka的动态负载均衡。

代码如下:

    //动态负载均衡
    @Test
    public void test3(){
        //基础配置
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.129:9092,192.168.184.130:9092,192.168.184.131:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //消费的细节
        String group = "user-center";
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,group);
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");

        //自动提交时异步提交,丢数据&&重复数据
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");

        properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"15000");
        properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        //kafka 的consumer会动态负载均衡
        consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {
            //Revoked,取消的回调函数
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                System.out.println("---onPartitionsRevoked:");
                Iterator<TopicPartition> iter = partitions.iterator();
                while(iter.hasNext()){
                    System.out.println(iter.next().partition());
                }
                System.out.println();
            }

            //Assigned 指定的回调函数
            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                System.out.println("---onPartitionsAssigned:");
                Iterator<TopicPartition> iter = partitions.iterator();

                while(iter.hasNext()){
                    System.out.println(iter.next().partition());
                }
                System.out.println();
            }
        });

        while(true){
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(0));
            if (!records.isEmpty()){
                System.out.println();
                System.out.println("-----------------" + records.count() + "------------------------------");

                Iterator<ConsumerRecord<String,String>> iterator = records.iterator();
                while (iterator.hasNext()){
                    ConsumerRecord<String,String> next = iterator.next();
                    int p = next.partition();
                    long offset = next.offset();
                    String key = next.key();
                    String value = next.value();
                    System.out.println("key: " + key + " val: " + value + " partition: " + p + " offset: " + offset);
                    TopicPartition sp = new TopicPartition(topic,p);
                    OffsetAndMetadata om = new OffsetAndMetadata(offset);
                    HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
                    map.put(sp,om);
                    consumer.commitSync(map);
                }
            }
        }
    }

上述代码在订阅时,加了一个ConsumerRebalanceListener监听器,实现了2个回调函数onPartitionsRevoked和onPartitionsAssigned,分别是取消组内消费者负载均衡时触发的回调函数,和指定组内消费者加入负载均衡时触发的回调函数。

在使用动态负载均衡时,需要注意的是,在提交时不要批量提交,否则会报错如下,暂时还没有研究问题原因,有了结果会回来更新的。

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

先打开一个消费者A,触发了回调函数onPartitionsAssigned,可以看到partition0 和 partition1都被分配到了A上。在这里插入图片描述

此时打开生产者,可以看到partition0和1的消息都发送到了A上。
在这里插入图片描述

我们再打开一个同一个组内的消费者B。
可以看到A取消了partition0和1的分配,被指定了partition0。消费者B则被指定了partition1.在这里插入图片描述在这里插入图片描述

再次打开生产者去生产消息,这次A只消费partition 0的消息,B只消费partition1的消息。
在这里插入图片描述
在这里插入图片描述

如果我们再启动组内第3台消费者,那么组内消费者会再次负载均衡。由于这个topic只有2个partition,所以即使启动3台组内的消费者,也最多只有2个消费者被分配给某个partition,剩余1个消费者不参与负载均衡
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

参考文章:
[1],【kafka】记一次kafka基于linux的原生命令的使用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1191800.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

QGraphicsView

** QGraphicsView教程及示例代码 ** 1、简介 在Qt界面库中,对于图形的绘制,可以使用 QPainter 实现普通二维图形的绘制,该方法在 paintEvent 事件里编写绘图程序,其本质绘制的图形是位图,这种方法更适合于绘制复杂度不高的固定图形,并且不能实现图项的选择、编辑、拖…

福布斯:Salesforce和ZohoCRM,哪个更适合你?

上周&#xff0c;福布斯发布了《CRM软件指南》&#xff0c;从企业的实际需求出发&#xff0c;通过性价比、功能、可用性、第三方集成、分析工具等多个维度进行比较&#xff0c;最终推选出7家代表厂商。本周&#xff0c;福布斯就其中呼声较高的两家企业Salesforce、Zoho CRM做进…

uniapp和vue3+ts开发小程序,使用vscode提示声明变量冲突解决办法

在uniapp中&#xff0c;我们可能经常会遇到需要在不用的环境中使用不同变量的场景&#xff0c;例如在VUE3中的小程序环境使用下面的方式导入echarts&#xff1a; const echarts require(../../static/echarts.min); 如果不是小程序环境则使用下面的方式导入echarts&#xff…

苹果手机发热发烫是什么原因?看完这篇你就知道了!

苹果手机以其卓越的用户体验和优秀的性能得到了广大用户的喜爱和追捧。在日常使用苹果手机时&#xff0c;我们可能会遇到手机发热发烫的情况。那么&#xff0c;苹果手机发热发烫是什么原因呢&#xff1f;小编将为大家解析这一问题的原因&#xff0c;并为您提供相应的解决方案&a…

mysql隐式转换转换引起的bug

生产环境中遇到一个情况情况 &#xff0c;过滤数据发现过滤不掉相关值情况&#xff0c;具体情况如下 原始数据&#xff1a; CREATE TABLE test (id bigint(11) NOT NULL AUTO_INCREMENT COMMENT 自增id,subject_id bigint(11) NOT NULL DEFAULT 0 COMMENT 主题id,subject_nam…

Java 设计模式——享元模式

目录 1.概述2.结构3.实现3.1.抽象享元3.2.具体享元3.3.享元工厂3.4.测试 4.优缺点5.使用场景6.JDK 源码解析——Integer 类 1.概述 &#xff08;1&#xff09;享元模式 (Flyweight Pattern) 是一种结构型设计模式&#xff0c;主要通过共享对象来减少系统中的对象数量&#xff…

【Cheat Engine7.5】基础教程第三关(步骤4)

文章目录 一、简介二、操作步骤2.1、加载进程2.2、查找健康数据2.2.1、首次扫描(单浮点数100)2.2.2、点击打我&#xff0c;再次扫描数值97.112.2.3、修改数据值为50002.2.4、测试正常 2.3、查找弹药数据2.3.1、双浮点数1002.3.2、点击开火2.3.3、修改数据2.3.4、测试 2.4、通关…

【23真题】太难!千万别考!不值!

今天分享的是23年哈尔滨工程大学810的信号与系统试题及解析。 为什么说不值呢&#xff1f;因为哈工程810据之前的分析来看不保护一志愿&#xff0c;就23年810的专业课来看&#xff0c;又在超纲的边缘疯狂试探&#xff01;&#xff08;如果它默认考DSP&#xff0c;当我没说&…

这份华为以太网接口配置命令太真香了!

【赠送】IT技术视频教程&#xff0c;白拿不谢&#xff01;思科、华为、红帽、数据库、云计算等等_厦门微思网络的博客-CSDN博客文章浏览阅读415次。风和日丽&#xff0c;小微给你送福利~如果你是小微的老粉&#xff0c;这里有一份粉丝福利待领取...如果你是新粉关注到了小微&am…

if...else绝佳替换方案

目录 方法一&#xff1a;提前 return方法二&#xff1a;枚举方案三&#xff1a;Optional 判空方案四&#xff1a;表驱动法方案五&#xff1a;策略模式 工厂方法方案六&#xff1a;责任链模式方案七&#xff1a;Function 方法一&#xff1a;提前 return 假如有如下代码&#x…

python实现微信新版v3的jsapi支付

python实现微信新版v3的jsapi支付 1、需要从公众号、商户号获取的信息 注意&#xff1a;在商户号的支付授权目录中需要设置好发起支付的界面url&#xff0c;比如我的&#xff1a; http://xxx/paypage/# 商户证书私钥&#xff0c;此文件不要放置在下面设置的CERT_DIR目录里。…

ZYNQ通过AXI DMA实现PL发送连续大量数据到PS DDR

硬件&#xff1a;ZYNQ7100 软件&#xff1a;Vivado 2017.4、Xilinx SDK 2017.4   ZYNQ PL 和 PS 的通信方式有 AXI GPIO、BRAM、DDR等。对于数据量较少、地址不连续、长度规则的情况&#xff0c;BROM 比较适用。而对于传输速度要求高、数据量大、地址连续的情况&#xff0c;比…

浏览器标签页之间的通信

前言 在开发管理后台页面的时候&#xff0c;会遇到这样一种需求&#xff1a;有一个列表页面&#xff0c;一个新增按钮&#xff0c;一个新增页面&#xff0c;点击新增按钮&#xff0c;在一个新的标签页中打开新增页面。并且&#xff0c;新增后要自动实时的更新列表页面的数据。…

Mybatis-plus 内部提供的 ServiceImpl<M extends BaseMapper<T>, T> 学习总结

作用 当集成Mybatis-Plus 后&#xff0c;我们的大部分数据库操作都可以通过 XxxxxMapper &#xff0c;同时 Mybatis-plus 在Mapper 提供基本操作方法的同时&#xff0c;也提供类基础的 serviceImpl 来帮助我们完成一些常见的基本操作。 使用 一般情况下&#xff0c;我们首先…

腾讯云真的是良心云!服务器带宽、CPU、硬盘IO性能大揭秘!

本文将通过对腾讯云服务器CVM S5 4核配置的云服务器进行测试&#xff0c;来评估其在带宽、CPU和硬盘IO性能方面的表现。 在云服务器的并发处理中&#xff0c;带宽是一个重要的因素。经过测试&#xff0c;腾讯云的带宽网络表现非常出色&#xff0c;能够跑满带宽&#xff0c;同时…

1.linux极速进阶

目录 概述文件相关vi文件编辑查找字符串查找某一行内容复制粘贴快速删除快速跳到文件首行和末行 进程相关ps/netstatjpstopkill linux三剑客grepsed添加方面操作删除方面替换操作 awk 结束 概述 身为后端开发&#xff0c;大数据平台搭建&#xff0c;对 linux 系统的操作最起码…

React的refs和表单组件总结

React的refs和表单组件 react中refs的使用字符串形式的ref react核心就在于虚拟DOM&#xff0c;也就是React中不总是直接操页面的真实DOM元素&#xff0c;并且结合Diffing算法&#xff0c;可以做到最小化页面重绘&#xff0c;但有些时候不可避免我们需要一种方法可以操作我们定…

pip 安装任意软件包报错

现象 使用 pip 命令时提示 查看源码 可以看到是从 pip 包中导入 main失败&#xff0c;点击查看目录 main 文件不见了&#xff0c;判断是文件缺失&#xff0c;重装 pip 即可 # python3 下载 pip curl https://bootstrap.pypa.io/get-pip.py -o get-pip.py # python2 下载…

精密设备企业适合哪款CRM客户管理体系?

精密设备企业致力于打造现代化管理体系&#xff0c;以精密的仪器、精细的销售、精准的市场、精确的售后为企业核心&#xff0c;提供优质的精密产品和专业服务。随着企业的发展及市场发展需要&#xff0c;建立高效的客户关系管理体系势在必行。那么&#xff0c;精密设备企业适合…

C#医学检验室(LIS)信息管理系统源码

LIS:实验室信息管理系统 (Laboratory Information Management System简称:LIS)。 LIS 是面向医院检验科、检验中心、动物实验所、生物医疗研究所等科研单位研发的集数据采集、传输、存储、分析、处理、发布等功能于一体的信息管理系统。 一、完善的质控&#xff1a; 从样本管理…