kafka 02

news2025/2/25 8:23:27

4.API开发

准备: 创建项目 添加依赖

XML
 <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.3.1</version>
        </dependency>
    </dependencies>


    <!--
依赖下载国内镜像库 -->
    <repositories>
        <repository>
            <id>nexus-aliyun</id>
            <name>Nexus aliyun</name>
            <layout>default</layout>
            <url>http://maven.aliyun.com/nexus/content/groups/public</url>
            <snapshots>
                <enabled>false</enabled>
                <updatePolicy>never</updatePolicy>
            </snapshots>
            <releases>
                <enabled>true</enabled>
                <updatePolicy>never</updatePolicy>
            </releases>
        </repository>
    </repositories>

    <!-- maven插件下载国内镜像库 -->
    <pluginRepositories>
        <pluginRepository>
            <id>ali-plugin</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <snapshots>
                <enabled>false</enabled>
                <updatePolicy>never</updatePolicy>
            </snapshots>
            <releases>
                <enabled>true</enabled>
                <updatePolicy>never</updatePolicy>
            </releases>
        </pluginRepository>
    </pluginRepositories>


    <build>
        <plugins>

            <!-- 指定编译java的插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>

            <!-- 指定编译scala的插件 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                        <configuration>
                            <args>
                                <arg>-dependencyfile</arg>
                                <arg>${project.build.directory}/.scala_dependencies</arg>
                            </args>
                        </configuration>
                    </execution>
                </executions>
            </plugin>


            <!--  把依赖jar中的用到的类,提取到自己的jar中 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.6</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass></mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <!--下面是为了使用 mvn package命令,如果不加则使用mvn assembly-->
                <executions>
                    <execution>
                        <id>make-assemble</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>

        </plugins>

    </build>

 

4.1topic管理

如果希望将管理类的功能集成到公司内部的系统中,打造集管理、监控、运维、告警为一体的生态平台,那么就需要以程序调用 API 方式去实现。

工具类KafkaAdminClient可以用来管理broker、配置和ACL Access Control List),管理topic

构造一个KafkaAdminClient

Java
AdminClient adminClient = KafkaAdminClient.create(props);

4.1.1列出主题

Java
ListTopicsResult listTopicsResult = adminClient.listTopics();
Set
<String> topics = listTopicsResult.names().get();
System.out.println(topics);

4.1.2查看主题信息

Java
DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList("tpc_4", "tpc_3"));
Map
<String, TopicDescription> res = describeTopicsResult.all().get();
Set
<String> ksets = res.keySet();
for (String k : ksets) {
    System.out.println(res.get(k));
}

4.1.2创建主题

Java
// 参数配置
Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,
"doit01:9092,doit02:9092,doit03:9092");
props.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG,
3000);

// 创建 admin client 对象
AdminClient adminClient = KafkaAdminClient.create(props);
// 由服务端controller自行分配分区及副本所在broker
NewTopic tpc_3 = new NewTopic("tpc_3", 2, (short) 1);
// 手动指定分区及副本的broker分配
HashMap<Integer, List<Integer>> replicaAssignments = new HashMap<>();
// 分区0,分配到broker0,broker1
replicaAssignments.put(0,Arrays.asList(0,1));
// 分区1,分配到broker0,broker2
replicaAssignments.put(1,Arrays.asList(0,1));

NewTopic tpc_4
= new NewTopic("tpc_4", replicaAssignments);
CreateTopicsResult result
= adminClient.createTopics(Arrays.asList(tpc_3,tpc_4));

// 从future中等待服务端返回
try {
    result.all().get();
}
catch (Exception e) {
    e.printStackTrace();
}
adminClient.close();

4.1.3删除主题

Java
DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("tpc_1", "tpc_1"));
Map
<String, KafkaFuture<Void>> values = deleteTopicsResult.values();
System.out.println(values);

汇总代码示例:

Java
package com.doit.ApisDemo;

import org.apache.kafka.clients.admin.*;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartitionInfo;

import java.util.*;
import java.util.concurrent.ExecutionException;

public class AdminDemo {
    public static void main(String[] args)  {
        Properties props = new Properties();
        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092");
        props.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG,"3000");
        //
创建kafkaAdmin对象
        AdminClient adminClient = KafkaAdminClient.create(props);

//列出所有主题
        ListTopicsResult listTopicsResult = adminClient.listTopics();
        System.out.println(listTopicsResult.names().get());

//创建topic
        NewTopic topic1 = new NewTopic("idea_topic_1", 2, (short) 2);
        NewTopic topic2 = new NewTopic("idea_topic_2", 3, (short) 3);

       
       
//
创建topic  指定分区数和副本数
        CreateTopicsResult res1 = adminClient.createTopics(Arrays.asList(topic1, topic2));
        try {
            res1.all().get();
        } catch (InterruptedException e) {
            System.out.println("有异常了:"+e);
        } catch (ExecutionException e) {
            System.out.println("有异常了:"+e);        }



//创建topic 手动指定分区和副本分别在哪个broker上
        HashMap<Integer, List<Integer>> replicaAssignments = new HashMap<>();
        //需要传一个key value key 代表的是broker value代表的是副本分别在哪些broker 上
        replicaAssignments.put(0,Arrays.asList(0,1));
        replicaAssignments.put(1,Arrays.asList(1,2));

        //创建一个新的topic信息
        NewTopic idea_topic_3 = new NewTopic("idea_topic_3", replicaAssignments);
        //创建topic
        CreateTopicsResult res2 = adminClient.createTopics(Arrays.asList(idea_topic_3));
        try {
            res2.all().get();
        } catch (InterruptedException e) {
            System.out.println("有异常了:"+e);        } catch (ExecutionException e) {
            System.out.println("有异常了:"+e);        }


//描述主题
       DescribeTopicsResult res = adminClient.describeTopics(Arrays.asList("idea_topic_3"));
        //获取到主题的所有信息
        Map<String, TopicDescription> map = res.all().get();
        Set<Map.Entry<String, TopicDescription>> entries = map.entrySet();
        for (Map.Entry<String, TopicDescription> entry : entries) {
            String topicName = entry.getKey();
            TopicDescription des = entry.getValue();
            //分区的详细信息,leader在哪,副本在哪,分区具体是什么情况,isr副本是什么情况
            List<TopicPartitionInfo> partitions = des.partitions();
            //主题的名称
            String name = des.name();
            System.out.println(topicName+","+name+","+partitions);
        }


//删除主图
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("idea_topic_1", "idea_topic_2"));
        try {
            deleteTopicsResult.all().get();
        } catch (InterruptedException e) {
            System.out.println("有异常了:"+e);
        } catch (ExecutionException e) {
            System.out.println("有异常了:"+e);
        }
    }
}

4.1生产者api示例

一个正常的生产逻辑需要具备以下几个步骤

  1. 配置生产者参数及创建相应的生产者实例
  1. 构建待发送的消息
  1. 发送消息
  1. 关闭生产者实例

采用默认分区方式将消息散列的发送到各个分区当中

Java
package com.doitedu;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class KafkaProducerDemo {
    public static void main(String[] args) throws InterruptedException {
        /**
         * 1.
构建一个kafka的客户端
         * 2.创建一些待发送的消息,构建成kafka所需要的格式
         * 3.调用kafka的api去发送消息
         * 4.关闭kafka生产实例
         */
        //1.
创建kafka的对象,配置一些kafka的配置文件
        //它里面有一个泛型k,v
        //要发送数据的key
        //要发送的数据value
        //他有一个隐含之意,就是kafka发送的消息,是一个key,value类型的数据,但是不是必须得,其实只需要发送value的值就可以了
        Properties pros = new Properties();
        //
指定kafka集群的地址
        pros.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092");
        //
指定key的序列化方式
        pros.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //
指定value的序列化方式
        pros.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //ack
模式,取值有0,1,-1(all),all是最慢但最安全的  服务器应答生产者成功的策略
        pros.put("acks", "all");
        //
这是kafka发送数据失败的重试次数,这个可能会造成发送数据的乱序问题
        pros.setProperty("retries", "3");
        //
数据发送批次的大小 单位是字节
        pros.setProperty("batch.size", "10000");
        //
一次数据发送请求所能发送的最大数据量
        pros.setProperty("max.request.size", "102400");
        //
消息在缓冲区保留的时间,超过设置的值就会被提交到服务端
        pros.put("linger.ms", 10000);
        //
整个Producer用到总内存的大小,如果缓冲区满了会提交数据到服务端
        //buffer.memory要大于batch.size,否则会报申请内存不足的错误
        pros.put("buffer.memory", 10240);

        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(pros);
        for (int i = 0; i < 1000; i++) {
            //key value  0 --> doit32+-->+0
            //key value  1 --> doit32+-->+1
            //key value  2 --> doit32+-->+2
            //2.
创建一些待发送的消息,构建成kafka所需要的格式
            ProducerRecord<String, String> record = new ProducerRecord<>("test01", i + "", "doit32-->" + i);
            //3.
调用kafka的api去发送消息
            kafkaProducer.send(record);
            Thread.sleep(100);
        }
        kafkaProducer.flush();
        kafkaProducer.close();
    }
}

对于properties配置的第二种写法,相对来说不会出错,简单举例:

Java
public static void main(String[] args) {
    Properties pros = new Properties();
    pros.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092,linux02:9092,linux03:9092");
    pros.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    pros.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
}

小问题:

1.kafka的生产者可以持续不断的向topic中发送数据不?

可以

2.kafak的生产者有哪些必须配置的参数:

 //指定kafka集群的地址
pros.setProperty("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092");
//
指定key的序列化方式
pros.setProperty("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
//
指定value的序列化方式
pros.setProperty("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

3.kafka生产者发送数据的时候,可以用jdk的序列化器来将数据进行序列化不?

不可以,kafka有指定的序列化接口 org.apache.kafka.common.serialization.Serializer

4.构造一个kafka的生产者后,是不是就已经确定了,我的数据是需要发送到哪一个topic里面不?

不是哦,咱们构造生产者对象的时候不需要指定topic,是在构造发送数据对象的时候才指定的

4.2消费者Api示例

一个正常的消费逻辑需要具备以下几个步骤:

  1. 配置消费者客户端参数及创建相应的消费者实例;
  1. 订阅主题topic;
  1. 拉取消息并消费;
  1. 定期向__consumer_offsets主题提交消费位移offset;
  1. 关闭消费者实例

Java
package com.doitedu;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Optional;
import java.util.Properties;

public class ConsumerDemo {
    public static void main(String[] args) {
        //1.创建kafka的消费者对象,附带着把配置文件搞定
        Properties props = new Properties();
        //props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092");
        //props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        // 定义kakfa 服务的地址,不需要将所有broker指定上
       // props.put("bootstrap.servers", "linux01:9092,linux02:9092,linux03:9092");
        // 制定consumer group
        props.put("group.id", "g3");
        // 是否自动提交offset  __consumer_offset   有多少分区  50
        props.put("enable.auto.commit", "true");
        // 自动提交offset的时间间隔   -- 这玩意设置的大小怎么控制
        props.put("auto.commit.interval.ms", "5000");  //50000   1000
        // key的反序列化类
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // value的反序列化类
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // 如果没有消费偏移量记录,则自动重设为起始offset:latest, earliest, none
        props.put("auto.offset.reset","earliest");
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        //2.订阅主题(确定需要消费哪一个或者多个主题)
        consumer.subscribe(Arrays.asList("test02"));
        //3.开始从topic中获取数据
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
            for (ConsumerRecord<String, String> record : records) {
                //这是数据所属的哪一个topic
                String topic = record.topic();
                //该条数据的偏移量
                long offset = record.offset();
                //这条数据是哪一个分区的
                int partition = record.partition();
                //这条数据记录的时间戳,但是这个时间戳有两个类型
                long timestamp = record.timestamp();
                //上面时间戳的类型,这个类型有两个,一个是CreateTime(这条数据创建的时间), LogAppendTime(这条数据往日志里面追加的时间)
                TimestampType timestampType = record.timestampType();
                //这个数据key的值
                String key = record.key();
                //这条数据value的值
                String value = record.value();
                //分区leader的纪元
                Optional<Integer> integer = record.leaderEpoch();
                //key的长度
                int keySize = record.serializedKeySize();
                //value的长度
                int valueSize = record.serializedValueSize();
                //数据的头部信息
                Headers headers = record.headers();
//            for (Header header : headers) {
//                String hKey = header.key();
//                byte[] hValue = header.value();
//                String valueString = new String(hValue);
//                System.out.println("header的key值 = " + hKey + "header的value的值 = "+ valueString);
//            }
                System.out.printf("topic = %s ,offset = %d, partition = %d, timestampType = %s ,timestamp = %d , key = %s , value = %s ,leader的纪元 = %d , key序列化的长度 = %d ,value 序列化的长度 = %d \r\n" ,
                        topic,offset,partition,timestampType + "",timestamp,key,value,integer.get(),keySize,valueSize);
            }
        }

        //4.关闭消费者对象
//        consumer.close();
    }
}

4.2.1subscribe订阅主题

subscribe有如下重载方法:

Java
public void subscribe(Collection<String> topics,ConsumerRebalanceListener listener)
public void subscribe(Collection<String> topics)
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener)
public void subscribe(Pattern pattern)

  1. 指定集合方式订阅主题

Java
consumer.subscribe(Arrays.asList(topicl ));

  1. 正则方式订阅主题

如果消费者采用的是正则表达式的方式(subscribe(Pattern))订阅, 在之后的过程中,如果有人又创建了新的主题,并且主题名字与正表达式相匹配,那么这个消费者就可以消费到新添加的主题中的消息。如果应用程序需要消费多个主题,并且可以处理不同的类型,那么这种订阅方式就很有效。

正则表达式的方式订阅的示例

Java
consumer.subscribe(Pattern.compile ("topic.*" ));

利用正则表达式订阅主题,可实现动态订阅

4.2.2assign订阅主题

消费者不仅可以通过 KafkaConsumer.subscribe() 方法订阅主题,还可直接订阅某些主题的指定分区;

 在 KafkaConsumer 中提供了 assign() 方法来实现这些功能,此方法的具体定义如下:

Java
package com.doitedu;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Optional;
import java.util.Properties;

public class ConsumerDemo1 {
    public static void main(String[] args) {
        //1.
创建kafka的消费者对象,附带着把配置文件搞定
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"doit01");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

        //2.
订阅主题(确定需要消费哪一个或者多个主题)
//        consumer.subscribe(Arrays.asList("test03"));

//        consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
//        //我现在想手动指定,我需要从哪边开始消费
//        //如果用subscribe去订阅主题的时候,他内部会给这个消费者组来一个自动再均衡
//        consumer.seek(new TopicPartition("test03",0),2);
        TopicPartition tp01 = new TopicPartition("test03", 0);

        //
他就是手动去订阅主题和partition,有了这个就不需要再去订阅subscribe主题了,手动指定以后,他的内部就不会再来自动均衡了
        consumer.assign(Arrays.asList(tp01)); // 手动订阅指定主题的指定分区的指定位置
        consumer.seek(new TopicPartition("test03",0),2);

        //3.开始从topic中获取数据
        while (true){
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
            for (ConsumerRecord<String, String> record : records) {
                //这是数据所属的哪一个topic
                String topic = record.topic();
                //该条数据的偏移量
                long offset = record.offset();
                //这条数据是哪一个分区的
                int partition = record.partition();
                //这条数据记录的时间戳,但是这个时间戳有两个类型
                long timestamp = record.timestamp();
                //上面时间戳的类型,这个类型有两个,一个是CreateTime(这条数据创建的时间), LogAppendTime(这条数据往日志里面追加的时间)
                TimestampType timestampType = record.timestampType();
                //这个数据key的值
                String key = record.key();
                //这条数据value的值
                String value = record.value();

                //分区leader的纪元
                Optional<Integer> integer = record.leaderEpoch();
                //key的长度
                int keySize = record.serializedKeySize();
                //value的长度
                int valueSize = record.serializedValueSize();
                //数据的头部信息
                Headers headers = record.headers();
//            for (Header header : headers) {
//                String hKey = header.key();
//                byte[] hValue = header.value();
//                String valueString = new String(hValue);
//                System.out.println("header的key值 = " + hKey + "header的value的值 = "+ valueString);
//            }
                System.out.printf("topic = %s ,offset = %d, partition = %d, timestampType = %s ,timestamp = %d , key = %s , value = %s ,leader的纪元 = %d , key序列化的长度 = %d ,value 序列化的长度 = %d \r\n" ,
                        topic,offset,partition,timestampType + "",timestamp,key,value,integer.get(),keySize,valueSize);
            }
        }

        //4.关闭消费者对象
//        consumer.close();
    }
}

这个方法只接受参数partitions,用来指定需要订阅的分区集合。示例如下:

Java
consumer.assign(Arrays.asList(new TopicPartition ("tpc_1" , 0),new TopicPartition(“tpc_2”,1))) ;

4.2.3subscribeassign的区别

  • 通过subscribe()方法订阅主题具有消费者自动再均衡功能 ;

在多个消费者的情况下可以根据分区分配策略来自动分配各个消费者与分区的关系。当消费组的消费者增加或减少时,分区分配关系会自动调整,以实现消费负载均衡及故障自动转移。

  • assign() 方法订阅分区时,是不具备消费者自动均衡的功能的;

其实这一点从assign方法参数可以看出端倪,两种类型subscribe()都有ConsumerRebalanceListener类型参数的方法,而assign()方法却没有。

4.2.4取消订阅

既然有订阅,那么就有取消订阅;

可以使用KafkaConsumer中的unsubscribe()方法采取消主题的订阅,这个方法既可以取消通过 subscribe( Collection)方式实现的订阅;

也可以取消通过subscribe(Pattem)方式实现的订阅,还可以取消通过assign( Collection)方式实现的订阅。示例码如下:

Java
consumer.unsubscribe();

如果将subscribe(Collection )或assign(Collection)集合参数设置为空集合,作用与unsubscribe()方法相同,如下示例中三行代码的效果相同:

Java
consumer.unsubscribe();
consumer.subscribe(new ArrayList<String>()) ;
consumer.assign(new ArrayList<TopicPartition>());

4.2.5消息的消费模式

Kafka中的消费是基于拉取模式的。

消息的消费一般有两种模式:推送模式和拉取模式。推模式是服务端主动将消息推送给消费者,而拉模式是消费者主动向服务端发起请求来拉取消息。

Java
public class ConsumerRecord<K, V> {
    public static final long NO_TIMESTAMP = RecordBatch.NO_TIMESTAMP;
    public static final int NULL_SIZE = -1;
    public static final int NULL_CHECKSUM = -1;

    private final String topic;
    private final int partition;
    private final long offset;
    private final long timestamp;
    private final TimestampType timestampType;
    private final int serializedKeySize;
    private final int serializedValueSize;
    private final Headers headers;
    private final K key;
    private final V value;

    private volatile Long checksum;

  • topic partition 这两个属性分别代表消息所属主题的名称和所在分区的编号。
  • offset 表示消息在所属分区的偏移量。
  • timestamp 表示时间戳,与此对应的timestampType 表示时间戳的类型。
  • timestampType 有两种类型 CreateTime 和LogAppendTime ,分别代表消息创建的时间戳和消息追加到日志的时间戳。
  • headers 表示消息的头部内容。
  • key value 分别表示消息的键和消息的值,一般业务应用要读取的就是value ;
  • serializedKeySize、serializedValueSize分别表示key、value 经过序列化之后的大小,如果 key 为空,则 serializedKeySize 值为 -1,同样,如果value为空,则serializedValueSize 的值也会为 -1;
  • checksum 是CRC32的校验值。

示例代码片段

Java
/**
 *
订阅与消费方式2
 */
TopicPartition tp1 = new TopicPartition("x", 0);
TopicPartition tp2 = new TopicPartition("y", 0);
TopicPartition tp3 = new TopicPartition("z", 0);
List<TopicPartition> tps = Arrays.asList(tp1, tp2, tp3);
consumer.assign(tps);

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (TopicPartition tp : tps) {
        List<ConsumerRecord<String, String>> rList = records.records(tp);
        for (ConsumerRecord<String, String> r : rList) {
            r.topic();
            r.partition();
            r.offset();
            r.value();
            //do something to process record.
        }
    }
}

4.2.6指定位移消费

有些时候,我们需要一种更细粒度的掌控,可以让我们从特定的位移处开始拉取消息,而 KafkaConsumer 中的seek()方法正好提供了这个功能,让我们可以追前消费或回溯消费。

seek()方法的具体定义如下:

Java
seek都是和assign这个方法一起用 指定消费位置
public void seek(TopicPartiton partition,long offset)

代码示例:

Java
public class ConsumerDemo3指定偏移量消费 {
   
public static void main(String[] args) {

        Properties props
= new Properties();
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
"g002");
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
"doit01:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.
class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.
class.getName());
        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"latest");
       
// 是否自动提交消费位移
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");

       
// 限制一次poll拉取到的数据量的最大值
        props.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG,"10240000");
         KafkaConsumer
<String, String> consumer = new KafkaConsumer<>(props);

       
// assign方式订阅doit27-1的两个分区
        TopicPartition tp0 = new TopicPartition("doit27-1", 0);
        TopicPartition tp1
= new TopicPartition("doit27-1", 1);
       
        consumer.assign(Arrays.asList(tp0,tp1));
       
// 指定分区0,从offset:800开始消费    ;  分区1,从offset:650开始消费
        consumer.seek(tp0,200);
        consumer.seek(tp1,
250);

       
// 开始拉取消息
        while(true){
            ConsumerRecords
<String, String> poll = consumer.poll(Duration.ofMillis(3000));
           
for (ConsumerRecord<String, String> rec : poll) {
                System.out.println(rec.partition()
+","+rec.key()+","+rec.value()+","+rec.offset());
            }
        }
    }
}

4.2.7自动提交消费者偏移量

Kafka中默认的消费位移的提交方式是自动提交,这个由消费者客户端参数enable.auto.commit 配置,默认值为 true 。当然这个默认的自动提交不是每消费一条消息就提交一次,而是定期提交,这个定期的周期时间由客户端参数 auto.commit.interval.ms配置,默认值为5秒,此参数生效的前提是 enable. auto.commit 参数为 true。

在默认的方式下,消费者每隔5秒会将拉取到的每个分区中最大的消息位移进行提交。自动位移提交的动作是在 poll() 方法的逻辑里完成的,在每次真正向服务端发起拉取请求之前会检查是否可以进行位移提交,如果可以,那么就会提交上一次轮询的位移。

Kafka 消费的编程逻辑中位移提交是一大难点,自动提交消费位移的方式非常简便,它免去了复杂的位移提交逻辑,让编码更简洁。但随之而来的是重复消费和消息丢失的问题

  • 重复消费

假设刚刚提交完一次消费位移,然后拉取一批消息进行消费,在下一次自动提交消费位移之前,消费者崩溃了,那么又得从上一次位移提交的地方重新开始消费,这样便发生了重复消费的现象(对于再均衡的情况同样适用)。我们可以通过减小位移提交的时间间隔来减小重复消息的窗口大小,但这样并不能避免重复消费的发送,而且也会使位移提交更加频繁。

  • 丢失消息

按照一般思维逻辑而言,自动提交是延时提交,重复消费可以理解,那么消息丢失又是在什么情形下会发生的呢?我们来看下图中的情形:

拉取线程不断地拉取消息并存入本地缓存,比如在BlockingQueue 中,另一个处理线程从缓存中读取消息并进行相应的逻辑处理。设目前进行到了第 y+l 次拉取,以及第m次位移提交的时候,也就是 x+6 之前的位移己经确认提交了,处理线程却还正在处理x+3的消息;此时如果处理线程发生了异常,待其恢复之后会从第m次位移提交处,也就是 x+6 的位置开始拉取消息,那么 x+3至x+6 之间的消息就没有得到相应的处理,这样便发生消息丢失的现象。

4.2.8手动提交消费者偏移量(调用kafka api

自动位移提交的方式在正常情况下不会发生消息丢失或重复消费的现象,但是在编程的世界里异常无可避免;同时,自动位移提交也无法做到精确的位移管理。在 Kafka中还提供了手动位移提交的方式,这样可以使得开发人员对消费位移的管理控制更加灵活。

很多时候并不是说拉取到消息就算消费完成,而是需要将消息写入数据库、写入本地缓存,或者是更加复杂的业务处理。在这些场景下,所有的业务处理完成才能认为消息被成功消费;

手动的提交方式可以让开发人员根据程序的逻辑在合适的时机进行位移提交。开启手动提交功能的前提是消费者客户端参数 enable.auto.commit 配置为false ,示例如下:

Java
props.put(ConsumerConf.ENABLE_AUTO_COMMIT_CONFIG, false);

手动提交可以细分为同步提交和异步提交,对应于 KafkaConsumer 中的 commitSync()和

commitAsync()两种类型的方法。

  • 同步提交的方式

commitSync()方法的定义如下:

Java
/**
 * 手动提交offset
 */
while (true) {
    ConsumerRecords
<String, String> records = consumer.poll(Duration.ofMillis(1000));
   
for (ConsumerRecord<String, String> r : records) {
       
//do something to process record.
    }
    consumer.commitSync();
}

对于采用 commitSync()的无参方法,它提交消费位移的频率和拉取批次消息、处理批次消息的频率是一样的,如果想寻求更细粒度的、更精准的提交,那么就需要使用commitSync()的另一个有参方法,具体定义如下:

Java
public void commitSync(final Map<TopicPartition,OffsetAndMetadata> offsets)

示例代码如下:

Java
while (true) {
    ConsumerRecords
<String, String> records = consumer.poll(Duration.ofMillis(1000));
   
for (ConsumerRecord<String, String> r : records) {
       
long offset = r.offset();
       
//do something to process record.

        TopicPartition topicPartition = new TopicPartition(r.topic(), r.partition());
        consumer.commitSync(Collections.singletonMap(topicPartition,
new OffsetAndMetadata(offset+1)));
    }
}

提交的偏移量  =  消费完的record的偏移量  +  1

因为,__consumer_offsets中记录的消费偏移量,代表的是,消费者下一次要读取的位置!!!

  • 异步提交方式

异步提交的方式( commitAsync())在执行的时候消费者线程不会被阻塞;可能在提交消费位移的结果还未返回之前就开始了新一次的拉取。异步提交可以让消费者的性能得到一定的增强。 commitAsync方法有一个不同的重载方法,具体定义如下

Java
/**
 * 异步提交offset
 */
while (true) {
    ConsumerRecords
<String, String> records = consumer.poll(Duration.ofMillis(1000));
   
for (ConsumerRecord<String, String> r : records) {
       
long offset = r.offset();

       
//do something to process record.
        TopicPartition topicPartition = new TopicPartition(r.topic(), r.partition());
        consumer.commitSync(Collections.singletonMap(topicPartition,
new OffsetAndMetadata(offset+1)));
        consumer.commitAsync(Collections.singletonMap(topicPartition,
new OffsetAndMetadata(offset + 1)), new OffsetCommitCallback() {
    
@Override
     public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
               
if(e == null ){
                    System.out.println(map);
                }
else{
                    System.out.println(
"error commit offset");
                }
            }
        });
    }
}

4.2.9手动提交位移(时机的选择)

  • 数据处理完成之前先提交偏移量

可能会发生漏处理的现象(数据丢失)反过来说,这种方式实现了: at most once的数据处理(传递)语义

  • 数据处理完成之后再提交偏移量

可能会发生重复处理的现象(数据重复)反过来说,这种方式实现了: at least once的数据处理(传递)语义当然,数据处理(传递)的理想语义是: exactly once(精确一次)Kafka也能做到exactly once(基于kafka的事务机制)

代码示例:

Java
package com.doitedu;

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.sql.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Properties;

public class CommitOffsetByMyself {
    public static void main(String[] args) throws SQLException {

        //
获取mysql的连接对象
        Connection connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/football", "root", "123456");
        connection.setAutoCommit(false);
        PreparedStatement pps = connection.prepareStatement("insert into user values (?,?,?)");
        PreparedStatement pps_offset = connection.prepareStatement("insert into offset values (?,?) on duplicate key update offset = ?");

        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092,linux02:9092,linux03:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //
设置手动提交偏移量参数,需要将自动提交给关掉
        props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        //
设置从哪里开始消费
//        props.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        //
设置组id
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group001");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);
        //
订阅主题
        consumer.subscribe(Arrays.asList("kafka2mysql"), new ConsumerRebalanceListener() {
            @Override
            public void onPartitionsRevoked(Collection<TopicPartition> collection) {

            }

            @Override
            public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                for (TopicPartition topicPartition : collection) {
                    try {
                        PreparedStatement get_offset = connection.prepareStatement("select offset from offset where topic_partition = ?");
                        String topic = topicPartition.topic();
                        int partition = topicPartition.partition();
                        get_offset.setString(1, topic + "_" + partition);
                        ResultSet resultSet = get_offset.executeQuery();
                        if (resultSet.next()){
                            int offset = resultSet.getInt(1);
                            System.out.println("
发生了再均衡,被分配了分区消费权,并且查到了目标分区的偏移量"+partition+" , "+offset);
                            //
拿到了offset后就可以定位消费了
                            consumer.seek(new TopicPartition(topic, partition), offset);
                        }
                    } catch (SQLException e) {
                        e.printStackTrace();
                    }
                }
            }
        });

        //
拉去数据后写入到mysql
        while (true) {
            try {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
                for (ConsumerRecord<String, String> record : records) {
                    String data = record.value();
                    String[] arr = data.split(",");
                    String id = arr[0];
                    String name = arr[1];
                    String age = arr[2];

                    pps.setInt(1, Integer.parseInt(id));
                    pps.setString(2, name);
                    pps.setInt(3, Integer.parseInt(age));
                    pps.execute();

                    //
埋个异常,看看是不是真的是这样
//                    if (Integer.parseInt(id) == 5) {
//                        throw new SQLException();
//                    }

                    long offset = record.offset();
                    int partition = record.partition();
                    String topic = record.topic();
                    pps_offset.setString(1, topic + "_" + partition);
                    pps_offset.setInt(2, (int) offset + 1);
                    pps_offset.setInt(3, (int) offset + 1);
                    pps_offset.execute();
                    //
提交jdbc事务
                    connection.commit();
                }
            } catch (Exception e) {
                connection.rollback();
            }
        }
    }
}

4.2.10消费者提交偏移量方式的总结

consumer的消费位移提交方式:

  • 全自动 
  1. auto.offset.commit = true
  1. 定时提交到consumer_offsets
  • 半自动 
  1. auto.offset.commit = false;  
  1. 然后手动触发提交 consumer.commitSync();
  1. 提交到consumer_offsets
  • 全手动 
  1. auto.offset.commit = false;  
  1. 写自己的代码去把消费位移保存到你自己的地方mysql/zk/redis/
  1. 提交到自己所涉及的存储;初始化时也需要自己去从自定义存储中查询到消费位移

4.2.11一些相对重要参数(了解)

Java
fetch.min.bytes=1B   // 一次拉取的最小字节数

fetch.max.bytes=50M  //一次拉取的最大数据量
 
fetch.max.wait.ms=500ms  //拉取时的最大等待时长
 
max.partition.fetch.bytes = 1MB  //每个分区一次拉取的最大数据量
 
max.poll.records=500 //一次拉取的最大条数
 
connections.max.idle.ms=540000ms //网络连接的最大闲置时长
 
request.timeout.ms=30000ms  //一次请求等待响应的最大超时时间
 
metadata.max.age.ms=300000ms  //元数据在限定时间内没有进行更新,则会被强制更新
 
reconnect.backoff.ms=50ms  //尝试重新连接指定主机之前的退避时间  ==》 TODO
 
retry.backoff.ms=100ms  //尝试重新拉取数据的重试间隔
 
 //事务的时候会出现这个

isolation.level=read_uncommitted  //隔离级别! 决定消费者能读到什么样的数据

read_uncommitted:  //可以消费到LSO(LastStableOffset)位置;
read_committed:  //可以消费到HW(High Watermark)位置
 
max.poll.interval.ms  //超过时限没有发起poll操作,则消费组认为该消费者已离开消费组
 
enable.auto.commit=true  //开启消费位移的自动提交

auto.offset.reset = latest
 
auto.commit.interval.ms=5000  //自动提交消费位移的时间间隔

4.2.12练一练

需求:写一个生产者,不断的去生产用户行为数据,写入到kafka的一个topic

生产的数据格式:  造数据

{"guid":1,"eventId":"pageview","timestamp":1637868346789}  isNew = 1

{"guid":1,"eventId":"addcard","timestamp":1637868347625}   isNew = 0

{"guid":2,"eventId":"collect","timestamp":16378683463219}

{"guid":3,"eventId":"paid","timestamp":16378683467829}

......

再写一个消费者,不断的从kafka中消费上面的用户行为数据,做一个统计

1.5s输出一次当前来了多少用户(去重)  uv 

2.将每条数据添加一个字段来标识,如果这个用户的id是第一次出现,那么就标注1,否则就是0

依赖:

XML
<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>${kafka.version}</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.75</version>
    </dependency>

    <dependency>
        <groupId>org.apache.commons</groupId>
        <artifactId>commons-lang3</artifactId>
        <version>3.8.1</version>
    </dependency>


    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.22</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.roaringbitmap/RoaringBitmap -->
    <dependency>
        <groupId>org.roaringbitmap</groupId>
        <artifactId>RoaringBitmap</artifactId>
        <version>0.9.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->
    <dependency>
        <groupId>com.google.guava</groupId>
        <artifactId>guava</artifactId>
        <version>31.1-jre</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.28</version>
    </dependency>

生产者代码示例:

Java
package com.doitedu;

import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 *
验证数据:
 * 创建topic
 * kafka-topics.sh --create --topic event-log --zookeeper linux01:2181 --partitions 3 --replication-factor 3
 *
搞个消费者消费数据
 * kafka-console-consumer.sh  --bootstrap-server linux01:9092 --topic event-log
 * {"eventId":"zTUAbXcWbn","guid":7170,"timeStamp":1659944455262}
 * {"eventId":"KSzaaNmczb","guid":9743,"timeStamp":1659944455823}
 * {"eventId":"FNUERLlCNu","guid":7922,"timeStamp":1659944456295}
 * {"eventId":"VmXVJHlpOF","guid":2505,"timeStamp":1659944458267}
 * {"eventId":"pMIHwLzSIE","guid":7668,"timeStamp":1659944460088}
 * {"eventId":"ZvGYIvmKTx","guid":3636,"timeStamp":1659944460461}
 * {"eventId":"jBanTDSlCO","guid":3468,"timeStamp":1659944460787}
 * {"eventId":"vXregpYeHu","guid":1107,"timeStamp":1659944462525}
 * {"eventId":"PComosCafr","guid":7765,"timeStamp":1659944463640}
 * {"eventId":"xCHFOYIJlb","guid":3443,"timeStamp":1659944464697}
 * {"eventId":"xDToApWwFo","guid":5034,"timeStamp":1659944465953}
 */
public class Exercise_kafka
编程练习 {
    public static void main(String[] args) throws InterruptedException {
        MyData myData = new MyData();
        myData.genData();
    }
}

class MyData{
    KafkaProducer<String, String> producer = null;
    public MyData(){
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"linux01:9092,linux02:9092,linux03:9092");
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        producer = new KafkaProducer<String, String>(props);
    }
   
    public void genData() throws InterruptedException {
        UserEvent userEvent = new UserEvent();
        while (true){
            //造数据
            userEvent.setGuid(RandomUtils.nextInt(0,10000));
            userEvent.setEventId(RandomStringUtils.randomAlphabetic(10));
            userEvent.setTimeStamp(System.currentTimeMillis());
            String json = JSON.toJSONString(userEvent);
            //数据造完了就往kafka中写
            ProducerRecord<String, String> stringProducerRecord = new ProducerRecord<>("event-log", json);
            Thread.sleep(RandomUtils.nextInt(200,1000));
            producer.send(stringProducerRecord);
        }
    }
}
/*
{"guid":1,"eventId":"pageview","timestamp":1637868346789}
{"guid":1,"eventId":"addcard","timestamp":1637868347625}
{"guid":2,"eventId":"collect","timestamp":16378683463219}
 */
@NoArgsConstructor
@AllArgsConstructor
@Getter
@Setter
class UserEvent{
    private Integer guid;
    private String eventId;
    private long timeStamp;
}

消费者代码示例:用hashset来实现:

Java
package com.doitedu;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.*;

/**
 *
分两步走:
 * 第一步:一个消费者不断的去消费数据
 * 第二步:5分钟计算一次,返回用户数这个结果
 */
public class Exercise_consumerDemo {
    public static void main(String[] args) {
        HashSet<Integer> set = new HashSet<>();
        new Thread(new ConsumerThread(set)).start();
        //
定时的任务调度
        Timer timer = new Timer();
        //
调度,第一个参数,你给我一个任务,
        //第二个参数代表过多久之后我开始执行任务
        //第三个参数代表每隔多久执行一次
        timer.schedule(new ConsumerTask(set),5000,10000);

    }
}

class ConsumerThread implements Runnable {
    HashSet<Integer> set = null;
    KafkaConsumer<String, String> consumer = null;

    public ConsumerThread(HashSet<Integer> set) {
        this.set = set;
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");

        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("event-log"));
    }
    /**
     * 重写run方法的话,我需要在里面实现什么逻辑?
     * 消费者消费数据,拿到数据以后,只需要获取到用户id
     * 将用户id写到hashset集合里面
     */
    @Override
    public void run() {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
            for (ConsumerRecord<String, String> record : records) {
                String json = record.value();
                UserEvent userEvent = JSON.parseObject(json, UserEvent.class);
                Integer guid = userEvent.getGuid();
                set.add(guid);
            }
        }
    }
}

class ConsumerTask extends TimerTask {
    HashSet<Integer> set = null;

    public ConsumerTask(HashSet<Integer> set) {
        this.set = set;
    }
    /**
     * 这里面就是返回的一个用户数
     */
    @Override
    public void run() {
        int userCount = set.size();
        System.out.println(System.currentTimeMillis() + ",截至到当前为止的一个用户数为:"+userCount);
    }
}

用hashset来实现很显然会出问题,如果数据量一直往上增长,会出现oom的问题,而且占用资源越来越多,影响电脑性能!!!

方案二:将HashSet改成bitMap来计数,就很完美,大逻辑不变,小逻辑就是将HashMap改成bitMap

Java
package com.doitedu;

import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.roaringbitmap.RoaringBitmap;

import java.time.Duration;
import java.util.*;

/**
 *
分两步走:
 * 第一步:一个消费者不断的去消费数据
 * 第二步:5分钟计算一次,返回用户数这个结果
 */
public class BitMap_consumerDemo {
    public static void main(String[] args) {
        //
原来我用的是Hashset来记录,现在我用RoaringBitmap来记录
        RoaringBitmap bitMap = RoaringBitmap.bitmapOf();

        new Thread(new BitMapConsumerThread(bitMap)).start();
        //
定时的任务调度
        Timer timer = new Timer();
        timer.schedule(new BitMapConsumerTask(bitMap),1000,5000);

    }
}

class BitMapConsumerThread implements Runnable {
    RoaringBitmap bitMap = null;
    KafkaConsumer<String, String> consumer = null;

    public BitMapConsumerThread(RoaringBitmap bitMap) {
        this.bitMap = bitMap;
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");

        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("event-log"));
    }
    /**
     *
重写run方法的话,我需要在里面实现什么逻辑?
     * 消费者消费数据,拿到数据以后,只需要获取到用户id
     * 将用户id写到hashset集合里面
     */
    @Override
    public void run() {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
            for (ConsumerRecord<String, String> record : records) {
                String json = record.value();
                UserEvent userEvent = JSON.parseObject(json, UserEvent.class);
                Integer guid = userEvent.getGuid();
                bitMap.add(guid);
            }
        }
    }
}


class BitMapConsumerTask extends TimerTask {
    RoaringBitmap bitMap = null;

    public BitMapConsumerTask(RoaringBitmap bitMap) {
        this.bitMap = bitMap;
    }
    /**
     *
这里面就是返回的一个用户数
     */
    @Override
    public void run() {
        int userCount = bitMap.getCardinality();
        System.out.println(System.currentTimeMillis() + ",
截至到当前为止的一个用户数为:"+userCount);
    }
}

需求二:判断来没来过的问题,可以用bitmap来搞,当然还可以用布隆过滤器来搞

Java
package com.doitedu;

import com.alibaba.fastjson.JSON;
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
import java.util.Timer;
import java.util.TimerTask;

/**
 *
用布隆过滤器来判定是否重复,当然,bitMap也是可以操作的
 */
public class BloomFilter_consumerDemo {
    public static void main(String[] args) {

        BloomFilter<Long> longBloomFilter = BloomFilter.create(Funnels.longFunnel(), 100000);

        new Thread(new BloomFilterConsumerThread(longBloomFilter)).start();
    }
}

class BloomFilterConsumerThread implements Runnable {
    BloomFilter<Long> longBloomFilter = null;
    KafkaConsumer<String, String> consumer = null;

    public BloomFilterConsumerThread(BloomFilter<Long> longBloomFilter) {
        this.longBloomFilter = longBloomFilter;
        Properties props = new Properties();
        props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "linux01:9092");
        props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test001");
        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(Arrays.asList("event-log"));
    }

    /**
     *
重写run方法的话,我需要在里面实现什么逻辑?
     * 消费者消费数据,拿到数据以后,只需要获取到用户id
     * 将用户id写到hashset集合里面
     */
    @Override
    public void run() {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(Integer.MAX_VALUE));
            for (ConsumerRecord<String, String> record : records) {
                String json = record.value();
                UserEvent userEvent = JSON.parseObject(json, UserEvent.class);
                Integer guid = userEvent.getGuid();
                boolean flag = longBloomFilter.mightContain((long) guid);
                if (flag) {
                    userEvent.setIsNew(0);
                } else {
                    userEvent.setIsNew(1);
                }
                //
判断完成以后,得把他加进去
                longBloomFilter.put((long) guid);
                System.out.println(JSON.toJSONString(userEvent));
            }
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/618847.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何评价编程语言 Reason?

Reason编程语言的设计目标是为开发人员提供一种易于理解和学习的工具&#xff0c;同时提供静态类型检查和高效的代码执行。Reason基于OCaml语言&#xff0c;并引入了JavaScript的语法和工具链&#xff0c;使得开发者能够在现有的JavaScript生态系统中更好地开发和维护代码。Rea…

2023,数据库国产替代走到哪了?

如今&#xff0c;战场不仅银行&#xff0c;参战者也不仅单独的一家。对中国的国产数据库而言&#xff0c;机会和挑战都在加速涌来。 作者|思杭 编辑|皮爷 出品|产业家 2023&#xff0c;数据库格局正在变化&#xff0c;愈演愈烈。 如果说哪个环节是如今国产替代的最火热…

chatgpt赋能python:Python中如何输出两个数

Python中如何输出两个数 对于任何一种编程语言来说&#xff0c;输出两个数都是非常基础的知识点&#xff0c;Python也不例外。在Python中&#xff0c;我们可以使用print()函数来输出两个数。本篇文章将会介绍如何在Python中输出两个数。 介绍 在Python中&#xff0c;输出两个…

Python让文档处理变得轻松:如何快速替换Word文档中的关键字

应用场景&#xff1a; Python自动化处理Word文档的功能可以应用于许多场景&#xff0c;以下是其中一些常见的应用场景&#xff1a; 批量处理文档&#xff1a;如果您需要处理大量的Word文档&#xff0c;例如替换文本、添加文本、修改格式等&#xff0c;手动完成这些任务将非常耗…

驱动LSM6DS3TR-C实现高效运动检测与数据采集(4)----上报匿名上位机实现可视化

概述 LSM6DS3TR-C是单芯片“3轴陀螺仪 3轴加速度计”的惯性 测量单元(IMU)&#xff0c; 五种种可选满量程的陀螺仪(125/250/500/1000/2000 dps)和加速度计(2/4/8/16 g)。 上述工程中选择的加速度和陀螺仪对应的量程为2g和2000dps&#xff0c;对应的灵敏度如下所示&#xff0c…

具有更多存储空间和带宽的亚马逊云科技Snowball Edge存储优化型设备

亚马逊云科技AWS Snow Family系列设备用于将数据经济高效地迁移到云端并在边缘进行处理。增强型Snowball Edge存储优化型设备专为PB级数据迁移项目而设计&#xff0c;具有210TB的NVMe存储空间&#xff0c;每秒可传输高达1.5千兆字节的数据。这些设备还包括10GBASE-T、SFP48和QS…

缩略图加密学习总结

一、缩略图加密概述 完全加密为噪声图像后&#xff0c;密文图像的文件扩展&#xff0c;传输存储消耗更多的资源。完全加密的噪声图像的可用性建立在对密文进行解密的基础上&#xff0c;耗费大量的计算代价。原始图像中精细的视觉信息被抹去以保护隐私,而粗略的视觉信息被保留以…

Vue+element UI实现列表全部数据排序

element ui 表格中的sortable属性只能实现当前页数据的排序&#xff0c;无法实现整张表全部数据的排序&#xff0c;所以需要采取自定义的排序方式重新触发接口&#xff0c;获取排序好的全部列表 Java后端的分页列表有这两个字段需要前端去传递&#xff1a; el-table 上加上so…

springboot 2.6.12 自定义解析 yaml 加密数据

文章目录 一、简介二、yaml 默认解析简单说明三、自定义 yaml 解密解析器四、配置 PropertySourceLoader五、简单测试 一、简介 为了保证项目的配置文件的安全性&#xff0c;需要对第三方组件 mysql、redis、es、mq 等用户名密码进行加密处理&#xff0c;可以使用现成的三方包…

【springboot项目开发】文件上传与下载

目录 总体介绍 文件上传 介绍 文件上传的前端需求 文件上传的前端代码 文件上传的后端需求 文件上传的后端代码 文件下载 介绍 前端需求 前端代码 后端需求 后端代码 总体介绍 文件的上传和下载功能&#xff0c;是项目开发过程中比较常见的业务需求&#xff0c;我们…

开源 Golang 微服务入门二:RPC 框架 Kitex

前言 前一篇笔记介绍了字节跳动的开源 Golang 微服务 HTTP 框架 Hertz&#xff0c; 如下&#xff1a; 开源 Golang 微服务入门一&#xff1a; HTTP 框架 Hertz 本文将要介绍同样是字节跳动开源的 Golang 微服务 RPC 框架 Kitex。 Kitex 简介 Kitex 字节跳动内部的 Golang 微…

nacos高级

一、什么是配置中心 在微服务架构中&#xff0c;当系统从一个单体应用被拆分成分布式系统上的一个个服务节点后&#xff0c;配置文件也必须跟着迁移&#xff08;分割&#xff09;&#xff0c;这样配置就分散了。不仅配置会分散&#xff0c;分散中还会包含着冗余。 配置中心将…

Linux进程、用户、权限命令

进程管理命令 进程和程序的区别 1 程序是静态概念&#xff0c;本身作为一种软件资源长期保存&#xff1b;而进程是程序的执行过程&#xff0c;它是动态概念&#xff0c;有一定的生命期&#xff0c;是动态产生和消亡的。 2 程序和进程无一一对应关系。一个进程在活动中可有顺序…

顺序表和链表的比较

本文主要内容&#xff1a;从存取方式、逻辑/物理结构、查找/插入/删除操作和空间分配的角度比较顺序表和链表&#xff0c;并从存储、运算、环境的角度对比应如何选取存储结构。 目录 一、顺序表和链表的比较1、存取&#xff08;读写&#xff09;方式2、逻辑/物理结构3、查找、…

AI人工智能领域精美绘图模板分享

1 人工智能的发展历程 如今人工智能的应用渗透了我们生活的方方面面&#xff0c;我们都知道人工智能的前景十分光明&#xff0c;在未来对于推进人类发展进程也是非常重要的&#xff0c;但其实人工智能的发展道路是极其曲折的&#xff0c;下面就将人工智能的发展历程分为如下六…

arcgis for javascript TileLayer 自定义高德地图图层

效果如图&#xff1a; 一、创建自定义切片层 要创建自定义图块层&#xff0c;您必须调用BaseTileLayer类的createSubclass()方法。命名自定义层为TintLayer 由于这一层需要知道在哪里访问预定义的图块&#xff0c;我们将创建一个属性。应用程序将为图层提供值&#xff0c;图…

全球首发 《NGINX 完全指南》中文版

原文作者&#xff1a;Derek DeJonghe of F5 原文链接&#xff1a;全球首发 | 《NGINX 完全指南》中文版 转载来源&#xff1a;NGINX 开源社区 NGINX唯一中文官方社区 &#xff0c;尽在nginx.org.cn 在社区小伙伴们的催促下&#xff0c;我们很高兴地与大家分享这个好消息&#…

关于阵列发射端的波束形成(相控阵)研究与仿真实践

说明 相控阵是一个很大的话题&#xff0c;相控阵说得直白一点就是通过控制天线阵列中各个天线的相位来使得波束指向我们想要的方向。现阶段相控阵雷达用得更多的还是军事领域&#xff0c;不过随着技术的进步、成本的下降以及小型化&#xff0c;相控阵雷达也逐渐被用于民用领域了…

Python之Gradio简单使用

目录 安装Gradio示例用法应用界面1. gr.Interface2. gr.Blocks Gradio的输入和输出组件输入组件&#xff08;Inputs&#xff09;输出组件&#xff08;Outputs&#xff09; 其他 Gradio是一个Python库&#xff0c;用于构建快速的Web界面&#xff0c;以便于使用机器学习模型进行实…

Vue.js 中的 watch 属性详解

Vue.js 中的 watch 属性详解 在 Vue.js 中&#xff0c;watch 属性是一种非常重要的属性&#xff0c;它可以监听 Vue 实例中指定的数据变化&#xff0c;并在数据发生变化时执行相应的操作。本文将对 Vue.js 中的 watch 属性进行详细的介绍&#xff0c;并附上相关的代码示例。 什…