4.API开发

准备: 创建项目 , 添加依赖
XML <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> </properties>
<dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.3.1</version> </dependency> </dependencies>
<!-- 依赖下载国内镜像库 --> <repositories> <repository> <id>nexus-aliyun</id> <name>Nexus aliyun</name> <layout>default</layout> <url>http://maven.aliyun.com/nexus/content/groups/public</url> <snapshots> <enabled>false</enabled> <updatePolicy>never</updatePolicy> </snapshots> <releases> <enabled>true</enabled> <updatePolicy>never</updatePolicy> </releases> </repository> </repositories>
<!-- maven插件下载国内镜像库 --> <pluginRepositories> <pluginRepository> <id>ali-plugin</id> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <snapshots> <enabled>false</enabled> <updatePolicy>never</updatePolicy> </snapshots> <releases> <enabled>true</enabled> <updatePolicy>never</updatePolicy> </releases> </pluginRepository> </pluginRepositories>
<build> <plugins>
<!-- 指定编译java的插件 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin>
<!-- 指定编译scala的插件 --> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.2.2</version> <executions> <execution> <goals> <goal>compile</goal> <goal>testCompile</goal> </goals> <configuration> <args> <arg>-dependencyfile</arg> <arg>${project.build.directory}/.scala_dependencies</arg> </args> </configuration> </execution> </executions> </plugin>
<!-- 把依赖jar中的用到的类,提取到自己的jar中 --> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-assembly-plugin</artifactId> <version>2.6</version> <configuration> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <!--下面是为了使用 mvn package命令,如果不加则使用mvn assembly--> <executions> <execution> <id>make-assemble</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin>
</plugins>
</build> |
4.1topic管理
如果希望将管理类的功能集成到公司内部的系统中,打造集管理、监控、运维、告警为一体的生态平台,那么就需要以程序调用 API 方式去实现。
工具类KafkaAdminClient可以用来管理broker、配置和ACL (Access Control List),管理topic

构造一个KafkaAdminClient
Java AdminClient adminClient = KafkaAdminClient.create(props); |
4.1.1列出主题
Java ListTopicsResult listTopicsResult = adminClient.listTopics(); Set<String> topics = listTopicsResult.names().get(); System.out.println(topics); |
4.1.2查看主题信息
Java DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList("tpc_4", "tpc_3")); Map<String, TopicDescription> res = describeTopicsResult.all().get(); Set<String> ksets = res.keySet(); for (String k : ksets) { System.out.println(res.get(k)); } |
4.1.2创建主题
Java // 参数配置 Properties props = new Properties(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"doit01:9092,doit02:9092,doit03:9092"); props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG,3000);
// 创建 admin client 对象 AdminClient adminClient = KafkaAdminClient.create(props); // 由服务端controller自行分配分区及副本所在broker NewTopic tpc_3 = new NewTopic("tpc_3", 2, (short) 1); // 手动指定分区及副本的broker分配 HashMap<Integer, List<Integer>> replicaAssignments = new HashMap<>(); // 分区0,分配到broker0,broker1 replicaAssignments.put(0,Arrays.asList(0,1)); // 分区1,分配到broker0,broker2 replicaAssignments.put(1,Arrays.asList(0,1));
NewTopic tpc_4 = new NewTopic("tpc_4", replicaAssignments); CreateTopicsResult result = adminClient.createTopics(Arrays.asList(tpc_3,tpc_4));
// 从future中等待服务端返回 try { result.all().get(); } catch (Exception e) { e.printStackTrace(); } adminClient.close(); |
4.1.3删除主题
Java DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("tpc_1", "tpc_1")); Map<String, KafkaFuture<Void>> values = deleteTopicsResult.values(); System.out.println(values); |
汇总代码示例:
Java package com.doit.ApisDemo;
import org.apache.kafka.clients.admin.*; import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartitionInfo;
import java.util.*; import java.util.concurrent.ExecutionException;
public class AdminDemo { public static void main(String[] args) { Properties props = new Properties(); props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092"); props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG,"3000"); //创建kafkaAdmin对象 AdminClient adminClient = KafkaAdminClient.create(props);
//列出所有主题 ListTopicsResult listTopicsResult = adminClient.listTopics(); System.out.println(listTopicsResult.names().get());
//创建topic NewTopic topic1 = new NewTopic("idea_topic_1", 2, (short) 2); NewTopic topic2 = new NewTopic("idea_topic_2", 3, (short) 3); //创建topic 指定分区数和副本数 CreateTopicsResult res1 = adminClient.createTopics(Arrays.asList(topic1, topic2)); try { res1.all().get(); } catch (InterruptedException e) { System.out.println("有异常了:"+e); } catch (ExecutionException e) { System.out.println("有异常了:"+e); }
//创建topic 手动指定分区和副本分别在哪个broker上 HashMap<Integer, List<Integer>> replicaAssignments = new HashMap<>(); //需要传一个key value key 代表的是broker value代表的是副本分别在哪些broker 上 replicaAssignments.put(0,Arrays.asList(0,1)); replicaAssignments.put(1,Arrays.asList(1,2));
//创建一个新的topic信息 NewTopic idea_topic_3 = new NewTopic("idea_topic_3", replicaAssignments); //创建topic CreateTopicsResult res2 = adminClient.createTopics(Arrays.asList(idea_topic_3)); try { res2.all().get(); } catch (InterruptedException e) { System.out.println("有异常了:"+e); } catch (ExecutionException e) { System.out.println("有异常了:"+e); }
//描述主题 DescribeTopicsResult res = adminClient.describeTopics(Arrays.asList("idea_topic_3")); //获取到主题的所有信息 Map<String, TopicDescription> map = res.all().get(); Set<Map.Entry<String, TopicDescription>> entries = map.entrySet(); for (Map.Entry<String, TopicDescription> entry : entries) { String topicName = entry.getKey(); TopicDescription des = entry.getValue(); //分区的详细信息,leader在哪,副本在哪,分区具体是什么情况,isr副本是什么情况 List<TopicPartitionInfo> partitions = des.partitions(); //主题的名称 String name = des.name(); System.out.println(topicName+","+name+","+partitions); }
//删除主图 DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("idea_topic_1", "idea_topic_2")); try { deleteTopicsResult.all().get(); } catch (InterruptedException e) { System.out.println("有异常了:"+e); } catch (ExecutionException e) { System.out.println("有异常了:"+e); } } } |
4.1生产者api示例
一个正常的生产逻辑需要具备以下几个步骤
- 配置生产者参数及创建相应的生产者实例
- 构建待发送的消息
- 发送消息
- 关闭生产者实例
采用默认分区方式将消息散列的发送到各个分区当中
Java package com.doitedu;
import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class KafkaProducerDemo { public static void main(String[] args) throws InterruptedException { /** * 1.构建一个kafka的客户端 * 2.创建一些待发送的消息,构建成kafka所需要的格式 * 3.调用kafka的api去发送消息 * 4.关闭kafka生产实例 */ //1.创建kafka的对象,配置一些kafka的配置文件 //它里面有一个泛型k,v //要发送数据的key //要发送的数据value //他有一个隐含之意,就是kafka发送的消息,是一个key,value类型的数据,但是不是必须得,其实只需要发送value的值就可以了 Properties pros = new Properties(); //指定kafka集群的地址 pros.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092"); //指定key的序列化方式 pros.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //指定value的序列化方式 pros.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //ack模式,取值有0,1,-1(all),all是最慢但最安全的 服务器应答生产者成功的策略 pros.put("acks", "all"); //这是kafka发送数据失败的重试次数,这个可能会造成发送数据的乱序问题 pros.setProperty("retries", "3"); //数据发送批次的大小 单位是字节 pros.setProperty("batch.size", "10000"); //一次数据发送请求所能发送的最大数据量 pros.setProperty("max.request.size", "102400"); //消息在缓冲区保留的时间,超过设置的值就会被提交到服务端 pros.put("linger.ms", 10000); //整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端 //buffer.memory要大于batch.size,否则会报申请内存不足的错误 pros.put("buffer.memory", 10240);
KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(pros); for (int i = 0; i < 1000; i++) { //key value 0 --> doit32+-->+0 //key value 1 --> doit32+-->+1 //key value 2 --> doit32+-->+2 //2.创建一些待发送的消息,构建成kafka所需要的格式 ProducerRecord<String, String> record = new ProducerRecord<>("test01", i + "", "doit32-->" + i); //3.调用kafka的api去发送消息 kafkaProducer.send(record); Thread.sleep(100); } kafkaProducer.flush(); kafkaProducer.close(); } } |
对于properties配置的第二种写法,相对来说不会出错,简单举例:
Java public static void main(String[] args) { Properties pros = new Properties(); pros.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092,linux02:9092,linux03:9092"); pros.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); pros.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); } |
小问题: 1.kafka的生产者可以持续不断的向topic中发送数据不? 可以 2.kafak的生产者有哪些必须配置的参数: //指定kafka集群的地址 pros.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092"); //指定key的序列化方式 pros.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer"); //指定value的序列化方式 pros.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer"); 3.kafka生产者发送数据的时候,可以用jdk的序列化器来将数据进行序列化不? 不可以,kafka有指定的序列化接口 org.apache.kafka.common.serialization.Serializer 4.构造一个kafka的生产者后,是不是就已经确定了,我的数据是需要发送到哪一个topic里面不? 不是哦,咱们构造生产者对象的时候不需要指定topic,是在构造发送数据对象的时候才指定的 |
4.2消费者Api示例
一个正常的消费逻辑需要具备以下几个步骤:
- 配置消费者客户端参数及创建相应的消费者实例;
- 订阅主题topic;
- 拉取消息并消费;
- 定期向__consumer_offsets主题提交消费位移offset;
- 关闭消费者实例
Java package com.doitedu;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration; import java.util.Arrays; import java.util.Iterator; import java.util.Optional; import java.util.Properties;
public class ConsumerDemo { public static void main(String[] args) { //1.创建kafka的消费者对象,附带着把配置文件搞定 Properties props = new Properties(); //props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092"); //props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); // 定义kakfa 服务的地址,不需要将所有broker指定上 // props.put("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092"); // 制定consumer group props.put("group.id", "g3"); // 是否自动提交offset __consumer_offset 有多少分区 50 props.put("enable.auto.commit", "true"); // 自动提交offset的时间间隔 -- 这玩意设置的大小怎么控制 props.put("auto.commit.interval.ms", "5000"); //50000 1000 // key的反序列化类 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的反序列化类 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 如果没有消费偏移量记录,则自动重设为起始offset:latest, earliest, none props.put("auto.offset.reset","earliest"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//2.订阅主题(确定需要消费哪一个或者多个主题) consumer.subscribe(Arrays.asList("test02")); //3.开始从topic中获取数据 while (true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE)); for (ConsumerRecord<String, String> record : records) { //这是数据所属的哪一个topic String topic = record.topic(); //该条数据的偏移量 long offset = record.offset(); //这条数据是哪一个分区的 int partition = record.partition(); //这条数据记录的时间戳,但是这个时间戳有两个类型 long timestamp = record.timestamp(); //上面时间戳的类型,这个类型有两个,一个是CreateTime(这条数据创建的时间), LogAppendTime(这条数据往日志里面追加的时间) TimestampType timestampType = record.timestampType(); //这个数据key的值 String key = record.key(); //这条数据value的值 String value = record.value(); //分区leader的纪元 Optional<Integer> integer = record.leaderEpoch(); //key的长度 int keySize = record.serializedKeySize(); //value的长度 int valueSize = record.serializedValueSize(); //数据的头部信息 Headers headers = record.headers(); // for (Header header : headers) { // String hKey = header.key(); // byte[] hValue = header.value(); // String valueString = new String(hValue); // System.out.println("header的key值 = " + hKey + "header的value的值 = "+ valueString); // } System.out.printf("topic = %s ,offset = %d, partition = %d, timestampType = %s ,timestamp = %d , key = %s , value = %s ,leader的纪元 = %d , key序列化的长度 = %d ,value 序列化的长度 = %d \r\n" , topic,offset,partition,timestampType + "",timestamp,key,value,integer.get(),keySize,valueSize); } }
//4.关闭消费者对象 // consumer.close(); } } |
4.2.1subscribe订阅主题
subscribe有如下重载方法:
Java public void subscribe(Collection<String> topics,ConsumerRebalanceListener listener) public void subscribe(Collection<String> topics) public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) public void subscribe(Pattern pattern) |
- 指定集合方式订阅主题
Java consumer.subscribe(Arrays.asList(topicl )); |
- 正则方式订阅主题
如果消费者采用的是正则表达式的方式(subscribe(Pattern))订阅, 在之后的过程中,如果有人又创建了新的主题,并且主题名字与正表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息。如果应用程序需要消费多个主题,并且可以处理不同的类型,那么这种订阅方式就很有效。
正则表达式的方式订阅的示例
Java consumer.subscribe(Pattern.compile ("topic.*" )); |
4.2.2assign订阅主题
消费者不仅可以通过 KafkaConsumer.subscribe() 方法订阅主题,还可直接订阅某些主题的指定分区;
在 KafkaConsumer 中提供了 assign() 方法来实现这些功能,此方法的具体定义如下:
Java package com.doitedu;
import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.header.Headers; import org.apache.kafka.common.record.TimestampType; import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration; import java.util.Arrays; import java.util.Optional; import java.util.Properties;
public class ConsumerDemo1 { public static void main(String[] args) { //1.创建kafka的消费者对象,附带着把配置文件搞定 Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"doit01");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
//2.订阅主题(确定需要消费哪一个或者多个主题) // consumer.subscribe(Arrays.asList("test03"));
// consumer.poll(Duration.ofMillis(Integer.MAX_VALUE)); // //我现在想手动指定,我需要从哪边开始消费 // //如果用subscribe去订阅主题的时候,他内部会给这个消费者组来一个自动再均衡 // consumer.seek(new TopicPartition("test03",0),2); TopicPartition tp01 = new TopicPartition("test03", 0);
//他就是手动去订阅主题和partition,有了这个就不需要再去订阅subscribe主题了,手动指定以后,他的内部就不会再来自动均衡了 consumer.assign(Arrays.asList(tp01)); // 手动订阅指定主题的指定分区的指定位置 consumer.seek(new TopicPartition("test03",0),2);
//3.开始从topic中获取数据 while (true){ ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE)); for (ConsumerRecord<String, String> record : records) { //这是数据所属的哪一个topic String topic = record.topic(); //该条数据的偏移量 long offset = record.offset(); //这条数据是哪一个分区的 int partition = record.partition(); //这条数据记录的时间戳,但是这个时间戳有两个类型 long timestamp = record.timestamp(); //上面时间戳的类型,这个类型有两个,一个是CreateTime(这条数据创建的时间), LogAppendTime(这条数据往日志里面追加的时间) TimestampType timestampType = record.timestampType(); //这个数据key的值 String key = record.key(); //这条数据value的值 String value = record.value();
//分区leader的纪元 Optional<Integer> integer = record.leaderEpoch(); //key的长度 int keySize = record.serializedKeySize(); //value的长度 int valueSize = record.serializedValueSize(); //数据的头部信息 Headers headers = record.headers(); // for (Header header : headers) { // String hKey = header.key(); // byte[] hValue = header.value(); // String valueString = new String(hValue); // System.out.println("header的key值 = " + hKey + "header的value的值 = "+ valueString); // } System.out.printf("topic = %s ,offset = %d, partition = %d, timestampType = %s ,timestamp = %d , key = %s , value = %s ,leader的纪元 = %d , key序列化的长度 = %d ,value 序列化的长度 = %d \r\n" , topic,offset,partition,timestampType + "",timestamp,key,value,integer.get(),keySize,valueSize); } }
//4.关闭消费者对象 // consumer.close(); } } |
这个方法只接受参数partitions,用来指定需要订阅的分区集合。示例如下:
Java consumer.assign(Arrays.asList(new TopicPartition ("tpc_1" , 0),new TopicPartition(“tpc_2”,1))) ; |
4.2.3subscribe与assign的区别
- 通过subscribe()方法订阅主题具有消费者自动再均衡功能 ;
在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。
- assign() 方法订阅分区时,是不具备消费者自动均衡的功能的;
其实这一点从assign方法参数可以看出端倪,两种类型subscribe()都有ConsumerRebalanceListener类型参数的方法,而assign()方法却没有。
4.2.4取消订阅
既然有订阅,那么就有取消订阅;
可以使用KafkaConsumer中的unsubscribe()方法采取消主题的订阅,这个方法既可以取消通过 subscribe( Collection)方式实现的订阅;
也可以取消通过subscribe(Pattem)方式实现的订阅,还可以取消通过assign( Collection)方式实现的订阅。示例码如下:
Java consumer.unsubscribe(); |
如果将subscribe(Collection )或assign(Collection)集合参数设置为空集合,作用与unsubscribe()方法相同,如下示例中三行代码的效果相同:
Java consumer.unsubscribe(); consumer.subscribe(new ArrayList<String>()) ; consumer.assign(new ArrayList<TopicPartition>()); |
4.2.5消息的消费模式
Kafka中的消费是基于拉取模式的。
消息的消费一般有两种模式:推送模式和拉取模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。 |
Java public class ConsumerRecord<K, V> { public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP; public static final int NULL_SIZE = -1; public static final int NULL_CHECKSUM = -1;
private final String topic; private final int partition; private final long offset; private final long timestamp; private final TimestampType timestampType; private final int serializedKeySize; private final int serializedValueSize; private final Headers headers; private final K key; private final V value;
private volatile Long checksum; |
- topic partition 这两个属性分别代表消息所属主题的名称和所在分区的编号。
- timestamp 表示时间戳,与此对应的timestampType 表示时间戳的类型。
- timestampType 有两种类型 CreateTime 和LogAppendTime ,分别代表消息创建的时间戳和消息追加到日志的时间戳。
- key value 分别表示消息的键和消息的值,一般业务应用要读取的就是value ;
- serializedKeySize、serializedValueSize分别表示key、value 经过序列化之后的大小,如果 key 为空,则 serializedKeySize 值为 -1,同样,如果value为空,则serializedValueSize 的值也会为 -1;
示例代码片段
Java /** * 订阅与消费方式2 */ TopicPartition tp1 = new TopicPartition("x", 0); TopicPartition tp2 = new TopicPartition("y", 0); TopicPartition tp3 = new TopicPartition("z", 0); List<TopicPartition> tps = Arrays.asList(tp1, tp2, tp3); consumer.assign(tps); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (TopicPartition tp : tps) { List<ConsumerRecord<String, String>> rList = records.records(tp); for (ConsumerRecord<String, String> r : rList) { r.topic(); r.partition(); r.offset(); r.value(); //do something to process record. } } } |
4.2.6指定位移消费
有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而 KafkaConsumer 中的seek()方法正好提供了这个功能,让我们可以追前消费或回溯消费。
seek()方法的具体定义如下:
Java seek都是和assign这个方法一起用 指定消费位置 public void seek(TopicPartiton partition,long offset) |
代码示例:
Java public class ConsumerDemo3指定偏移量消费 { public static void main(String[] args) {
Properties props = new Properties(); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"g002"); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"doit01:9092"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest"); // 是否自动提交消费位移 props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
// 限制一次poll拉取到的数据量的最大值 props.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,"10240000"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
// assign方式订阅doit27-1的两个分区 TopicPartition tp0 = new TopicPartition("doit27-1", 0); TopicPartition tp1 = new TopicPartition("doit27-1", 1); consumer.assign(Arrays.asList(tp0,tp1)); // 指定分区0,从offset:800开始消费 ; 分区1,从offset:650开始消费 consumer.seek(tp0,200); consumer.seek(tp1,250);
// 开始拉取消息 while(true){ ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(3000)); for (ConsumerRecord<String, String> rec : poll) { System.out.println(rec.partition()+","+rec.key()+","+rec.value()+","+rec.offset()); } } } } |
4.2.7自动提交消费者偏移量
Kafka中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数enable.auto.commit 配置,默认值为 true 。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数 auto.commit.interval.ms配置,默认值为5秒,此参数生效的前提是 enable. auto.commit 参数为 true。
在默认的方式下,消费者每隔5秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在 poll() 方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。
Kafka 消费的编程逻辑中位移提交是一大难点,自动提交消费位移的方式非常简便,它免去了复杂的位移提交逻辑,让编码更简洁。但随之而来的是重复消费和消息丢失的问题。
假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象(对于再均衡的情况同样适用)。我们可以通过减小位移提交的时间间隔来减小重复消息的窗口大小,但这样并不能避免重复消费的发送,而且也会使位移提交更加频繁。

按照一般思维逻辑而言,自动提交是延时提交,重复消费可以理解,那么消息丢失又是在什么情形下会发生的呢?我们来看下图中的情形:
拉取线程不断地拉取消息并存入本地缓存,比如在BlockingQueue 中,另一个处理线程从缓存中读取消息并进行相应的逻辑处理。设目前进行到了第 y+l 次拉取,以及第m次位移提交的时候,也就是 x+6 之前的位移己经确认提交了,处理线程却还正在处理x+3的消息;此时如果处理线程发生了异常,待其恢复之后会从第m次位移提交处,也就是 x+6 的位置开始拉取消息,那么 x+3至x+6 之间的消息就没有得到相应的处理,这样便发生消息丢失的现象。


4.2.8手动提交消费者偏移量(调用kafka api)
自动位移提交的方式在正常情况下不会发生消息丢失或重复消费的现象,但是在编程的世界里异常无可避免;同时,自动位移提交也无法做到精确的位移管理。在 Kafka中还提供了手动位移提交的方式,这样可以使得开发人员对消费位移的管理控制更加灵活。
很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费;
手动的提交方式可以让开发人员根据程序的逻辑在合适的时机进行位移提交。开启手动提交功能的前提是消费者客户端参数 enable.auto.commit 配置为false ,示例如下:
Java props.put(ConsumerConf.ENABLE_AUTO_COMMIT_CONFIG, false); |
手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync()和
commitAsync()两种类型的方法。
commitSync()方法的定义如下:
Java /** * 手动提交offset */ while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> r : records) { //do something to process record. } consumer.commitSync(); } |
对于采用 commitSync()的无参方法,它提交消费位移的频率和拉取批次消息、处理批次消息的频率是一样的,如果想寻求更细粒度的、更精准的提交,那么就需要使用commitSync()的另一个有参方法,具体定义如下:
Java public void commitSync(final Map<TopicPartition,OffsetAndMetadata> offsets) |
示例代码如下:
Java while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> r : records) { long offset = r.offset(); //do something to process record.
TopicPartition topicPartition = new TopicPartition(r.topic(), r.partition()); consumer.commitSync(Collections.singletonMap(topicPartition,new OffsetAndMetadata(offset+1))); } } |
提交的偏移量 = 消费完的record的偏移量 + 1 因为,__consumer_offsets中记录的消费偏移量,代表的是,消费者下一次要读取的位置!!! |
异步提交的方式( commitAsync())在执行的时候消费者线程不会被阻塞;可能在提交消费位移的结果还未返回之前就开始了新一次的拉取。异步提交可以让消费者的性能得到一定的增强。 commitAsync方法有一个不同的重载方法,具体定义如下

Java /** * 异步提交offset */ while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000)); for (ConsumerRecord<String, String> r : records) { long offset = r.offset();
//do something to process record. TopicPartition topicPartition = new TopicPartition(r.topic(), r.partition()); consumer.commitSync(Collections.singletonMap(topicPartition,new OffsetAndMetadata(offset+1))); consumer.commitAsync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset + 1)), new OffsetCommitCallback() { @Override public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) { if(e == null ){ System.out.println(map); }else{ System.out.println("error commit offset"); } } }); } } |
4.2.9手动提交位移(时机的选择)
可能会发生漏处理的现象(数据丢失)反过来说,这种方式实现了: at most once的数据处理(传递)语义
可能会发生重复处理的现象(数据重复)反过来说,这种方式实现了: at least once的数据处理(传递)语义当然,数据处理(传递)的理想语义是: exactly once(精确一次)Kafka也能做到exactly once(基于kafka的事务机制)
代码示例:
Java package com.doitedu;
import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.StringDeserializer;
import java.sql.*; import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Properties;
public class CommitOffsetByMyself { public static void main(String[] args) throws SQLException {
//获取mysql的连接对象 Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/football", "root", "123456"); connection.setAutoCommit(false); PreparedStatement pps = connection.prepareStatement("insert into user values (?,?,?)"); PreparedStatement pps_offset = connection.prepareStatement("insert into offset values (?,?) on duplicate key update offset = ?");
Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092,linux02:9092,linux03:9092"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); //设置手动提交偏移量参数,需要将自动提交给关掉 props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); //设置从哪里开始消费 // props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); //设置组id props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group001");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); //订阅主题 consumer.subscribe(Arrays.asList("kafka2mysql"), new ConsumerRebalanceListener() { @Override public void onPartitionsRevoked(Collection<TopicPartition> collection) {
}
@Override public void onPartitionsAssigned(Collection<TopicPartition> collection) { for (TopicPartition topicPartition : collection) { try { PreparedStatement get_offset = connection.prepareStatement("select offset from offset where topic_partition = ?"); String topic = topicPartition.topic(); int partition = topicPartition.partition(); get_offset.setString(1, topic + "_" + partition); ResultSet resultSet = get_offset.executeQuery(); if (resultSet.next()){ int offset = resultSet.getInt(1); System.out.println("发生了再均衡,被分配了分区消费权,并且查到了目标分区的偏移量"+partition+" , "+offset); //拿到了offset后就可以定位消费了 consumer.seek(new TopicPartition(topic, partition), offset); } } catch (SQLException e) { e.printStackTrace(); } } } });
//拉去数据后写入到mysql while (true) { try { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE)); for (ConsumerRecord<String, String> record : records) { String data = record.value(); String[] arr = data.split(","); String id = arr[0]; String name = arr[1]; String age = arr[2];
pps.setInt(1, Integer.parseInt(id)); pps.setString(2, name); pps.setInt(3, Integer.parseInt(age)); pps.execute();
//埋个异常,看看是不是真的是这样 // if (Integer.parseInt(id) == 5) { // throw new SQLException(); // }
long offset = record.offset(); int partition = record.partition(); String topic = record.topic(); pps_offset.setString(1, topic + "_" + partition); pps_offset.setInt(2, (int) offset + 1); pps_offset.setInt(3, (int) offset + 1); pps_offset.execute(); //提交jdbc事务 connection.commit(); } } catch (Exception e) { connection.rollback(); } } } } |
4.2.10消费者提交偏移量方式的总结
consumer的消费位移提交方式:
- auto.offset.commit = true
- 定时提交到consumer_offsets
- auto.offset.commit = false;
- 然后手动触发提交 consumer.commitSync();
- 提交到consumer_offsets
- auto.offset.commit = false;
- 写自己的代码去把消费位移保存到你自己的地方mysql/zk/redis/
- 提交到自己所涉及的存储;初始化时也需要自己去从自定义存储中查询到消费位移
4.2.11一些相对重要参数(了解)
Java fetch.min.bytes=1B // 一次拉取的最小字节数
fetch.max.bytes=50M //一次拉取的最大数据量 fetch.max.wait.ms=500ms //拉取时的最大等待时长 max.partition.fetch.bytes = 1MB //每个分区一次拉取的最大数据量 max.poll.records=500 //一次拉取的最大条数 connections.max.idle.ms=540000ms //网络连接的最大闲置时长 request.timeout.ms=30000ms //一次请求等待响应的最大超时时间 metadata.max.age.ms=300000ms //元数据在限定时间内没有进行更新,则会被强制更新 reconnect.backoff.ms=50ms //尝试重新连接指定主机之前的退避时间 ==》 TODO retry.backoff.ms=100ms //尝试重新拉取数据的重试间隔 //事务的时候会出现这个 isolation.level=read_uncommitted //隔离级别! 决定消费者能读到什么样的数据
read_uncommitted: //可以消费到LSO(LastStableOffset)位置; read_committed: //可以消费到HW(High Watermark)位置 max.poll.interval.ms //超过时限没有发起poll操作,则消费组认为该消费者已离开消费组 enable.auto.commit=true //开启消费位移的自动提交
auto.offset.reset = latest auto.commit.interval.ms=5000 //自动提交消费位移的时间间隔 |
4.2.12练一练
需求:写一个生产者,不断的去生产用户行为数据,写入到kafka的一个topic中 生产的数据格式: 造数据 {"guid":1,"eventId":"pageview","timestamp":1637868346789} isNew = 1 {"guid":1,"eventId":"addcard","timestamp":1637868347625} isNew = 0 {"guid":2,"eventId":"collect","timestamp":16378683463219} {"guid":3,"eventId":"paid","timestamp":16378683467829} ...... 再写一个消费者,不断的从kafka中消费上面的用户行为数据,做一个统计 1.每5s输出一次当前来了多少用户(去重) uv 2.将每条数据添加一个字段来标识,如果这个用户的id是第一次出现,那么就标注1,否则就是0 |
依赖:
XML <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>${kafka.version}</version> </dependency>
<dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency>
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.8.1</version> </dependency>
<dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.22</version> </dependency>
<!-- https://mvnrepository.com/artifact/org.roaringbitmap/RoaringBitmap --> <dependency> <groupId>org.roaringbitmap</groupId> <artifactId>RoaringBitmap</artifactId> <version>0.9.0</version> </dependency>
<!-- https://mvnrepository.com/artifact/com.google.guava/guava --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>31.1-jre</version> </dependency>
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.28</version> </dependency> |
生产者代码示例:
Java package com.doitedu;
import com.alibaba.fastjson.JSON; import lombok.AllArgsConstructor; import lombok.Getter; import lombok.NoArgsConstructor; import lombok.Setter; import org.apache.commons.lang3.RandomStringUtils; import org.apache.commons.lang3.RandomUtils; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/** * 验证数据: * 创建topic * kafka-topics.sh --create --topic event-log --zookeeper linux01:2181 --partitions 3 --replication-factor 3 * 搞个消费者消费数据 * kafka-console-consumer.sh --bootstrap-server linux01:9092 --topic event-log * {"eventId":"zTUAbXcWbn","guid":7170,"timeStamp":1659944455262} * {"eventId":"KSzaaNmczb","guid":9743,"timeStamp":1659944455823} * {"eventId":"FNUERLlCNu","guid":7922,"timeStamp":1659944456295} * {"eventId":"VmXVJHlpOF","guid":2505,"timeStamp":1659944458267} * {"eventId":"pMIHwLzSIE","guid":7668,"timeStamp":1659944460088} * {"eventId":"ZvGYIvmKTx","guid":3636,"timeStamp":1659944460461} * {"eventId":"jBanTDSlCO","guid":3468,"timeStamp":1659944460787} * {"eventId":"vXregpYeHu","guid":1107,"timeStamp":1659944462525} * {"eventId":"PComosCafr","guid":7765,"timeStamp":1659944463640} * {"eventId":"xCHFOYIJlb","guid":3443,"timeStamp":1659944464697} * {"eventId":"xDToApWwFo","guid":5034,"timeStamp":1659944465953} */ public class Exercise_kafka编程练习 { public static void main(String[] args) throws InterruptedException { MyData myData = new MyData(); myData.genData(); } }
class MyData{ KafkaProducer<String, String> producer = null; public MyData(){ Properties props = new Properties(); props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092"); props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); producer = new KafkaProducer<String, String>(props); } public void genData() throws InterruptedException { UserEvent userEvent = new UserEvent(); while (true){ //造数据 userEvent.setGuid(RandomUtils.nextInt(0,10000)); userEvent.setEventId(RandomStringUtils.randomAlphabetic(10)); userEvent.setTimeStamp(System.currentTimeMillis()); String json = JSON.toJSONString(userEvent); //数据造完了就往kafka中写 ProducerRecord<String, String> stringProducerRecord = new ProducerRecord<>("event-log", json); Thread.sleep(RandomUtils.nextInt(200,1000)); producer.send(stringProducerRecord); } } } /* {"guid":1,"eventId":"pageview","timestamp":1637868346789} {"guid":1,"eventId":"addcard","timestamp":1637868347625} {"guid":2,"eventId":"collect","timestamp":16378683463219} */ @NoArgsConstructor @AllArgsConstructor @Getter @Setter class UserEvent{ private Integer guid; private String eventId; private long timeStamp; } |
消费者代码示例:用hashset来实现:
Java package com.doitedu;
import com.alibaba.fastjson.JSON; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration; import java.util.*;
/** * 分两步走: * 第一步:一个消费者不断的去消费数据 * 第二步:5分钟计算一次,返回用户数这个结果 */ public class Exercise_consumerDemo { public static void main(String[] args) { HashSet<Integer> set = new HashSet<>(); new Thread(new ConsumerThread(set)).start(); //定时的任务调度 Timer timer = new Timer(); //调度,第一个参数,你给我一个任务, //第二个参数代表过多久之后我开始执行任务 //第三个参数代表每隔多久执行一次 timer.schedule(new ConsumerTask(set),5000,10000);
} }
class ConsumerThread implements Runnable { HashSet<Integer> set = null; KafkaConsumer<String, String> consumer = null;
public ConsumerThread(HashSet<Integer> set) { this.set = set; Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");
consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("event-log")); } /** * 重写run方法的话,我需要在里面实现什么逻辑? * 消费者消费数据,拿到数据以后,只需要获取到用户id * 将用户id写到hashset集合里面 */ @Override public void run() { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE)); for (ConsumerRecord<String, String> record : records) { String json = record.value(); UserEvent userEvent = JSON.parseObject(json, UserEvent.class); Integer guid = userEvent.getGuid(); set.add(guid); } } } }
class ConsumerTask extends TimerTask { HashSet<Integer> set = null;
public ConsumerTask(HashSet<Integer> set) { this.set = set; } /** * 这里面就是返回的一个用户数 */ @Override public void run() { int userCount = set.size(); System.out.println(System.currentTimeMillis() + ",截至到当前为止的一个用户数为:"+userCount); } } |
用hashset来实现很显然会出问题,如果数据量一直往上增长,会出现oom的问题,而且占用资源越来越多,影响电脑性能!!!
方案二:将HashSet改成bitMap来计数,就很完美,大逻辑不变,小逻辑就是将HashMap改成bitMap
Java package com.doitedu;
import com.alibaba.fastjson.JSON; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.roaringbitmap.RoaringBitmap;
import java.time.Duration; import java.util.*;
/** * 分两步走: * 第一步:一个消费者不断的去消费数据 * 第二步:5分钟计算一次,返回用户数这个结果 */ public class BitMap_consumerDemo { public static void main(String[] args) { //原来我用的是Hashset来记录,现在我用RoaringBitmap来记录 RoaringBitmap bitMap = RoaringBitmap.bitmapOf();
new Thread(new BitMapConsumerThread(bitMap)).start(); //定时的任务调度 Timer timer = new Timer(); timer.schedule(new BitMapConsumerTask(bitMap),1000,5000);
} }
class BitMapConsumerThread implements Runnable { RoaringBitmap bitMap = null; KafkaConsumer<String, String> consumer = null;
public BitMapConsumerThread(RoaringBitmap bitMap) { this.bitMap = bitMap; Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");
consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("event-log")); } /** * 重写run方法的话,我需要在里面实现什么逻辑? * 消费者消费数据,拿到数据以后,只需要获取到用户id * 将用户id写到hashset集合里面 */ @Override public void run() { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE)); for (ConsumerRecord<String, String> record : records) { String json = record.value(); UserEvent userEvent = JSON.parseObject(json, UserEvent.class); Integer guid = userEvent.getGuid(); bitMap.add(guid); } } } }
class BitMapConsumerTask extends TimerTask { RoaringBitmap bitMap = null;
public BitMapConsumerTask(RoaringBitmap bitMap) { this.bitMap = bitMap; } /** * 这里面就是返回的一个用户数 */ @Override public void run() { int userCount = bitMap.getCardinality(); System.out.println(System.currentTimeMillis() + ",截至到当前为止的一个用户数为:"+userCount); } } |
需求二:判断来没来过的问题,可以用bitmap来搞,当然还可以用布隆过滤器来搞
Java package com.doitedu;
import com.alibaba.fastjson.JSON; import com.google.common.hash.BloomFilter; import com.google.common.hash.Funnels; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration; import java.util.Arrays; import java.util.Properties; import java.util.Timer; import java.util.TimerTask;
/** * 用布隆过滤器来判定是否重复,当然,bitMap也是可以操作的 */ public class BloomFilter_consumerDemo { public static void main(String[] args) {
BloomFilter<Long> longBloomFilter = BloomFilter.create(Funnels.longFunnel(), 100000);
new Thread(new BloomFilterConsumerThread(longBloomFilter)).start(); } }
class BloomFilterConsumerThread implements Runnable { BloomFilter<Long> longBloomFilter = null; KafkaConsumer<String, String> consumer = null;
public BloomFilterConsumerThread(BloomFilter<Long> longBloomFilter) { this.longBloomFilter = longBloomFilter; Properties props = new Properties(); props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092"); props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001"); consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList("event-log")); }
/** * 重写run方法的话,我需要在里面实现什么逻辑? * 消费者消费数据,拿到数据以后,只需要获取到用户id * 将用户id写到hashset集合里面 */ @Override public void run() { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE)); for (ConsumerRecord<String, String> record : records) { String json = record.value(); UserEvent userEvent = JSON.parseObject(json, UserEvent.class); Integer guid = userEvent.getGuid(); boolean flag = longBloomFilter.mightContain((long) guid); if (flag) { userEvent.setIsNew(0); } else { userEvent.setIsNew(1); } //判断完成以后,得把他加进去 longBloomFilter.put((long) guid); System.out.println(JSON.toJSONString(userEvent)); } } } } |