Kafka生产者原理 kafka生产者发送流程 kafka消息发送到集群步骤 kafka如何发送消息 kafka详解

news2025/1/9 1:44:19

kafka尚硅谷视频:

10_尚硅谷_Kafka_生产者_原理_哔哩哔哩_bilibili

     1. producer初始化:加载默认配置,以及配置的参数,开启网络线程

     2. 拦截器拦截

     3. 序列化器进行消息key, value序列化

     4. 进行分区

     5. kafka broker集群 获取metaData

     6. 消息缓存到RecordAccumulator收集器,分配到该分区的DQueue(RecordBatch)

     7. batch.size满了,或者linker.ms到达指定时间,唤醒sender线程, 实例化networkClient

         RecordBatch ==>RequestClient 发送消息体,

      8. 与分区相同broker建立网络连接,发送到对应broker

 1. send()方法参数producerRecord对象:

    关于分区:

      a.指定分区,则发送到该分区    

      b.不指定分区,k值没有传入,使用黏性分区(sticky partition

                 第一次调用时随机生成一个整数(后面每次调用在这个整数上自增),将这个值与 topic 可用的 partition 总数取余得到 partition 值,也就是常说的 round-robin 算法   

      c.不指定分区,传入k值,k值先进行hash获取hashCodeValue, 再与topic下的分区数进行求模取余,进行分区。

      如 k hash = 5 topic目前的分区数2  则 分区为:1

          k  hash =6  topic目前的分区数2  则 分区为:0

2. KafkaProducer 异步, 同步发送api:

    异步发送:

                    producer.send(producerRecord对象);

    同步发送则send()方法后面.get()



kafka 的send方法核心逻辑:

    public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
        return this.send(record, (Callback)null);
    }

    public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
        // 拦截器集合。多个拦截对象循环遍历
        ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
        return this.doSend(interceptedRecord, callback);
    }
    
    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
        TopicPartition tp = null;

        // 获取集群信息metadata
        try {
            this.throwIfProducerClosed();
            long nowMs = this.time.milliseconds();

            ClusterAndWaitTime clusterAndWaitTime;
            try {
                clusterAndWaitTime = this.waitOnMetadata(record.topic(), record.partition(), nowMs, this.maxBlockTimeMs);
            } catch (KafkaException var22) {
                if (this.metadata.isClosed()) {
                    throw new KafkaException("Producer closed while send in progress", var22);
                }

                throw var22;
            }

            nowMs += clusterAndWaitTime.waitedOnMetadataMs;
            long remainingWaitMs = Math.max(0L, this.maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
            Cluster cluster = clusterAndWaitTime.cluster;

            // 序列化器 key序列化
            byte[] serializedKey;
            try {
                serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());
            } catch (ClassCastException var21) {
                throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() + " to class " + this.producerConfig.getClass("key.serializer").getName() + " specified in key.serializer", var21);
            }
      
            // 序列化器 value序列化
            byte[] serializedValue;
            try {
                serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());
            } catch (ClassCastException var20) {
                throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() + " to class " + this.producerConfig.getClass("value.serializer").getName() + " specified in value.serializer", var20);
            }

            // 分区
            int partition = this.partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);
            this.setReadOnly(record.headers());
            Header[] headers = record.headers().toArray();
            int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(this.apiVersions.maxUsableProduceMagic(), this.compressionType, serializedKey, serializedValue, headers);
            this.ensureValidRecordSize(serializedSize);
            long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
            if (this.log.isTraceEnabled()) {
                this.log.trace("Attempting to append record {} with callback {} to topic {} partition {}", new Object[]{record, callback, record.topic(), partition});
            }

            Callback interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);
            // RecordAccumulator.append() 添加数据转 ProducerBatch
            RecordAccumulator.RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);
            if (result.abortForNewBatch) {
                int prevPartition = partition;
                this.partitioner.onNewBatch(record.topic(), cluster, partition);
                partition = this.partition(record, serializedKey, serializedValue, cluster);
                tp = new TopicPartition(record.topic(), partition);
                if (this.log.isTraceEnabled()) {
                    this.log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", new Object[]{record.topic(), partition, prevPartition});
                }

                interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);
                result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
            }

            if (this.transactionManager != null) {
                this.transactionManager.maybeAddPartition(tp);
            }

            // 判断是否满了,满了唤醒sender , sender继承了runnable
            if (result.batchIsFull || result.newBatchCreated) {
                this.log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
                this.sender.wakeup();
            }

            return result.future;
        } catch (ApiException var23) {
            this.log.debug("Exception occurred during message send:", var23);
            if (tp == null) {
                tp = ProducerInterceptors.extractTopicPartition(record);
            }

            Callback interceptCallback = new InterceptorCallback(callback, this.interceptors, tp);
            interceptCallback.onCompletion((RecordMetadata)null, var23);
            this.errors.record();
            this.interceptors.onSendError(record, tp, var23);
            return new FutureFailure(var23);
        } catch (InterruptedException var24) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, var24);
            throw new InterruptException(var24);
        } catch (KafkaException var25) {
            this.errors.record();
            this.interceptors.onSendError(record, tp, var25);
            throw var25;
        } catch (Exception var26) {
            this.interceptors.onSendError(record, tp, var26);
            throw var26;
        }
    }

  Sender类 run()方法:

 public void run() {
        this.log.debug("Starting Kafka producer I/O thread.");

        while(this.running) {
            try {
                this.runOnce();
            } catch (Exception var5) {
                this.log.error("Uncaught error in kafka producer I/O thread: ", var5);
            }
        }

        this.log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");

        while(!this.forceClose && (this.accumulator.hasUndrained() || this.client.inFlightRequestCount() > 0 || this.hasPendingTransactionalRequests())) {
            try {
                this.runOnce();
            } catch (Exception var4) {
                this.log.error("Uncaught error in kafka producer I/O thread: ", var4);
            }
        }

        while(!this.forceClose && this.transactionManager != null && this.transactionManager.hasOngoingTransaction()) {
            if (!this.transactionManager.isCompleting()) {
                this.log.info("Aborting incomplete transaction due to shutdown");
                this.transactionManager.beginAbort();
            }

            try {
                this.runOnce();
            } catch (Exception var3) {
                this.log.error("Uncaught error in kafka producer I/O thread: ", var3);
            }
        }

        if (this.forceClose) {
            if (this.transactionManager != null) {
                this.log.debug("Aborting incomplete transactional requests due to forced shutdown");
                this.transactionManager.close();
            }

            this.log.debug("Aborting incomplete batches due to forced shutdown");
            this.accumulator.abortIncompleteBatches();
        }

        try {
            this.client.close();
        } catch (Exception var2) {
            this.log.error("Failed to close network client", var2);
        }

        this.log.debug("Shutdown of Kafka producer I/O thread has completed.");
    }


    void runOnce() {
        if (this.transactionManager != null) {
            try {
                this.transactionManager.maybeResolveSequences();
                if (this.transactionManager.hasFatalError()) {
                    RuntimeException lastError = this.transactionManager.lastError();
                    if (lastError != null) {
                        this.maybeAbortBatches(lastError);
                    }

                    this.client.poll(this.retryBackoffMs, this.time.milliseconds());
                    return;
                }

                this.transactionManager.bumpIdempotentEpochAndResetIdIfNeeded();
                if (this.maybeSendAndPollTransactionalRequest()) {
                    return;
                }
            } catch (AuthenticationException var5) {
                this.log.trace("Authentication exception while processing transactional request", var5);
                this.transactionManager.authenticationFailed(var5);
            }
        }

        long currentTimeMs = this.time.milliseconds();
        // 发送数据
        long pollTimeout = this.sendProducerData(currentTimeMs);
        this.client.poll(pollTimeout, currentTimeMs);
    }

  sendProducerData() :

      最终转换为ClientRequest对象

         ClientRequest clientRequest = this.client.newClientRequest(nodeId, requestBuilder, now, acks != 0, this.requestTimeoutMs, callback);
         this.client.send(clientRequest, now);
private long sendProducerData(long now) {
        Cluster cluster = this.metadata.fetch();
        RecordAccumulator.ReadyCheckResult result = this.accumulator.ready(cluster, now);
        Iterator iter;
        if (!result.unknownLeaderTopics.isEmpty()) {
            iter = result.unknownLeaderTopics.iterator();

            while(iter.hasNext()) {
                String topic = (String)iter.next();
                this.metadata.add(topic, now);
            }

            this.log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", result.unknownLeaderTopics);
            this.metadata.requestUpdate();
        }

        iter = result.readyNodes.iterator();
        long notReadyTimeout = Long.MAX_VALUE;

        while(iter.hasNext()) {
            Node node = (Node)iter.next();
            if (!this.client.ready(node, now)) {
                iter.remove();
                notReadyTimeout = Math.min(notReadyTimeout, this.client.pollDelayMs(node, now));
            }
        }

        Map<Integer, List<ProducerBatch>> batches = this.accumulator.drain(cluster, result.readyNodes, this.maxRequestSize, now);
        this.addToInflightBatches(batches);
        List expiredBatches;
        Iterator var11;
        ProducerBatch expiredBatch;
        if (this.guaranteeMessageOrder) {
            Iterator var9 = batches.values().iterator();

            while(var9.hasNext()) {
                expiredBatches = (List)var9.next();
                var11 = expiredBatches.iterator();

                while(var11.hasNext()) {
                    expiredBatch = (ProducerBatch)var11.next();
                    this.accumulator.mutePartition(expiredBatch.topicPartition);
                }
            }
        }

        this.accumulator.resetNextBatchExpiryTime();
        List<ProducerBatch> expiredInflightBatches = this.getExpiredInflightBatches(now);
        expiredBatches = this.accumulator.expiredBatches(now);
        expiredBatches.addAll(expiredInflightBatches);
        if (!expiredBatches.isEmpty()) {
            this.log.trace("Expired {} batches in accumulator", expiredBatches.size());
        }

        var11 = expiredBatches.iterator();

        while(var11.hasNext()) {
            expiredBatch = (ProducerBatch)var11.next();
            String errorMessage = "Expiring " + expiredBatch.recordCount + " record(s) for " + expiredBatch.topicPartition + ":" + (now - expiredBatch.createdMs) + " ms has passed since batch creation";
            this.failBatch(expiredBatch, (RuntimeException)(new TimeoutException(errorMessage)), false);
            if (this.transactionManager != null && expiredBatch.inRetry()) {
                this.transactionManager.markSequenceUnresolved(expiredBatch);
            }
        }

        this.sensors.updateProduceRequestMetrics(batches);
        long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout);
        pollTimeout = Math.min(pollTimeout, this.accumulator.nextExpiryTimeMs() - now);
        pollTimeout = Math.max(pollTimeout, 0L);
        if (!result.readyNodes.isEmpty()) {
            this.log.trace("Nodes with data ready to send: {}", result.readyNodes);
            pollTimeout = 0L;
        }

        this.sendProduceRequests(batches, now);
        return pollTimeout;
    }

    private void sendProduceRequests(Map<Integer, List<ProducerBatch>> collated, long now) {
        Iterator var4 = collated.entrySet().iterator();

        while(var4.hasNext()) {
            Map.Entry<Integer, List<ProducerBatch>> entry = (Map.Entry)var4.next();
            this.sendProduceRequest(now, (Integer)entry.getKey(), this.acks, this.requestTimeoutMs, (List)entry.getValue());
        }

    }

    private void sendProduceRequest(long now, int destination, short acks, int timeout, List<ProducerBatch> batches) {
        if (!batches.isEmpty()) {
            Map<TopicPartition, ProducerBatch> recordsByPartition = new HashMap(batches.size());
            byte minUsedMagic = this.apiVersions.maxUsableProduceMagic();
            Iterator var9 = batches.iterator();

            while(var9.hasNext()) {
                ProducerBatch batch = (ProducerBatch)var9.next();
                if (batch.magic() < minUsedMagic) {
                    minUsedMagic = batch.magic();
                }
            }

            ProduceRequestData.TopicProduceDataCollection tpd = new ProduceRequestData.TopicProduceDataCollection();
            Iterator var16 = batches.iterator();

            while(var16.hasNext()) {
                ProducerBatch batch = (ProducerBatch)var16.next();
                TopicPartition tp = batch.topicPartition;
                MemoryRecords records = batch.records();
                if (!records.hasMatchingMagic(minUsedMagic)) {
                    records = (MemoryRecords)batch.records().downConvert(minUsedMagic, 0L, this.time).records();
                }

                ProduceRequestData.TopicProduceData tpData = tpd.find(tp.topic());
                if (tpData == null) {
                    tpData = (new ProduceRequestData.TopicProduceData()).setName(tp.topic());
                    tpd.add(tpData);
                }

                tpData.partitionData().add((new ProduceRequestData.PartitionProduceData()).setIndex(tp.partition()).setRecords(records));
                recordsByPartition.put(tp, batch);
            }

            String transactionalId = null;
            if (this.transactionManager != null && this.transactionManager.isTransactional()) {
                transactionalId = this.transactionManager.transactionalId();
            }

            ProduceRequest.Builder requestBuilder = ProduceRequest.forMagic(minUsedMagic, (new ProduceRequestData()).setAcks(acks).setTimeoutMs(timeout).setTransactionalId(transactionalId).setTopicData(tpd));
            RequestCompletionHandler callback = (response) -> {
                this.handleProduceResponse(response, recordsByPartition, this.time.milliseconds());
            };
            String nodeId = Integer.toString(destination);
            ClientRequest clientRequest = this.client.newClientRequest(nodeId, requestBuilder, now, acks != 0, this.requestTimeoutMs, callback);
            // this.client 为KafkaClient接口 实现类:NetworkClient对象
            this.client.send(clientRequest, now);
            this.log.trace("Sent produce request to {}: {}", nodeId, requestBuilder);
        }
    }

 NetworkClient send()方法:

 public void send(ClientRequest request, long now) {
        this.doSend(request, false, now);
    }

private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now) {
        this.ensureActive();
        String nodeId = clientRequest.destination();
        if (!isInternalRequest && !this.canSendRequest(nodeId, now)) {
            throw new IllegalStateException("Attempt to send a request to node " + nodeId + " which is not ready.");
        } else {
            AbstractRequest.Builder<?> builder = clientRequest.requestBuilder();

            try {
                NodeApiVersions versionInfo = this.apiVersions.get(nodeId);
                short version;
                if (versionInfo == null) {
                    version = builder.latestAllowedVersion();
                    if (this.discoverBrokerVersions && this.log.isTraceEnabled()) {
                        this.log.trace("No version information found when sending {} with correlation id {} to node {}. Assuming version {}.", new Object[]{clientRequest.apiKey(), clientRequest.correlationId(), nodeId, version});
                    }
                } else {
                    version = versionInfo.latestUsableVersion(clientRequest.apiKey(), builder.oldestAllowedVersion(), builder.latestAllowedVersion());
                }

                this.doSend(clientRequest, isInternalRequest, now, builder.build(version));
            } catch (UnsupportedVersionException var9) {
                this.log.debug("Version mismatch when attempting to send {} with correlation id {} to {}", new Object[]{builder, clientRequest.correlationId(), clientRequest.destination(), var9});
                ClientResponse clientResponse = new ClientResponse(clientRequest.makeHeader(builder.latestAllowedVersion()), clientRequest.callback(), clientRequest.destination(), now, now, false, var9, (AuthenticationException)null, (AbstractResponse)null);
                if (!isInternalRequest) {
                    this.abortedSends.add(clientResponse);
                } else if (clientRequest.apiKey() == ApiKeys.METADATA) {
                    this.metadataUpdater.handleFailedRequest(now, Optional.of(var9));
                }
            }

        }
    }

    private void doSend(ClientRequest clientRequest, boolean isInternalRequest, long now, AbstractRequest request) {
        String destination = clientRequest.destination();
        RequestHeader header = clientRequest.makeHeader(request.version());
        if (this.log.isDebugEnabled()) {
            this.log.debug("Sending {} request with header {} and timeout {} to node {}: {}", new Object[]{clientRequest.apiKey(), header, clientRequest.requestTimeoutMs(), destination, request});
        }

        Send send = request.toSend(header);
        // clientRequest convert InFlightRequest 对象
        InFlightRequest inFlightRequest = new InFlightRequest(clientRequest, header, isInternalRequest, request, send, now);
        this.inFlightRequests.add(inFlightRequest);
        // nio channel。。。selector 发送消息信息
        //this.selector is Selectable interface  KafkaChannel is implement
        this.selector.send(new NetworkSend(clientRequest.destination(), send));
    }

总结:直接阅读源码很快就能想明白kafka 生产者发送逻辑,kafka-client.jar。  核心==>

   本文第一张图片

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/928235.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

3D模型转换工具HOOPS Exchange助力打造虚拟现实应用程序

挑战&#xff1a; 支持使用各种 CAD 系统和 CAD 文件格式的客户群向可视化硬件提供快速、准确的数据加载提供对详细模型信息的访问&#xff0c;同时确保高帧率性能 解决方案&#xff1a; HOOPS Exchange领先的CAD数据转换工具包 结果&#xff1a; 确保支持来自领先工程软件…

【Python编程】将同一种图片分类到同一文件夹下,并且将其分类的路径信息写成txt文件进行保存

注&#xff1a;数据结构同上一篇博文类似 一、代码 import os import cv2 import shutilpath0os.getcwd()\\apple\\RGB path1os.getcwd()\\apple\\tof_confidence # path2os.getcwd()\\apple\\tof_depth # path3os.getcwd()\\apple\\tof_depthRGB # path4os.getcwd()\\apple\…

云计算为中小企业带来的 10 大好处

云计算的迅速采用并非巧合。中小型企业 (SMB) 现在有机会摆脱传统 IT 基础设施的限制&#xff0c;享受云提供的众多优势。它的发展使公司能够更智能、更快速、更安全地工作。 因此&#xff0c;如果您发现自己质疑是否需要进行这种转变&#xff0c;请不要害怕&#xff01;让我们…

1.1 VMware Workstation与Kali的安装和配置1

资源见专栏第一篇文章https://blog.csdn.net/algorithmyyds/article/details/132457258 安装VMware 不多加赘述&#xff0c;直接按顺序安装即可。 有以下需注意的地方&#xff1a; 1.建议选择增强型服务&#xff1b; 2.不要加入体验改进计划。是否开启提示更新看你的想法&…

小资金能玩期权吗?为什么玩期权的人这么少?

目前我国有很多个ETF期权品种,实际交易时,小资金也能参与期权交易,开通期权账户只是打开了交易50ETF期权的窗口,第二关键的在于关于怎么买卖50ETF期权的方式,那么先来说说小小资金能玩期权吗&#xff1f;为什么玩期权的人这么少&#xff1f;本文来自&#xff1a;期权酱 【建议收…

回收站恢复软件推荐!轻松找回误删数据!

“不小心清空了回收站怎么办呢&#xff1f;大家有没有什么回收站恢复软件推荐呢&#xff1f;非常需要一个软件来帮助我恢复回收站里的重要数据&#xff0c;请看看我吧&#xff01;” 众所周知&#xff0c;回收站中存放着很多我们删除的文件。如果我们发现文件是被误删了&#x…

期刊的“综合影响因子”和“复合影响因子”你了解多少?

可能在刚开始发论文时还分不清许多概念&#xff0c;但是在此之前&#xff0c;需要先了解影响因子是&#xff1f; 影响因子&#xff1a;美国科技信息研究所&#xff08;ISI&#xff09;原所长尤金加菲尔德博士于1963年提出&#xff0c;已成为国际上通行的期刊评价指标。期刊的影…

⌈算法进阶⌋图论::拓扑排序(Topological Sorting)——快速理解到熟练运用

目录 一、原理 1. 引例&#xff1a;207.课程表 2. 应用场景 3. 代码思路 二、代码模板 三、练习 1、210.课程表Ⅱ&#x1f7e2; 2、2392.给定条件下构造举证&#x1f7e1; 3、310.最小高度树 &#x1f7e1; 一、原理 1. 引例&#xff1a;207.课程表 就如大学课程安排一样&…

WPF基础入门-Class2-样式

WPF基础入门 Class2&#xff1a;样式 1、内联样式&#xff1a;优先度最高 <Grid><StackPanel><!--内联样式优先度高--><Button Background"Red" Height"10" Width"100"FontSize"20"Content"SB">…

《热题100》二分查找、排序、二叉树篇

思路&#xff1a;将数组排序&#xff0c;峰值肯定是比较大的那些数&#xff0c;排序后&#xff0c;从大到小&#xff0c;依次看该值是否比左右大&#xff0c;如果是&#xff0c;就返回该值为峰值。 import random class Solution: def paixu(self,nums): if len(nums) < 1…

人工智能在现代招聘中的崛起:超越传统筛选的未来

引言 在过去的几十年里,招聘一直是企业的核心活动之一。传统的招聘流程依赖于人力资源专家手工筛选简历、面试候选人并进行背景调查。这种方法不仅耗时,而且可能受到人为偏见的影响。随着技术的进步,特别是人工智能(AI)的发展,招聘的面貌正在发生深刻的变化。人工智能在…

RT-Thread 线程管理(学习二)

线程相关操作 线程相关的操作包括&#xff1a;创建/初始化、启动、运行、删除/脱离。 动态线程与静态线程的区别&#xff1a;动态线程是系统自动从动态内存堆上分配栈空间与线程句柄&#xff08;初始化heap之后才能使用create创建动态线程&#xff09;&#xff0c;静态线程是…

【Azure】Virtual Hub vWAN

虚拟 WAN 文档 Azure 虚拟 WAN 是一个网络服务&#xff0c;其中整合了多种网络、安全和路由功能&#xff0c;提供单一操作界面。 我们主要讨论两种连接情况&#xff1a; 通过一个 vWAN 来连接不通的 vNET 和本地网络。以下是一个扩展的拓扑 结合 vhub&#xff0c;可以把两个中…

Bootstrap Blazor 实战动态表单组件

1.新建工程 源码 新建工程b18ValidateForm,使用 nuget.org 进行 BootstrapBlazor 组件安装, Chart 库,字体. 将项目添加到解决方案中 dotnet new blazorserver -o b18ValidateForm dotnet add b06chart package BootstrapBlazor dotnet add b06chart package BootstrapBlazo…

秒杀系统的业务流程以及优化方案(实现异步秒杀)

先看基本的业务流程 那么我们可以看到整个流程都是一个线程来完成的&#xff0c;这样的话耗时还是很长的&#xff0c;那么可不可以采用多线程去实现呢&#xff1f; 首先我们要思考怎么对业务进行拆分&#xff0c;可以想象一个我们去饭店点餐&#xff0c;会有前台接待&#xff…

react antd框架中的徽标获取数据对应状态的数量

实现思路&#xff1a;获取数量的思路是通过filter过滤符合数据来实现。 列表数组.filter(item > item.status 值).length; 例子&#xff1a;以下这个例子是判断data数组中的status中在职的数量。 data.filter((item) > item.status 在职).length 效果展示&#xff…

Python 练习:剔除数字

练习&#xff1a;剔除数字&#xff1a; 要求如下&#xff1a; 1、编写一段程序代码&#xff0c;程序运行后&#xff0c; 需要用户随意输入一段包含有数字和字母的字符串&#xff1b; 2、程序会自动删除字符串中的数字&#xff0c; 然后输出一串没有数字的字符串&#xff08;纯…

yyyy-MM-dd‘T‘HH:mm时间格式探索

yyyy-MM-ddTHH:mm:ss 一直以后这个T是为了避免yyyy-MM-dd HH:mm:ss空格出现解析报错 但是这个T实际是一个标识符&#xff0c;作为小时元素的开始。 T代表后面跟着是时间&#xff0c;Z代表0时区&#xff08;相差北京时间8小时&#xff09; T 即代表 UTC&#xff08;Coodinated U…

MyBatis与spring集成

目录 MyBatis与Spring集成 导入pom依赖 导入generatorConfig.xml 导入spring-mybatis.xml 自动生成mapper文件 编写接口类&#xff1a;BookBiz aop整合PageHelper分页插件 编写分页查询 编写pagebean 编写PagerAspect类 测试结果 MyBatis与Spring集成 导入pom依赖 &l…

最新SQLMap安装与入门技术

SQLMap详解 SQLMap是一个自动化的SQL注入工具&#xff0c;其主要功能是扫描、发现并利用给定URL的SQL注入漏洞。SQLMap内置了很多绕过插件&#xff0c;支持的数据库是MySQL、Oracle、PostgreSQL、Microsoft SQL Server、Microsoft Access、IBM DB2、SQLite、Firebird、Sybase和…