kafka 02——三个重要的kafka客户端

news2025/1/11 2:31:58

kafka 02——三个重要的kafka客户端

  • 1. 前言
    • 1.1 关于 Kafka 的安装
    • 1.2 常用客户端简介
    • 1.3 依赖
  • 2. AdminClient
    • 2.1 Admin Configs
    • 2.2 AdminClient API
      • 2.2.1 设置 AdminClient 对象
      • 2.2.2 创建 topic + 获取 topic 列表
      • 2.2.3 删除topic
      • 2.2.4 查看 topic 的描述信息
      • 2.2.5 查看 topic 的配置信息
      • 2.2.6 修改 topic 的配置信息
      • 2.2.7 新增 Partition
        • 2.2.7.1 相关概念
        • 2.2.7.2 演示
    • 2.3 附代码
  • 3. 生产者(Producer API)
    • 3.1 Producer Configs
      • 3.1.1 参考官网
      • 3.1.2 关于acks 的配置(消息传递保障)
    • 3.2 Producer API
      • 3.2.1 异步发送
      • 3.2.2 异步阻塞发送(同步发送)
      • 3.2.3 异步发送并回调
      • 3.2.4 总结 ( 异步阻塞发送 与 异步发送)
        • 3.2.3.1 异步阻塞发送
        • 3.2.3.2 异步发送
        • 3.2.3.3 参考
    • 3.3 Producer 自定义Partition分区规则(负载均衡)
      • 3.3.1 把 Partition 增加到3
      • 3.3.2 核心代码
      • 3.3.3 效果
  • 4. 消费者
    • 4.1 Consumer Configs
    • 4.2 消费者消费例子
      • 4.2.1 官网参考
      • 4.2.2 简单入门例子——自动偏移提交
      • 4.2.2 手动偏移控制
        • 4.2.3.1 解释
        • 4.2.3.2 代码
      • 4.2.3 每个 partition 单独处理
        • 4.2.3.1 解释
        • 4.2.3.2 代码
        • 4.2.3.3 注意
      • 4.2.4 手动控制消费哪个partition(手动分区分配)
        • 4.2.4.1 描述
        • 4.2.4.2 代码
        • 4.2.4.3 效果
      • 4.2.5 消费者多线程处理
        • 4.2.5.1 消费者线程不安全
        • 4.2.5.2 两种方式实现
          • 4.2.5.2.1 每个线程一个消费者
          • 4.2.5.2.1 将消费和处理分离
        • 4.2.5.3 典型的模式(每个线程一个消费者)
        • 4.2.5.4 将消费和处理分离(线程池处理)
      • 4.2.6
    • 4.3

1. 前言

1.1 关于 Kafka 的安装

  • 请参考下面的文章:
    Kafka 01——Kafka的安装及简单入门使用.

1.2 常用客户端简介

  • AdminClient API:
    允许管理和检测Topic、Broker以及其他Kafka对象。
  • Producer API:
    发布消息到一个或多个API。
  • Consumer API:
    订阅一个或多个Topic,并处理产生的消息。

1.3 依赖

  • 如下:
    在这里插入图片描述

            <!--kafka客户端-->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.8.2</version>
            </dependency>
            
    
  • 完整的pom

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
        <parent>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-parent</artifactId>
            <version>2.7.6</version>
            <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>com.liu.susu</groupId>
        <artifactId>kafka-api</artifactId>
        <version>0.0.1-SNAPSHOT</version>
        <name>kafka-api</name>
        <description>kafka-api</description>
        <properties>
            <java.version>1.8</java.version>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    
            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-test</artifactId>
                <scope>test</scope>
            </dependency>
    
            <dependency>
                <groupId>ch.qos.logback</groupId>
                <artifactId>logback-classic</artifactId>
                <version>1.2.11</version>
            </dependency>
    
            <!--kafka客户端-->
            <dependency>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
                <version>2.8.2</version>
            </dependency>
    
        </dependencies>
    
        <build>
            <plugins>
                <plugin>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-maven-plugin</artifactId>
                </plugin>
            </plugins>
        </build>
    
    </project>
    

2. AdminClient

2.1 Admin Configs

  • 关于配置,可参考官网:
    https://kafka.apache.org/documentation/#adminclientconfigs.

2.2 AdminClient API

2.2.1 设置 AdminClient 对象

  • 详细配置请参考官网,简单配置使用,如下:

    package com.liu.susu.admin;
    
    import org.apache.kafka.clients.admin.AdminClient;
    import org.apache.kafka.clients.admin.AdminClientConfig;
    
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    
    /**
     * @Description
     * @Author susu
     */
    public class AdminExample1 {
    
        public final static String TOPIC_NAME = "";
    
        /**
         * 1. 创建并设置 AdminClient 对象
         */
        public static AdminClient getAdminClient(){
            Properties properties = new Properties();
            properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "Kafka服务IP:9092");
    
            AdminClient adminClient = AdminClient.create(properties);
            return adminClient;
        }
    
        public static void main(String[] args) {
            //1. 测试 创建并设置 AdminClient 对象
            AdminClient adminClient = AdminExample1.getAdminClient();
            System.out.println("adminClient==>" + adminClient);
        }
    }
    
    

    在这里插入图片描述

2.2.2 创建 topic + 获取 topic 列表

  • 如下:

        /**
         * 2. 创建topic
         */
        public static void createTopic(){
            AdminClient adminClient = getAdminClient();
            // 副本因子
            short rs = 1;
            NewTopic newTopic = new NewTopic("new_topic_test", 1, rs);//new_topic_test 是 topic的name
            CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
            System.out.println("创建的新topic为::::" + topics);
        }
    
        /**
         * 3. 获取已经创建的 topic 的列表
         */
        public static ListTopicsResult getTopicList(){
            AdminClient adminClient = getAdminClient();
            ListTopicsResult topicList = adminClient.listTopics();
            return topicList;
        }
    
  • 测试如下:

        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //1. 测试 创建并设置 AdminClient 对象
    //        AdminClient adminClient = AdminExample1.getAdminClient();
    //        System.out.println("adminClient==>" + adminClient);
    
            //2. 测试 创建topic
            createTopic();
    
            //3. 获取已经创建的 topic 的列表
            ListTopicsResult topicList = getTopicList();
            Collection<TopicListing> topicListings = topicList.listings().get();
            for (TopicListing topic : topicListings) {
                System.out.println(topic);
            }
    
        }
    

    在这里插入图片描述
    在这里插入图片描述

2.2.3 删除topic

  • 如下:
        /**
         * 4. 删除 topic
         */
        public static void deleteTopic(String topicName) throws ExecutionException, InterruptedException {
            AdminClient adminClient = getAdminClient();
            DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(topicName));
            deleteTopicsResult.all().get();
        }
    

2.2.4 查看 topic 的描述信息

  • 如下:

        /**
         * 5. 获取描述 topic 的信息
         */
        public static void getDescribeTopics(String topicName) throws ExecutionException, InterruptedException {
            AdminClient adminClient = getAdminClient();
            DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList(topicName));
            Map<String, TopicDescription> descriptionMap = result.all().get();
            descriptionMap.forEach((k,v)->{
                System.out.println("k==>"+k +",v===>"+v);
            });
        }
    

    在这里插入图片描述

    k==>susu-topic,v===>(name=susu-topic, internal=false, partitions=(partition=0, leader=IP:9092 (id: 0 rack: null), replicas=IP:9092 (id: 0 rack: null), isr=IP:9092 (id: 0 rack: null)), authorizedOperations=null)
    

2.2.5 查看 topic 的配置信息

  • 如下:
        /**
         * 6. 获取 topic 的配置信息
         */
        public static void getDescribeConfig(String topicName) throws ExecutionException, InterruptedException{
            AdminClient adminClient = getAdminClient();
            ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC,topicName);
            DescribeConfigsResult configsResult = adminClient.describeConfigs(Arrays.asList(resource));
            Map<ConfigResource, Config> configMap = configsResult.all().get();
            configMap.forEach((k,v)->{
                System.out.println("k==>"+k +",v===>"+v);
            });
        }
        
        //查看某一项配置(eg:message.downconversion.enable)的值
        Config config = configMap.get(resource);
        ConfigEntry configEntry = config.get("message.downconversion.enable");
        System.out.println("message.downconversion.enable===>" + configEntry.value());
    
    在这里插入图片描述

2.2.6 修改 topic 的配置信息

  • 如下:
        /**
         * 7. 修改 topic 的配置信息
         *    本例修改 message.downconversion.enable,将默认的 true 改为 false
         */
        public static void editConfig(String topicName) throws ExecutionException, InterruptedException {
            AdminClient adminClient = getAdminClient();
            Map<ConfigResource,Config> configMap = new HashMap<>();
    
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC,topicName);
    
            String keyName = "message.downconversion.enable";
            String value = "false";
            ConfigEntry configEntry = new ConfigEntry(keyName, value);
            Config config = new Config(Arrays.asList(configEntry));
    
            configMap.put(configResource,config);
    
            AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(configMap);
            alterConfigsResult.all().get();
        }
    
  • 效果如下:
    在这里插入图片描述

2.2.7 新增 Partition

2.2.7.1 相关概念

  • Topic:主题,一个虚拟的概念,由1到多个 Partitions 组成,可以理解为一个队列,生产者和消费者都是面向一个Topic。
  • Partition:分区,实际消息存储单位。为了实现扩展性,一个非常大的 Topic 可以分布到多个 Broker 上,一个Topic 可以分为多个 Partition,每个 Partition 是一个有序的队列(分区有序,不能保证全局有序)
  • Producer:消息生产者,向 Kafka 中发布消息的角色。
  • Consumer:消息消费者,从 Kafka 中拉取消息消费的客户端。
  • Broker:经纪人,一台 Kafka 服务器就是一个 Broker,一个集群由多个 Broker 组成,一个 Broker 可以容纳多个 Topic。

2.2.7.2 演示

  • 代码如下:
        /**
         * 8. 增加 topic 的Partitions
         */
        public static void addPartitionNum(String topicName, int partitionNum) throws ExecutionException, InterruptedException {
            AdminClient adminClient = getAdminClient();
            Map<String,NewPartitions> partitionsMap = new HashMap<>() ;
    
            NewPartitions newPartitions = NewPartitions.increaseTo(partitionNum);//增加到的数量
    
            partitionsMap.put(topicName,newPartitions);
    
            CreatePartitionsResult request = adminClient.createPartitions(partitionsMap);
            request.all().get();
        }
    
  • 效果如下:
    在这里插入图片描述

2.3 附代码

  • 如下:
    package com.liu.susu.admin;
    
    import org.apache.kafka.clients.admin.*;
    import org.apache.kafka.common.KafkaFuture;
    import org.apache.kafka.common.config.ConfigResource;
    import org.apache.kafka.common.requests.CreatePartitionsRequest;
    
    import java.util.*;
    import java.util.concurrent.ExecutionException;
    
    /**
     * @Description
     * @Author susu
     */
    public class AdminExample1 {
    
        public final static String TOPIC_NAME = "new_topic_test";
    
        /**
         * 1. 创建并设置 AdminClient 对象
         */
        public static AdminClient getAdminClient(){
            Properties properties = new Properties();
            properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "43.143.190.116:9092");
    
            AdminClient adminClient = AdminClient.create(properties);
            return adminClient;
        }
    
    
        /**
         * 2. 创建topic
         */
        public static void createTopic(){
            AdminClient adminClient = getAdminClient();
            // 副本因子
            short rs = 1;
            NewTopic newTopic = new NewTopic("new_topic_test", 1, rs);
            CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
            System.out.println("创建的新topic为::::" + topics);
        }
    
        /**
         * 3. 获取已经创建的 topic 的列表
         */
        public static ListTopicsResult getTopicList(){
            AdminClient adminClient = getAdminClient();
            ListTopicsResult topicList = adminClient.listTopics();
            return topicList;
        }
    
        /**
         * 4. 删除 topic
         */
        public static void deleteTopic(String topicName) throws ExecutionException, InterruptedException {
            AdminClient adminClient = getAdminClient();
            DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(topicName));
            deleteTopicsResult.all().get();
        }
    
        /**
         * 5. 获取描述 topic 的信息
         */
        public static void getDescribeTopics(String topicName) throws ExecutionException, InterruptedException {
            AdminClient adminClient = getAdminClient();
            DescribeTopicsResult result = adminClient.describeTopics(Arrays.asList(topicName));
            Map<String, TopicDescription> descriptionMap = result.all().get();
            descriptionMap.forEach((k,v)->{
                System.out.println("k==>"+k +",v===>"+v);
            });
        }
    
        /**
         * 6. 获取 topic 的配置信息
         */
        public static void getDescribeConfig(String topicName) throws ExecutionException, InterruptedException{
            AdminClient adminClient = getAdminClient();
            ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC,topicName);
            DescribeConfigsResult configsResult = adminClient.describeConfigs(Arrays.asList(resource));
            Map<ConfigResource, Config> configMap = configsResult.all().get();
            configMap.forEach((k,v)->{
                System.out.println("\nk==>"+k +",v===>"+v);
            });
    
            //查看某一项配置(eg:message.downconversion.enable)的值
            Config config = configMap.get(resource);
            ConfigEntry configEntry = config.get("message.downconversion.enable");
            System.out.println("message.downconversion.enable===>" + configEntry.value());
        }
    
        /**
         * 7. 修改 topic 的配置信息
         *    本例修改 message.downconversion.enable,将默认的 true 改为 false
         */
        public static void editConfig(String topicName) throws ExecutionException, InterruptedException {
            AdminClient adminClient = getAdminClient();
            Map<ConfigResource,Config> configMap = new HashMap<>();
    
            ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC,topicName);
    
            String keyName = "message.downconversion.enable";
            String value = "false";
            ConfigEntry configEntry = new ConfigEntry(keyName, value);
            Config config = new Config(Arrays.asList(configEntry));
    
            configMap.put(configResource,config);
    
            AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(configMap);
            alterConfigsResult.all().get();
        }
    
        /**
         * 8. 增加 topic 的Partitions
         */
        public static void addPartitionNum(String topicName, int partitionNum) throws ExecutionException, InterruptedException {
            AdminClient adminClient = getAdminClient();
            Map<String,NewPartitions> partitionsMap = new HashMap<>() ;
    
            NewPartitions newPartitions = NewPartitions.increaseTo(partitionNum);//增加到的数量
    
            partitionsMap.put(topicName,newPartitions);
    
            CreatePartitionsResult request = adminClient.createPartitions(partitionsMap);
            request.all().get();
        }
    
    
        public static void main(String[] args) throws ExecutionException, InterruptedException {
            //1. 测试 创建并设置 AdminClient 对象
    //        AdminClient adminClient = AdminExample1.getAdminClient();
    //        System.out.println("adminClient==>" + adminClient);
    
    
            //2. 测试 创建topic
    //        createTopic();
    
            //3. 获取已经创建的 topic 的列表
            ListTopicsResult topicList = getTopicList();
            Collection<TopicListing> topicListings = topicList.listings().get();
            for (TopicListing topic : topicListings) {
                System.out.println(topic);
            }
    
            // 4. 删除topic
    //        deleteTopic("new_topic_test");
    
            // 5.
    //        getDescribeTopics("susu-topic");
    
            //6. 获取 topic 的配置信息
    //        getDescribeConfig("susu-topic");
    
            // 7. 修改 topic 的配置信息
    //        editConfig("susu-topic");
    //
    //        System.out.println("\n=============修改之后的配置===========\n");
    //
    //        getDescribeConfig("susu-topic"); //修改之后再查看配置
    
            //8. 增加 topic 的Partitions
            addPartitionNum("susu-topic",2);
            System.out.println("添加完毕");
    
        }
    
    }
    
    

3. 生产者(Producer API)

3.1 Producer Configs

3.1.1 参考官网

  • 关于 Producer Configs 更多配置,参考官网
    https://kafka.apache.org/documentation/#producerconfigs.

  • 简单看个配置,如下:
    在这里插入图片描述

    在这里插入图片描述

3.1.2 关于acks 的配置(消息传递保障)

关于 acks 的配置,在考虑请求完成之前,生产者要求领导已收到的确认次数。这控制发送的记录的持久性。允许以下设置:

  • acks=0 ,如果设置为0,那么生产者将不会等待服务器的任何确认。(即:消息发送之后就不管了,无论消息是否写成功)

    • 该记录将立即添加到套接字缓冲区并被认为已发送。
    • 在这种情况下,不能保证服务器已经接收到记录,重试配置将不会生效(因为客户端通常不会知道任何失败)。为每条记录返回的偏移量将始终设置为-1。
    • 即:这种情况消息发送之后,要么根本没收到要么收到一次,所以,最多收到一次消息(收到0次或多次)
  • acks=1 ,这将意味着leader将记录写入其本地日志,但将在不等待所有follower完全确认的情况下进行响应。

    • 在这种情况下,如果leader在确认记录后立即失败,但在follower复制它之前,那么记录将丢失。
    • 两种情况:
      • 一是,没收到消息没有回应的重复发送,这时还是收到1次;
      • 二是,收到消息但是回应出了问题,即仅没收到回应的重发,这时就会重复收到消息,所以多次。
    • 即:这种情况至少收到一次消息(一次或多次)
  • acks=all(或者acks=-1) ,这意味着leader将等待同步副本的完整集合来确认记录

    • 这保证了只要至少有一个同步副本保持活动状态,记录就不会丢失。这是最有力的保证。
    • 即:这种情况下收到消息有且仅有一次,如果重复发送会拒收。
  • 注意,启用幂等性需要这个配置值为“all”。如果设置了冲突的配置,并且幂等性没有显式启用,则幂等性被禁用。

3.2 Producer API

3.2.1 异步发送

  • 代码如下:
    在这里插入图片描述
    package com.liu.susu.producer;
    
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerConfig;
    import org.apache.kafka.clients.producer.ProducerRecord;
    
    import java.util.Properties;
    
    /**
     * @Description
     * @Author susu
     */
    public class ProducerExample1 {
    
        public static Properties getProperties(){
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:9092");
            properties.put(ProducerConfig.ACKS_CONFIG, "all");
            properties.put(ProducerConfig.RETRIES_CONFIG, "0");
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16348");
            properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            return properties;
        }
    
        /**
         * 1. 发送消息(异步发送)
         *    1.1 一次发一条消息
         */
        public static void producerSendOne(String topicName){
            Properties properties = getProperties();
            //Producer对象
            Producer<String, String> producer = new KafkaProducer<>(properties);
            //消息对象
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName,"num1","A-10001");
            //发送消息
            producer.send(producerRecord);
            //所有的通道打开都要记得关闭
            producer.close();
        }
        /**
         * 1. 发送消息(异步发送)
         *    1.2 一次发多条消息
         */
        public static void producerSendMore(String topicName){
            Properties properties = getProperties();
            //Producer对象
            Producer<String, String> producer = new KafkaProducer<>(properties);
            for (int i = 0; i < 5; i++) {
                //消息对象
                ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName,"Record-"+i,"R-1000"+i);
                //发送消息
                producer.send(producerRecord);
            }
            producer.close();
        }
    
        public static void main(String[] args) {
            //1.1 一次发一条消息
    //        producerSendOne("susu-topic");
    
            //1.2 一次发多条消息
            producerSendMore("susu-topic");
        }
    
    }
    
    
  • 测试效果如下:
    在这里插入图片描述

3.2.2 异步阻塞发送(同步发送)

  • 代码如下:

    package com.liu.susu.producer;
    
    import org.apache.kafka.clients.producer.*;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    /**
     * @Description
     * @Author susu
     */
    public class ProducerExample2 {
    
        public static Properties getProperties(){
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "43.143.190.116:9092");
            properties.put(ProducerConfig.ACKS_CONFIG, "all");
            properties.put(ProducerConfig.RETRIES_CONFIG, "0");
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16348");
            properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            return properties;
        }
        
        /**
         * 1. 异步阻塞发送(同步发送)
         */
        public static void producerSendMore(String topicName) throws ExecutionException, InterruptedException {
            Properties properties = getProperties();
            //Producer对象
            Producer<String, String> producer = new KafkaProducer<>(properties);
            for (int i = 0; i < 5; i++) {
                //消息对象
                ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName,"Z-Record-"+i,"Z-R-1000"+i);
                //发送消息
    //            producer.send(producerRecord);
                Future<RecordMetadata> send = producer.send(producerRecord);
                RecordMetadata recordMetadata = send.get();//future.get会进行阻塞直到返回数据表示发送成功,才会继续下一条消息的发送
    
                System.out.println("Z-Record-"+i + ",partition-->"+recordMetadata.partition() + ",offset-->"+recordMetadata.offset());
    
            }
            producer.close();
        }
    
        public static void main(String[] args) throws ExecutionException, InterruptedException{
            //1. 异步阻塞发送(同步发送)
            producerSendMore("susu-topic");
        }
    
    }
    
    
  • 测试如下:
    在这里插入图片描述
    在这里插入图片描述

3.2.3 异步发送并回调

  • 生产者发消息,发送完之后不用等待broker给回复,直接执行下面的业务逻辑。可以提供回调方法,让broker异步的调用callback,告知生产者,消息发送的结果。这种方式就不用像异步阻塞那样,发送完之后还得阻塞等着。

  • 效果如下:
    在这里插入图片描述

  • 代码如下:

    package com.liu.susu.producer;
    
    import org.apache.kafka.clients.producer.*;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.Future;
    
    /**
     * @Description
     * @Author susu
     */
    public class ProducerExample3 {
    
        public static Properties getProperties(){
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "43.143.190.116:9092");
            properties.put(ProducerConfig.ACKS_CONFIG, "all");
            properties.put(ProducerConfig.RETRIES_CONFIG, "0");
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16348");
            properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            return properties;
        }
        
        /**
         * 1. 异步发送并回调
         */
        public static void producerSendMore(String topicName) throws ExecutionException, InterruptedException {
            Properties properties = getProperties();
            //Producer对象
            Producer<String, String> producer = new KafkaProducer<>(properties);
            for (int i = 0; i < 5; i++) {
                //消息对象
                ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName,"H4-Record-"+i,"H4-R-1000"+i);
    
                //1 发送消息:异步发送并回调
                producer.send(producerRecord, (recordMetadata, exception) -> {
                    if(exception == null) {
                        System.out.println("partition-->"+recordMetadata.partition() + ",offset-->"+recordMetadata.offset());
                    }
                    else {
                        exception.printStackTrace();
                    }
                });
    
                //2 发送消息:异步发送并回调
    //            producer.send(producerRecord, new Callback() {
    //                @Override
    //                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
    //                    if(e == null) {
    //                        System.out.println("partition-->"+recordMetadata.partition() + ",offset-->"+recordMetadata.offset());
    //                    }
    //                    else {
    //                        e.printStackTrace();
    //                    }
    //                }
    //            });
    
            }
            producer.close();// 要关闭Producer实例
        }
        public static void main(String[] args) throws ExecutionException, InterruptedException{
            //1. 异步发送并回调
            producerSendMore("susu-topic");
        }
    
    }
    
    

3.2.4 总结 ( 异步阻塞发送 与 异步发送)

3.2.3.1 异步阻塞发送

  • 此方式可理解为同步发送(即:同步就是逐条发送。)。
    • 一定是逐条发送的,第一条响应到达后,才会请求第二条。会对每条消息的结果进行判断,future.get() 会进行阻塞直到返回数据表示发送成功,才会继续下一条消息的发送,可以直到每条信息的发送情况。
    • 此方式如果发送失败会进行重试并抛出异常,直至重试达到retries最大次数,此方式也是最大程度确保数据可靠性,可以记录对应的结果日志。

3.2.3.2 异步发送

  • 异步就是批量发送。
    • 如果设置成异步的模式,可以运行生产者以batch的形式push数据,这样会极大的提高broker的性能,但是这样会增加丢失数据的风险。
    • 异步方式,可以发送一条,也可以批量发送多条,特性是不需等第一次(注意这里单位是次,因为单次可以是单条,也可以是批量数据)响应,就立即发送第二次。

3.2.3.3 参考

  • Kafka同步发送与异步发送消息.

3.3 Producer 自定义Partition分区规则(负载均衡)

3.3.1 把 Partition 增加到3

  • 如下,0 ,1 ,2:
    在这里插入图片描述

3.3.2 核心代码

  • 如下:
    在这里插入图片描述
    在这里插入图片描述

  • MyPartition.java

    package com.liu.susu.producer;
    
    import org.apache.kafka.clients.producer.Partitioner;
    import org.apache.kafka.common.Cluster;
    
    import java.util.Map;
    
    /**
     * @Description
     * @Author susu
     */
    public class MyPartition implements Partitioner {
    
    
        @Override
        public int partition(String topic, Object key, byte[] bytes, Object value, byte[] bytes1, Cluster cluster) {
            String newsKey = key + "";  //格式:"P-Record-"+i
    
            String newKeyNum = newsKey.substring(newsKey.length()-1);//取最后一位
            int keyNum = Integer.parseInt(newKeyNum);
    
            int partition = keyNum % 3;
    
            System.out.println("newsKey--->"+newsKey + ",newKeyNum-->"+newKeyNum+",partition-->"+partition);
    
            return partition;
        }
    
        @Override
        public void close() {
    
        }
    
        @Override
        public void configure(Map<String, ?> map) {
    
        }
    
    }
    
    
  • 例子:

    package com.liu.susu.producer;
    
    import org.apache.kafka.clients.producer.*;
    
    import java.util.Properties;
    import java.util.concurrent.ExecutionException;
    
    /**
     * @Description
     * @Author susu
     */
    public class ProducerExample4 {
    
        public static Properties getProperties(){
            Properties properties = new Properties();
            properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "43.143.190.116:9092");
            properties.put(ProducerConfig.ACKS_CONFIG, "all");
            properties.put(ProducerConfig.RETRIES_CONFIG, "0");
            properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16348");
            properties.put(ProducerConfig.LINGER_MS_CONFIG, "1");
            properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432");
    
            properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.liu.susu.producer.MyPartition");
    
            properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
            return properties;
        }
        
        /**
         * 1. 异步发送并回调
         */
        public static void producerSendMore(String topicName) throws ExecutionException, InterruptedException {
            Properties properties = getProperties();
            //Producer对象
            Producer<String, String> producer = new KafkaProducer<>(properties);
            for (int i = 1; i <= 15; i++) {
                //消息对象
                ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topicName,"P-Record-"+i,"P-R-1000"+i);
    
                //发送消息:异步发送并回调
                producer.send(producerRecord, new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                        if(e == null) {
                            System.out.println("partition-->"+recordMetadata.partition() + ",offset-->"+recordMetadata.offset());
                        }
                        else {
                            e.printStackTrace();
                        }
                    }
                });
    
            }
            producer.close();// 要关闭Producer实例
        }
        public static void main(String[] args) throws ExecutionException, InterruptedException{
            // 异步发送并回调
            producerSendMore("susu-topic");
        }
    
    }
    
    

3.3.3 效果

  • 使用异步发送并回调,效果如下:
    在这里插入图片描述

4. 消费者

4.1 Consumer Configs

  • 参考官网:
    https://kafka.apache.org/documentation/#consumerconfigs.
    在这里插入图片描述

4.2 消费者消费例子

4.2.1 官网参考

  • 如下:
    https://kafka.apache.org/28/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html.
  • 例子可参考 Class KafkaConsumer<K,​V>
    在这里插入图片描述

4.2.2 简单入门例子——自动偏移提交

  • 这种情况下,消费过的不会再消费,代码如下:
    在这里插入图片描述

    package com.liu.susu.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Properties;
    
    /**
     * @Description
     * @Author susu
     */
    public class ConsumerExample1 {
    
        public static void consumerTest(){
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "IP:9092");
            props.setProperty("group.id", "test");
            props.setProperty("enable.auto.commit", "true");//设置enable.auto.commit意味着自动提交偏移量,其频率由配置auto.commit.interval.ms控制
            props.setProperty("auto.commit.interval.ms", "1000");
            props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
            /**
             * 消费订阅哪一个topic或者哪几个topic
             *   我这里:消费者订阅了主题susu-topic和susu-topic-2,作为消费者组test的一部分,并配置了group.id。
             */
            consumer.subscribe(Arrays.asList("susu-topic", "susu-topic-2"));
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));//每100毫秒拉取一次
                for (ConsumerRecord<String, String> record : records)
                    System.out.printf("topic = %s,partition = %d, offset = %d, key = %s, value = %s%n",
                            record.topic(),record.partition(),record.offset(), record.key(), record.value());
            }
        }
    
        public static void main(String[] args) {
            consumerTest();
        }
    
    }
    
    
  • 效果如下:
    在这里插入图片描述

4.2.2 手动偏移控制

4.2.3.1 解释

  • 用户还可以控制何时将记录视为已消耗记录,从而提交其偏移量,而不是依赖于消费者定期提交所消耗的偏移量。当消息的消费与一些处理逻辑相结合时,这很有用,因此在消息完成处理之前不应将其视为消费。
  • 在本例中,我们将使用一批记录并在内存中批量处理它们。当我们有足够的记录时,我们将把它们插入到数据库中。如果我们像前面的例子一样允许偏移量自动提交,那么记录在poll中返回给用户后就会被认为是消耗了。这样,我们的流程就有可能在对记录进行批处理之后,但在将它们插入数据库之前失败。
    为了避免这种情况,我们将只在将相应的记录插入数据库之后
    手动提交偏移量
    。这使我们能够精确控制记录何时被消费。这引发了相反的可能性:进程可能在插入数据库之后但在提交之前的时间间隔内失败(尽管这可能只有几毫秒,但这是有可能的)。在这种情况下,接管消费的进程将从最后提交的偏移量中消费,并将重复插入最后一批数据。使用这种方式,Kafka提供了通常被称为==“至少一次”的交付保证==,因为每个记录可能只交付一次,但在失败的情况下可以复制

4.2.3.2 代码

  • 代码如下:
    在这里插入图片描述
    在这里插入图片描述

    package com.liu.susu.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    
    import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Properties;
    
    /**
     * @Description 手动提交
     * @Author susu
     */
    public class ConsumerExample2 {
    
        public static void consumerTest(){
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "43.143.190.116:9092");
            props.setProperty("group.id", "test");
            props.setProperty("enable.auto.commit", "false");//false 手动提交
            props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
    //        consumer.subscribe(Arrays.asList("susu-topic", "susu-topic-2"));
            consumer.subscribe(Arrays.asList("susu-topic"));
    
            final int minBatchSize = 20;
            List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
    
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (ConsumerRecord<String, String> record : records) {
                    buffer.add(record);
                    System.out.printf("topic = %s,partition = %d, offset = %d, key = %s, value = %s%n",
                            record.topic(),record.partition(),record.offset(), record.key(), record.value());
                }
    
                if (buffer.size() >= minBatchSize) {
                    System.out.println(buffer);
    
                    try {
                        /**
                         * 这里是业务逻辑,把数据保存到数据库中
                         *    如果失败,则回滚
                         */
    //                insertIntoDb(buffer);
    
                        //如果成功,则手动通知offset提交
                        consumer.commitSync();//消费过之后不会再重复消费
                    } catch (Exception e) {
                        System.out.println("失败,不提交");//失败不执行commitSync,后续重复发送会消费
                        throw new RuntimeException(e);
                    }
    
                    buffer.clear();
                }
            }
    
    
        }
    
        public static void main(String[] args) {
            consumerTest();
        }
    
    }
    
    

4.2.3 每个 partition 单独处理

4.2.3.1 解释

  • 上面的例子使用 commitSync 将所有收到的记录标记为已提交。在某些情况下,你可能希望通过显式指定偏移量来更好地控制已提交的记录。在本示例中,我们在处理完每个分区中的记录后提交偏移量

4.2.3.2 代码

  • 代码如下:
    在这里插入图片描述

    package com.liu.susu.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    
    import java.time.Duration;
    import java.util.*;
    
    /**
     * @Description 处理完每个分区中的记录后提交偏移量
     * @Author susu
     */
    public class ConsumerExample3 {
    
        public static void consumerTest(){
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "43.143.190.116:9092");
            props.setProperty("group.id", "test");
            props.setProperty("enable.auto.commit", "false");//false 手动提交
            props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
    //        consumer.subscribe(Arrays.asList("susu-topic", "susu-topic-2"));
            consumer.subscribe(Arrays.asList("susu-topic"));
    
            try {
                while(true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                    // 每个partition单独处理
                    for (TopicPartition partition : records.partitions()) {
    
                        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                        for (ConsumerRecord<String, String> record : partitionRecords) {
    //                        System.out.println(record.offset() + ": " + record.value());
                            System.out.printf("topic = %s,partition = %d, offset = %d, key = %s, value = %s%n",
                                    record.topic(),record.partition(),record.offset(), record.key(), record.value());
                        }
    
                        long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                        // 循环一个partition,提交一次
                        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(lastOffset + 1);
                        consumer.commitSync(Collections.singletonMap(partition, offsetAndMetadata));
                    }
                }
            } finally {
                consumer.close();
            }
    
        }
    
        public static void main(String[] args) {
            consumerTest();
        }
    
    }
    
    
  • 效果如下:
    在这里插入图片描述

4.2.3.3 注意

  • 注意:提交的偏移量应该始终是应用程序将读取的下一条消息的偏移量。因此,在调用commitSync(offsets)时,应该在最后处理的消息的偏移量上添加一个。

4.2.4 手动控制消费哪个partition(手动分区分配)

4.2.4.1 描述

  • 在前面的例子中,我们订阅了我们感兴趣的主题,并让Kafka根据组中活跃的消费者动态地为这些主题分配公平的分区份额。但是,在某些情况下,您可能需要对分配的特定分区进行更好的控制。例如:
    • 如果进程正在维护与该分区相关的某种本地状态(比如本地磁盘上的键值存储),那么它应该只获取它在磁盘上维护的分区的记录。
    • 如果进程本身是高可用的,并且在失败时将重新启动(可能使用像YARN、Mesos或AWS设施这样的集群管理框架,或者作为流处理框架的一部分)。在这种情况下,Kafka不需要检测故障并重新分配分区,因为消费进程将在另一台机器上重新启动。
  • 要使用这种模式,不需要使用subscribe订阅主题,只需调用assign(Collection),其中包含要使用的分区的完整列表。

4.2.4.2 代码

  • 如下:
    在这里插入图片描述

    package com.liu.susu.consumer;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    
    import java.time.Duration;
    import java.util.Arrays;
    import java.util.Collections;
    import java.util.List;
    import java.util.Properties;
    
    /**
     * @Description 指定消费某个分区
     * @Author susu
     */
    public class ConsumerExample4 {
    
        public static void consumerTest(){
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "43.143.190.116:9092");
            props.setProperty("group.id", "test");
            props.setProperty("enable.auto.commit", "false");//false 手动提交
            props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
            String topicName = "susu-topic";
            TopicPartition partition0 = new TopicPartition(topicName, 0);
            TopicPartition partition1 = new TopicPartition(topicName, 1);
            TopicPartition partition2 = new TopicPartition(topicName, 2);
    
            consumer.assign(Arrays.asList(partition2)); //只有partition2消费
    //        consumer.assign(Arrays.asList(partition0, partition1));  //只有partition0, partition1消费
    
    
            try {
                while(true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Long.MAX_VALUE));
                    // 每个partition单独处理
                    for (TopicPartition partition : records.partitions()) {
    
                        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                        for (ConsumerRecord<String, String> record : partitionRecords) {
    //                        System.out.println(record.offset() + ": " + record.value());
                            System.out.printf("topic = %s,partition = %d, offset = %d, key = %s, value = %s%n",
                                    record.topic(),record.partition(),record.offset(), record.key(), record.value());
                        }
    
                        long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                        // 循环一个partition,提交一次
                        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(lastOffset + 1);
                        consumer.commitSync(Collections.singletonMap(partition, offsetAndMetadata));
                    }
                }
            } finally {
                consumer.close();
            }
    
        }
    
        public static void main(String[] args) {
            consumerTest();
        }
    
    }
    
    

4.2.4.3 效果

  • 如下:
    在这里插入图片描述

    在这里插入图片描述

4.2.5 消费者多线程处理

4.2.5.1 消费者线程不安全

  • Kafka消费者不是线程安全的。所有网络I/O都发生在发出调用的应用程序线程中。确保多线程访问正确同步是用户的责任。非同步访问将导致ConcurrentModificationException。
  • 该规则的唯一例外是wakeup(),它可以安全地从外部线程中断活动操作。在这种情况下,阻塞操作的线程将抛出WakeupException。这可以用于从另一个线程关闭消费者。
    • 然后在一个单独的线程中,可以通过设置closed标志并唤醒消费者来关闭消费者。
      closed.set(true);
      consumer.wakeup ();
      

4.2.5.2 两种方式实现

4.2.5.2.1 每个线程一个消费者
  • 一个简单的选择是为每个线程提供自己的消费者实例。以下是这种方法的优点和缺点:
    • 利:这是最容易实现的
    • 优点:它通常是最快的,因为不需要线程间的协调
    • 优点:它使得基于每个分区的有序处理非常容易实现(每个线程只按照接收消息的顺序处理消息)。
    • 缺点:更多的消费者意味着更多的TCP连接到集群(每个线程一个)。一般来说,Kafka处理连接非常有效,所以这通常是一个小成本。
    • 缺点:多个消费者意味着更多的请求被发送到服务器,稍微少一些数据批处理,这可能会导致I/O吞吐量下降。
    • 缺点:所有进程的线程总数将受到分区总数的限制。
4.2.5.2.1 将消费和处理分离
  • 这种方法是让一个或多个消费者线程完成所有数据消费,并将ConsumerRecords实例交给阻塞队列,该队列由实际处理记录处理的处理器线程池使用。这个选项同样也有利弊:
    • 优点:这个选项允许独立地扩展消费者和处理器的数量。这使得单个消费者可以为多个处理器线程提供服务,从而避免了对分区的任何限制。
    • 缺点:保证跨处理器的顺序需要特别注意,因为线程将独立执行,由于线程执行时间的运气,较早的数据块实际上可能在较晚的数据块之后处理。对于没有订购要求的处理,这不是问题。
    • 缺点:手动提交位置变得更加困难,因为它需要所有线程协调以确保对该分区的处理完成。
      这种方法有许多可能的变体。例如,每个处理器线程可以有自己的队列,消费者线程可以使用TopicPartition散列到这些队列中,以确保有序消费并简化提交。

4.2.5.3 典型的模式(每个线程一个消费者)

  • 代码如下:
    在这里插入图片描述

    package com.liu.susu.consumer.thread;
    
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.clients.consumer.OffsetAndMetadata;
    import org.apache.kafka.common.TopicPartition;
    import org.apache.kafka.common.errors.WakeupException;
    
    import java.time.Duration;
    import java.util.*;
    import java.util.concurrent.atomic.AtomicBoolean;
    
    /**
     * @Description
     * @Author susu
     */
    public class KafkaConsumerRunner implements Runnable {
        private final AtomicBoolean closed = new AtomicBoolean(false);
        private final KafkaConsumer consumer;
        public KafkaConsumerRunner(KafkaConsumer consumer) {
            this.consumer = consumer;
        }
    
        @Override
        public void run() {
            try {
                consumer.subscribe(Arrays.asList("susu-topic"));//订阅
                while (!closed.get()) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10000));
                    // Handle new records
                    for (TopicPartition partition : records.partitions()) {
    
                        List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                        for (ConsumerRecord<String, String> record : partitionRecords) {
                            System.out.printf("Thread = %s,topic = %s,partition = %d, offset = %d, key = %s, value = %s%n",
                                    Thread.currentThread().getName(),
                                    record.topic(),record.partition(),record.offset(), record.key(), record.value());
                        }
    
                        long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                        // 循环一个partition,提交一次
                        OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(lastOffset + 1);
                        consumer.commitSync(Collections.singletonMap(partition, offsetAndMetadata));
                    }
                }
            } catch (WakeupException e) {
                // Ignore exception if closing
                if (!closed.get()) throw e;
            } finally {
                consumer.close();
            }
        }
    
        // Shutdown hook which can be called from a separate thread
        public void shutdown() {
            closed.set(true);
            consumer.wakeup();
        }
    
    
        /**
         * 构建 consumer
         * @return consumer
         */
        public static KafkaConsumer<String, String> getKafkaConsumer(){
            Properties props = new Properties();
            props.setProperty("bootstrap.servers", "43.143.190.116:9092");
            props.setProperty("group.id", "test");
            props.setProperty("enable.auto.commit", "false");//false 手动提交
            props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
    
            return consumer;
        }
    
        public static void main(String[] args) {
            KafkaConsumer<String, String> consumer = getKafkaConsumer();
    
            KafkaConsumerRunner runner = new KafkaConsumerRunner(consumer);
    
            Thread thread = new Thread(runner);
            thread.start();
    
    //        runner.shutdown();
        }
    
    }
    
    

4.2.5.4 将消费和处理分离(线程池处理)

4.2.6

4.3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/872972.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

小白带你部署LNMP分布式部署

目录 前言 一、概述 二、LNMP环境部署 三、配置nginx 1、yum安装 2、编译安装 四、安装 1、编译安装nginx 2、网络源 3、稍作优化 4、修改配置文件vim /usr/local/nginx/conf/nginx.conf 5、书写测试页面 五、部署应用 前言 LNMP平台指的是将Linux、Nginx、MySQL和…

Spring Boot + Vue3前后端分离实战wiki知识库系统<十二>--用户管理单点登录开发一

目标&#xff1a; 在上一次Spring Boot Vue3前后端分离实战wiki知识库系统&#xff1c;十一&#xff1e;--文档管理功能开发三我们已经完成了文档管理的功能模块开发&#xff0c;接下来则开启新模块的学习---用户登录&#xff0c;这块还是有不少知识点值得学习的&#xff0c;…

谈谈语音助手

目录 1.什么是语音助手 2.语音助手的发展过程 3.现在有哪些成熟的语音助手 4.语音助手对人类发展的影响 1.什么是语音助手 语音助手是一种能够通过语音交互与用户进行沟通和执行任务的虚拟助手。它基于人工智能和自然语言处理技术&#xff0c;能够理解用户的语音指令&#x…

LeetCode 36题:有效的数独

题目 请你判断一个 9 x 9 的数独是否有效。只需要 根据以下规则 &#xff0c;验证已经填入的数字是否有效即可。 数字 1-9 在每一行只能出现一次。数字 1-9 在每一列只能出现一次。数字 1-9 在每一个以粗实线分隔的 3x3 宫内只能出现一次。&#xff08;请参考示例图&#xff…

MEC | 条款4 非必要不提供 default constructor

条款4 非必要不提供 default constructor 文章目录 条款4 非必要不提供 default constructorclasses 必须有默认构造函数&#xff1f;Example1. 产生数组解决方法 2.不适用于 template-based container clases3.虚基函数 探讨>>>>> 欢迎关注公众号【三戒纪元】 …

CANoe自动化工程的搭建

基于XMLCAPL建立自动化工程 1、导入ini文件2、新建 Test Environment3、报告类型4、代码编写 1、导入ini文件 工程的配置的文件&#xff0c;配置DUT相关信息&#xff0c;具体视工程而编写内容。 2、新建 Test Environment 1、新建XML测试用例环境 2、导入XML测试用例文件 …

vulnhub靶机Deathnote

难度&#xff1a;easy 下载地址&#xff1a;https://download.vulnhub.com/deathnote/Deathnote.ova 主机发现 arp-scan -l 端口扫描 nmap --min-rate 10000 -p- 192.168.21.140 进一步查看目标的端口的服务和版本 nmap -sV -sT -O -p22,80 192.168.21.140 扫描端口的漏洞…

微服务实战项目-学成在线-项目优化(redis缓存优化)

微服务实战项目-学成在线-项目优化(redis缓存优化) 1 优化需求 视频播放页面用户未登录也可以访问&#xff0c;当用户观看试学课程时需要请求服务端查询数据&#xff0c;接口如下&#xff1a; 1、根据课程id查询课程信息。 2、根据文件id查询视频信息。 这些接口在用户未认…

Java面试——一分钟搞懂限流算法

为什么限流 运营网站&#xff0c;经常会遇到各种挑战&#xff1a;某黑客发起DoS攻击、网络爬虫网页抓取、商品秒杀活动、双十一与618等场景&#xff0c;会使流量突然激增&#xff0c;如果不限制流量的访问就会使系统宕机。 常见的限流算法 1.漏桶算法&#xff08; LEAKY BUC…

Eclipse-配置彩色输出打印

文章目录 前言配置下载查看是否安装 前言 这是一篇古老的文章&#xff0c;那个时候还在用Eclipse &#xff0c;现在已经换 IDEA 了… 这是一篇 2018 年的文章&#xff0c;我只是将文章从个人比较挪到了CSDN 中 配置 配置完然后下载下面插件即可生成彩色代码。 下载 ANSI …

Vue2-收集表单数据、过滤器、内置指令与自定义指令、Vue生命周期

&#x1f954;&#xff1a;我徒越万重山 千帆过 万木自逢春 更多Vue知识请点击——Vue.js VUE2-Day4 收集表单数据1、不同标签的value属性2、v-model的三个修饰符 过滤器内置指令与自定义指令1、内置指令2、自定义指令定义语法&#xff08;1&#xff09;函数式&#xff08;2&am…

VMware vCenter 6.5 断电后无法启动修复方案

目录 第1章 前言 第2章 问题现象 第3章 解决方案 第4章、Tips 第1章 前言 本文主要介绍VMware vcsa 6.5由于电源异常/存储异常之后无法启动&#xff0c;进入磁盘自检模式处理方法。&#xff08;最近遇到类似情况比较多&#xff0c;Citrix Xenserver也遇到了&#xff0c;后来…

rocketMq启动broker报错找不到或无法加载主类 Files\Java\jdk1.8.0_171\lib\dt.jar;C:\Program]

假如弹出提示框提示‘错误: 找不到或无法加载主类 xxxxxx’。 1.打开runbroker.cmd 将"%CLASSPATH%"加上英文双引号&#xff0c;切勿别加中文双引号 2.打开runserver.cmd 同理 将"%CLASSPATH%"加上英文双引号&#xff0c;切勿别加中文双引号 3.正常执行即…

【Linux】邮件服务器搭建 postfix+dovecot+mysql (终极版 超详细 亲测多遍无问题)

&#x1f341;博主简介 &#x1f3c5;云计算领域优质创作者   &#x1f3c5;华为云开发者社区专家博主   &#x1f3c5;阿里云开发者社区专家博主 &#x1f48a;交流社区&#xff1a;运维交流社区 欢迎大家的加入&#xff01; 文章目录 前言基础原理准备工作一 、安装关于权…

一.RocketMQ概念

RocketMQ概念 1.概念2.应用场景3.MQ的优点和缺点4.常见MQ对比 1.概念 MQ(Message Queue)&#xff0c;是一种提供消息队列服务的中间件&#xff0c;也称为消息中间件&#xff0c;是一套提供了消息生产、存储、消费全过程API的软件系统。 RocketMQ是阿里巴巴2016年MQ中间件&…

C++QT教程3——手册4.11.1自带教程(笔记)——创建一个QT快速应用

文章目录 创建一个QT快速应用创建项目创建主视图添加应用逻辑为视图添加动画素材文件 参考文章 创建一个QT快速应用 本教程使用内置的QML类型&#xff0c;介绍了Qt Quick的基本概念。有关可以选择的用户界面选项的更多信息&#xff0c;请参阅用户界面。 本教程描述了如何使用…

约束综合中的逻辑互斥时钟(Logically Exclusive Clocks)

注&#xff1a;本文翻译自Constraining Logically Exclusive Clocks in Synthesis 逻辑互斥时钟的定义 逻辑互斥时钟是指设计中活跃&#xff08;activate&#xff09;但不彼此影响的时钟。常见的情况是&#xff0c;两个时钟作为一个多路选择器的输入&#xff0c;并根据sel信号…

【人工智能前沿弄潮】——生成式AI系列:Diffusers应用 (2) 训练扩散模型(无条件图像生成)

无条件图像生成是扩散模型的一种流行应用&#xff0c;它生成的图像看起来像用于训练的数据集中的图像。与文本或图像到图像模型不同&#xff0c;无条件图像生成不依赖于任何文本或图像。它只生成与其训练数据分布相似的图像。通常&#xff0c;通过在特定数据集上微调预训练模型…

Idea报错:Cannot resolve symbol “springframework“以及各种依赖包

问题描述&#xff1a; Idea导入了maven项目之后出现报错Cannot resolve symbol “springframework” &#xff0c;识别不了这个标识或者找不到这个包&#xff0c;明明这些依赖和包都有就是出现报错&#xff0c;并且运行按钮变成灰色 解决办法&#xff1a; 其实这个原因大概率就…

React构建的JS优化思路

背景 之前个人博客搭建时&#xff0c;发现页面加载要5s才能完成并显示 问题 React生成的JS有1.4M&#xff0c;对于个人博客服务器的带宽来说&#xff0c;压力较大&#xff0c;因此耗费了5S的时间 优化思路 解决React生成的JS大小&#xff0c;因为我用的是react-router-dom…