kafka发送消息-生产者发送消息的分区策略(消息发送到哪个分区中?是什么策略)

news2024/12/24 20:01:12

生产者发送消息的分区策略(消息发送到哪个分区中?是什么策略)

  • 1、默认策略,程序自动计算并指定分区
    • 1.1、指定key,不指定分区
    • 1.2、不指定key,不指定分区
  • 2、轮询分配策略RoundRobinPartitioner
    • 2.1、创建配置类
    • 2.2、application.yml文件
    • 2.3、生产者
    • 2.4、测试类
    • 2.5、执行结果
  • 3、自定义分区分配策略
    • 3.1、创建自定义分配策略类
    • 3.2、修改kafka配置类
    • 3.3、application.yml文件
    • 3.4、生产者
    • 3.5、测试类
    • 3.6、测试结果
    • 3.7、总结

在这里插入图片描述

1、默认策略,程序自动计算并指定分区

1.1、指定key,不指定分区

生产者:在编写代码发送消息时我们先不指定分区,即分区设为null,看看程序最终会把消息发送到哪个分区。

package com.power.producer;

import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate2;
    
    public void send9(){
        User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();
        //分区是null,让kafka自己去决定把消息发送到哪个分区
        kafkaTemplate2.send("heTopic",null,System.currentTimeMillis(),"k9",user);
    }
}

测试类:

package com.power;

import com.power.model.User;
import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.util.Date;

@SpringBootTest
public class SpringBoot01KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;
    
    @Test
    void send9(){
        eventProducer.send9();
    }
}

程序最终是通过以下代码进行目标分区计算的:

Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;

通过调试发现,程序是通过以下代码进行目标分区计算的:
程序自动读取生产者发送消息时的key(本次发送时值为“key9”),将key生成一个32位的HASH值,将该HASH值与默认分区数(这个topic中有9个分区)取余数(余数结果一定在0-8之间),进而计算得出消息默认发送到的分区值

在这里插入图片描述

在这里插入图片描述

1.2、不指定key,不指定分区

生产者:
在这里插入图片描述

测试类:

在这里插入图片描述
此时时通过随机数与默认分区取余数计算默认分区的

使用随机数 % numPartitions

2、轮询分配策略RoundRobinPartitioner

通过查看kafka源码发现,分区接口有一个轮询分配策略相关实现类。
在这里插入图片描述

在application.yml配置文件中生产者配置项,我发现并生产者并没有相关轮询分配策略的配置,那么该如何试下轮询指定分区的配置呢?
在这里插入图片描述

需要编写代码试下轮询指定分区策略:

在这里插入图片描述

2.1、创建配置类

package com.power.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.RoundRobinPartitioner;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;

    @Value("${spring.kafka.producer.key-serializer}")
    private String keySerializer;

    @Value("${spring.kafka.producer.value-serializer}")
    private String valueSerializer;

    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer);
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, RoundRobinPartitioner.class);
        return props;
    }

    public ProducerFactory<String, ?> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public KafkaTemplate<String, ?> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }


    //第二次创建
    @Bean
    public NewTopic newTopic9() {
        return new NewTopic("heTopic", 9, (short) 1);
    }
}

2.2、application.yml文件

spring:
  application:
    #应用名称
    name: spring-boot-01-kafka-base

  #kafka连接地址(ip+port)
  kafka:
    bootstrap-servers: <你的kafka服务器IP>:9092
    #配置生产者(24个配置)
    producer:
      #key默认是StringSerializer序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #value默认是ToStringSerializer序列化
      value-serializer: org.springframework.kafka.support.serializer.ToStringSerializer


    #配置消费者(24个配置)
    consumer:
      auto-offset-reset: earliest

    template:
      default-topic: default-topic

在这里插入图片描述

2.3、生产者

package com.power.producer;

import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate2;

    public void send10(){
        User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();
        //分区是null,让kafka自己去决定把消息发送到哪个分区
        kafkaTemplate2.send("heTopic",user);
    }
}

在这里插入图片描述

2.4、测试类

在这里插入图片描述

package com.power;

import com.power.model.User;
import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.util.Date;

@SpringBootTest
public class SpringBoot01KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void send10(){
        for (int i = 0; i <5 ; i++) {
            eventProducer.send10();
        }

    }

}

2.5、执行结果

执行完测试类,发现5次请求分别发送到了kafka的heTopic主题的5个不同分区中:
在这里插入图片描述

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3、自定义分区分配策略

在这里插入图片描述

3.1、创建自定义分配策略类

package com.power.config;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.utils.Utils;

import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;

public class CustomerPartitioner implements Partitioner {

    private AtomicInteger nextPartition = new AtomicInteger(0);

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if(key==null){
            //使用轮询方式选择分区
            int next = nextPartition.getAndIncrement();
            if(next>=numPartitions){
                nextPartition.compareAndSet(next,0);
            }
            if(next>0){
                next--;
            }
            System.out.println("分区值:"+next);
            return next;
        }else {
            //如果key不为inull,则使用默认的分区策略
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }

    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

3.2、修改kafka配置类

指定使用自定义的分区分配类
在这里插入图片描述

3.3、application.yml文件

spring:
  application:
    #应用名称
    name: spring-boot-01-kafka-base

  #kafka连接地址(ip+port)
  kafka:
    bootstrap-servers: <你的kafka服务器IP>:9092
    #配置生产者(24个配置)
    producer:
      #key默认是StringSerializer序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #value默认是ToStringSerializer序列化
      value-serializer: org.springframework.kafka.support.serializer.ToStringSerializer


    #配置消费者(24个配置)
    consumer:
      auto-offset-reset: earliest

    template:
      default-topic: default-topic

在这里插入图片描述

3.4、生产者

package com.power.producer;

import com.power.model.User;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.support.SendResult;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate2;

    public void send10(){
        User user = User.builder().id(1208).phone("16767667676").birthday(new Date()).build();
        //分区是null,让kafka自己去决定把消息发送到哪个分区
        kafkaTemplate2.send("heTopic",user);
    }
}

在这里插入图片描述

3.5、测试类

在这里插入图片描述

package com.power;

import com.power.model.User;
import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;
import java.util.Date;

@SpringBootTest
public class SpringBoot01KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void send10(){
        for (int i = 0; i <5 ; i++) {
            eventProducer.send10();
        }

    }

}

3.6、测试结果

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3.7、总结

使用自定义分区策略类尝试发送消息,发现发送的5次消息,并没有连续发送到5个挨着的分区中,查看kafka源码的org.apache.kafka.clients.producer.KafkaProducer类的doSend方法发现,每一次发送前,调用了两次计算分区的方法,导致第一个得到的分区并不会正在的发送消息。

doSend方法;

private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {
    TopicPartition tp = null;
    try {
        throwIfProducerClosed();
        // first make sure the metadata for the topic is available
        long nowMs = time.milliseconds();
        ClusterAndWaitTime clusterAndWaitTime;
        try {
            clusterAndWaitTime = waitOnMetadata(record.topic(), record.partition(), nowMs, maxBlockTimeMs);
        } catch (KafkaException e) {
            if (metadata.isClosed())
                throw new KafkaException("Producer closed while send in progress", e);
            throw e;
        }
        nowMs += clusterAndWaitTime.waitedOnMetadataMs;
        long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);
        Cluster cluster = clusterAndWaitTime.cluster;
        byte[] serializedKey;
        try {
            serializedKey = keySerializer.serialize(record.topic(), record.headers(), record.key());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert key of class " + record.key().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in key.serializer", cce);
        }
        byte[] serializedValue;
        try {
            serializedValue = valueSerializer.serialize(record.topic(), record.headers(), record.value());
        } catch (ClassCastException cce) {
            throw new SerializationException("Can't convert value of class " + record.value().getClass().getName() +
                    " to class " + producerConfig.getClass(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG).getName() +
                    " specified in value.serializer", cce);
        }
        int partition = partition(record, serializedKey, serializedValue, cluster);
        tp = new TopicPartition(record.topic(), partition);

        setReadOnly(record.headers());
        Header[] headers = record.headers().toArray();

        int serializedSize = AbstractRecords.estimateSizeInBytesUpperBound(apiVersions.maxUsableProduceMagic(),
                compressionType, serializedKey, serializedValue, headers);
        ensureValidRecordSize(serializedSize);
        long timestamp = record.timestamp() == null ? nowMs : record.timestamp();
        if (log.isTraceEnabled()) {
            log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, callback, record.topic(), partition);
        }
        // producer callback will make sure to call both 'callback' and interceptor callback
        Callback interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

        if (transactionManager != null && transactionManager.isTransactional()) {
            transactionManager.failIfNotReadyForSend();
        }
        RecordAccumulator.RecordAppendResult result = accumulator.append(tp, timestamp, serializedKey,
                serializedValue, headers, interceptCallback, remainingWaitMs, true, nowMs);

        if (result.abortForNewBatch) {
            int prevPartition = partition;
            partitioner.onNewBatch(record.topic(), cluster, prevPartition);
            partition = partition(record, serializedKey, serializedValue, cluster);
            tp = new TopicPartition(record.topic(), partition);
            if (log.isTraceEnabled()) {
                log.trace("Retrying append due to new batch creation for topic {} partition {}. The old partition was {}", record.topic(), partition, prevPartition);
            }
            // producer callback will make sure to call both 'callback' and interceptor callback
            interceptCallback = new InterceptorCallback<>(callback, this.interceptors, tp);

            result = accumulator.append(tp, timestamp, serializedKey,
                serializedValue, headers, interceptCallback, remainingWaitMs, false, nowMs);
        }

        if (transactionManager != null && transactionManager.isTransactional())
            transactionManager.maybeAddPartitionToTransaction(tp);

        if (result.batchIsFull || result.newBatchCreated) {
            log.trace("Waking up the sender since topic {} partition {} is either full or getting a new batch", record.topic(), partition);
            this.sender.wakeup();
        }
        return result.future;
        // handling exceptions and record the errors;
        // for API exceptions return them in the future,
        // for other exceptions throw directly
    } catch (ApiException e) {
        log.debug("Exception occurred during message send:", e);
        if (callback != null)
            callback.onCompletion(null, e);
        this.errors.record();
        this.interceptors.onSendError(record, tp, e);
        return new FutureFailure(e);
    } catch (InterruptedException e) {
        this.errors.record();
        this.interceptors.onSendError(record, tp, e);
        throw new InterruptException(e);
    } catch (KafkaException e) {
        this.errors.record();
        this.interceptors.onSendError(record, tp, e);
        throw e;
    } catch (Exception e) {
        // we notify interceptor about all exceptions, since onSend is called before anything else in this method
        this.interceptors.onSendError(record, tp, e);
        throw e;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2072828.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【前端基础篇】CSS基础速通万字介绍(上篇)

文章目录 前言CSS介绍什么是CSS基本语法规范 引入方式内部样式表行内样式表外部样式总结 代码风格样式格式样式大小写空格规范 选择器选择器的功能选择器的种类基础选择器标签选择器类选择器id选择器通配符选择器基础选择器总结 复合选择器后代选择器子代选择器并集选择器 伪类…

杀软对抗 ----> 你真的Bypass火绒了吗?

目录 1.白加黑&#xff1f;syscall&#xff1f; ......绕过火绒&#xff1f;&#xff1f;&#xff1f; 2.内存对抗 ​3.CS已死 &#xff1f;&#xff1f;&#xff1f; 是真的 &#xff01; 玩免杀的都知道&#xff0c;我们说到国产&#xff0c;基本上都是360&#xff0c;对于…

AutoCAD 2010 x64图文安装教程及下载.

AutoCAD 2010 是 Autodesk 于2009年发布的一个版本&#xff0c;是 AutoCAD 系列中的一个重要里程碑。以下是 AutoCAD 2010 x64 的一些关键特性和改进&#xff1a; 参数化绘图&#xff1a;增加了几何约束和尺寸约束功能&#xff0c;使用户能够创建更精确、可调整的设计模型。动…

树章节习题

今天也是小小的把树的章节的内容大体过了一遍&#xff0c;总共有树上dp&#xff0c;LCA&#xff08;最近公共祖先&#xff09;&#xff0c;树的直径&#xff0c;以及树上差分 P1395 会议 很经典的树上dp里面的换根dp问题&#xff0c;现在这里说几个数组 sz数组&#xff0c;用…

多模态协同学习框架 DMCL

https://arxiv.org/pdf/2408.05914 一.discriminative and robust model 早期传统的reid的工作方式&#xff0c;因无法在大规模数据集上产生有竞争力的结果&#xff0c;所以本文中为相关工作&#xff0c;并未成为本文方法。 二.Dynamic Multimodal Feature Fusion Strategy 提…

计算机毕业设计选题推荐-产品订单管理系统-产品销售管理系统-Java/Python项目实战

✨作者主页&#xff1a;IT研究室✨ 个人简介&#xff1a;曾从事计算机专业培训教学&#xff0c;擅长Java、Python、微信小程序、Golang、安卓Android等项目实战。接项目定制开发、代码讲解、答辩教学、文档编写、降重等。 ☑文末获取源码☑ 精彩专栏推荐⬇⬇⬇ Java项目 Python…

YOLOv8目标检测推理流程及Python代码

在这章中将介绍目标检测推理原理,以及基于onnx模型使用Python语言进行推理。在推理原理章节中,将了解onnx模型的输入和输出,对输入的图片需要进行预处理的操作,对输出的结果需要进行后处理的操作等;在Python代码篇,将给出推理代码。 这里注意一下的是,由于在导出onnx模型…

【数学分析笔记】第2章第4节收敛准则(2)

2. 数列极限 2.4 收敛准则 2.4.1 单调有界定理 【例2.4.3】 x 1 2 , x n 1 3 2 x n , n 1 , 2 , 3 , . . . x_{1}\sqrt{2},x_{n1}\sqrt{32x_{n}},n1,2,3,... x1​2 ​,xn1​32xn​ ​,n1,2,3,...&#xff0c;证明 { x n } \{x_{n}\} {xn​}收敛并求极限。 【证】 0 <…

InternVL 多模态模型部署微调实践

一、什么是InternVL nternVL 是一种用于多模态任务的深度学习模型&#xff0c;旨在处理和理解多种类型的数据输入&#xff0c;如图像和文本。它结合了视觉和语言模型&#xff0c;能够执行复杂的跨模态任务&#xff0c;比如图文匹配、图像描述生成等。通过整合视觉特征和语言信…

中仕公考怎么样?事业编联考、统考、单招介绍

一、事业编考试流程 发布公告——注册报名——交报名费——报名确认——打印准考证|——笔试——调剂——面试——体检——录用 二、招聘公告查看渠道&#xff1a; ①事业单位招聘网 事业单位公告都会发布&#xff0c;包括各类招考信息、报名信息等; ②各省人事考试网 是…

Telnet不止于端口测试:探索经典工具的多样化应用

文章目录 Telnet详解与实用指南1. 引言2. Telnet 的安装和启动2.1 在 Windows 上安装 Telnet2.2 在 Linux 上安装 Telnet2.3 在 macOS 上使用 Telnet 3. Telnet 的基本命令与操作3.1 远程登录3.2 测试端口连通性3.3 调试网络服务3.4 网络协议调试3.5 简单的文件传输 4. Telnet …

继承的初始化顺序

B类继承A类后&#xff0c;new B()后执行顺序如下&#xff1a; 1、执行A类的静态方法&#xff08;只执行一次&#xff09; 2、执行B类的静态方法&#xff08;只执行一次&#xff09; 3、执行A类的成员变量的赋值&#xff08;没有赋值操作则忽略此步&#xff09; 4、执行A类的…

Datawhale X 李宏毅苹果书 AI夏令营(深度学习进阶)taks2(2.1+2.2+2.3)

task2.1 自适应学习率 临界点其实不一定是在训练一个网络的时候会遇到的最大的障碍。 一般在训练一个网络的时候&#xff0c;损失原来很大&#xff0c;随着参数不断的更新&#xff0c;损失会越来越小&#xff0c;最后就卡住了&#xff0c;损失不再下降。当我们走到临界点的时…

VLDB 2024 即将来袭!创邻科技将带来精彩分享

8月26-30日&#xff0c;数据库领域最权威、影响力最大的顶级盛会之一&#xff0c;VLDB 2024 来了&#xff01; VLDB&#xff08;International Conference on Very Large Databases&#xff09;是数据管理、可扩展数据科学和数据库研究人员、厂商、应用开发者以及用户广泛参与…

ssrf简介

目录 SSRF漏洞 漏洞原理 形成原因 SSRF用途: 怎么找到SSRF漏洞? 漏洞案例 SSRF漏洞 漏洞原理 SSRF(Server-Side Request Forgery:服务器端请求伪造)是——种由仅专构造形成由服务端发起请求的一个安全漏洞。一般情况下&#xff0c;SSRF是要目标网站的内部系统。(因为他是…

【原创】java+swing+mysql健身房管理系统设计与实现

个人主页&#xff1a;程序员杨工 个人简介&#xff1a;从事软件开发多年&#xff0c;前后端均有涉猎&#xff0c;具有丰富的开发经验 博客内容&#xff1a;全栈开发&#xff0c;分享Java、Python、Php、小程序、前后端、数据库经验和实战 文末有本人名片&#xff0c;希望和大家…

无人机RTK定位定向技术详解

无人机RTK&#xff08;Real-Time Kinematic&#xff0c;实时动态差分技术&#xff09;定位定向技术&#xff0c;是无人机领域的一项高精度导航与定位技术。它结合了全球导航卫星系统&#xff08;如GPS、GLONASS、Galileo、BDS等&#xff09;与实时差分技术&#xff0c;通过地面…

精彩管道不会梦到深沉蓝调

如果上天开了眼 请多给我点蓝调 多给我点沙锤 多给我点甲壳 让我吃鸡&#xff01; 星元自动机&#xff0c;新的版本之神 给宁磕一个 完蛋 你说这不是问题吗 我这篇文章从我写开始&#xff0c;到写完 炉石都换赛季了&#xff01;&#xff01;&#xff01;&#xff01…

HTB-Redeemer(redis)

前言 各位师傅大家好&#xff0c;我是qmx_07&#xff0c;今天给大家讲解Redeemer这台机器&#xff0c;主要是对redis组件进行渗透&#xff0c;了解思路 渗透过程 更改一下 目录结构&#xff0c;先写 渗透过程&#xff0c;再写 题解 信息搜集 通过nmap扫描 发现开启了6379…

sklearn中的线性回归

多元线性回归 指的 是一个样本 有多个特征的 线性回归问题。 w 被统称为 模型的 参数&#xff0c;其中 w0 被称为截距&#xff08;intercept&#xff09;&#xff0c;w1~wn 被称为 回归系数&#xff08;regression coefficient&#xff09;。这个表达式和 yazb 是同样的…