Kafka快速入门:Kafka驱动JavaApi的使用

news2024/9/22 3:58:44

生产者和消费者是Kafka的核心概念之一,它们在客户端被创建和使用,并且包含了许多与Kafka性能和机制相关的配置。虽然Kafka提供的命令行工具能够执行许多基本操作,但它无法实现所有可能的性能优化。相比之下,使用Java API可以充分利用编程语言的灵活性,对生产者和消费者进行更精细的性能调优。对于大多数中间件,熟悉服务器的命令行操作可能足以帮助学习其API的使用。然而,Kafka则不同,要全面掌握Kafka的所有特性,必须系统地学习和理解其Java API。

1. 基础消费流程

在javaApi中可以通过创建一个Kafka生产者和消费者的配置对象,在new生产者或消费者的类时将配置对象传入,然后生产者实例通过调用send方法发送数据,消费者通过poll方法消费数据,数据需要通过ProducerRecords类封装key和value,并在生产者和消费者配置中为key和value指定序列化和反序列化类(key可以传null,key是在日志回收策略中发挥作用)。经过这样一套操作,消息就可以成功从生产者发往消费者。

package com.kafak.testkafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import java.time.Duration;
import java.util.Properties;

@SpringBootTest
class TestKafkaApplicationTests {

    //预定义Kafka对象实例,因为Kafka对象时线程安全,所以可以定义外面节省资源防止重复创建
    KafkaProducer<String, String> kafkaProducer;

    //创建生产者
    public KafkaProducer<String, String> getKafkaProducer() {

        //创建生产者配置
        Properties props = new Properties();
        //配置Kafka集群地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");
        //配置序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //返回生产者
        return new KafkaProducer<String, String>(props);
    }

    //创建消费者
    public KafkaConsumer<String, String> getKafkaConsumer() {
        //创建消费者配置
        Properties props = new Properties();
        //配置Kafka集群地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");
        //配置消费者组id
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        //配置反序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //返回消费者
        return new KafkaConsumer<>(props);
    }

    //通过生产者生产一百条数据
    @Test
    void kafkaProducerTest() {
        //获取生产者
        kafkaProducer = getKafkaProducer();
        //发送消息
        for (int i = 0; i < 100; i++) {
            kafkaProducer.send(new ProducerRecord<String, String>("topicJava", "testKey" + i, "testValue" + i));
        }
    }

    //通过消费者消费消息
    @Test
    void kafkaConsumerTest() {
        //创建消费者,由于消费者是线程不安全,所以使用一次实例化一次,可以方式出现线程安全问题
        KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer();
        //接受消费者信息,传入100毫秒,消费者会一百毫秒拉去一次消息
        ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
        // 处理消息
        for (ConsumerRecord<String, String> record : records) {
            System.out.printf("Consumed message with key %s, value %s, from partition %d with offset %d%n",
                    record.key(), record.value(), record.partition(), record.offset());
        }
    }

}

2. 消息确认

消息确认的原理性知识可以通过下面这篇文章学习,这里主要讲实操。

Kafka运行机制(二):消息确认,消息日志的存储和回收icon-default.png?t=N7T8https://blog.csdn.net/dxh9231028/article/details/141329851?spm=1001.2014.3001.5501

生产者端

生产者端的消息确认策略由acks配置项控制,其由三种配置方式,其中我在下面这篇文章中详细讲述了相关知识。我们可以通过javaApi配置acks来控制确认策略。

生产者端的消息确认有同步和异步两种方式。

  • 同步消息确认:同步消息确认是生产者实例在调用send方法后紧接着调用get方法,该方法会阻塞线程的继续执行,等待消息发送结果。当消息发送失败时,如果配置了重试机制(通过设置 retries 属性),生产者会自动重试指定的次数。如果在所有重试尝试后仍然失败,最终会抛出异常,通知调用方消息发送失败。
  • 异步消息确认:异步消息确认通过生产者实例调用send方法时传入,第二个回调函数的参数。在成功或失败响应时,执行回调函数,异步消息确认不会阻塞代码,也不会触发自动重试。

消费者端

消费者消费成功在客户端的体现是成功获取到了数据,这本没有什么好说的,不过消费者不仅需要响应客户端数据,还要讲偏移量发送给Kafka,在这一过程中,消费者提供了手动提交和自动提交两种方式。启动自动提交是默认开启的,而手动提交则需要配置enable.auto.commit为false,然后通过创建分区和偏移量的映射关系,通过消费者的commit方法提交偏移量。

代码实现

下面代码中,我创建了四个测试单元,其中前两个测试单元,分别是生产者同步提交和异步提交,而后两个测试单元分别时消费者的自动提交和手动提交。

package com.kafak.testkafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

@SpringBootTest
class TestKafkaApplicationTests {
    //预定义Kafka对象实例,因为Kafka对象时线程安全,所以可以定义外面节省资源防止重复创建
    KafkaProducer<String, String> kafkaProducer;

    //创建生产者
    public KafkaProducer<String, String> getKafkaProducer() {
        //创建生产者配置
        Properties props = new Properties();

        //配置消息确认策略
        props.put(ProducerConfig.ACKS_CONFIG, "all");

        //配置重试次数
        props.put(ProducerConfig.RETRIES_CONFIG,3);

        //配置Kafka集群地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");
        //配置序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //返回生产者
        return new KafkaProducer<String, String>(props);
    }

    //创建消费者
    public KafkaConsumer<String, String> getKafkaConsumer(Boolean isAutoCommit) {
        //创建消费者配置
        Properties props = new Properties();
        //配置Kafka集群地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");
        //配置消费者组id
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        //配置反序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //判断当前消费者是否开启自动提交
        if (!isAutoCommit) {
            //关闭自动提交
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        }else{
            //设置自动提交间隔时间1s
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        }
        //返回消费者
        return new KafkaConsumer<>(props);
    }

    //生产者同步确认
    @Test
    void kafkaProducerGetTest() {
        if(kafkaProducer == null) {
            kafkaProducer = getKafkaProducer();
        }
        //同步确认消息是否发送成功
        for (int i = 0; i < 100; i++) {
            try{
                RecordMetadata topicJava = kafkaProducer.send(new ProducerRecord<String, String>("topicJava", "testKey" + i, "testValue" + i)).get();
            }catch (Exception e) {
                e.printStackTrace();
            }
        }
        kafkaProducer.close();
    }
    
    //生产者异步确认
    @Test
    void kafkaProduceSyncTest() {
        if(kafkaProducer == null) {
            kafkaProducer = getKafkaProducer();
        }
        //异步确认是否发送成功
        for (int i = 0; i < 100; i++) {
            kafkaProducer.send(new ProducerRecord<String, String>("topicJava", "testKey" + i, "testValue" + i), (metadata, exception) -> {
                if (exception == null) {
                    System.out.printf("发送消息成功, metadata=%s%n", metadata);
                } else {
                    System.err.printf("发送消息失败, exception=%s%n", exception.getMessage());
                }
            });
        }
        kafkaProducer.close();
    }

    //消费者自动提交
    @Test
    void kafkaAutoCommitConsumerTest() {
        //创建消费者开启自动提交
        KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer(true);
        //消费数据流程中无需负责偏移量提交
        while (true) {
            //接受消费者信息,传入100毫秒,消费者会一百毫秒拉去一次消息
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
            //处理消息
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("消息消费成功, key=%s, value=%s, partition=%d, offset=%d%n",
                        record.key(), record.value(), record.partition(), record.offset());
            }
        }
    }

    //消费者手动提交
    @Test
    void kafkaSyncCommitConsumerTest() {
        //创建消费者关闭自动提交
        KafkaConsumer<String, String> kafkaConsumer = getKafkaConsumer(false);
        //消费数据流程中需要在消费数据后,提交偏移量
        while (true) {
            //接受消费者信息,传入100毫秒,消费者会一百毫秒拉去一次消息
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
            // 处理消息
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("消息消费成功, key=%s, value=%s, partition=%d, offset=%d%n",
                        record.key(), record.value(), record.partition(), record.offset());
                //创建分区和偏移量的映射类
                Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
                //讲分区和偏移量的数据存入映射类
                offsets.put(new TopicPartition(record.topic(), record.partition()),
                        new OffsetAndMetadata(record.offset() + 1));
                //偏移量提交
                kafkaConsumer.commitSync(offsets);
            }
        }
    }

}

3. 批处理

批处理在生产者端,和消费者端也有不同的实现。我在Kakfa基本概念一文中清楚的讲解了批处理的概念,文章如下

Kafka基本概念icon-default.png?t=N7T8https://blog.csdn.net/dxh9231028/article/details/141270920?spm=1001.2014.3001.5501

生产者

在生产者端,生产者实例的send方法会发送消息到缓冲区中,而缓冲区消息何时发送给Kafka集群,则是通过配置batch.size和linger.ms配置,来实现当缓冲区存入多少消息,和距离上一次发送消息多久后,来发送这一轮缓冲区的消息到Kafka集群,代码实现如下

    //创建生产者
    public KafkaProducer<String, String> getKafkaProducer() {
        //创建生产者配置
        Properties props = new Properties();

        //配置生产者批处理
        //缓冲区大小最大为16384比特
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //距离上次发送消息时间隔3s
        props.put(ProducerConfig.LINGER_MS_CONFIG,"3000");
        
        //配置消息确认策略
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        //配置重试次数
        props.put(ProducerConfig.RETRIES_CONFIG,3);
        //配置Kafka集群地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");
        //配置序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //返回生产者
        return new KafkaProducer<String, String>(props);
    }

消费者

消费者端批处理,消费者在拉去消息时,会在fetch.max.bytes,max.partition.fetch.bytes和max.poll.records三个配置项,以及传入poll方法的超时时间参数的限制下,尽可能多的拉取更多消息。

  • fetch.max.bytes:fetch.max.bytes 是指 Kafka 消费者单次从服务器拉取数据时能够获取的最大字节数。这是全局的上限,控制每次 poll() 操作可以拉取的数据量总和。 默认值:50MB(即 52428800 字节)。
  • max.partition.fetch.bytes:max.partition.fetch.bytes 是指 Kafka 消费者从单个分区拉取消息时能获取的最大字节数,当消费者数量少于主题分区数量时,一个消费者可能会负责多个分区。默认值:1MB(即 1048576 字节)。
  • max.poll.records:max.poll.records 是指 Kafka 消费者每次调用 poll() 方法时能够拉取的最大消息条数。 默认值:500 条消息。

代码实现如下

    //创建消费者
    public KafkaConsumer<String, String> getKafkaConsumer(Boolean isAutoCommit) {
        //创建消费者配置
        Properties props = new Properties();

        //消费者批处理相关配置
        //消费缓冲区大小,也就是一次消费最多能消费多少比特消息
        props.put(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,16384);
        //一次消费一个分区最多能消费多少比特
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG,8192);
        //一次消费最多能消费多少条数据
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,1000);

        //配置Kafka集群地址
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");
        //配置消费者组id
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        //配置反序列化
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        //判断当前消费者是否开启自动提交
        if (!isAutoCommit) {
            //关闭自动提交
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        }else{
            //设置自动提交间隔时间1s
            props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        }
        //返回消费者
        return new KafkaConsumer<>(props);
    }

4. 事务操作

Kafka驱动支持事务操作,允许许生产者在多个主题和分区上以原子方式写入消息。这意味着你可以确保一组消息要么全部成功写入Kafka,要么全部失败。

事务操作首先通过生产者实例调用生产者实例的initTransactions方法,向kafka集群申请一个映射当前生产者的事务Id,然后就可以通过调用生产者实例的beginTransaction方法,开启一个事务,进行消息发送,最终通过调用commitTransaction方法完成事务的提交,如果中途发生异常则通过abortTransaction对当前事务进行回滚,代码实例如下

package com.kafak.testkafka;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

@SpringBootTest
class TestKafkaApplicationTests {
    //预定义Kafka对象实例,因为Kafka对象时线程安全,所以可以定义外面节省资源防止重复创建
    KafkaProducer<String, String> kafkaProducer;

    //创建生产者
    public KafkaProducer<String, String> getKafkaProducer() {
        //创建生产者配置
        Properties props = new Properties();
        //配置生产者批处理
        //缓冲区大小最大为16384比特
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //距离上次发送消息时间隔3s
        props.put(ProducerConfig.LINGER_MS_CONFIG,"3000");
        //配置消息确认策略
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        //配置重试次数
        props.put(ProducerConfig.RETRIES_CONFIG,3);
        //配置Kafka集群地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");
        //配置序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //返回生产者
        return new KafkaProducer<String, String>(props);
    }



    //测试事务
    @Test
    void kafkaProducerTransactionTest() {
        if(kafkaProducer == null) {
            kafkaProducer = getKafkaProducer();
        }
        kafkaProducer.initTransactions();
        try{
            kafkaProducer.beginTransaction();
            //消息发送相关操作
            for (int i = 0; i < 100; i++) {
                try{
                    RecordMetadata topicJava = kafkaProducer.send(new ProducerRecord<String, String>("topicJava", "testKey" + i, "testValue" + i)).get();
                }catch (Exception e) {
                    e.printStackTrace();
                }
            }
            kafkaProducer.commitTransaction();
        }catch (Exception e) {
            e.printStackTrace();
            kafkaProducer.abortTransaction();
        }
        kafkaProducer.close();
    }

    

}

5. 自定义分区器

Kafka允许用户自定义分区器,实现特定的分区策略。可以通过实现Partitioner接口来创建自定义分区器。实现Partitioner接口需要实现三个方法,分别是partition,configure,close。

partition方法

partition方法是实现分区逻辑其的主要方法,其接受六个参数,分别是

  • String topic:消息要发送到的Kafka主题名称
  • Object key:消息的 key,可能为 null。
  • byte[] keyBytes:序列化后的 key,可能为 null
  • Object value:消息的 value,可以为任意对象
  • byte[] valueBytes:序列化后的 value,可能为 null。
  • Cluster cluster:Kafka集群的元数据信息,包括主题的分区数、每个分区的领导者等

partition方法的返回值则是发送分区的编号,通过这个机制可以实现不同逻辑的分区器。

configure方法

configuer方法在自定义分区类初始化时调用,当设计一些复杂操作,比如在发送消息前要和数据库交互时,可以在configure中完成数据库的连接。

close方法

close在分区逻辑执行完后调用,和configure一样,在复杂操作时,用于关闭分区逻辑中创建的连接,或一些内存资源等

假设我有一个三主机集群,其中30主机性能最好,31其次,32最差,我要通过自定义分区,将消息发送到三个分区的比例为3:2:1,通过Partitioner接口,可以简单的通过如下方式实现

package com.kafak.testkafka;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;

import java.util.List;
import java.util.Map;

public class CustomPartitioner implements Partitioner {

    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {
        //获取分区元数据
        List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(s);
        //创建一个0-100的随机数
        double num = Math.random() * 100;
        //默认传递分区号
        Integer finalPartition = 0;
        for (PartitionInfo partitionInfo : partitionInfos) {
            //获取分区的leader
            Node leader = partitionInfo.leader();
            //获取分区leader的ip和端口
            String leaderAddress = leader.host() + ":" + leader.port(); // 生成 "host:port" 格式的字符串
            //如果随机数在0-50之间,发送消息至192.168.142.30:9092
            if (num < 50 && leaderAddress.equals("192.168.142.30:9092")) {
                finalPartition = partitionInfo.partition();
                break; 
            //如果随机数在50-82之间,发送消息至192.168.142.31:9092
            } else if (num < 82 && num >= 50 && leaderAddress.equals("192.168.142.31:9092")) {
                finalPartition = partitionInfo.partition();
                break;
            //如果随机数在82-100之间,发送消息至192.168.142.32:9092
            } else if (num < 100 && num >= 82 && leaderAddress.equals("192.168.142.32:9092")) {
                finalPartition = partitionInfo.partition();
                break;
            }
        }
        //返回最终分区号
        return finalPartition;
    }


    @Override
    public void close() {

    }
    
    @Override
    public void configure(Map<String, ?> map) {

    }
}

在生产者配置中通过partitioner_class配置自定义分区器,代码如下

 public KafkaProducer<String, String> getKafkaProducer() {
        //创建生产者配置
        Properties props = new Properties();
        
        //启用自定义分区器
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.kafak.testkafka.CustomPartitioner");
        
        //配置生产者批处理
        //缓冲区大小最大为16384比特
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        //距离上次发送消息时间隔3s
        props.put(ProducerConfig.LINGER_MS_CONFIG,"3000");
        //配置消息确认策略
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        //配置重试次数
        props.put(ProducerConfig.RETRIES_CONFIG,3);
        //配置Kafka集群地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.142.30:9092,192.168.142.31:9092,192.168.142.32:9092");
        //配置序列化
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //返回生产者
        return new KafkaProducer<String, String>(props);
    }

如此,便可以实现一个自定义分区策略。 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2057791.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

谁有实际开发权限?

需要查一下&#xff0c;谁有实际开发权限。 正常来说&#xff0c;是按权限查&#xff0c;但是&#xff0c;有权限&#xff0c;不见得能开发&#xff0c;谁开发都得要个sap key。 那切入点就是这个key了&#xff0c;毕竟是问谁有实际开发权限。 直接给结果&#xff1a; 好啦&…

通过POJO生成MySQL的DDL语句

背景 有时候下载的源码没有数据库的DDL语句&#xff0c;需要手动去创建&#xff0c;这就很麻烦了&#xff0c;这时需要一个利器通过POJO对象生成DDL语句&#xff0c;一键生成&#xff0c;直接执行即可。 工程结构示例 pom.xml文件 <?xml version"1.0" encodin…

如何在 Git 中安全撤销提交与更改

文章目录 前言一、Git Reset1. --soft&#xff1a;保留变更在暂存区2. --mixed&#xff08;默认选项&#xff09;&#xff1a;保留变更在工作区3. --hard&#xff1a;彻底丢弃所有变更 二、Git Revert1. 撤销单个提交2. 撤销多个提交3. 撤销合并提交 三、实际例子总结 前言 在…

你知道手机零部件尺寸检测的重要性吗?

手机零部件作为手机制造行业的基础&#xff0c;其品质的优劣直接关系到行业的发展&#xff0c;所以加强手机精密零部件尺寸检测非常重要。如今&#xff0c;手机零部件变得更加精细&#xff0c;对质量的要求也在不断提高&#xff0c;随着生产规模逐渐扩大&#xff0c;传统的检测…

java ssl使用自定义证书

1.证书错误 Caused by: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target 2.生成客户端证书 openssl x509 -in <(openssl s_client -connect 192.168.11.19:8101 -prexit 2>/dev/null) -ou…

C语言 | Leetcode C语言题解之第355题设计推特

题目&#xff1a; 题解&#xff1a; typedef struct {int tweetId;int userId; } Tweet;typedef struct {int* dict[501];Tweet* tweetList;int tweetListLen; } Twitter;Twitter* twitterCreate() {Twitter* obj malloc(sizeof(Twitter));for (int i 0; i < 501; i) {ob…

【Linux】 gdb-调试器初入门(简单版使用)

&#x1f525;系列文章&#xff1a;《Linux入门》 目录 一、背景 二、什么是GDB &#x1f337;定义 &#x1f337;GDB调试工具---提供的帮助 三、GDB的安装教程-Ubuntu &#x1f337;gdb的安装 四、哪类程序可被调试 &#x1f337;程序的发布方式 &#x1f337;Debug版…

力扣 | 背包dp | 279. 完全平方数、518. 零钱兑换 II、474. 一和零、377. 组合总和 Ⅳ

文章目录 一、279. 完全平方数二、518. 零钱兑换 II三、474. 一和零四、377. 组合总和 Ⅳ 一、279. 完全平方数 LeetCode&#xff1a;279. 完全平方数 朴素想法&#xff1a; 这个题目最简单的想法是&#xff0c;可以用 O ( n n ) O(n\sqrt{}n) O(n ​n)的动态规划解决&#x…

OpenCV几何图像变换(1)映射转换函数convertMaps()的使用

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 将图像变换映射从一种表示形式转换为另一种表示形式。 该函数将用于 remap 的映射对从一种表示形式转换为另一种表示形式。以下选项 ((map1.type…

车辆类型检测算法、车辆类型源码及其样本与模型解析

车辆类型检测算法是利用计算机视觉和深度学习技术&#xff0c;对车辆图像进行自动分析和识别&#xff0c;以判断车辆的类型&#xff08;如轿车、SUV、货车等&#xff09;的一种算法。以下是对车辆类型检测算法的详细解析&#xff1a; 一、算法基础 车辆类型检测算法的基础是图…

区间预测|基于长短期记忆网络LSTM分位数单变量时间序列区间预测Matlab程序QRLSTM

区间预测|基于长短期记忆网络LSTM分位数单变量时间序列区间预测Matlab程序QRLSTM 文章目录 前言区间预测|基于长短期记忆网络LSTM分位数单变量时间序列区间预测Matlab程序QRLSTM 一、QRLSTM模型1. 基本原理1.1 LSTM (Long Short-Term Memory)1.2 量化回归&#xff08;Quantile …

移动端GenAI应用的崛起:从市场规模到成功案例分析

随着生成式人工智能&#xff08;GenAI&#xff09;技术的飞速发展&#xff0c;移动应用市场正经历一场前所未有的变革。从图像编辑到聊天机器人&#xff0c;这些基于AI的应用不仅满足了用户日益增长的需求&#xff0c;也为企业带来了巨大的商业机遇。本文将探讨这一领域的最新趋…

网站建设中:高效利用Robots.txt文件的策略与实践

原文&#xff1a;网站建设中&#xff1a;高效利用Robots.txt文件的策略与实践 - 孔乙己大叔 (rebootvip.com) 在网站中使用robots.txt文件是一种控制搜索引擎爬虫访问网站内容的方法。以下是关于如何在网站中使用robots.txt的详细步骤和注意事项&#xff1a; 一、创建robots.t…

集团数字化转型方案(四)

集团数字化转型方案通过全面部署人工智能&#xff08;AI&#xff09;、大数据分析、云计算和物联网&#xff08;IoT&#xff09;技术&#xff0c;创建了一个智能化的企业运营平台&#xff0c;涵盖从业务流程自动化、实时数据监控、精准决策支持&#xff0c;到个性化客户服务和高…

PV、UV、IP:网站流量分析的关键指标

原文&#xff1a;PV、UV、IP&#xff1a;网站流量分析的关键指标 - 孔乙己大叔 (rebootvip.com) 摘要&#xff1a; 在浩瀚的互联网海洋中&#xff0c;PV&#xff08;Page View&#xff0c;页面浏览量&#xff09;、UV&#xff08;Unique Visitor&#xff0c;独立访客数…

Eclipse SVN 插件在线下载地址

Eclipse SVN 插件 Subversive 在线安装 1、选择help下的install new software 2、点击 add 3、Name随便写&#xff0c;Location输入&#xff1a; https://download.eclipse.org/technology/subversive/4.8/release/latest/ 点击Add 4、然后一直下一步&#xff0c;Finish&am…

【QT】——1_QT学习笔记

一、QT是什么&#xff1f; QT 是一个功能强大、应用广泛的跨平台 C 应用程序开发框架&#xff0c;它不仅提供了丰富多样、美观实用的图形界面组件&#xff0c;还具备高效灵活的信号与槽通信机制&#xff0c;能够帮助开发者轻松构建出复杂且性能优越的应用程序&#xff0c;广泛…

VS Code中基于MSTest编写和运行测试

MS Test&#xff08;Microsoft Test Framework&#xff09;是微软提供的一个用于.NET应用程序的单元测试框架。以下是一个使用MS Test进行单元测试的示例&#xff0c;该示例将涵盖测试的基本步骤和概念。 项目搭建 在VS Code中开发C#时&#xff0c;创建solution&#xff08;解…

AI绘画Stable Diffusion画全身图总是人脸扭曲?ADetailer插件实现一键解决!商业级AI人物生成教程

大家好&#xff0c;我是灵魂画师向阳 你是否遇到过SD生成的人物脸部扭曲、甚至令人恶心的情况&#xff1f;也曾感到束手无策&#xff1f;别担心&#xff0c;这份教程专为你而来。 在使用SD生成人物全身照时&#xff0c;你可能经常发现人物的脸部会出现扭曲问题。这是因为人物…

整体思想以及取模

前言&#xff1a;一开始由于失误&#xff0c;误以为分数相加取模不能&#xff0c;但是其实是可以取模的 这个题目如果按照一般方法&#xff0c;到达每个节点再进行概率统计&#xff0c;但是不知道为什么只过了百分之十五的测试集 题目地址 附上没过关的代码 #include<bits…