kafka入门篇

news2024/9/27 21:20:01

文章目录

  • 前言
  • 介绍
  • 概念与说明
  • 安装
    • 启动
    • 配置
  • 命令操作
    • 创建topic
    • 查看topic列表
    • 发送消息(启动一个生产者)
    • 消费消息(启动一个消费者)
    • 查询topic信息
    • 删除topic
  • 集群
  • 关机
  • 使用
    • 报错
    • java连接示例

前言

作为入门篇,主要是了解Kafka的概念,还有一些基本操作和使用。

介绍

官方为kafka做了比较完善的介绍:Kafka 中文文档 - ApacheCN

kafka是一个分布式流出来平台。

作为流处理平台它有三种特性:

  1. 可以让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。
  2. 可以储存流式的记录,并且有较好的容错性。
  3. 可以在流式记录产生时就进行处理。

它可以用于两大类别的应用:

  1. 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。 (相当于message queue)
  2. 构建实时流式应用程序,对这些流数据进行转换或者影响。 (就是流处理,通过kafka stream topic和topic之间内部进行变化)

概念与说明

kafka作为一个集群,可以运行在一台或多台服务器上

kafka通过topic对存储的数据进行分类

每条记录包含一个key,一个value和一个timestamp

kafka的一个结构如下:

结构

kafka 生产者和消费者均通过TCP协议进行连接。

producer:生产者,也是我们的应用,或是其他要发送的程序,它负责将消息发送到topicpartition中,

consumer:消费者,通过消费者组表示,可以将消费者组看成一个消费者,消费者订阅一个topic,那么Kafka会将消息发送给这个消费者组,此时,这个消费者组会负载均衡的把消息平均分配给组下的所有消费者实例.

topic:主题(数据主题),存储数据记录的地方,可以通过它来区分业务数据,一个topic,可以有一个或多个消费者订阅

partition:分区,每个topic都会有一个或多个分区,分区是将topic接收到的数据按指定的分区和分区规则,拆分到不同分区。消费者订阅topic消费消息时,可以指定分区,或不指定,不指定时Kafka会采用轮询方式将所有分区下的数据发送给消费者;指定分区当然也只会发送指定分区的消息,不同的点也是很明显的,在同一个分区上的消息是有序的。

(官网描述:每一个topic,kafka集群都会维护一个分区日志,每个分区的数据都是有序的,并且不断追加到commit log文件中。分区中的每一条记录,都会分配一个ID号表示顺序,也可以称之为offset(偏移量),也就是说,消费者可以通过这个offset来获取日志,可以从指定的位置获取。

一个分区的存储量受限于主机的文件限制,但是topic可以有多个partition,在集群中,一个topic是在集群节点上的,那么分区也会同步到其他节点的topic下,保持容错性)

replication:副本,每个分区都有一个leader,0个或多个follower,leader只做独写,follower只做备份,当leader宕机,在剩下的follower中就会选举一个leader。

注意:如果创建topic时,副本数时不能大于broker数的,也就是不能大于Kafka节点数,因为副本是同步到其他节点上的,所有如果大于节点数据,就无法创建。

broker:kafka服务

集群:在集群下,总会有一个节点作为leader,零台或者多台作为follwers,只有leader节点处理一切的读写请求,其他节点都是被动的同步leader达到数据,如果这些leader宕机,则会在follwers中选举一个新的leader

消息系统:相对于其他消息系统的模式来说(有队列发布-订阅两个模块),kafka在队列模式中,消费者是一个消费者组,像redis,队列模式只能一个消费者,消费了就没了;发布订阅模式,其他中间件也是多订阅模式的,kafka订阅是针对topic,所有消费者组里的消费者实例个数不能多于分区数量。

流处理:kafka拥有流处理的功能,它会不断的从topic获取数据,然后做数据处理,可以做数据聚合,join等复杂处理,然后再写入topic

安装

官方地址:Kafka 中文文档 - ApacheCN

官方提供的下载的地址,下载很快。

启动

方式一

kafka是使用zookeeper做集群管理的,所以你需要个zookeeper服务器,kafka有提供zookeeper,可以直接使用

./bin/zookeeper-server-start.sh -daemon config/zoeeper.properties

它和zookeeper的还是有点不一样的,kafka这个脚本,是启动脚本,zookeeper那个是命令行方式的。

启动kafka

./bin/kafka-server-start.sh -daemon config/server.properties

方式二

不使用kafka自带的zookeeper,使用之前我们搭建的zookeeper集群

vi config/server.properties

/zookeeper

#修改配置为zookeeper集群地址

zookeeper.connect=localhost:2181,localhost:2182,localhost:2183

启动

./bin/zkServer.sh start zoo.cfg

./bin/zkServer.sh start zoo2.cfg

./bin/zkServer.sh start zoo3.cfg

./bin/kafka-server-start.sh config/server.properties

配置

配置项说明:Kafka 中文文档 - ApacheCN

命令操作

kafka它带了很多命令,我们一个通过这些命令进行创建topic、发送消息和消费消息等。

image-20230103221311638

创建topic

./bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test_topic --partitions 3 --replication-factor 3

–zookeeper:指定zookeeper地址

–partitions:topic分片多少个

–replication-factor:复制因子,控制写入的数据副本个数,这个值是<=broker数量,这里我是单机的,所以这里是1

查看topic列表

./bin/kafka-topics.sh --list --zookeeper localhost:2181

发送消息(启动一个生产者)

#长连接

./bin/kafka-console-producer.sh --broker-list 192.168.17.128:9092 --topic test_topic

客户端收到消息表示成功 了

消费消息(启动一个消费者)

#长连接

./bin/kafka-console-consumer.sh --bootstrap-server 192.168.17.128:9092 --topic test_topic

在生产者客户端发送小,这边能收到就已经成功了

查询topic信息

./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic

image-20230211143956872

虽然概念上面已经说明过,不过这里还是在提一遍。

**topic:**主题名称,因为指定了3分区,3分布,所有这里是3条数据;

**Partition:**分区,表示当前分区在哪一个节点上。每个topic都会有一个或多个分区,每个分区保存的数据不同(均匀分配了topic上的数据)

**Leader与Replicas:**副本信息,每个分区数据都会备份指定数量的副本,保证容灾能力,每个分区有一个leader,0个或多个follower,leader只做独写,follower只做备份,当一个分区的leader宕机,剩余follower会选举一个leader,如上图Replicas: 1,2,3

image-20230211152324585

如果leader1掉线,那么剩下的2,3会便会选举其中一个作为leader。

所以这里Leader是分区副本的主节点,负责一切的读写,Replicas是分布所在的信息。

现在kill一个kafka节点:

image-20230211144054008

它成功的将1剔除,并且选举出了一个新的leader

image-20230211144146960

注意:

  1. 之前提过,副本数不能大于Kafka节点数,而这里是本身节点数是够的,只是在运行过程中的不可抗力因素导致的这个集群主动的删除了删除了放弃了异常的节点。
  2. 这个命令可以查看topic的信息,在Kafka里也作为一个检查集群状态的命令,在Kafka里他没有类似像zkServer.sh status那样的命令,所有可以通过这个命令,来查看是否有分区掉线。

删除topic

这个需要在server.properties里设置:delete.topic.enable=true,不然这个topic只会被标记为删除,并不会真删

./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test_topic

image-20230206222944176

集群

kafka官网提供了配置示例,我们根据它给的添加配置就行,集群的配置,只有3个地方不一样,因为我是在一台服务器上部署,所以要区别其他服务。

  • log.dir
  • broker.id
  • listeners(注意这里的地址配置ip)

zookeeper简单配置示例

# ZooKeeper服务地址
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
# topic分区日志数
num.partitions=8
# 自动创建topic的默认副本数
default.replication.factor=3
# 自动创建topic
auto.create.topics.enable=false
# 指定写入成功的副本数,当写入数据的副本数达不到这个值,则会报错
min.insync.replicas=2
delete.topic.enable=true

# 集群配置
# 日志保存目录
log.dir=./log1
# broker服务ID
broker.id=1
# 监听地址
listeners=PLAINTEXT://192.168.17.128:9092

判断集群状态是否成功:

./bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test_topic

关机

关机和故障时不同的,正常关机,Kafka会执行:

  1. 同步所有日志到磁盘,可以避免重启时还有执行恢复日志过程,可以提供启动速度;
  2. 关闭leader时,会将leader上的分区迁移到其他副本,而且切换leader更快;

使用

  1. 首先确定服务器防火墙端口已经打开

    firewall-cmd --add-port=9200/tcp --permanent
    
  2. kafka需要logback的依赖

     <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.8.0</version>
            </dependency>
            <dependency>
                <groupId>com.fasterxml.jackson.core</groupId>
                <artifactId>jackson-databind</artifactId>
                <version>2.13.1</version>
            </dependency>
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.33</version>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-core</artifactId>
                <version>1.2.10</version>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
                <version>1.2.10</version>
            </dependency>
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-access</artifactId>
                <version>1.2.3</version>
            </dependency>
    

    logback.xml

    <!-- 级别从高到低 OFF 、 FATAL 、 ERROR 、 WARN 、 INFO 、 DEBUG 、 TRACE 、 ALL -->
    <!-- 日志输出规则 根据当前ROOT 级别,日志输出时,级别高于root默认的级别时 会输出 -->
    <!-- 以下 每个配置的 filter 是过滤掉输出文件里面,会出现高级别文件,依然出现低级别的日志信息,通过filter 过滤只记录本级别的日志 -->
    <!-- scan 当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true。 -->
    <!-- scanPeriod 设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒。当scan为true时,此属性生效。默认的时间间隔为1分钟。 -->
    <!-- debug 当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false。 -->
    <configuration scan="true" scanPeriod="60 seconds" debug="false">
        <!-- 动态日志级别 -->
        <jmxConfigurator />
        <!-- 定义日志文件 输出位置 -->
        <!-- <property name="log_dir" value="C:/test" />-->
        <property name="log_dir" value="./logs" />
        <!-- 日志最大的历史 30天 -->
        <property name="maxHistory" value="30" />
    
        <!-- ConsoleAppender 控制台输出日志 -->
        <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
            <encoder>
                <pattern>
                    <!-- 设置日志输出格式 -->
                    %d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %logger - %msg%n
                </pattern>
            </encoder>
        </appender>
    
        <!-- ERROR级别日志 -->
        <!-- 滚动记录文件,先将日志记录到指定文件,当符合某个条件时,将日志记录到其他文件 RollingFileAppender -->
        <appender name="ERROR" class="ch.qos.logback.core.rolling.RollingFileAppender">
            <!-- 过滤器,只记录WARN级别的日志 -->
            <!-- 果日志级别等于配置级别,过滤器会根据onMath 和 onMismatch接收或拒绝日志。 -->
            <filter class="ch.qos.logback.classic.filter.LevelFilter">
                <!-- 设置过滤级别 -->
                <level>ERROR</level>
                <!-- 用于配置符合过滤条件的操作 -->
                <onMatch>ACCEPT</onMatch>
                <!-- 用于配置不符合过滤条件的操作 -->
                <onMismatch>DENY</onMismatch>
            </filter>
            <!-- 最常用的滚动策略,它根据时间来制定滚动策略.既负责滚动也负责出发滚动 -->
            <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
                <!--日志输出位置 可相对、和绝对路径 -->
                <fileNamePattern>
                    ${log_dir}/error/%d{yyyy-MM-dd}/error-log.log
                </fileNamePattern>
                <!-- 可选节点,控制保留的归档文件的最大数量,超出数量就删除旧文件假设设置每个月滚动,且<maxHistory>是6, 则只保存最近6个月的文件,删除之前的旧文件。注意,删除旧文件是,那些为了归档而创建的目录也会被删除 -->
                <maxHistory>${maxHistory}</maxHistory>
            </rollingPolicy>
            <encoder>
                <pattern>
                    <!-- 设置日志输出格式 -->
                    %d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %logger - %msg%n
                </pattern>
            </encoder>
        </appender>
    
        <!-- WARN级别日志 appender -->
        <appender name="WARN" class="ch.qos.logback.core.rolling.RollingFileAppender">
            <!-- 过滤器,只记录WARN级别的日志 -->
            <!-- 果日志级别等于配置级别,过滤器会根据onMath 和 onMismatch接收或拒绝日志。 -->
            <filter class="ch.qos.logback.classic.filter.LevelFilter">
                <!-- 设置过滤级别 -->
                <level>WARN</level>
                <!-- 用于配置符合过滤条件的操作 -->
                <onMatch>ACCEPT</onMatch>
                <!-- 用于配置不符合过滤条件的操作 -->
                <onMismatch>DENY</onMismatch>
            </filter>
            <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
                <!--日志输出位置 可相对、和绝对路径 -->
                <fileNamePattern>${log_dir}/warn/%d{yyyy-MM-dd}/warn-log.log</fileNamePattern>
                <maxHistory>${maxHistory}</maxHistory>
            </rollingPolicy>
            <encoder>
                <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %logger - %msg%n</pattern>
            </encoder>
        </appender>
    
        <!-- INFO级别日志 appender -->
        <appender name="INFO" class="ch.qos.logback.core.rolling.RollingFileAppender">
            <filter class="ch.qos.logback.classic.filter.LevelFilter">
                <level>INFO</level>
                <onMatch>ACCEPT</onMatch>
                <onMismatch>DENY</onMismatch>
            </filter>
            <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
                <fileNamePattern>${log_dir}/info/%d{yyyy-MM-dd}/info-log.log</fileNamePattern>
                <maxHistory>${maxHistory}</maxHistory>
            </rollingPolicy>
            <encoder>
                <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %logger - %msg%n</pattern>
            </encoder>
        </appender>
    
        <!-- DEBUG级别日志 appender -->
        <appender name="DEBUG" class="ch.qos.logback.core.rolling.RollingFileAppender">
            <filter class="ch.qos.logback.classic.filter.LevelFilter">
                <level>DEBUG</level>
                <onMatch>ACCEPT</onMatch>
                <onMismatch>DENY</onMismatch>
            </filter>
            <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
                <fileNamePattern>${log_dir}/debug/%d{yyyy-MM-dd}/debug-log.log</fileNamePattern>
                <maxHistory>${maxHistory}</maxHistory>
            </rollingPolicy>
            <encoder>
                <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %logger - %msg%n</pattern>
            </encoder>
        </appender>
    
        <!-- TRACE级别日志 appender -->
        <appender name="TRACE" class="ch.qos.logback.core.rolling.RollingFileAppender">
            <filter class="ch.qos.logback.classic.filter.LevelFilter">
                <level>TRACE</level>
                <onMatch>ACCEPT</onMatch>
                <onMismatch>DENY</onMismatch>
            </filter>
            <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
                <fileNamePattern>${log_dir}/trace/%d{yyyy-MM-dd}/trace-log.log</fileNamePattern>
                <maxHistory>${maxHistory}</maxHistory>
            </rollingPolicy>
            <encoder>
                <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} %-5level %logger - %msg%n</pattern>
            </encoder>
        </appender>
    
        <!-- root级别 DEBUG -->
        <root>
            <!-- 打印debug级别日志及以上级别日志 -->
            <level value="info" />
            <!-- 控制台输出 -->
            <appender-ref ref="console" />
            <!-- 文件输出 -->
            <appender-ref ref="ERROR" />
            <appender-ref ref="INFO" />
            <appender-ref ref="WARN" />
            <appender-ref ref="DEBUG" />
            <appender-ref ref="TRACE" />
        </root>
    </configuration>
    
    

报错

  1. 连接出现这样的错误
org.apache.kafka.common.network.Selector - [Consumer clientId=con, groupId=con-group] Connection with /192.168.17.128 disconnected
java.net.ConnectException: Connection refused: no further information

org.apache.kafka.clients.NetworkClient - [Consumer clientId=con, groupId=con-group] Connection to node -1 (/192.168.17.128:9092) could not be established. Broker may not be available.

最有可能的就是listeners配置没有配置ip导致的,所有可以先尝试改一下,我这里就是因为没有配置IP出现的;最后改成了listeners=PLAINTEXT://192.168.17.128:9092,然后consumer的报错没有了,

  1. 接下来是生产者的报错:
Error while fetching metadata with correlation id 53 : {test_topic=UNKNOWN_TOPIC_OR_PARTITION}

看着像是未知的topic,我看官网是默认TRUE不是

image-20230110221509106

最后配置里又加了一句auto.create.topics.enable=true,完整配置如下:

# ZooKeeper服务地址
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
# topic分区日志数
num.partitions=2
# 自动创建topic的默认副本数
default.replication.factor=3
# 自动创建topic
auto.create.topics.enable=false
# 指定写入成功的副本数,当写入数据的副本数达不到这个值,则会报错
min.insync.replicas=2
# 设置自动创建topic
auto.create.topics.enable=true
delete.topic.enable=true

#集群配置
# 日志保存目录
log.dir=./log1
# broker服务ID
broker.id=1
# 监听地址
listeners=PLAINTEXT://192.168.17.128:9092

之后就正常了

弄一个一键启动脚本:

/data/kafka/kafka_2.11-1.0.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.11-1.0.0/config/server.properties

/data/kafka/kafka_2.11-1.0.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.11-1.0.0/config/server2.properties

/data/kafka/kafka_2.11-1.0.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.11-1.0.0/config/server3.properties

sleep 3

echo 'kafka启动完成'

java连接示例

消费者代码:

这里有几个必填配置,对应shell命令里的参数

bootstrap.servers - Kafka服务地址

client.id - 客户端id

key.deserializer - key序列化配置

value.deserializer - value序列化配置

group.id - 消费者组id

auto.offset.reset - offset方式

    public static void consumer() {
        Properties properties = new Properties();
        // 必填参数
        // kafka服务地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, IP);
        // 客户端消费者ID
        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, "con");
        // key序列化器
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // value序列化器
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 消费者组ID,如果需要重新读取,可以切换groupId为新的ID就可以了,然后设置自动提交为true
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "con-group");
            // 消费偏移量
        // earliest:分区中有offset时,从offset位置消费,没有从头消费
        // latest:分区中有offset时,从offset位置下佛,没有时,消费新产生的
        // none:分区有offset时,从offset后开始消费,如果有一个分区缺少已提交的offset时,抛异常
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // 回话超时时间
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 6000);
        // 心跳间隔
        properties.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "2000");
        // 自动提交间隔
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        // 每次拉取的最大数据量
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2);
        // 如果需要重复消费,可以设置自动提交为FALSE,这样每次拉取都是从之前的offset开始拉取
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    

        new Thread(() -> {
            while (true) {
                try(KafkaConsumer<String, Object> con = new KafkaConsumer<>(properties);){
                    con.subscribe(Collections.singleton(TOPIC));
                    ConsumerRecords<String, Object> poll = con.poll(Duration.ofSeconds(5000));
                    for (ConsumerRecord<String, Object> record : poll) {
                        System.out.println("topic:" + record.topic() + ",offset:" + record.offset() + ",数据:" + record.value().toString());
                    }
                }
            }
        }).start();
    }

生产者代码:

同样也是必填参数

bootstrap.servers

client.id

key.deserializer - key序列号配置

value.deserializer - value序列号配置

public static void producer() {
        Properties properties = new Properties();
        // 必要条件
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, IP);
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "pro");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        KafkaProducer<String, String> pro = new KafkaProducer<>(properties);

        new Thread(() ->{
            while (true) {
                try {
                    TimeUnit.SECONDS.sleep(3);
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
                Future<RecordMetadata> result = pro.send(new ProducerRecord<>(TOPIC, "1","dddd"), (meta, error) -> {
                    if (error != null) {
                        error.printStackTrace();
                        return;
                    }
                    System.out.println("发送回调:" + meta.toString());
                });
                System.out.println("发送的结果:" + result );
            }
        }).start();

    }

备注:

  1. Kafka会自动将大数据分组给消费者组,同时,在客户端也可以设置max.poll.records(ConsumerConfig.MAX_POLL_RECORDS_CONFIG)来控制每次拉取的数据量;

    因为是通过消费者组进行拉取的,所有便可以由多个客户端设置同一个groupId取订阅同一个topic;

  2. 如果说需要有序的消费数据,那么也要保证生产者的消息都发送到一个分区中,就可以利用Kafka的分区机制进行顺序消费;

  3. 如果需要全部拉取数据,可以设置groupId,和分批数据量,还有自动提交为true,k

        // 消费者组ID,如果需要重新读取,可以切换groupId为新的ID就可以了,然后设置自动提交为true
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "con-group");
        // 每次拉取的最大数据量
        properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);
        // 如果需要重复消费,可以设置自动提交为FALSE,这样每次拉取都是从之前的offset开始拉取
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        // 消费偏移量

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/339850.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在windows下载安装netcat(nc)命令

参考文章 一、netcat(nc)下载 网盘下载 netcat(nc)下载地址&#xff1a;netcat 1.11 for Win32/Win64 二、配置环境变量 在Path里添加netcat的存放路径 参数 说明 -C 类似-L选项&#xff0c;一直不断连接[1.13版本新加的功能] -d 后台执行 -e prog 程序重定向&am…

能取代90%人工作的ChatGPT到底牛在哪?

&#x1f4e3;&#x1f4e3;&#x1f4e3;&#x1f4e3;&#x1f4e3;&#x1f4e3;&#x1f4e3; &#x1f38d;大家好&#xff0c;我是慕枫 &#x1f38d;前阿里巴巴高级工程师&#xff0c;InfoQ签约作者、阿里云专家博主&#xff0c;一直致力于用大白话讲解技术知识 &#x…

Web 框架 Flask 快速入门(二)表单

课程地址&#xff1a;Python Web 框架 Flask 快速入门 文章目录&#x1f334; 表单1、表单介绍2、表单的简单实现1. 代码2. 代码的执行逻辑3、使用wtf扩展实现4、bug记录&#xff1a;表单验证总是失败&#x1f334; 表单 1、表单介绍 当我们在网页上填写账号密码进行登录的时…

Spring 面试题(一):Spring 如何处理全局异常?

❤️ 博客首页&#xff1a;水滴技术 &#x1f680; 支持水滴&#xff1a;点赞&#x1f44d; 收藏⭐ 留言&#x1f4ac; &#x1f338; 订阅专栏&#xff1a;Spring 教程&#xff1a;从入门到精通 文章目录1、如何处理全局异常2、代码示例2.1、定义统一的“响应结果对象”2.2、…

Leetcode 回溯详解

回溯法 回溯法有“通用解题法”之称&#xff0c;用它可以系统地搜索问题的所有解。回溯法是一个既带有系统性又带有跳跃性的搜索算法。 在包含问题的所有解的解空间树中&#xff0c;按照深度优先搜索(DFS)&#xff09;的策略&#xff0c;从根结点出发深度探索解空间树。当探索…

MWORKS--同元软控MWORKS介绍、安装与使用

MWORKS--同元软控MWORKS介绍、安装与使用1 同元软控介绍1.1 同元软控简介1.2 同元软控发展历史2 MWORKS介绍2.1 MWORKS简介2.2 MWORKS产品描述3 装备数字化3.1 发展3.2 内涵3.3 系统模型发展成为产品的一部分3.4 MWORKS系统模型数据管理3.4 MWORKS为装备数字化提供的套件参考1 …

springcloud集成seata(AT)分布式事务

目录 一、 下载seata server和seata源码 二、配置启动seata 2.1 在nacos控制台&#xff0c;新建一个seata的名称空间&#xff0c;用于存放seata的专用配置 2.2 创建seata server的mysql库 2.3 在nacos上配置seata相关配置 &#xff08;seata名称空间&#xff09; 2.4 启动…

家政服务小程序实战教程08-宫格导航

小程序一般会在首页显示商品的分类&#xff0c;这类需求我们在微搭中是使用宫格导航组件来实现。 01 组件说明 宫格导航组件可以在导航配置里设置菜单&#xff0c;可以手动添加&#xff0c;也可以变量绑定 因为我们一般的分类是动态变化的&#xff0c;品类会不断的调整&#…

阿里代码规范插件中,Apache Beanutils为什么被禁止使用?

在实际的项目开发中&#xff0c;对象间赋值普遍存在&#xff0c;随着双十一、秒杀等电商过程愈加复杂&#xff0c;数据量也在不断攀升&#xff0c;效率问题&#xff0c;浮出水面。 问&#xff1a;如果是你来写对象间赋值的代码&#xff0c;你会怎么做&#xff1f; 答&#xf…

[Java 进阶面试题] CAS 和 Synchronized 优化过程

最有用的东西,是你手里的钱,有钱就有底气,还不快去挣钱~ 文章目录CAS 和 Synchronized 优化过程1. CAS1.1 CAS的原理1.2 CAS实现自增自减的原子性1.3 CAS实现自旋锁1.4 CAS针对ABA问题的优化2. synchronized2.1 synchronized加锁阶段分析2.2 synchronized优化CAS 和 Synchroniz…

nodejs+vue大学生在线选课系统vscode - Visual Studio Code

3、数据库进行设计&#xff0c;建立约束和联系。 4、创建程序框架&#xff0c;代码分成三层结构&#xff1a;接口层、业务层、表示层&#xff0c;设计窗口和主窗口&#xff0c;主窗口菜单项依照系统模块图设计。 5、设计数据访问的接口&#xff0c;供各模块调用。完成登录功能…

【JavaWeb项目】简单搭建一个前端的博客系统

博客系统项目 本项目主要分成四个页面: 博客列表页博客详情页登录页面博客编辑页 该系统公共的CSS样式 common.css /* 放置一些各个页面都会用到的公共样式 */* {margin: 0;padding: 0;box-sizing: 0; }/* 给整个页面加上背景 */ html, body{height: 100%; }body {backgrou…

printf的返回值

参考资料 点击下面的链接https://legacy.cplusplus.com/reference/cstdio/printf/?kwprintf, 返回值的理解 如果返回成功后&#xff0c;将返回写入的字符总数。 如果发生写入错误&#xff0c;则设置错误指示器&#xff08;ferror&#xff09;并返回负数。 如果在写入宽字符…

微信中如何接入chatgpt机器人才比较安全(不会收到警告或者f号)之第一步登录微信

大家好,我是雄雄,欢迎关注微信公众号:雄雄的小课堂。 前言 为什么会有这个话题?大家都知道最近有个AI机器人很火,那就是chatgpt,关于它的介绍,大家可以自行百度去,我这边就不多介绍了。 好多人嫌网页版玩的不过瘾,就把这个机器人接入到了QQ上,接入到了钉钉上,TG 上…

设计模式:原型模式解决对象创建成本大问题

一、问题场景 现在有一只猫tom&#xff0c;姓名为: tom, 年龄为&#xff1a;1&#xff0c;颜色为&#xff1a;白色&#xff0c;请编写程序创建和tom猫属性完全相同的10只猫。 二、传统解决方案 public class Cat {private String name;private int age;private String color;…

JMeter 接口测试/并发测试/性能测试

Jmter工具设计之初是用于做性能测试的&#xff0c;它在实现对各种接口的调用方面已经做的比较成熟&#xff0c;因此&#xff0c;本次直接使用Jmeter工具来完成对Http接口的测试。因为再做接口测试时可以设置线程组&#xff0c;所以也可做接口性能测试。本篇使用JMeter完成了一个…

TrueNas篇-trueNas Scale安装

安装TrueNAS Scale 在尝试trueNas core时发下可以成功安装&#xff0c;但是一直无法成功启动&#xff0c;而且国内对我遇见的错误几乎没有案例&#xff0c;所以舍弃掉了&#xff0c;而且trueNas core是基于Linux的&#xff0c;对Linux的生态好了很多&#xff0c;还可以可以在t…

DNS 原理入门指南(二)

三、DNS服务器 下面我们根据前面这个例子&#xff0c;一步步还原&#xff0c;本机到底怎么得到域名math.stackexchange.com的IP地址。 首先&#xff0c;本机一定要知道DNS服务器的IP地址&#xff0c;否则上不了网。通过DNS服务器&#xff0c;才能知道某个域名的IP地址到底是什么…

Qt优秀开源项目之十六:SQLite数据库管理系统—SQLiteStudio

首先&#xff0c;感谢CSDN官方认可 SQLiteStudio是一款开源、跨平台&#xff08;Windows、Linux和MacOS&#xff09;的SQLite数据库管理系统。 github地址&#xff1a;https://github.com/pawelsalawa/sqlitestudio 官网&#xff1a;https://sqlitestudio.pl/ 特性很多&#xf…

11-git-查看提交历史

查看提交历史前言查看提交历史常用选项-p-n--stat--pretty--since限制输出选项前言 本篇来学习git中查看提交历史命令 查看提交历史 官方项目例子&#xff1a; git clone https://github.com/schacon/simplegit-progit git log说明: 不传任何参数会按时间先后顺序列出所有的…