【Kafka】开发实战和Springboot集成kafka

news2025/1/10 3:19:16

目录

  • 消息的发送与接收
    • 生产者
    • 消费者
  • SpringBoot 集成kafka
  • 服务端参数配置

消息的发送与接收

生产者

在这里插入图片描述

生产者主要的对象有: KafkaProducer , ProducerRecord 。

其中 KafkaProducer 是用于发送消息的类, ProducerRecord 类用于封装Kafka的消息。

KafkaProducer 的创建需要指定的参数和含义:

  1. bootstrap.servers:配置生产者如何与broker建立连接。该参数设置的是初始化参数。如果生产者需要连接的是Kafka集群,则这里配置集群中几个部分broker的地址,而不是全部,当生产者连接上此处指定的broker之后,在通过该连接发现集群中的其他节点。
  2. key.serializer:要发送信息的key数据的序列化类。设置的时候可以写类名,也可以使用该类的Class对象。
  3. value.serializer:要发送消息的value数据的序列化类。设置的时候可以写类名,也可以使用该类的Class对象。
  4. acks:默认值:all
    • acks=0:生产者不等待broker对消息的确认,只要将消息放到缓冲区,就认为消息已经发送完成。该情形不能保证broker是否真的收到了消息,retries配置也不会生效。发送的消息的返回的消息偏移量永远是-1。
    • acks=1:表示消息只需要写到主分区即可,然后就响应客户端,而不等待副本分区的确认。在该情形下,如果主分区收到消息确认之后就宕机了,而副本分区还没来得及同步该消息,则该消息丢失。
    • acks=all:leader分区会等待所有的ISR副本分区确认记录。该处理保证了只要有一个ISR副本分区存活,消息就不会丢失。这是Kafka最强的可靠性保证,等效于 acks=-1。
  5. retries:retries重试次数。当消息发送出现错误的时候,系统会重发消息。跟客户端收到错误时重发一样。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1,否则在重试此失败消息的时候,其他的消息可能发送成功了。

其他参数可以从 org.apache.kafka.clients.producer.ProducerConfig 中找到。后面的内容会介绍到。

消费者生产消息后,需要broker端的确认,可以同步确认,也可以异步确认。同步确认效率低,异步确认效率高,但是需要设置回调对象。

示例如下:

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class MyProducer1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        Map<String, Object> configs = new HashMap<>();
        // 指定初始连接用到的broker地址
        configs.put("bootstrap.servers", "192.168.100.101:9092");
        // 指定key的序列化类
        configs.put("key.serializer", IntegerSerializer.class);
        // 指定value的序列化类
        configs.put("value.serializer", StringSerializer.class);

//        configs.put("acks", "all");
//        configs.put("reties", "3");

        KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);

        // 用于设置用户自定义的消息头字段
        List<Header> headers = new ArrayList<>();
        headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));

        ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
                "topic_1",  // topic
                0,  // 分区
                0,  // key
                "hello lagou 0", // value
                headers  // headers
        );

        // 消息的同步确认
        final Future<RecordMetadata> future = producer.send(record);
        final RecordMetadata metadata = future.get();
        System.out.println("消息的主题:" + metadata.topic());
        System.out.println("消息的分区号:" + metadata.partition());
        System.out.println("消息的偏移量:" + metadata.offset());

        // 关闭生产者
        producer.close();
    }
}

如果需要异步发送,如下:

package com.lagou.kafka.demo.producer;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class MyProducer1 {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        Map<String, Object> configs = new HashMap<>();
        // 指定初始连接用到的broker地址
        configs.put("bootstrap.servers", "192.168.100.101:9092");
        // 指定key的序列化类
        configs.put("key.serializer", IntegerSerializer.class);
        // 指定value的序列化类
        configs.put("value.serializer", StringSerializer.class);

//        configs.put("acks", "all");
//        configs.put("reties", "3");

        KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);

        // 用于设置用户自定义的消息头字段
        List<Header> headers = new ArrayList<>();
        headers.add(new RecordHeader("biz.name", "producer.demo".getBytes()));

        ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
                "topic_1",  // topic
                0,  // 分区
                0,  // key
                "hello lagou 0", // value
                headers  // headers
        );

        // 消息的异步确认
        producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if (exception == null) {
                    System.out.println("消息的主题:" + metadata.topic());
                    System.out.println("消息的分区号:" + metadata.partition());
                    System.out.println("消息的偏移量:" + metadata.offset());
                } else {
                    System.out.println("异常消息:" + exception.getMessage());
                }
            }
        });

        // 关闭生产者
        producer.close();
    }
}

消费者

kafka不支持消息的推送(当然可以自己已实现),采用的消息的拉取(poll方法)。

消费者主要的对象是kafkaConsumer,用于消费消息的类。

其主要参数:

  1. bootstrap.servers:与kafka建立初始连接的broker地址列表
  2. key.deserializer:key的反序列化器
  3. value.deserializer:value的反序列化器
  4. group.id:指定消费者组id,用于标识该消费者属于哪个消费者组
  5. auto.offset.reset:当kafka中没有初始化偏移量或当前偏移量在服务器中不存在(如数据被删除了),处理办法
    • earliest:自动重置偏移量到最早的偏移量
    • latest:自动重置偏移量到最新的偏移量
    • none:如果消费者组原来的偏移量(previous)不存在,向消费者抛出异常
    • anything:向消费者抛异常

ConsumerConfig类中包含了所有的可以给kafkaConsumer的参数。

示例:

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;

public class MyConsumer1 {
    public static void main(String[] args) {

        Map<String, Object> configs = new HashMap<>();
        // node1对应于192.168.100.101,windows的hosts文件中手动配置域名解析
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "node1:9092");
        // 使用常量代替手写的字符串,配置key的反序列化器
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        // 配置value的反序列化器
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 配置消费组ID
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer_demo1");
        // 如果找不到当前消费者的有效偏移量,则自动重置到最开始
        // latest表示直接重置到消息偏移量的最后一个
        configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);

        // 先订阅,再消费
        consumer.subscribe(Arrays.asList("topic_1"));

        // 如果主题中没有可以消费的消息,则该方法可以放到while循环中,每过3秒重新拉取一次
        // 如果还没有拉取到,过3秒再次拉取,防止while循环太密集的poll调用。
        // 批量从主题的分区拉取消息
        final ConsumerRecords<Integer, String> consumerRecords = consumer.poll(3_000);

        // 遍历本次从主题的分区拉取的批量消息
        consumerRecords.forEach(new Consumer<ConsumerRecord<Integer, String>>() {
            @Override
            public void accept(ConsumerRecord<Integer, String> record) {
                System.out.println(record.topic() + "\t"
                        + record.partition() + "\t"
                        + record.offset() + "\t"
                        + record.key() + "\t"
                        + record.value());
            }
        });

        consumer.close();

    }
}

SpringBoot 集成kafka

这里把生产者和消费者放在一个项目中,实际可能是在两个里的。

1、引入依赖

<dependency> 
  <groupId>org.springframework.kafka</groupId> 
  <artifactId>spring-kafka</artifactId>
</dependency>

2、 配置

spring.application.name=springboot-kafka-02 
server.port=8080 

# 用于建立初始连接的broker地址 
spring.kafka.bootstrap-servers=node1:9092 
# producer用到的key和value的序列化类 
spring.kafka.producer.key- serializer=org.apache.kafka.common.serialization.IntegerSerializer
spring.kafka.producer.value- serializer=org.apache.kafka.common.serialization.StringSerializer 
# 默认的批处理记录数 
spring.kafka.producer.batch-size=16384 
# 32MB的总发送缓存 
spring.kafka.producer.buffer-memory=33554432 
# consumer用到的key和value的反序列化类 
spring.kafka.consumer.key- deserializer=org.apache.kafka.common.serialization.IntegerDeserializer 
spring.kafka.consumer.value- deserializer=org.apache.kafka.common.serialization.StringDeserializer
# consumer的消费组id 
spring.kafka.consumer.group-id=spring-kafka-02-consumer 
# 是否自动提交消费者偏移量
spring.kafka.consumer.enable-auto-commit=true
# 每隔100ms向broker提交一次偏移量 
spring.kafka.consumer.auto-commit-interval=100 
# 如果该消费者的偏移量不存在,则自动设置为最早的偏移量 
spring.kafka.consumer.auto-offset-reset=earliest

3、启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class Demo02SpringbootKafkaApplication {

    public static void main(String[] args) {
        SpringApplication.run(Demo02SpringbootKafkaApplication.class, args);
    }

}

4、生产者

这里我们就写在Controller里就好,如下:

import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ExecutionException;

@RestController
public class KafkaSyncProducerController {

    @Autowired
    private KafkaTemplate<Integer, String> template;

    @RequestMapping("send/sync/{message}")
    public String send(@PathVariable String message) {

        final ListenableFuture<SendResult<Integer, String>> future = template.send("topic-spring-01", 0, 0, message);
        // 同步发送消息
        try {
            final SendResult<Integer, String> sendResult = future.get();
            final RecordMetadata metadata = sendResult.getRecordMetadata();

            System.out.println(metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());

        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        return "success";
    }

}

上面是同步发送消息,如果异步发送消息,可改为如下:

import org.apache.kafka.clients.producer.RecordMetadata;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class KafkaAsyncProducerController {

    @Autowired
    private KafkaTemplate<Integer, String> template;


    @RequestMapping("send/async/{message}")
    public String send(@PathVariable String message) {

        final ListenableFuture<SendResult<Integer, String>> future = this.template.send("topic-spring-01", 0, 1, message);

        // 设置回调函数,异步等待broker端的返回结果
        future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                System.out.println("发送消息失败:" + throwable.getMessage());
            }

            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                final RecordMetadata metadata = result.getRecordMetadata();

                System.out.println("发送消息成功:" + metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset());
            }
        });

        return "success";
    }

}

5、消费者

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class MyConsumer {

    @KafkaListener(topics = "topic-spring-01")
    public void onMessage(ConsumerRecord<Integer, String> record) {
        System.out.println("消费者收到的消息:"
                + record.topic() + "\t"
                + record.partition() + "\t"
                + record.offset() + "\t"
                + record.key() + "\t"
                + record.value());
    }

}

6、kafka配置类

上面当我们启动生产者和消费者时,kafka会自动为我们创建好topic和分区等。那是因为kafka的KafkaAutoConfigration里有个KafkaAdmin,他负责自动检测需要创建的topic和分区等。如果我们想自己创建,或者自定义KafkaTemplate(一般不会这么做),可以使用配置类,如下:

import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConfig {


    @Bean
    public NewTopic topic1() {
        return new NewTopic("nptc-01", 3, (short) 1);
    }

    @Bean
    public NewTopic topic2() {
        return new NewTopic("nptc-02", 5, (short) 1);
    }

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put("bootstrap.servers", "node1:9092");
        KafkaAdmin admin = new KafkaAdmin(configs);
        return admin;
    }

    @Bean
    @Autowired
    public KafkaTemplate<Integer, String> kafkaTemplate(ProducerFactory<Integer, String> producerFactory) {

        // 覆盖ProducerFactory原有设置
        Map<String, Object> configsOverride = new HashMap<>();
        configsOverride.put(ProducerConfig.BATCH_SIZE_CONFIG, 200);

        KafkaTemplate<Integer, String> template = new KafkaTemplate<Integer, String>(
                producerFactory, configsOverride
        );
        return template;
    }

}

服务端参数配置

$KAFKA_HOME/config/server.properties文件中的一些配置。

1、zookeeper.connect

该参数用于配置Kafka要连接的Zookeeper/集群的地址。

它的值是一个字符串,使用逗号分隔Zookeeper的多个地址。Zookeeper的单个地址是 host:port形式的,可以在最后添加Kafka在Zookeeper中的根节点路径。

如:

zookeeper.connect=node2:2181,node3:2181,node4:2181/myKafka 1

2、listeners

用于指定当前Broker向外发布服务的地址和端口。

配置项为

listeners=PLAINTEXT://:9092

如下:

在这里插入图片描述

PLAINTEXT是一种协议名称;上面ip地址没写,可以配置成listeners=PLAINTEXT://0.0.0.0:9092,则只有本机可以访问。也可以是其他配置。

可以配置多个,逗号分割。但是多个listener的协议名称不能相同,且端口号不能相同。如果想用一个协议,则需要在listener.security.protocol.map维护听器名称和协议的map。

可以与 advertised.listeners 配合,用于做内外网隔离,比如创建topic和分区的等管理方面的使用一个地址,发送和消费消息则使用另一个地址,即管理和使用分开。

内外网隔离配置:

  1. listener.security.protocol.map

监听器名称和安全协议的映射配置。比如,可以将内外网隔离,即使它们都使用SSL。

listener.security.protocol.map=INTERNAL:SSL,EXTERNAL:SSL

冒号前面代表监听器名称,后面代表真正的协议。每个监听器的名称只能在map中出现一次。

  1. listeners

用于配置broker监听的URI以及监听器名称列表,使用逗号隔开多个URI及监听器名称。如果监听器名称代表的不是安全协议,必须配置listener.security.protocol.map。每个监听器必须使用不同的网络端口。

  1. advertised.listeners

需要将该地址发布到zookeeper供客户端使用。

可以在zookeeper的 get /myKafka/brokers/ids/<broker.id> 中找到。

在IaaS环境,该条目的网络接口得与broker绑定的网络接口不同。

如果不设置此条目,就使用listeners的配置。跟listeners不同,该条目不能使用0.0.0.0网络端口。

advertised.listeners的地址必须是listeners中配置的或配置的一部分。

  1. inter.broker.listener.name

用于配置broker之间通信使用的监听器名称,该名称必须在advertised.listeners列表中。

inter.broker.listener.name=EXTERNAL

典型配置如下:

在这里插入图片描述

3、 broker.id

该属性用于唯一标记一个Kafka的Broker,它的值是一个任意integer值。当Kafka以分布式集群运行的时候,尤为重要。

最好该值跟该Broker所在的物理主机有关的,如主机名为 host1.lagou.com ,则 broker.id=1 ;如果主机名为 192.168.100.101 ,则 broker.id=101 等等。

4、 log.dir

通过该属性的值,指定Kafka在磁盘上保存消息的日志片段的目录。它是一组用逗号分隔的本地文件系统路径。

如果指定了多个路径,那么broker 会根据“最少使用”原则,把同一个分区的日志片段保存到同一个路径下。

broker 会往拥有最少数目分区的路径新增分区,而不是往拥有最小磁盘空间的路径新增分区。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1404414.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

机器学习_正则化、欠拟合和过拟合

文章目录 正则化欠拟合和过拟合正则化参数 正则化 机器学习中的正则化是在损失函数里面加惩罚项&#xff0c;增加建模的模糊性&#xff0c;从而把捕捉到的趋势从局部细微趋势&#xff0c;调整到整体大概趋势。虽然一定程度上地放宽了建模要求&#xff0c;但是能有效防止过拟合…

【GitHub项目推荐--一个语音机器人项目】【转载】

推荐一个腾讯大佬开源的语音对话机器人&#xff1a;wukong-robot &#xff0c;悟空机器人在 GitHub 上斩获 3.2K 的 Star。 这是一个简单灵活的中文语音对话机器人项目&#xff0c;目的是让中国的开发者也能快速打造个性化的智能音箱&#xff0c;同时&#xff0c;该项目还是第…

科技云报道:金融大模型落地,还需跨越几重山?

科技云报道原创。 时至今日&#xff0c;大模型的狂欢盛宴仍在持续&#xff0c;而金融行业得益于数据密集且有强劲的数字化基础&#xff0c;从一众场景中脱颖而出。 越来越多的公司开始布局金融行业大模型&#xff0c;无论是乐信、奇富科技、度小满、蚂蚁这样的金融科技公司&a…

【XR806开发板试用】系列之一 - Linux环境下Ubuntu完全开发流程

前言 为了让极术社区开发者体验搭载安谋科技STAR-MC1处理器的面向IoT领域的全志XR806开发板&#xff0c;极术社区联合全志在线开发者社区共同推出XR806开发板免费试用活动。 极术社区特准备了200块XR806开发板作为2022年社区新年活动&#xff0c;申请的人数有600多&#xff0c…

���恒峰|配网行波型故障预警定位装置:电力系统的守护神

&#xfffd;&#xfffd;&#xfffd;在电力系统中&#xff0c;设备的正常运行对于保障供电至关重要。而配网行波型故障预警定位装置就是电力系统的守护神&#xff0c;它能够实时监测设备状态&#xff0c;提前发现故障&#xff0c;确保电力供应的稳定。本文将详细介绍配网行波…

【前端可视化】postcss-px-to-viewport 适配怎么限制最大宽度?使用 postcss-mobile-forever

需求原因 自己用 nuxt3 写官网发现用 postcss-px-to-viewport 这个插件虽然能够实现基于 vw 的响应式&#xff0c;但是无法做到限制宽度&#xff0c;比如设计稿 1920p&#xff0c;我只想让最大缩放比例为 1920p&#xff0c;不能超过&#xff0c;就无法实现了。 方案参考 纯 c…

记一次低级且重大的Presto运维事故

本文纯属虚构&#xff0c;旨在提醒各位别犯类似低级错误。 如有雷同&#xff0c;说的就是你&#xff01; 文章目录 前言事件回顾后续总结 前言 首先&#xff0c;要重视运维工作和离职人员的交接工作&#xff0c;这个不必多说。一将无能&#xff0c;累死三军&#xff01; 接下来…

Xilinx MicroBlaze 告警提示解决方案

告警&#xff1a; [BD 41-2559] AXI interface port /M02_AXI_0 is not associated to any clock port. It may not work correctly. Please update ASSOCIATED_BUSIF parameter of a clock port to include this interface port in an external clock port. If no external cl…

Java 设计者模式以及与Spring关系(五) 策略和观察者模式

目录 简介: 23设计者模式以及重点模式 策略模式&#xff08;Strategy Pattern&#xff09; 示例 spring中应用 观察者模式&#xff08;Observer&#xff09; 示例 spring中应用 简介: 本文是个系列一次会出两个设计者模式作用&#xff0c;如果有关联就三个&#xff0c;…

一键去除图片背景——background-removal-js

一些JavaScript库和工具可以帮助实现背景去除&#xff1a; OpenCV.js&#xff1a;OpenCV的JavaScript版本&#xff0c;提供了许多计算机视觉功能&#xff0c;包括背景去除。Jimp&#xff1a;一个用于处理图像的JavaScript库&#xff0c;提供了许多图像处理功能&#xff0c;包括…

Vue.js 3 项目开发:迈向现代化前端开发的必经之路

文章目录 一、Vue.js 3简介二、Vue.js 3新特性1. Composition API2. 更好的性能3. 更好的TypeScript支持4. 更多的生命周期钩子5. 更好的自定义指令API 三、Vue.js 3项目开发实践1. 搭建开发环境2. 项目结构规划3. 组件开发4. 路由管理5. 状态管理6. 测试与部署 《Vue.js 3企业…

VSCode Python Windows环境下创建虚拟环境,隔离每个项目的依赖pip包,推荐使用!

VSCode Python Windows环境下创建虚拟环境 Visual Studio Code 可以隔离不同项目的pip依赖包&#xff0c;防止不同版本的干扰**&#xff08;推荐使用&#xff09;** 先在python官网https://www.python.org/downloads/下载需要的python版本&#xff08;我选择了3.9.8&#xff09…

基于springboot+vue新能源汽车充电管理系统

摘要 新能源汽车充电管理系统是基于Spring Boot和Vue.js技术栈构建的一款先进而高效的系统&#xff0c;旨在满足不断增长的新能源汽车市场对充电服务的需求。该系统通过整合前后端技术&#xff0c;实现了用户注册、充电桩管理、充电订单管理等核心功能&#xff0c;为用户提供便…

centos 7 增加临时路由及永久路由

centos 7 增加临时路由及永久路由 如果增加临时路由&#xff0c;要先安装net-tools , sudo yum install net-tools route add -net 10.1.0.0 gw 10.1.1.1 netmask 255.255.0.0 意思是增加了一条动态路由&#xff0c;网关10.1.1.1 ,10.1.x.x 的所有ip都走这个网关 此种方式&am…

Wireshark的捕获接口设置

通过Wireshark菜单栏的“捕获”-“选项”和工具栏的“捕获选项”按钮&#xff0c;可以进入接口捕获接口的设置。 打开捕获接口设置界面&#xff0c;首先设置“Input”标签。 进行接口选择&#xff0c;关掉不必要的接口。 选择使用接口模式&#xff0c;选择“混杂模式”&#x…

proxy 代理的接口报错301问题

项目系统里仅仅这个接口报错&#xff0c;反向代理错误导致。 默认情况下&#xff0c;不接受运行在HTTPS上&#xff0c;且使用了无效证书的后端服务器。如果你想要接受&#xff0c;修改配置&#xff1a;secure: false&#xff08;简单意思&#xff1a;如果本地没有进行过https相…

PaddleNLP 如何打包成Windows环境可执行的exe?

当我们使用paddleNLP完成业务开发后&#xff0c;需要将PaddleNLP打包成在Windows操作系统上可执行的exe程序。操作流程&#xff1a; 1.环境准备&#xff1a; python环境&#xff1a;3.7.4 2.安装Pyinstaller pip install pyinstaller 3.目录结构&#xff0c;main.py为可执…

Jmeter 设置全局请求 重点cook

原因 在使用jmeter 过程中为了方便 &#xff0c;会设置很多公众信心 比如请求头 请求cook 还会设置多个线程组 在同一个线程组中 我们只需要设置一个请求请求cook 就可以了 但是 有逆骨 就是喜欢多个线程组所以出现问题了 解决方案 设置一个全局变量 步骤 在测试计划中设…

正点原子RV1126编译环境搭建+rkmedia编译

备注&#xff1a; 1&#xff09;如果想成功编译rkmedia可执行程序&#xff0c;必须一步不差的这么操作。 1.编译环境搭建 1.1设置ubuntu 1&#xff09;右上角 设置->系统设置->软件和更新->下载自&#xff08;“选择阿里云”&#xff09; 2&#xff09;提交之后…

LabVIEW电路板插件焊点自动检测系统

LabVIEW电路板插件焊点自动检测系统 介绍了电路板插件焊点的自动检测装置设计。项目的核心是使用LabVIEW软件&#xff0c;开发出一个能够自动检测电路板上桥接、虚焊、漏焊和多锡等焊点缺陷的系统。 系统包括成像单元、机械传动单元和软件处理单元。首先&#xff0c;利用工业相…