Windows安装和使用kafka

news2024/11/18 11:28:30

一、安装kafka

由于kafka依赖jdk和zookeeper,安装kafka之前需要先安装jdk和zookeeper,也可以使用kafka自带的zookeeper。安装jdk可以参考:Windows和Linux安装jdk,此处使用kafka自带的zookeeper,不单独安装。

下面在Windows系统中安装kafka时使用的ip地址是192.168.10.188,这是我自己电脑的ip。

1、下载kafka

2、修改配置文件

修改zk和kafka的配置文件。

修改zk的配置文件:

配置上面创建的mydata目录,用于保存zookeeper的数据。

修改kafka的配置文件:

在kafka的配置文件中主要是创建或修改port、host.name、listeners、log.dirs和zookeeper.connect这五个参数。其中port就是kafka的端口号,默认是9092。host.name是当前计算机的ip地址。log.dirs就是上面创建的mylog目录,用于保存kafka的数据。zookeeper.connect参数的作用是kafka连接zookeeper,在下面创建topic时也需要用到此处配置的ip:port,默认配置是zookeeper.connect=localhost:2181。

3、启动zk和kafka

启动zk

进入到kafka的安装目录kafka_2.11-1.0.0下,同时按住shift和鼠标右键,选择“打开命令窗口”选项,或者win+R输入cmd,打开命令行窗口。

输入启动zk的命令:

bin\windows\zookeeper-server-start.bat config\zookeeper.properties

这个dos窗口不要关闭,再次进入到kafka的安装目录kafka_2.11-1.0.0下,同时按住shift和鼠标右键,选择“打开命令窗口”选项,又打开一个命令行窗口,输入启动kafka的命令。

启动kafka

bin\windows\kafka-server-start.bat config\server.properties

4、创建topic

重新打开一个dos窗口,创建topic。

bin\windows\kafka-topics.bat --zookeeper 192.168.10.188:2181 --create --replication-factor 1 --partitions 1 --topic kjTest

如果在kafka的配置文件中没有配置zookeeper.connect,或者配置的是zookeeper.connect=localhost:2181,创建topic时--zookeeper参数就要使用localhost:2181,2181是zookeeper的默认端口号。

查看创建的topic列表:

bin\windows\kafka-topics.bat --zookeeper 192.168.10.188:2181 --list

__consumer_offsets这个topickafka自动创建的,当consumer消费数据之后,consumer就会把offset提交到__consumer_offsets中。

删除topic

bin\windows\kafka-topics.bat --zookeeper 192.168.10.188:2181 --delete --topic kjTest

5、启动producer和consumer

在介绍启动producer和consumer的命令之前,先简单了解一下broker-list、bootstrap-servers和zookeeper。

1.broker:kafka服务端,可以是一个服务器也可以是一个集群。producer和consumer都相当于这个服务端的客户端。

2.broker-list:指定kafka集群中的一个或多个服务器,一般在使用kafka-console-producer.sh的时候,这个参数是必备参数,另外一个必备的参数是topic。

3.bootstrap-servers指的是kafka目标集群的服务器地址,这和broker-list功能一样,不过在启动producer时要求用broker-list,在启动consumer时用bootstrap-servers。

4. zookeeper指的是zk服务器或zk集群的地址。旧版本(0.9以前)的kafka,消费的进度(offset)是写在zk中的,所以启动consumer需要知道zk的地址。后来的版本都统一由broker管理,所以在启动consumer时就用bootstrap-server。

重新打开两个dos窗口,分别启动producer和consumer。

启动producer并输入内容:

bin\windows\kafka-console-producer.bat --broker-list 192.168.10.188:9092 --topic kjTest

启动consumer查看消息:

bin\windows\kafka-console-consumer.bat --zookeeper 192.168.10.188:2181 --topic kjTest --from-beginning

上面是旧版本的写法,下面是新版本的写法。

bin\windows\kafka-console-consumer.bat --bootstrap-server 192.168.10.188:9092 --topic kjTest --from-beginning

参数--zookeeper 192.168.10.188:2181中的ip和port是zookeeper节点的ip和zookeeper的port,参数--bootstrap-server 192.168.10.188:9092中的ip和port是kafka节点的ip和kafka的port。

6、查看消费者组以及消息是否积压

查看消费者组的命令:

bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --list

查看消息是否有积压的命令:

bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --describe --group consumer-group-01

上图中GROUP表示消费者组,TOPIC表示消息主题,PARTITION表示分区,CURRENT-OFFSET表示当前消费的消息条数,LOG-END-OFFSET表示kafka中生产的消息条数,LAG表示kafka中有多少条消息还未消费,也就是有多少条积压的消息。

在kafka中,消费者是按批次拉取数据的,每一批次拉取的数据条数是0-n条,每个消费者可以拉取多个分区的数据,但是一个分区的数据只能被同一个消费者组中的一个消费者拉取。如果一个消费者拉取多个分区的数据,那么拉取的这一批次的数据就包含多个分区的数据。消费者处理完这批数据之后,会将offset提交到__consumer_offsets这个topic中,__consumer_offsets(是一个topic)就是用于维护消费者消费到哪条数据offset的,是按照分区粒度维护的,各个分区的offset是互不影响的。例如一个consumer拉取两个分区(p0、p1)的数据,如果p0分区的数据处理完并将offset提交到__consumer_offsets中,而p1分区的数据还未处理完,p1分区的offset还未提交到__consumer_offsets中,此时consumer异常重启,consumer不会再拉取p0分区上次已消费的数据,但是会重新拉取p1分区上次消费但未提交的数据。

7、异常

启动consumer时可能会报下面的异常:

kafka的consumer:java.nio.channels.ClosedChannelException

解决方法:

出现以上异常是由于服务器没有做kafka的主机名与ip的映射,

linux的目录是/etc/hosts

windows的目录是C:\Windows\System32\drivers\etc\hosts

二、创建生产者和消费者

上面在Windows中安装好kafka之后,就可以在idea中操作kafka了。

项目结构:

pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>kafka-study</artifactId>
    <version>1.0-SNAPSHOT</version>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>2.7.0</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.13.2</version>
        </dependency>
    </dependencies>

</project>

1、配置kafka相关参数

package com.example.kafka.config;

import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

/**
 * 此处为了简化,直接将kafka配置信息写到代码中,
 * 实际项目中需要从application.yml配置文件中读取
 *
 * @Author: 倚天照海
 */
public class MyKafkaConfig {
    /**
     * kafka集群地址,多个地址用逗号分隔
     */
    private String bootstrapServer = "localhost:9092";

    private String topic = "testKafka";

    /**
     * 消费者组
     */
    private String consumerGroupId = "consumer-group-01";

    /**
     * kafka中保存的是将数据序列化后的字节数组,需要指定key和value的序列化方式
     */
    private String keySerializerClass = StringSerializer.class.getName();

    private String valueSerializerClass = StringSerializer.class.getName();

    /**
     * kafka中key和value的反序列化方式
     */
    private String keyDeserializerClass = StringDeserializer.class.getName();

    private String valueDeserializerClass = StringDeserializer.class.getName();

    public String getBootstrapServer() {
        return bootstrapServer;
    }

    public String getTopic() {
        return topic;
    }

    public String getConsumerGroupId() {
        return consumerGroupId;
    }

    public String getKeySerializerClass() {
        return keySerializerClass;
    }

    public String getValueSerializerClass() {
        return valueSerializerClass;
    }

    public String getKeyDeserializerClass() {
        return keyDeserializerClass;
    }

    public String getValueDeserializerClass() {
        return valueDeserializerClass;
    }
}

2、编写producer

根据公司业务逻辑编写producer,用于生产消息。

package com.example.kafka.producer;

import com.example.kafka.config.MyKafkaConfig;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * kafka生产者
 * 创建kafka生产者并生产消息的步骤:
 * 1.启动zookeeper和kafka
 * 2.创建topic
 * 3.启动producer
 *
 * @Author: 倚天照海
 */
public class MyProducer {
    /**
     * 1.创建topic:进入到kafka安装目录的bin目录下,执行kafka-topics.sh(Linux系统)或windows\kafka-topics.bat(windows系统)脚本
     * Linux系统: bin/kafka-topics.sh --zookeeper 192.168.10.188:2181/kafka --create --replication-factor 2 --partitions 2 --topic testKafka
     * windows系统: bin\windows\kafka-topics.bat --zookeeper 192.168.10.188:2181 --create --replication-factor 2 --partitions 2 --topic testKafka
     * 2.启动producer
     * Linux系统: bin/kafka-console-producer.sh --broker-list 192.168.10.188:9092 --topic testKafka
     * windows系统: bin\windows\kafka-console-producer.bat --broker-list 192.168.10.188:9092 --topic testKafka
     */
    public void produce() throws ExecutionException, InterruptedException {
        MyKafkaConfig kafkaConfig = new MyKafkaConfig();
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServer());
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaConfig.getKeySerializerClass());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaConfig.getValueSerializerClass());
        String topic = kafkaConfig.getTopic();

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        doProduce(producer, topic);
    }

    private void doProduce(KafkaProducer<String, String> producer, String topic) throws ExecutionException, InterruptedException {
        while (true) {
            for (int i = 0; i < 3; i++) {
                for (int j = 0; j < 3; j++) {
                    ProducerRecord<String, String> record = new ProducerRecord<>(topic, "item" + j, "price" + i);
                    Future<RecordMetadata> future = producer.send(record);
                    RecordMetadata recordMetadata = future.get();
                    int partition = recordMetadata.partition();
                    long offset = recordMetadata.offset();
                    System.out.println("key=" + record.key() + ", value=" + record.value()
                            + ", partition=" + partition + ", offset=" + offset);
                }
            }
        }
    }
}

3、编写consumer

编写consumer,用于接受消息。

package com.example.kafka.consumer;

import com.example.kafka.config.MyKafkaConfig;
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;

import java.time.Duration;
import java.util.*;

/**
 * @Author: 倚天照海
 */
public class MyConsumer {

    public void consume() {
        MyKafkaConfig kafkaConfig = new MyKafkaConfig();
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServer());
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaConfig.getKeyDeserializerClass());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaConfig.getValueDeserializerClass());
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getConsumerGroupId());
        /**
         * ConsumerConfig.AUTO_OFFSET_RESET_CONFIG表示当kafka中未初始化offset或当前offset不存在时,消费者自动重置offset的方式,默认是latest。
         * What to do when there is no initial offset in Kafka
         * or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
         * <ul>
         *     <li>earliest: automatically reset the offset to the earliest offset</li>
         *     <li>latest: automatically reset the offset to the latest offset</li>
         *     <li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li>
         *     <li>anything else: throw exception to the consumer.</li>
         * </ul>
         */
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        // 是否开启自动提交,默认开启。自动提交是异步提交,开启自动提交可能会造成数据丢失或重复消费数据
        // properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        // 自动提交的间隔时间(多长时间会触发自动提交),默认是5秒
        // properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10000");
        // kafka的消费者是按批次拉取数据,该参数是设置一批最多拉取多少条数据
        // properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");

        List<String> topics = Collections.singletonList(kafkaConfig.getTopic());
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        doConsume(consumer, topics);
    }

    private void doConsume(KafkaConsumer<String, String> consumer, List<String> topics) {
        // 消费者订阅主题消息,多个consumer会动态负载均衡多个分区
        // 例如有两个分区,最开始只启动一个consumer,会给这个consumer分配两个分区,它会消费两个分区的数据。
        // 然后在同一个消费者组内又启动一个consumer,此时会把第一个consumer的两个分区都撤销,再随机给这两个consumer分别分配一个分区
        consumer.subscribe(topics, new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
                System.out.println("-----------------onPartitionsRevoked撤销的分区是:---------------");
                Iterator<TopicPartition> iterator = partitions.iterator();
                while (iterator.hasNext()) {
                    TopicPartition next = iterator.next();
                    System.out.println(next.partition());
                }
            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
                System.out.println("-----------------onPartitionsAssigned分配的分区是:---------------");
                Iterator<TopicPartition> iterator = partitions.iterator();
                while (iterator.hasNext()) {
                    TopicPartition next = iterator.next();
                    System.out.println(next.partition());
                }
            }
        });
        while (true) {
            // 消费者拉取消息,设置等待时间,按批次拉取,一批拉取的数据是0-n条,每次poll是同时拉取多个分区的数据
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(0));
            while (!consumerRecords.isEmpty()) {
                // 每次拉取数据的条数
                System.out.println("----------------consumerRecords.count------------------" + consumerRecords.count());
                // 方式一:按分区分别处理每个分区的数据
                Set<TopicPartition> partitions = consumerRecords.partitions();
                // kafka中consumer是按照分区粒度提交维护offset的,将offset提交到__consumer_offsets中
                // 如果关闭自动提交,使用手动提交offset,则有三种粒度同步提交offset:
                // 1.按每条消息粒度同步提交offset
                // 2.按每个分区粒度同步提交offset
                // 3.按poll的一批数据粒度同步提交offset
//                for (TopicPartition topicPartition : partitions) {
//                    // 分别获取每个分区的数据记录,且分区内的数据是有序的,可以用多线程并行处理每个分区的数据
//                    List<ConsumerRecord<String, String>> partitionRecords = consumerRecords.records(topicPartition);
//                    for (ConsumerRecord<String, String> record : partitionRecords) {
//                        // 一个消费者可以消费多个分区,一个分区只能被同一个消费者组中的一个消费者消费
//                        int partition = record.partition();
//                        long offset = record.offset();
//                        // TODO 获取数据处理复杂的业务逻辑,最终将数据持久化到数据库中
//                        System.out.println("key=" + record.key() + ", value=" + record.value()
//                                + ", partition=" + partition + ", offset=" + offset);
//
//                        // 粒度1.按每条消息粒度同步提交offset
//                        /*OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset);
//                        Map<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
//                        map.putIfAbsent(topicPartition, offsetAndMetadata);
//                        consumer.commitSync(map);*/
//                    }
//                    // 粒度2.按每个分区粒度同步提交offset
//                    // 获取分区最后一条消息记录的offset,将offset提交到__consumer_offsets中
//                    long offset = partitionRecords.get(partitionRecords.size() - 1).offset();
//                    OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(offset);
//                    Map<TopicPartition, OffsetAndMetadata> map = new HashMap<>();
//                    map.putIfAbsent(topicPartition, offsetAndMetadata);
//                    consumer.commitSync(map);
//                }
                // 粒度3.按poll的一批数据粒度同步提交offset
                // consumer.commitSync();

                // 方式二:不区分分区,将多个分区的数据放在一起逐条处理
                for (ConsumerRecord<String, String> record : consumerRecords) {
                    // 一个消费者可以消费多个分区,一个分区只能被同一个消费者组中的一个消费者消费
                    int partition = record.partition();
                    long offset = record.offset();
                    // TODO 获取数据处理复杂的业务逻辑,最终将数据持久化到数据库中
                    System.out.println("key=" + record.key() + ", value=" + record.value()
                            + ", partition=" + partition + ", offset=" + offset);
                }
                consumer.commitSync();
            }
        }
    }
}

从kafka中消费数据,经过一系列逻辑处理之后将数据写入到数据库中,开启自动提交可能会造成数据丢失或重复消费数据。自动提交是异步提交,异步提交是指consumer提交offset与将数据持久化到数据库是异步的。
1.数据丢失:若开启自动提交,且自动提交的间隔时间(默认是5秒)到了,消费者会将拉取的这批数据的offset保存到_consumer_offsets中。但是5s内由于业务太过复杂,数据没有完全持久化,消费者就把offset提交了,若此时消费端consumer挂了,等消费端重启之后,会根据自身维护的offset拉取新的数据,不会重新拉取之前已消费的数据,因为这些数据的offset已经被提交了。
2.重复消费:若开启自动提交,且自动提交的间隔时间(默认是5秒)还未到,经过业务逻辑处理后将数据写入到了数据库中,此时消费者还未将拉取的这批数据的offset保存到_consumer_offsets中,若此时消费端consumer挂了,等消费端重启之后,会根据自身维护的offset拉取新的数据,会重新拉取之前已消费的数据,因为这些数据的offset还未被提交。

4、编写测试类

package com.example.kafka;

import com.example.kafka.consumer.MyConsumer;
import com.example.kafka.producer.MyProducer;
import org.junit.Test;

import java.util.concurrent.ExecutionException;

/**
 * @Author: 倚天照海
 */
public class Main {
    MyProducer myProducer = new MyProducer();

    MyConsumer myConsumer = new MyConsumer();
    
    @Test
    public void produceTest() {
        produceData();
    }

    @Test
    public void consumeTest() {
        consumeData();
    }

    private void produceData() {
//        MyProducer myProducer = new MyProducer();
        try {
            myProducer.produce();
        } catch (ExecutionException | InterruptedException e) {
            e.printStackTrace();
        }
    }

    private void consumeData() {
//        MyConsumer myConsumer = new MyConsumer();
        myConsumer.consume();
    }
}

测试结果:

生产者:

消费者:

Kafka中消息积压情况:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1376039.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python列表(list)

目录 列表列表的创建与删除访问列表元素index() 方法 列表的遍历添加&#xff0c;修改和删除列表元素添加修改删除 对列表统计和计算count() 方法如需确定列表中有**多少元素**&#xff0c;请使用 len() 方法&#xff1a;检查项目是否存在**复制列表****合并两个列表****list()…

Win10安装配置Redis,修改密码

一、下载Redis tporadowski 提供了 支持 Windows平台的 Redis 安装包&#xff0c;目前仍在维护&#xff0c;目前最新版本是 5.0.14&#xff0c;更新速度跟Redis官网也相差好几个大版本。 下载地址&#xff1a;https://github.com/tporadowski/redis/releases 二、Redis 安装 …

极客时间-如何降低用户鉴权的流量压力

背景 内容是极客时间-徐长龙老师的高并发系统实战课的个人学习笔记&#xff0c;欢迎大家学习&#xff01;https://time.geekbang.org/column/article/596644 使用Session方式实现用户的用户鉴权 优点 信息都在服务端储存&#xff0c;对客户端不暴露任何用户敏感的数据信息 缺…

SQL-修改表操作

目录 DDL-表操作-修改 添加字段 &#xff08;方括号内容可选&#xff09; 修改字段 修改指定字段的数据类型 修改字段名和字段类型 删除字段 修改表名 删除表 删除指定表&#xff0c;并重新创建该表 总结 &#x1f389;欢迎您来到我的MySQL基础复习专栏 ☆* o(≧▽≦…

Window平台安装MongoDB

在部署前需要在官网先确定系统对应的Mongo DB版本。 本机电脑为Window10&#xff0c;所以这里以MongoDB 6.0版本。 1 在官网下载安装包 2 安装MongoDB MongoDB Compass 是一个图形界面管理工具&#xff0c;如果勾选了安装会花费长一点时间&#xff0c;可以取消掉勾选&#xff…

[UI5] ODATA V4中的CRUD

文章目录 前言一、Read二、Create三、Update四、Delete 前言 ODATA V4在CRUD方面与V2截然不同。 这篇文章简单介绍V4中是如何进行CRUD操作 一、Read Model不再有read方法&#xff0c; 一般是把Path绑定到View中进行读取&#xff0c; 如果需要额外的读取数据&#xff0c;可使用…

树状结构查询 - 华为OD统一考试

OD统一考试 分值&#xff1a; 200分 题解&#xff1a; Java / Python / C 题目描述 通常使用多行的节点、父节点表示一棵树&#xff0c;比如&#xff1a; 西安 陕西 陕西 中国 江西 中国 中国 亚洲 泰国 亚洲 输入一个节点之后&#xff0c;请打印出来树中他的所有下层节点。 …

Python: Spire.PDF-for-Python

# encoding: utf-8 # 版权所有 2024 ©涂聚文有限公司 # 许可信息查看&#xff1a; # 描述&#xff1a; # Author : geovindu,Geovin Du 涂聚文. # IDE : PyCharm 2023.1 python 3.11 # Datetime : 2024/1/11 10:32 # User : geovindu # Product : PyChar…

TypeScript类型挑战:实现内置的Omit实用类型

掌握 TypeScript Omit 泛型&#xff0c;一起完成 Type 挑战&#xff0c;巩固 TypeScript 知识。 为了帮助读者更好地巩固 TypeScript 的知识&#xff0c;我从 Github 上的 type-challenges 库中选择了几十个挑战&#xff0c;与您一起完成类型挑战。 挑战 实现内置的 Omit&…

分布式系统架构设计之分布式消息队列的水平扩展性、安全可用性以及监控与调优

一、分布式消息队列的水平扩展 随着业务的快速发展和数据的不断增长&#xff0c;单一的消息队列服务器往往难以满足高并发、高可用和高吞吐量的需求&#xff0c;因此&#xff0c;如何实现消息队列的水平扩展成为了一个重要的问题。这部分我将从分区、副本、负载均衡等关键概念…

影响eCPM的因素有哪些?如何提升eCPM?

eCPM&#xff08;千次展示有效收益&#xff09;直接关系广告变现收益的高低&#xff0c;是开发者们最关心的数据之一。要想优化提升eCPM&#xff0c;首先要了解哪些主要因素影响eCPM&#xff0c;再针对性优化广告库存&#xff0c;提高变现收益。 https://www.shenshiads.com …

线性回归实例

1、线性回归&#xff08;linear Regression&#xff09;和逻辑回归&#xff08;logistic Regression&#xff09;的区别 线性回归主要是用来拟合数据&#xff0c;逻辑回归主要是用来区分数据&#xff0c;找到决策边界。 线性回归的代价函数常用平方误差函数&#xff0c;逻辑回…

java每日一题——打印100以内个位和十位相同,尾数为1,3,5,7的数字

前言&#xff1a; 打好基础&#xff0c;daydayup! 题目&#xff1a;打印100以内个位和十位相同&#xff0c;尾数为1,3,5,7的数字 思路&#xff1a;1&#xff0c;个位通过对10求余数可求出1&#xff0c;3&#xff0c;5&#xff0c;7&#xff1b; 2&#xff0c;十位可通过先除10…

【Unity】【Pico】【VR开发】为何PICO打包后真机运行闪退

【背景】 设置步骤&#xff0c;项目配置都没问题&#xff0c;Build也成功&#xff0c;Unity版本是符合要求的2022LTS版本&#xff0c;但是一在真机上运行就闪退。 【分析】 由于并没有开版权验证&#xff0c;而且闪退后也并没有弹框说版权问题&#xff0c;所以还是怀疑环境有…

软件测试|Python成员运算符:使用方法与元素检查

简介 Python是一种功能强大的编程语言&#xff0c;提供了许多方便的工具来处理数据和集合。其中之一就是成员运算符&#xff0c;它允许我们在集合中检查特定元素的存在。在本文中&#xff0c;我们将深入探讨Python中的成员运算符&#xff0c;以及如何使用它来进行元素检查。 …

pyside6 捕捉主窗口关闭后,进行释放相关的资源

import sys from PySide6 import QtGui from PySide6.QtWidgets import QWidget,QApplication,QMessageBoxclass Message(QWidget):def __init__(self):# 如果希望窗口内嵌于其他部件&#xff0c;可添加parent参数super(Message, self).__init__()# 调用初始化方法self.initUI(…

使用Sqoop将数据导入Hadoop的详细教程

在大数据处理中&#xff0c;Sqoop是一个强大的工具&#xff0c;它可以将关系型数据库中的数据导入到Hadoop生态系统中&#xff0c;以便进行进一步的分析和处理。本文将提供一个详细的教程&#xff0c;以帮助大家了解如何使用Sqoop将数据导入Hadoop。 准备工作 在开始之前&…

【uniapp】新课uniapp零基础入门到项目打包(微信小程序/H5/vue/安卓apk)全掌握

一、uniapp和HBuilderX介绍 uni-app官方网站&#xff1a;https://uniapp.dcloud.net.cn/ 为什么要学习uniapp&#xff1f; 1、一套代码可以打包到不同的应用平台&#xff1b;一套代码编到十几个平台&#xff0c;这不是梦想。眼见为实&#xff0c;扫描以下二维码&#xff0c;…

uniapp微信小程序投票系统实战 (SpringBoot2+vue3.2+element plus ) -用户投票实现

锋哥原创的uniapp微信小程序投票系统实战&#xff1a; uniapp微信小程序投票系统实战课程 (SpringBoot2vue3.2element plus ) ( 火爆连载更新中... )_哔哩哔哩_bilibiliuniapp微信小程序投票系统实战课程 (SpringBoot2vue3.2element plus ) ( 火爆连载更新中... )共计21条视频…

爬虫利器一览

前言 爬虫&#xff08;英文&#xff1a;spider&#xff09;&#xff0c;可以理解为简单的机器人&#xff0c;如此一个“不为名利而活&#xff0c;只为数据而生&#xff0c;目标单纯&#xff0c;能量充沛&#xff0c;不怕日晒雨淋&#xff0c;不惧寒冬酷暑”的家伙&#xff0c;…