Kafka生产者与消费者api示例

news2025/1/6 19:24:29

生产者api示例

 

一个正常的生产逻辑需要具备以下几个步骤

  1. 配置生产者参数及创建相应的生产者实例

  2. 构建待发送的消息

  3. 发送消息

  4. 关闭生产者实例

采用默认分区方式将消息散列的发送到各个分区当中

 

package com.doitedu;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerDemo {
    public static void main(String[] args) throws InterruptedException {
        /**
         * 1.构建一个kafka的客户端
         * 2.创建一些待发送的消息,构建成kafka所需要的格式
         * 3.调用kafka的api去发送消息
         * 4.关闭kafka生产实例
         */
        //1.创建kafka的对象,配置一些kafka的配置文件
        //它里面有一个泛型k,v
        //要发送数据的key
        //要发送的数据value
        //他有一个隐含之意,就是kafka发送的消息,是一个key,value类型的数据,但是不是必须得,其实只需要发送value的值就可以了
        Properties pros = new Properties();
        //指定kafka集群的地址
        pros.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092");
        //指定key的序列化方式
        pros.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //指定value的序列化方式
        pros.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //ack模式,取值有0,1,-1(all),all是最慢但最安全的  服务器应答生产者成功的策略
        pros.put("acks", "all");
        //这是kafka发送数据失败的重试次数,这个可能会造成发送数据的乱序问题
        pros.setProperty("retries", "3");
        //数据发送批次的大小 单位是字节
        pros.setProperty("batch.size", "10000");
        //一次数据发送请求所能发送的最大数据量
        pros.setProperty("max.request.size", "102400");
        //消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
        pros.put("linger.ms", 10000);
        //整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端
        //buffer.memory要大于batch.size,否则会报申请内存不足的错误
        pros.put("buffer.memory", 10240);

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(pros);
        for (int i = 0; i < 1000; i++) {
            //key value  0 --> doit32+-->+0
            //key value  1 --> doit32+-->+1
            //key value  2 --> doit32+-->+2
            //2.创建一些待发送的消息,构建成kafka所需要的格式
            ProducerRecord<String, String> record = new ProducerRecord<>("test01", i + "", "doit32-->" + i);
            //3.调用kafka的api去发送消息
            kafkaProducer.send(record);
            Thread.sleep(100);
        }
        kafkaProducer.flush();
        kafkaProducer.close();
    }
}

 对于properties配置的第二种写法,相对来说不会出错,简单举例:

public static void main(String[] args) {
    Properties pros = new Properties();
    pros.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092,linux02:9092,linux03:9092");
    pros.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    pros.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
}

 

1.kafka的生产者可以持续不断的向topic中发送数据不?

可以

2.kafak的生产者有哪些必须配置的参数:

//指定kafka集群的地址 pros.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092"); //指定key的序列化方式 pros.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); //指定value的序列化方式 pros.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

3.kafka生产者发送数据的时候,可以用jdk的序列化器来将数据进行序列化不?

不可以,kafka有指定的序列化接口 org.apache.kafka.common.serialization.Serializer

4.构造一个kafka的生产者后,是不是就已经确定了,我的数据是需要发送到哪一个topic里面不?

不是哦,咱们构造生产者对象的时候不需要指定topic,是在构造发送数据对象的时候才指定的

 消费者Api示例

一个正常的消费逻辑需要具备以下几个步骤:

  1. 配置消费者客户端参数及创建相应的消费者实例;

  2. 订阅主题topic;

  3. 拉取消息并消费;

  4. 定期向__consumer_offsets主题提交消费位移offset;

  5. 关闭消费者实例

 

package com.doitedu;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Optional;
import java.util.Properties;

public class ConsumerDemo {
    public static void main(String[] args) {
        //1.创建kafka的消费者对象,附带着把配置文件搞定
        Properties props = new Properties();
        //props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092");
        //props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 定义kakfa 服务的地址,不需要将所有broker指定上
       // props.put("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092");
        // 制定consumer group
        props.put("group.id", "g3");
        // 是否自动提交offset  __consumer_offset   有多少分区  50 
        props.put("enable.auto.commit", "true");
        // 自动提交offset的时间间隔   -- 这玩意设置的大小怎么控制
        props.put("auto.commit.interval.ms", "5000");  //50000   1000
        // key的反序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // value的反序列化类
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 如果没有消费偏移量记录,则自动重设为起始offset:latest, earliest, none
        props.put("auto.offset.reset","earliest");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        //2.订阅主题(确定需要消费哪一个或者多个主题)
        consumer.subscribe(Arrays.asList("test02"));
        //3.开始从topic中获取数据
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
            for (ConsumerRecord<String, String> record : records) {
                //这是数据所属的哪一个topic
                String topic = record.topic();
                //该条数据的偏移量
                long offset = record.offset();
                //这条数据是哪一个分区的
                int partition = record.partition();
                //这条数据记录的时间戳,但是这个时间戳有两个类型
                long timestamp = record.timestamp();
                //上面时间戳的类型,这个类型有两个,一个是CreateTime(这条数据创建的时间), LogAppendTime(这条数据往日志里面追加的时间)
                TimestampType timestampType = record.timestampType();
                //这个数据key的值
                String key = record.key();
                //这条数据value的值
                String value = record.value();
                //分区leader的纪元
                Optional<Integer> integer = record.leaderEpoch();
                //key的长度
                int keySize = record.serializedKeySize();
                //value的长度
                int valueSize = record.serializedValueSize();
                //数据的头部信息
                Headers headers = record.headers();
//            for (Header header : headers) {
//                String hKey = header.key();
//                byte[] hValue = header.value();
//                String valueString = new String(hValue);
//                System.out.println("header的key值 = " + hKey + "header的value的值 = "+ valueString);
//            }
                System.out.printf("topic = %s ,offset = %d, partition = %d, timestampType = %s ,timestamp = %d , key = %s , value = %s ,leader的纪元 = %d , key序列化的长度 = %d ,value 序列化的长度 = %d \r\n" ,
                        topic,offset,partition,timestampType + "",timestamp,key,value,integer.get(),keySize,valueSize);
            }
        }

        //4.关闭消费者对象
//        consumer.close();
    }
}

 subscribe订阅主题

 subscribe有如下重载方法:

public void subscribe(Collection<String> topics,ConsumerRebalanceListener listener) 
public void subscribe(Collection<String> topics) 
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) 
public void subscribe(Pattern pattern)
  1. 指定集合方式订阅主题

 consumer.subscribe(Arrays.asList(topicl ));

2.正则方式订阅主题

如果消费者采用的是正则表达式的方式(subscribe(Pattern))订阅, 在之后的过程中,如果有人又创建了新的主题,并且主题名字与正表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息。如果应用程序需要消费多个主题,并且可以处理不同的类型,那么这种订阅方式就很有效。

正则表达式的方式订阅的示例

 consumer.subscribe(Pattern.compile ("topic.*" ));

 利用正则表达式订阅主题,可实现动态订阅

 assign订阅主题

消费者不仅可以通过 KafkaConsumer.subscribe() 方法订阅主题,还可直接订阅某些主题的指定分区;

在 KafkaConsumer 中提供了 assign() 方法来实现这些功能,此方法的具体定义如下:

 

package com.doitedu;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Optional;
import java.util.Properties;

public class ConsumerDemo1 {
    public static void main(String[] args) {
        //1.创建kafka的消费者对象,附带着把配置文件搞定
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"doit01");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        //2.订阅主题(确定需要消费哪一个或者多个主题)
//        consumer.subscribe(Arrays.asList("test03"));

//        consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
//        //我现在想手动指定,我需要从哪边开始消费
//        //如果用subscribe去订阅主题的时候,他内部会给这个消费者组来一个自动再均衡
//        consumer.seek(new TopicPartition("test03",0),2);
        TopicPartition tp01 = new TopicPartition("test03", 0);

        //他就是手动去订阅主题和partition,有了这个就不需要再去订阅subscribe主题了,手动指定以后,他的内部就不会再来自动均衡了
        consumer.assign(Arrays.asList(tp01)); // 手动订阅指定主题的指定分区的指定位置
        consumer.seek(new TopicPartition("test03",0),2);

        //3.开始从topic中获取数据
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
            for (ConsumerRecord<String, String> record : records) {
                //这是数据所属的哪一个topic
                String topic = record.topic();
                //该条数据的偏移量
                long offset = record.offset();
                //这条数据是哪一个分区的
                int partition = record.partition();
                //这条数据记录的时间戳,但是这个时间戳有两个类型
                long timestamp = record.timestamp();
                //上面时间戳的类型,这个类型有两个,一个是CreateTime(这条数据创建的时间), LogAppendTime(这条数据往日志里面追加的时间)
                TimestampType timestampType = record.timestampType();
                //这个数据key的值
                String key = record.key();
                //这条数据value的值
                String value = record.value();

                //分区leader的纪元
                Optional<Integer> integer = record.leaderEpoch();
                //key的长度
                int keySize = record.serializedKeySize();
                //value的长度
                int valueSize = record.serializedValueSize();
                //数据的头部信息
                Headers headers = record.headers();
//            for (Header header : headers) {
//                String hKey = header.key();
//                byte[] hValue = header.value();
//                String valueString = new String(hValue);
//                System.out.println("header的key值 = " + hKey + "header的value的值 = "+ valueString);
//            }
                System.out.printf("topic = %s ,offset = %d, partition = %d, timestampType = %s ,timestamp = %d , key = %s , value = %s ,leader的纪元 = %d , key序列化的长度 = %d ,value 序列化的长度 = %d \r\n" ,
                        topic,offset,partition,timestampType + "",timestamp,key,value,integer.get(),keySize,valueSize);
            }
        }

        //4.关闭消费者对象
//        consumer.close();
    }
}

 这个方法只接受参数partitions,用来指定需要订阅的分区集合。示例如下:

consumer.assign(Arrays.asList(new TopicPartition ("tpc_1" , 0),new TopicPartition(“tpc_2”,1))) ;

 subscribe与assign的区别

 

  • 通过subscribe()方法订阅主题具有消费者自动再均衡功能 ;

在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。

  • assign() 方法订阅分区时,是不具备消费者自动均衡的功能的;

其实这一点从assign方法参数可以看出端倪,两种类型subscribe()都有ConsumerRebalanceListener类型参数的方法,而assign()方法却没有。

 消息的消费模式

Kafka中的消费是基于拉取模式的。

 消息的消费一般有两种模式:推送模式和拉取模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。

 

public class ConsumerRecord<K, V> {
    public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
    public static final int NULL_SIZE = -1;
    public static final int NULL_CHECKSUM = -1;

    private final String topic;
    private final int partition;
    private final long offset;
    private final long timestamp;
    private final TimestampType timestampType;
    private final int serializedKeySize;
    private final int serializedValueSize;
    private final Headers headers;
    private final K key;
    private final V value;

    private volatile Long checksum;
  • topic partition 这两个属性分别代表消息所属主题的名称和所在分区的编号。

  • offset 表示消息在所属分区的偏移量。

  • timestamp 表示时间戳,与此对应的timestampType 表示时间戳的类型。

  • timestampType 有两种类型 CreateTime 和LogAppendTime ,分别代表消息创建的时间戳和消息追加到日志的时间戳。

  • headers 表示消息的头部内容。

  • key value 分别表示消息的键和消息的值,一般业务应用要读取的就是value ;

  • serializedKeySize、serializedValueSize分别表示key、value 经过序列化之后的大小,如果 key 为空,则 serializedKeySize 值为 -1,同样,如果value为空,则serializedValueSize 的值也会为 -1;

  • checksum 是CRC32的校验值。

示例代码片段 

/**
 * 订阅与消费方式2
 */
TopicPartition tp1 = new TopicPartition("x", 0);
TopicPartition tp2 = new TopicPartition("y", 0);
TopicPartition tp3 = new TopicPartition("z", 0);
List<TopicPartition> tps = Arrays.asList(tp1, tp2, tp3);
consumer.assign(tps);
​
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (TopicPartition tp : tps) {
        List<ConsumerRecord<String, String>> rList = records.records(tp);
        for (ConsumerRecord<String, String> r : rList) {
            r.topic();
            r.partition();
            r.offset();
            r.value();
            //do something to process record.
        }
    }
}

 指定位移消费

有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而 KafkaConsumer 中的seek()方法正好提供了这个功能,让我们可以追前消费或回溯消费。

seek()方法的具体定义如下:

seek都是和assign这个方法一起用 指定消费位置
public void seek(TopicPartiton partition,long offset)

 代码示例:

public class ConsumerDemo3指定偏移量消费 {
    public static void main(String[] args) {

        Properties props = new Properties();
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"g002");
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"doit01:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");
        // 是否自动提交消费位移
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");

        // 限制一次poll拉取到的数据量的最大值
        props.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,"10240000");
         KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        // assign方式订阅doit27-1的两个分区
        TopicPartition tp0 = new TopicPartition("doit27-1", 0);
        TopicPartition tp1 = new TopicPartition("doit27-1", 1);
        
        consumer.assign(Arrays.asList(tp0,tp1));
        // 指定分区0,从offset:800开始消费    ;  分区1,从offset:650开始消费
        consumer.seek(tp0,200);
        consumer.seek(tp1,250);

        // 开始拉取消息
        while(true){
            ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(3000));
            for (ConsumerRecord<String, String> rec : poll) {
                System.out.println(rec.partition()+","+rec.key()+","+rec.value()+","+rec.offset());
            }
        }
    }
}

 自动提交消费者偏移量

 

Kafka中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数enable.auto.commit 配置,默认值为 true 。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数 auto.commit.interval.ms配置,默认值为5秒,此参数生效的前提是 enable. auto.commit 参数为 true。

在默认的方式下,消费者每隔5秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在 poll() 方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。

Kafka 消费的编程逻辑中位移提交是一大难点,自动提交消费位移的方式非常简便,它免去了复杂的位移提交逻辑,让编码更简洁。但随之而来的是重复消费和消息丢失的问题

  • 重复消费

假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象(对于再均衡的情况同样适用)。我们可以通过减小位移提交的时间间隔来减小重复消息的窗口大小,但这样并不能避免重复消费的发送,而且也会使位移提交更加频繁。

 

 

  • 丢失消息

按照一般思维逻辑而言,自动提交是延时提交,重复消费可以理解,那么消息丢失又是在什么情形下会发生的呢?我们来看下图中的情形:

拉取线程不断地拉取消息并存入本地缓存,比如在BlockingQueue 中,另一个处理线程从缓存中读取消息并进行相应的逻辑处理。设目前进行到了第 y+l 次拉取,以及第m次位移提交的时候,也就是 x+6 之前的位移己经确认提交了,处理线程却还正在处理x+3的消息;此时如果处理线程发生了异常,待其恢复之后会从第m次位移提交处,也就是 x+6 的位置开始拉取消息,那么 x+3至x+6 之间的消息就没有得到相应的处理,这样便发生消息丢失的现象。

 

  

 手动提交消费者偏移量(调用kafka api

 

自动位移提交的方式在正常情况下不会发生消息丢失或重复消费的现象,但是在编程的世界里异常无可避免;同时,自动位移提交也无法做到精确的位移管理。在 Kafka中还提供了手动位移提交的方式,这样可以使得开发人员对消费位移的管理控制更加灵活。

很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费;

手动的提交方式可以让开发人员根据程序的逻辑在合适的时机进行位移提交。开启手动提交功能的前提是消费者客户端参数 enable.auto.commit 配置为false ,示例如下:

props.put(ConsumerConf.ENABLE_AUTO_COMMIT_CONFIG, false); 

 

手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync()和

commitAsync()两种类型的方法。

  • 同步提交的方式

commitSync()方法的定义如下:

 

/**
 * 手动提交offset
 */
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> r : records) {
        //do something to process record.
    }
    consumer.commitSync();
}

 对于采用 commitSync()的无参方法,它提交消费位移的频率和拉取批次消息、处理批次消息的频率是一样的,如果想寻求更细粒度的、更精准的提交,那么就需要使用commitSync()的另一个有参方法,具体定义如下:

public void commitSync(final Map<TopicPartition,OffsetAndMetadata> offsets)

 示例代码如下:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> r : records) {
        long offset = r.offset();
        //do something to process record.

        TopicPartition topicPartition = new TopicPartition(r.topic(), r.partition());
        consumer.commitSync(Collections.singletonMap(topicPartition,new OffsetAndMetadata(offset+1)));
    }
}

 提交的偏移量 = 消费完的record的偏移量 + 1

因为,__consumer_offsets中记录的消费偏移量,代表的是,消费者下一次要读取的位置!!!

  • 异步提交方式

异步提交的方式( commitAsync())在执行的时候消费者线程不会被阻塞;可能在提交消费位移的结果还未返回之前就开始了新一次的拉取。异步提交可以让消费者的性能得到一定的增强。 commitAsync方法有一个不同的重载方法,具体定义如下

 

 

/**
 * 异步提交offset
 */
while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> r : records) {
        long offset = r.offset();

        //do something to process record.
        TopicPartition topicPartition = new TopicPartition(r.topic(), r.partition());
        consumer.commitSync(Collections.singletonMap(topicPartition,new OffsetAndMetadata(offset+1)));
        consumer.commitAsync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset + 1)), new OffsetCommitCallback() {
     @Override
     public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
                if(e == null ){
                    System.out.println(map);
                }else{
                    System.out.println("error commit offset");
                }
            }
        });
    }
}

 手动提交位移(时机的选择)

 

  • 数据处理完成之前先提交偏移量

可能会发生漏处理的现象(数据丢失)反过来说,这种方式实现了: at most once的数据处理(传递)语义

  • 数据处理完成之后再提交偏移量

可能会发生重复处理的现象(数据重复)反过来说,这种方式实现了: at least once的数据处理(传递)语义当然,数据处理(传递)的理想语义是: exactly once(精确一次)Kafka也能做到exactly once(基于kafka的事务机制)

代码示例:

package com.doitedu;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.sql.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;

public class CommitOffsetByMyself {
    public static void main(String[] args) throws SQLException {

        //获取mysql的连接对象
        Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/football", "root", "123456");
        connection.setAutoCommit(false);
        PreparedStatement pps = connection.prepareStatement("insert into user values (?,?,?)");
        PreparedStatement pps_offset = connection.prepareStatement("insert into offset values (?,?) on duplicate key update offset = ?");

        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092,linux02:9092,linux03:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //设置手动提交偏移量参数,需要将自动提交给关掉
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        //设置从哪里开始消费
//        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        //设置组id
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group001");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //订阅主题
        consumer.subscribe(Arrays.asList("kafka2mysql"), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {

            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                for (TopicPartition topicPartition : collection) {
                    try {
                        PreparedStatement get_offset = connection.prepareStatement("select offset from offset where topic_partition = ?");
                        String topic = topicPartition.topic();
                        int partition = topicPartition.partition();
                        get_offset.setString(1, topic + "_" + partition);
                        ResultSet resultSet = get_offset.executeQuery();
                        if (resultSet.next()){
                            int offset = resultSet.getInt(1);
                            System.out.println("发生了再均衡,被分配了分区消费权,并且查到了目标分区的偏移量"+partition+" , "+offset);
                            //拿到了offset后就可以定位消费了
                            consumer.seek(new TopicPartition(topic, partition), offset);
                        }
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        //拉去数据后写入到mysql
        while (true) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
                for (ConsumerRecord<String, String> record : records) {
                    String data = record.value();
                    String[] arr = data.split(",");
                    String id = arr[0];
                    String name = arr[1];
                    String age = arr[2];

                    pps.setInt(1, Integer.parseInt(id));
                    pps.setString(2, name);
                    pps.setInt(3, Integer.parseInt(age));
                    pps.execute();

                    //埋个异常,看看是不是真的是这样
//                    if (Integer.parseInt(id) == 5) {
//                        throw new SQLException();
//                    }

                    long offset = record.offset();
                    int partition = record.partition();
                    String topic = record.topic();
                    pps_offset.setString(1, topic + "_" + partition);
                    pps_offset.setInt(2, (int) offset + 1);
                    pps_offset.setInt(3, (int) offset + 1);
                    pps_offset.execute();
                    //提交jdbc事务
                    connection.commit();
                }
            } catch (Exception e) {
                connection.rollback();
            }
        }
    }
}

 消费者提交偏移量方式的总结

consumer的消费位移提交方式:

  • 全自动

    • auto.offset.commit = true

    • 定时提交到consumer_offsets

  • 半自动

    • auto.offset.commit = false;

    • 然后手动触发提交 consumer.commitSync();

    • 提交到consumer_offsets

  • 全手动

    • auto.offset.commit = false;

    • 写自己的代码去把消费位移保存到你自己的地方mysql/zk/redis/

    • 提交到自己所涉及的存储;初始化时也需要自己去从自定义存储中查询到消费位移

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/608930.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【人工智能】— 线性分类器、感知机、损失函数的选取、最小二乘法分类、模型复杂性和过度拟合、规范化

【人工智能】— 感知机、线性分类器、感知机、感知机、最小二乘法分类、模型复杂性和过度拟合、规范化 Linear predictions 线性预测分类线性分类器感知机感知机学习策略损失函数的选取距离的计算 最小二乘法分类求解最小二乘分类矩阵解法一般线性分类模型复杂性和过度拟合训练…

重估端到端原则

评价技术迭代的旧的定势眼光来自于该技术诞生时。 1970/80/90 年代&#xff0c;相比传输带宽技术&#xff0c;处理器更强。网络协议倾向于字段多&#xff0c;字段小且紧凑&#xff0c;尽可能减少传输量&#xff0c;用 “算法技巧” 等价&#xff0c;如果 TCP 序列号 48 位&…

【iOS】消息传递和消息转发机制

消息传递机制 在OC语言中&#xff0c;调用对象的方法被叫做消息传递。消息有名称和选择子(selector)&#xff0c;可以接受参数&#xff0c;还可能有返回值。 在Objective-C中&#xff0c;如果向某对象传递消息&#xff0c;那就会使用动态绑定机制来决定需要调用的方法。在底层…

C++进阶 —— 范围for(C++11新特性)

目录 一&#xff0c;范围for介绍 二&#xff0c;范围for注意事项 一&#xff0c;范围for介绍 范围for&#xff08;range-based for loop&#xff09;是C11新引入的特性&#xff0c;可遍历各种序列结构的容器&#xff08;如数组、vector、list等&#xff09;&#xff1b;每次循…

【QT】Qt ApplicationManager Compositor源码分析

Qt ApplicationManager的Compositor功能分析 根据Qt ApplicationManager官网介绍&#xff0c;它基于Wayland协议实现了Compositor功能。下述为官网介绍。实际上&#xff0c;QtApplicationManager是使用了QtWayland模块来实现Compositor的。Wayland是一套旨在替代XWindow的 Com…

微机实验:第5章——存储器设计

存储器设计 将两片6116所有的存储单元都写入11H。 提示&#xff1a;6116的存储容量为2K*8b&#xff0c;片内地址为0000H-07FFH,两片一起构成F8000H-F8FFFH的内存空间。 仿真调试时可以看到&#xff1a;每片从0000H-07FFH的每个存储单元均显示11H。 CODE SEGMENTASSUME CS:C…

4-4 哈夫曼编码

博主简介&#xff1a;一个爱打游戏的计算机专业学生博主主页&#xff1a; 夏驰和徐策所属专栏&#xff1a;算法设计与分析 1.什么是哈夫曼编码&#xff1f; 哈夫曼编码&#xff08;Huffman coding&#xff09;是一种用于数据压缩的无损编码方法。它是由David A. Huffman在1952…

STM32F4_软件模拟SPI

目录 1. 硬件连接 2. SPI通讯协议 3. W25Q64 简介 4. 程序详解 4.1 main.c 4.2 SPI.c 4.3 SPI.h 4.4 W25Q128.c 4.5 W25Q128.h 4.6 OLED.c 4.7 OLED.h 4.8 OLED_Font.h 5. 实验结果 我们都知道&#xff0c;SPI 和 IIC 一样&#xff0c;都可以通过硬件方式和软件方…

JSON基础(待补充)

一、JSON初识 1.1基础认识 JSON是一种轻量级的数据交换格式&#xff0c;它基于JavaScript语言的对象表示法&#xff0c;可以在多种语言之间进行数据交换。JSON的基本数据类型有数值、字符串、布尔值、数组、对象和空值。JSON的格式简洁易读&#xff0c;也易于解析和处理。JSON…

【数据结构】由完全二叉树引申出的堆的实现

【数据结构】由完全二叉树引申出的堆的实现 一、什么是堆二、目标三、实现1、初始化工作2、堆的插入(堆的创建)2.1、向上调整建堆2.1.1、向上调整算法原理解析2.1.2、代码实现 2.2、向下调整建堆2.2.1、向下调整算法原理解析2.2.2、代码实现 2.3、“向上”和“向下”复杂度的差…

初识网络安全

目录 HTML前置基础知识 1、id和class区别&#xff1a; 2、一些常用的属性&#xff1a; 3、HTML字符编码和实体编码 4、URL介绍 网址的组成部分&#xff1a; TTL值 DNS工作原理和资源记录及其种类&#xff1a; 5、正确区分“加密”和“签名” 6、状态码 1xx &#xf…

如何安装pycharm

PyCharm是JetBrains公司推出的一款Python集成开发环境&#xff08;IDE&#xff09;&#xff0c;可以提供高效的Python代码编写、调试和测试。以下是一些PyCharm的主要功能&#xff1a; 代码智能提示和自动补全功能&#xff1b;支持调试和测试Python代码&#xff1b;完整的Pyth…

基于Springboot+Vue的幼儿园管理系统设计与实现

博主介绍&#xff1a; 大家好&#xff0c;我是一名在Java圈混迹十余年的程序员&#xff0c;精通Java编程语言&#xff0c;同时也熟练掌握微信小程序、Python和Android等技术&#xff0c;能够为大家提供全方位的技术支持和交流。 我擅长在JavaWeb、SSH、SSM、SpringBoot等框架下…

汽车相关知识及术语

1 汽车构造与制造流程 1.1 汽车构造 汽车可以分为四大部分 车身&#xff1a; 骨架、车身钣金件以及座椅、仪表、天窗、车外后视镜等车身附件 动力系统&#xff1a; 发动机和变速器 底盘&#xff1a; 传动系统、悬架系统、转向系统、制动系统和车轮轮胎 电气电子系统&#…

《Apollo 智能驾驶进阶课程》三、无人车自定位技术

1. 什么是无人车自定位系统 相对一个坐标系来确定无人车的位置和姿态 定位的指标要求大概分为三个部分&#xff1a;精度、鲁棒性、场景 定位精度必须控制在10厘米以内&#xff0c;才能使行驶中的自动驾驶车辆避免出现碰撞/车道偏离的情况。鲁棒性一般情况下用最大值来衡量。…

Java IO流详细教程

目录 一、IO介绍 IO流体系 字节流 字节输出流&#xff1a;FileoutputStream 字节输入流FilelnputStream 字符流 字符输入流 字符输出流 缓冲流 字节缓冲流 字符缓冲流 序列化、反序列化流 序列化/对象操作输出流 反序列化/对象操作输入流 打印流 字节打印流 字…

firewalld与iptables练习

1、禁止一个IP访问 iptables -I INPUT -s $ip -j REJECT 2、清空默认的防火墙默认规则&#xff1a; iptables -F 3、保存清空后的防火墙规则表 service iptables save 4、firewall-cmd --list-all #查看防火墙规则&#xff08;只显示/etc/firewalld/zones/public.xml中防火墙…

投票活动链接创建微信链接视频投票线上免费投票链接

近些年来&#xff0c;第三方的微信投票制作平台如雨后春笋般络绎不绝。随着手机的互联网的发展及微信开放平台各项基于手机能力的开放&#xff0c;更多人选择微信投票小程序平台&#xff0c;因为它有非常大的优势。 1.它比起微信公众号自带的投票系统、传统的H5投票系统有可以图…

从零手写操作系统之RVOS协作式多任务切换实现-03

从零手写操作系统之RVOS协作式多任务切换实现-03 任务&#xff08;task&#xff09;多任务 &#xff08;Multitask&#xff09;任务上下文&#xff08;Context&#xff09;多任务系统的分类协作式多任务 创建和初始化第 1 号任务切换到第一号任务执行协作式多任务 - 调度初始化…

字典树算法(C/C++)

目录 一、字典树算法的概念介绍 二、字典树算法的实现 三、例题 &#xff08;注&#xff1a;借鉴蓝桥杯国赛特训营&#xff09; 一、字典树算法的概念介绍 首先我们看下字典的组织方式 Trie 的核心思想是空间换时间。利用字符串的公共前缀来降低查询时间的开销以达到提高效…