【基础】Kafka -- 基础架构及核心概念

news2024/11/15 8:26:48

Kafka -- 基础架构及核心概念

  • 初识 Kafka
    • Kafka 基本架构
    • Kafka 主题与分区
      • 主题与分区
      • 分区副本机制 Replica
      • 高水位 HW
  • 生产者
    • 生产者客户端
      • 必要的参数配置
      • 消息的发送
      • 序列化
      • 分区器
      • 生产者拦截器
    • 原理分析
    • 重要的生产者参数
  • 消费者
    • 消费者与消费者组
    • 消费者客户端
      • 必要的参数配置
      • 订阅主题与分区
      • 反序列化
      • 消息消费
      • 位移提交
      • 控制或关闭消费
      • 指定位移消费
      • 再均衡
      • 消费者拦截器
      • 多线程实现
    • 重要的消费者参数

初识 Kafka

Kafka 在日常开发使用中主要扮演三大角色:

  • 消息系统:Kafka 与传统消息中间件相同,都具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、扩展性、可恢复性等功能。除此之外,Kafka 还提供了多数消息中间件所不具备的消息顺溪行保障以及回溯消费等功能;

  • 存储系统:Kafka 可以将消息持久化到磁盘,与基于内存进行存储的系统相比,有效降低了数据丢失的风险。得益于 Kafka 的消息持久化和多副本的机制,其也可以作为数据存储系统来使用;

  • 流式处理平台:Kafka 为大部分流式处理框架提供了可靠的数据来源,并且提供了一个完整的流式处理类库,如窗口、连接、变换、聚合等操作;

Kafka 基本架构

一个典型的 Kafka 架构包括若干的 Productor、若干 Broker、若干 Consumer 以及一个 ZooKeeper 集群,如下图所示:

在这里插入图片描述

其中:

  • ZooKeeper 负责集群元数据的管理以及控制器的选举等操作;

  • Productor 即生产者,其负责创建消息并将消息投放至 Broker;

  • Broker 是服务代理节点,可以将其简单的视作一个独立的 Kafka 服务节点或者 Kafka 服务实例。一个或者多个 Broker 可以形成一个 Kafka 集群;

  • Consumer 即消费者,其负责从 Broker 中获取消息并进行相应的业务处理;

Kafka 主题与分区

主题与分区

Kafka 中另外两个很重要的概念即主题与分区。Kafka 中的消息以主题进行分类,生产者创建消息后将消息发送到指定的主题,而消费者订阅主题并从该主题中获取消息进行消费。

主题是一个逻辑上的概念,其可以细化为多个分区。分区在物理层面可以看作一个可追加的日志文件,同一主题下的不同分区存储的内容是不同的。消息在被添加到分区日志文件是都会被分配一个偏移量 offset,偏移量唯一标识这条消息,Kafka 通过偏移量来维持分区内消息的顺序性。需要注意的是,offset 并不跨越分区,即 Kafka 保持分区有序而不是主题有序。Kafka 中的分区可以分布在不同的服务器 broker 上,即一个主题可以跨越多个 broker。

每一条消息被发送到 broker 之前,将会根据分区策略选择存储到哪一个分区。若分区策略设置的合理,消息将被均匀的分配到所有的分区进行存储,这也解决了单一分区 IO 操作的性能瓶颈。分区可以在创建主题时通过参数进行个数设置,也可以在主题创建完成后再进行修改,通过增加分区数实现水平扩展。

在这里插入图片描述

分区副本机制 Replica

Kafka 为分区引入了多副本 Replica 机制,通过多副本增加分区的容灾能力。同一个分区的不同副本中保存的内容完全相同(但在同一时刻未必完全相同)。副本采取一主多从的策略,leader 副本负责处理读写请求,follower 副本只负责与 leader 进行消息同步。副本存储在不同的 broker 当中,当 leader 出现故障时,将从 follower 副本中选举产生新的 leader 对外进行服务。多副本的机制实现了故障的自动转移,当 Kafka 集群中的某个 broker 失效时仍可以保证服务可用。

分区中的所有副本统称为 AR(Assigned Replicas),所有与 leader 副本保持一定程度同步的副本(包含 leader 在内)组成 ISR(In-Sync Replicas),ISR 是 AR 的一个子集。当有消息时,首先会发送给 leader 副本,然后 follower 副本才能从 leader 副本中拉取消息进行同步,同步期间允许一定程度的滞后。这个滞后指可容忍的滞后范围,可以通过参数进行配置。与 leader 副本的滞后超过容忍范围的副本的集合组成 OSR(Out-of-Sync Replicas)。因此,AR=ISR+OSR。一般情况下,AR=ISR,OSR 为空,即所有 follower 副本都应该与 leader 副本保持 一定程度的同步。

leader 副本负责维护和跟踪 ISR 集合中所有 follower 副本的滞后状态。当 follower 副本落后太多或失效时,leader 副本会把它从 ISR 集合中剔除。如果 OSR 集合中有 follower 副本“追上”了 leader 副本,那么 leader 副本会把它从 OSR 集合转移至 ISR 集合。默认情况下,当 leader 副本发生故障时,只有在 ISR 集合中的副本才有资格被选举为新的 leader,而在 OSR 集合中的副本则没有任何机会(该原则也可以通过修改相应的参数配置来改变)。

高水位 HW

HW 是 High Watermark 的缩写,俗称高水位。它标识一个特定的消息偏移量 offset,消费者只能够拉取到这个 offset 之前的消息。

如下图所示,代表一个日志文件,该文件中有 9 条消息,第一条消息的 offset 为 0,称为 LogStartOffset;最后一条消息的 offset 为 8;offset 为 9 的消息用虚线表示,代表下一条待写入的消息。日志文件的 HW 为 6,这意味着消费者只能拉取到 0-5 的消息进行消费,offset 为 6 的消息对消费者而言是不可见的。

在这里插入图片描述

LEO是 Log End Offset 的缩写,它标识当前日志文件中下一条待写入消息的 offset,上图中 offset 为 9 的位置即为当前日志文件的 LEO,LEO 的大小相当于当前日志分区中最后一条消息的 ofset 值加 1。分区 ISR 集合中的每个副本都会维护自身的 LEO,而 ISR 集合中最小的 LEO 即为分区的 HW,对消费者而言只能消费 HW 之前的消息。

生产者

生产者客户端

一个正常的生产逻辑主要分为以下几个步骤:

  1. 配置生产者客户端参数以及创建相应的生产者实例;

  2. 构建待发送的消息;

  3. 发送消息;

  4. 关闭生产者实例;

编码前首先引入依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.0.0</version>
</dependency>

生产者客户端的代码如下:

public class KafkaProducerDemo {

    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";

    public static Properties initConfig() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", brokerList);
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("client.id", "producer.client.id.demo");
        return properties;
    }

    public static void main(String[] args) {
        Properties properties = initConfig();
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, "hello Kafka~");
        try {
            producer.send(record);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

}

必要的参数配置

在创建生产者实例之前需要配置相应的参数,参照上述生产者客户端代码中的 initConfig() 方法,有三个参数是必须进行配置的:

  • bootstrap.servers:该参数用于指定生产者客户端要连接的 Kafka 集群的地址清单,内容格式为host1:port1,host2:port2。这里不需要配置所有 broker 的地址,因为生产者客户端会从给定的 broker 查找其他 broker 的信息。建议设置两个以上的地址,防止其中的 broker 宕机时连接集群失败;

  • key.serializervalue.serializer:broker 接收的消息必须以字节数组 byte[] 的形式存在。KafkaProducer<String, String>ProdecerRecord<String, String>中的泛型对应的就是消息中的 key 和 value 的类型。这里必须填写序列化器的全限定名;

为使配置代码更加规范化,减少拼写错误,可以对代码进行下列优化:

public class ProducerConfig {

    public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
    public static final String KEY_SERIALIZER = "key.serializer";
    public static final String VALUE_SERIALIZER = "value.serializer";
    public static final String CLIENT_ID = "client.id";

}
public static Properties initConfig() {
    Properties properties = new Properties();
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS, brokerList);
    properties.put(ProducerConfig.KEY_SERIALIZER, StringSerializer.class.getName());
    properties.put(ProducerConfig.VALUE_SERIALIZER, StringSerializer.class.getName());
    properties.put(ProducerConfig.CLIENT_ID, "producer.client.id.demo");
    return properties;
 }

KafkaProducer 是线程安全的,因此可以在多个线程中共享单个 KafkaProducer 实例,也可以将 KafkaProducer 进行池化供其它线程调用。

消息的发送

生产者客户端实例创建完成后,需要进行消息的构建,即创建 ProducerRecord 对象。

消息对象 ProducerRecord 并不是单纯意义上的消息,其内部包含多个属性,如下所示:

public class ProducerRecord<K, V> {
    private final String topic;
    private final Integer partition;
    private final Headers headers;
    private final K key;
    private final V value;
    private final Long timestamp;
}
属性概述
topic消息发往的主题
partition消息发往的分区号
headers消息头部,一般用来设定一些与应用相关的信息,若不需要则不用设置
key指定消息的键,按主题归类后会再按照 key 进行分区,key 相同的消息会被划分到同一个分区当中
value传输的消息体,一般不为空。为空时表示特定的消息–墓碑消息
timestamp消息的时间戳,有两种类型 CreateTime 以及 LogAppendTime,前者表示消息创建的时间,后者表示消息追加到日志文件的时间

其中,主题 topic 和消息体 value 是必填项,其余属性为选填项,因此对应 ProducerRecord 的构造方法也有很多:

public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers) {...}
public ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value) {...}
public ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers) {...}
public ProducerRecord(String topic, Integer partition, K key, V value) {...}
public ProducerRecord(String topic, K key, V value) {...}
public ProducerRecord(String topic, V value) {...}

对于不同的消息,需要构建不用的 ProducerRecord 对象,因此再实际应用中创建 ProducerRecord 对象是一个十分频繁的操作。

在创建生产者客户端以及消息体后,就可以进行消息的发送。消息的发送有三种模式,分别为发后即忘(fire-and-forget)、同步(sync)以及异步(async)。

发后即忘的模式直接调用 KafkaProducer 的send()方法即可,该模式下只负责向 Kafka 中发送消息而不关心消息是否正确到达。该模式的性能最好,但是可靠性最差。

实际上,send()方法的返回值并不是 void,而是Future<RecordMetadata>类型:

public Future<RecordMetadata> send(ProducerRecord<K, V> record) {
    return this.send(record, (Callback)null);
}

public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) {
    ProducerRecord<K, V> interceptedRecord = this.interceptors.onSend(record);
    return this.doSend(interceptedRecord, callback);
}

因此,同步模式可以通过 Future 对象实现:

try {
   producer.send(record).get();
} catch (Exception e) {
   e.printStackTrace();
}

send()方法本身就是异步的操作,在使用异步模式时,一般会在send()方法中指定一个 Callback 回调函数,在 Kafka 返回响应时做相应的处理:

try {
    producer.send(record, new Callback() {
        @Override
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
            if (e != null) {
                e.printStackTrace();
            } else {
                System.out.println(recordMetadata.topic() + "|" + recordMetadata.partition() + "|" + recordMetadata.offset());
            }
        }
    });
} catch (Exception e) {
    e.printStackTrace();
}

其中,onCompletion()方法的两个参数是互斥的。消息发送成功时,metadata 不为 null 而 exception 为 null;消息发送异常时,metadata 为 null 而 exception 不为 null。回调函数的调用可以保证分区的有序性。

一般情况下,KafkaProducer 在发送完所有数据后,需要调用close()方法来回收资源。该方法会等待所有的发送请求完毕后再关闭 KafkaProducer。此外,close()方法也可以传入等待时间,在等待相应的时间后强制退出。实际开发中一般使用无参的方法。

public void close() {
    this.close(9223372036854775807L, TimeUnit.MILLISECONDS);
}

public void close(long timeout, TimeUnit timeUnit) {
    this.close(timeout, timeUnit, false);
}

序列化

生产者需要使用序列化器 Serializer 将对象转换成字节数组才能通过网络发送给 Kafka。客户端提供了 String、ByteArray、ByteBuffer、Bytes、Double、Integer、Long 等类型的序列化器,它们都实现了org.apache.kafka.common.serialization.Serialier接口:

public interface Serializer<T> extends Closeable {
    void configure(Map<String, ?> var1, boolean var2);
    byte[] serialize(String var1, T var2);
    void close();
}

接口定义了三个方法需要实现:

  • configue()方法用来配置当前类;

  • serialize()方法用来执行序列化的操作;

  • close()方法用来关闭当前序列化器,该方法一般为空;

若 Kafka 客户端提供的序列化器无法满足需求,则可以使用 Avro、JSON、Thrif、ProtoBuf和 Protostuff 等通用的序列化工具来实现,或使用自定义的序列化器实现。

下述代码针对 Company 类实现了自定义序列化器:

@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class Company {

    private String name;
    private String address;

}
public class CompanySerializer implements Serializer<Company> {

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public byte[] serialize(String topic, Company data) {
        if (data == null) {
            return null;
        }
        byte[] name, address;
        try {
            if (data.getName() != null) {
                name = data.getName().getBytes(StandardCharsets.UTF_8);
            } else {
                name = new byte[0];
            }
            if (data.getAddress() != null) {
                address = data.getAddress().getBytes(StandardCharsets.UTF_8);
            } else {
                address = new byte[0];
            }
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + name.length + address.length);
            buffer.putInt(name.length);
            buffer.put(name);
            buffer.putInt(address.length);
            buffer.put(address);
            return buffer.array();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return new byte[0];
    }

    @Override
    public void close() {

    }
}

在使用时,只需要在进行生产者客户端配置时,对value.serializer属性传入自定义序列化器的全限定名即可。

分区器

在消息通过send()方法发往 broker 的过程中,可能需要经过拦截器、序列化器、分区器等一系列的处理之后才能被真正的发往 broker。其中,拦截器不是必须的,消息在经过序列化器处理后就需要经过分区器确定其要发往的分区,如果在 ProducerRecord 中指定了 partition 字段,那么则不需要分区器的作用;若没有指定 partition 字段,那么就需要依赖分区器为消息分配分区。

Kafka 中提供的默认分区器是org.apache.kafka.clients.producer.internals.DefaultPartitioner,其实现了org.apache.kafka.clients.producer.Partitioner接口:

public interface Partitioner extends Configurable, Closeable {
    int partition(String var1, Object var2, byte[] var3, Object var4, byte[] var5, Cluster var6);

    void close();
}

该接口定义了两个方法:

  • partition()方法用于计算分区号,返回 int 类型。该方法中的参数依次代表主题、键、序列化后的键、值、序列化后的值以及集群的元数据消息;

  • close()方法用于关闭分区器,一般为空方法;

Partitioner 还有一个父接口org.apache.kafka.common.Configurable

public interface Configurable {
    void configure(Map<String, ?> var1);
}

其中:

  • configure()方法用于配置信息及初始化数据;

在默认分区器的实现中,如果消息的 key 不为 null,则分区器会对 key 采用 MurmurHash2 (高运算性能以及低碰撞率)算法进行哈希,并根据最终的哈希值来计算分区号,因此具有相同 key 的消息会被写入到同一个分区;如果消息的 key 为 null,那么消息将会以轮询的方式发往主题内的各个可用分区。

在不改变分区数量的情况下,key 与分区之间映射保持不变;若分区数量产生变化,则难以保证之前的映射关系。

可以通过实现 Partitioner 接口来自定义分区的逻辑,如下所示:

public class CustomPartitioner implements Partitioner {

    private final AtomicInteger counter = new AtomicInteger(0) ;

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitionInfos = cluster.partitionsForTopic(topic);
        int numPartitions = partitionInfos.size();
        if (null == keyBytes) {
            return counter.getAndIncrement() % numPartitions;
        } else {
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

实现自定义的分区器之后,需要通过配置参数partitioner.class显示的指定自定义分区器:

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());

生产者拦截器

生产者拦截器可以用来在消息发送前进行一些准备工作。要使用生产者拦截器只需要自定义实现org.apache.kafka.clients.producer.ProducerInterceptor接口即可:

public interface ProducerInterceptor<K, V> extends Configurable {
    ProducerRecord<K, V> onSend(ProducerRecord<K, V> var1);

    void onAcknowledgement(RecordMetadata var1, Exception var2);

    void close();
}

该接口定义了三个方法:

  • onSend()方法用于在消息序列化和计算分区之前对消息进行定制化的处理;

  • onAcknowledgement()方法在消息被应答或消息发送失败时触发,将先于 Callback 执行。该方法运行于 Producer 的 IO 线程中,因此代码逻辑越简单越好,否则将影响消息发送的速度;

  • close()方法用于关闭拦截器时执行一些资源的清理工作;

下列代码实现了一个生产者拦截器,该拦截器为每一条消息添加一个“prefix”前缀并计算消息发送的成功率:

public class CustomProducerInterceptor implements ProducerInterceptor<String, String> {

    private volatile long sendSuccess = 0;
    private volatile long sendFailure = 0;

    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {
        String modifiedValue = "prefix" + producerRecord.value();
        return new ProducerRecord<>(producerRecord.topic(),
                producerRecord.partition(),
                producerRecord.timestamp(),
                producerRecord.key(),
                modifiedValue,
                producerRecord.headers());
    }

    @Override
    public void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {
        if (e == null) {
            sendSuccess++;
        } else {
            sendFailure++;
        }
    }

    @Override
    public void close() {
        double successRatio = (double) sendSuccess / (sendSuccess + sendFailure);
        System.out.println("发送成功率>>>" + String.format("%f", successRatio * 100) + "%");
    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

实现自定义的拦截器之后,需要通过配置参数interceptor.class指定该拦截器:

properties.put(ProducerConfig.INTERCEPTOR_CLASS_CONFIG, CustomProducerInterceptor.class.getName());

在通过配置进行拦截器的配置时,不仅可以指定一个拦截器,还可以指定多个拦截器形成拦截器链。拦截器将按照配置的顺序依次执行。

原理分析

在这里插入图片描述

整个生产者客户端由两个线程协调运行,分别为主线程和 Sender 线程:

  • 在主线程中,由 KafkaProducer 创建消息,通过拦截器、序列化器、分区器等一系列逻辑处理后,将消息缓存至消息累加器 RecordAccumulator 中;

  • Sender 线程主要负责从消息累加器 RecordAccumulator 中获取消息并发送至 Kafka;

RecordAccumulator

RecordAccumulator 主要负责缓存消息,使 Sender 可以批量的发送数据,进而减少网络传输的资源消耗以提升性能。RecordAccumulator 缓存区大小通过生产者客户端参数buffer.memory进行配置,默认为 32M。当生产者生成消息的速度大于发送到服务器的速度,则会导致生产者空间不足,此时调用生产者的send()方法可能被阻塞也可能抛出异常,这取决于max.block.ms,该参数默认为 60s。

RecordAccumulator 内部还有一块内存区域用于保存消息的字节形式,该区域为 BufferPool,它主要用来实现 ByteBuffer 的复用。但是 BufferPool 只针对特定大小的 ByteBuffer 进行管理,该大小由batch.size参数指定,默认 16KB。

ProducerBatch

RecordAccumulator 内部维护了双端队列Deque<ProducerBatch>。ProducerBatch 包含了一条或多条 ProducerRecord,代表一个批次的消息。当一条消息流入 RecordAccumulator 时,首先从队列尾部获取一个 ProducerBatch(若没有则新建),并查看当前的 ProducerBatch 能否写入,若可以则写入,否则就新建一个。

ProducerBatch 的大小由配置参数batch.size决定。在新建时,需要评估当前 ProducerRecord 的大小是否超过上述参数设置的大小。若不超过,则以参数指定的大小创建 ProducerBatch,该段内存区域可以被复用;若超过,则以当前 ProducerRecord 的大小创建 ProducerBatch,该段内存将不会被复用。

Sender

Sender 线程从 RecordAccumulator 中获取到缓存的消息后,会将原本的<分区, Deque<ProducerBatch>>的形式转换成<Node, List<ProducerBatch>>的形式,这里的 Node 指示的是具体的 broker 节点。

随后 Sender 会进一步的将<Node, List<ProducerBatch>>封装成<Node, Request>的形式,这样就可以将请求发往各个节点。

在请求被发送之前,还会被保存到 InFlightRequests 中,其保存的对象的具体形式为Map<NodeId, Deque<Request>>,他的作用是缓存已经发出但是好没有收到响应的请求。此外,InFlightRequests 可以通过配置参数max.in.flight.requests.per.connection来限制客户端与 Node 之间的连接最多可缓存的请求数。该配置参数默认为 5,即每个连接最多缓存 5 个未响应的请求。

重要的生产者参数

acks

该参数用于指定分区中有多少副本收到该消息时生产者就认为该消息已经成功写入。

  • acks=1。该值默认为 1,表示消息发送后只要分区的 leader 副本成功写入了消息,那么生产者就会收到来自服务端的成功响应。若消息无法写入 leader 副本,则生产者会收到一个错误响应,此时可以选择重发消息。在这种情况下,若 leader 副本成功写入,但是在 follower 副本拉取消息进行同步时 leader 副本崩溃,该条消息就会丢失。asks 设置为 1 时消息可靠性和吞吐量之间的折中方案。

  • acks=0。生产者发送消息后不需要等待任何服务端的响应。若消息在发送到写入服务端的过程中产生异常,则消息丢失。acks 设置为 0 可以保证最大的吞吐量。

  • acks=-1或acks=all。生产者在发送消息后需要等待 ISR 中的所有副本将消息成功写入之后才会收到来自服务端的成功响应。acks 设置为 -1 可以保证最强的可靠性。

max.request.size

该参数用于设置生产者能发送的消息的最大值,默认值为 1MB。一般不建议修改该参数,因为该参数与其它的参数存在联动。

retries 和 retry.backoff.ms

retires 参数用于设置生产者消息发送失败时重试的次数,当重试的次数达到该值时,生产者就会放弃重试并返回异常。

retry.backoff.ms 用于设置两次重试之间的时间间隔,以避免无效的频繁重试。

compression.type

该参数用于指定消息的压缩方式,默认为 none,即不进行压缩。该参数可以配置为 gzip、snappy、lz4 等对消息进行压缩以减少网络传输量、降低网络 IO。

connections.max.idle.ms

该参数用于设置多久之后关闭限制的连接,默认值为 540000 ms,即 9 分钟。

linger.ms

该参数用于指定生产者发送 ProducerBatch 的等待时间,默认为 0。生产者客户端会在 ProducerBatch 填满或者等待时间超过该参数的设定值时执行发送。

recevier.buffer.bytes

该参数用于设置 Socket 接收消息缓冲区的大小,默认值为 32KB。

send.buffer.bytes

该参数用于设置 Socket 发送消息缓冲区的大小,默认这为 128KB。

request.timeout.ms

该参数用于配置生产者等待请求响应的最长时间,默认值为 30000 ms,请求超时后可以选择重试。

消费者

消费者与消费者组

消费者负责订阅 Kafka 中的消息并进行消费,每一个消费者都有一个对应的消费组。当消息发布到主题后,该消息只能被订阅该主题的消费者组中的一个消费者所消费。

对于消息中间件而言,一般存在两种消息投递模式:即 P2P 模式以及发布-订阅模式。基于上述的消费者与消费者组的模式,Kafka 同时支持两种消息的投递模式:

  • 若所有的消费者都属于同一个消费者组,则所有消息都会被均衡的分配给该消费者组中的每一个消费者,即一条消息只会被一个消费者处理,这相当于点对点模式;
  • 若所有的消费者都隶属于不同的消费者组,那么所有消息都会被广播给所有的消费者,即每条消息都会被所有的消费者消费,这相当于发布-订阅模式;

消费者客户端

一个正常的消费逻辑需要具备以下步骤:

  1. 配置消费者客户端参数并创建相应的消费者实例;

  2. 订阅主题;

  3. 拉取消息进行消费;

  4. 提交消费位移;

  5. 关闭消费者实例;

消费者客户端的代码如下:

public class KafkaConsumerDemo {

    public static final String brokerList = "localhost:9092";
    public static final String topic = "topic-demo";
    public static final String groupId = "group.demo";
    public static final AtomicBoolean isRunning = new AtomicBoolean(true);

    public static Properties initConfig() {
        Properties properties = new Properties();
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("bootstrap.servers", brokerList);
        properties.put("group.id", groupId);
        properties.put("client.id", "consumer.client.id.demo");
        return properties;
    }

    public static void main(String[] args) {
        Properties properties = initConfig();
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(List.of(topic));

        try {
            while (isRunning.get()) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println("topic>>>" + record.topic() + "| partition>>>" + record.partition()
                            + "| offset>>>" + record.offset());
                    System.out.println("key>>>" + record.key() + "| value>>>" + record.value());
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
}

必要的参数配置

在创建消费者实例之前需要配置相应的参数,参照上述消费者客户端代码中的 initConfig() 方法,有四个参数是必须进行配置的:

  • bootstrap.servers:指定 Kafka 集群的 broker 地址清单;

  • key.deserializervalue.deserializer:指定 key 与 value 反序列化器;

  • group.id:消费者隶属的消费者组的名称,若设置为空则会产生异常;

为使配置代码更加规范化,减少拼写错误,可以对代码进行下列优化:

public class ConsumerConfig {

    public static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
    public static final String KEY_SERIALIZER = "key.deserializer";
    public static final String VALUE_SERIALIZER = "value.deserializer";
    public static final String CLIENT_ID = "client.id";
    public static final String GROUP_ID = "group.id";

}
public static Properties initConfig() {
    Properties properties = new Properties();
    properties.put(ConsumerConfig.KEY_SERIALIZER, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.VALUE_SERIALIZER, StringDeserializer.class.getName());
    properties.put(ConsumerConfig.BOOTSTRAP_SERVERS, brokerList);
    properties.put(ConsumerConfig.GROUP_ID, groupId);
    properties.put(ConsumerConfig.CLIENT_ID, "consumer.client.id.demo");
    return properties;
}

订阅主题与分区

订阅主题

消费者客户端创建完毕后,可以调用subscribe()方法进行主题的订阅,该方法定义的源码如下:

public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {...}
public void subscribe(Collection<String> topics) {...}
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {...}
public void subscribe(Pattern pattern) {...}

当使用集合的方式订阅主题时,订阅了什么主题就消费什么主题中的消息。若前后两次调用该方法订阅了不同的主题,则以第二次订阅的主题为准。

当使用正则表达式的方式订阅主题时,只要主题与正则表达式相匹配,该消费者就可以对当前主题中的消息进行消费。

subscribe()方法还可以传入一个 ConsumerRebalanceListener 类对象,用于设置再均衡监听器。

订阅分区

消费者客户端还可以调用assign()方法直接订阅某些主题的特定分区,方法定义源码如下:

public void assign(Collection<TopicPartition> partitions) {...}

该方法需要传入要订阅的分区集合,集合中的 TopicPartition 类主要包含两个属性:topic 与 partition,即分区主题和分区编号。订阅“topic-demo”主题下的 0 分区的代码如下:

consumer.assign(List.of(new TopicPartition("topic-demo", 0)));

如果需要订阅分区,那么我们需要直到主题中到底存在多少个分区,消费者客户端中提供了partitionFor()方法用于查询指定主题的元数据信息:

List<PartitionInfo> partitionsFor(String topic);

返回结果集中的 PartitionInfo 类包含下列属性:

public class PartitionInfo {
    // 主题名称
    private final String topic;
    // 分区编号
    private final int partition;
    // leader 副本所在节点
    private final Node leader;
    // 分区的 AR 集合
    private final Node[] replicas;
    // 分区的 ISR 集合
    private final Node[] inSyncReplicas;
    // 分区的 OSR 集合
    private final Node[] offlineReplicas;
    ....
}

因此,通过partitionFor()方法与assign()方法,可以实现订阅主题的全部分区:

List<TopicPartition> partitions = new ArrayList<>();
List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);
if (partitionInfos != null) {
    for (PartitionInfo info: partitionInfos) {
        partitions.add(new TopicPartition(info.topic(), info.partition()));
    }
}

集合订阅方式、正则表达式订阅方式以及指定分区订阅方式分别代表了三种不同的订阅状态,即 AUTO_TOPIC、AUTO_PATTERN 以及 USER_ASSIGNED,三种状态是互斥的,在一个消费者中仅能存在一种状态,否则将会产生异常。

消费者客户端还提供了unsubscribe()的方法用于取消订阅。

反序列化

消息经生产者端的序列化发送到 Kafka 中,同样在进行消息的消费时需要进行反序列化操作。消费者客户端提供了 String、ByteArray、ByteBuffer、Bytes、Double、Integer、Long 等类型的反序列化器,它们都实现了org.apache.kafka.common.serialization.Deserialier接口:

public interface Deserializer<T> extends Closeable {
    void configure(Map<String, ?> var1, boolean var2);
    T deserialize(String var1, byte[] var2);
    void close();
}

接口定义了三个方法需要实现:

  • configue()方法用来配置当前类;

  • serialize()方法用来执行反序列化的操作,若 data 为 null,那么在进行处理时直接返回 null 而不会抛出异常;

  • close()方法用来关闭当前反序列化器;

同序列化器一样,若 Kafka 客户端提供的反序列化器无法满足需求,则需要自定义反序列化器,下述代码针对 Company 类实现了自定义反序列化器:

public class CompanyDeserializer implements Deserializer<Company> {

    @Override
    public void configure(Map<String, ?> map, boolean b) {

    }

    @Override
    public Company deserialize(String topic, byte[] data) {
        if (data == null) {
            return null;
        }
        if (data.length < 8) {
            throw new SerializationException("Size of data received by DemoDeserializer is shorter than expected!");
        }
        ByteBuffer buffer = ByteBuffer.wrap(data);
        int nameLen, addressLen;
        String name, address;
        nameLen = buffer.getInt();
        byte[] nameBytes = new byte[nameLen];
        buffer.get(nameBytes);
        addressLen = buffer.getInt();
        byte[] addressBytes = new byte[addressLen];
        buffer.get(addressBytes);
        try {
            name = new String(nameBytes, StandardCharsets.UTF_8);
            address = new String(addressBytes, StandardCharsets.UTF_8);
        } catch (Exception e) {
            throw new SerializationException("Error occur when deserializing!");
        }
        return new Company(name, address);
    }

    @Override
    public void close() {

    }
}

生产过程中还是建议通过 Avro、JSON、Thrif、ProtoBuf和 Protostuff 等通用的序列化工具来实现。

消息消费

Kafka 中的消息消费是一个不断轮询的过程,消费者需要做的就是不断地调用poll()方法拉取消息,该方法返回的是该消费者所订阅的主题上的一组消息,返回类型为 ConsumerRecords,其内部包含了若干的 ConsumerRecord 对象。

poll()方法中需要传入一个超时时间参数,用于控制该方法的阻塞时间:

public ConsumerRecords<K, V> poll(Duration timeout) {...}

poll()方法返回的是 ConsumerRecords 类型,其提供iterator()方法来循环遍历消息集内部,在上述的消费者客户端的代码中,就是通过这种方式获取到消息集中的每一条消息:

public Iterator<ConsumerRecord<K, V>> iterator() {...}

此外,还可以通过records(TopicPartition)方法来获取消息集当中指定分区的消息:

public List<ConsumerRecord<K, V>> records(TopicPartition partition) {...}

使用上述方法实现消费的代码如下,其中partitions()方法用于获取消息集当中的所有分区:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (TopicPartition topicPartition: records.partitions()) {
    for (ConsumerRecord<String, String> record: records.records(topicPartition)) {
        System.out.println(record.partition() + ">>>" + record.value());
    }
}

此外,还可以通过records(String topic)方法按照主题的维度进行消费:

public Iterable<ConsumerRecord<K, V>> records(String topic) {...}

使用上述方法实现消费的代码如下,ConsumerRecords 类没有提供方法来获取消息集中所包含的主题列表,因此主题需要根据订阅时的主题列表进行消费:

ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (String topic: List.of(topic)) {
    for (ConsumerRecord<String, String> record: records.records(topic)) {
        System.out.println(record.topic() + ">>>" + record.value());
    }
}

消费者消费到的每条消息的类型都为 ConsumerRecord,其包含若干属性,如下:

public class ConsumerRecord<K, V> {
    // 消息所在主题
    private final String topic;
    // 消息所在分区编号
    private final int partition;
    // 消息所在分区的偏移量
    private final long offset;
    // 时间戳
    private final long timestamp;
    // 时间戳类型:分为 CreateTime 和 LogAppendTime,分别代表消息创建的时间戳和消息追加到日志的时间戳
    private final TimestampType timestampType;
    // 键和值经反序列化后的大小
    private final int serializedKeySize;
    private final int serializedValueSize;
    // 消息的头部内容
    private final Headers headers;
    // 消息的键与值     
    private final K key;
    private final V value;    
    // CRC32 校验值
    private volatile Long checksum;    
    ...
}

位移提交

此处对 offset 做一些区分:当描述消息在分区中的位置时,称 offset 为偏移量;当描述消费者消费到的位置时,称 offset 为消费位移。

当消费者调用poll()时,拉取的是还没有被消费过的消息集,因此需要记录上一次消费时的消费位移。消费位移存储在 Kafka 的内部主题__consumer_offsets当中,将消费位移存储起来的动作称为“提交”。

下图展示了消费位移,每一个方块代表着一条消息,x 代表某一次拉取操作中的消费消息的最大偏移量,即当前消费者消费到了 x 处。需要注意的是,当前消费者提交的消费位移是 x + 1,即下一条需要拉取的消息的位置。

在这里插入图片描述

KafkaConsumer 类提供了position(TopicPartition)方法和committed(TopicPartition)方法来分别获取下一次拉取的消息的位置以及当前消费者提交的消费位移:

public long position(TopicPartition partition) {...}
public OffsetAndMetadata committed(TopicPartition partition) {...}

自动提交

Kafka 中默认的消费位移提交方式为自动提交,由消费者客户端的参数enable.auto.commit进行配置,默认为 true。默认的自动提交方式为定期提交,提交周期由客户端参数auto.commit.interval.ms进行配置,默认值为 5s。

在默认的方式下,消费者每间隔 5s 会拉取每个分区当中最大的消息位移进行提交,该方式省区了复杂的位移提交逻辑,但是在某些情况下会导致消息重复消费或消息丢失:

  • 重复消费:自动提交是延时提交,可能存在某些消息已经消费,但位移还没有提交的情况,此时若消费者崩溃,重启之后会读取上一次的消息位移,导致部分消息重复消费;

  • 消息丢失:若消息被接收后还需要处理线程进行处理,若一个批次的消息已经全部被接收并提交位移时,处理线程在依次处理消息时崩溃,那么消费者重启后读取上一次的消息位移,将会导致没有被处理线程处理完毕的消息丢失;

手动提交

为实现对消息位移更加灵活的管理和控制,Kafka 还提供了手动提交位移的方式。因为很多时候不是消息被拉取到就算作消费完成,可能需要后续的业务处理完成之后才可以认为消息被成功消费。因此,手动的提交方式可以让开发人员根据程序的逻辑在何时的地方进行位移的提交。

开启手动提交功能的前提是消费者客户端参数enable.auto.commit设置为 false。手动提交的方式分为同步提交和异步提交,分别对应commitSync()方法和commitAsync()方法。

同步提交方法commitSync()存在四个重载方法:

public void commitSync() {...}
public void commitSync(Duration timeout) {...}
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets) {...}
public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) {...}

同步提交消息位移的简单使用如下:

while (isRunning.get()) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        // 编写消息的消费逻辑
    }
    consumer.commitSync();
}

无参方法commitSync()提交位移的频率和拉取批次消息、处理批次消息的频率是一样的,若想使用更准确的位移提交,则需要使用传参方法。带参数的同步位移提交的简单使用如下,该代码每消费一条消息便提交一次消息位移:

while (isRunning.get()) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record : records) {
        // 编写消息的消费逻辑
        long offset = record.offset();
        TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
        consumer.commitSync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(offset + 1)));
    }
}

对于上述的使用,因为commitSync()方法是同步的,因此每消费一条消息便提交一次位移非常消耗性能,实际开发中一般不会这样使用。更多的时候我们按照分区来进行位移的提交,简单代码如下:

while (isRunning.get()) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (TopicPartition partition: records.partitions()) {
        List<ConsumerRecord<String, String>> recordList = records.records(partition);
        for (ConsumerRecord<String, String> record: recordList) {
            // 编写消息的消费逻辑
        }
        long offset = recordList.get(recordList.size() - 1).offset();
        consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(offset + 1)));
    }
}

异步提交方法commitAsync()在执行消费位移提交时将不会阻塞消费者线程,该方法存在三个重载方法:

public void commitAsync() {...}
public void commitAsync(OffsetCommitCallback callback) {...}
public void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {...}

异步提交方法的使用与同步提交基本相同,在上述第二第三个重载方法当中,可以传入一个异步提交回调方法,当位移提交成功后会回调 OffsetCommitCallback 中的onComplete()方法,简单的使用代码如下:

while (isRunning.get()) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    for (ConsumerRecord<String, String> record: records) {
        // 编写消息的消费逻辑
    }
    consumer.commitAsync(new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
            if (e == null) {
                System.out.println("消费位移提交成功!");
            } else {
                e.printStackTrace();
            }
        }
    });
}

使用异步方法提交位移时可能会存在位移提交失败的情况,一般通过重试来解决,但这也会导致新的问题:当位移 x 提交失败进行重试时,位移 x+y 提交成功,随后位移 x 重试提交成功,此时会产生重复消费的情况。

为避免上述情况,可以设置一个递增的序号来维持异步提交的顺序,当位移提交失败需要重试时,可以检查已提交位移的序号和大小,若重试提交的序号小于已提交位移的序号,则说明已经有更大的消费位移提交了,不需要进行重试。

控制或关闭消费

消费控制

在某些应用场景下我们需要暂停某些分区的消费,优先消费其他分区的消息,对此消费者客户端提供了pause()reesume()方法分别实现暂停和恢复对某些分区消息的拉取:

public void pause(Collection<TopicPartition> partitions) {...}
public void resume(Collection<TopicPartition> partitions) {...}

此外,消费者客户端还提供了一个无参的方法paused()用于获取被暂停的分区集合:

public Set<TopicPartition> paused() {...}

关闭消费

之前的示例代码中,我们使用while(isRunning.get()循环调用poll()方法实现消息的拉取和消费,当需要关闭消费时,可以调用isRunning.set(false)来退出循环。

跳出循环之后,需要显示的执行消费者客户端的关闭动作以释放各种资源,消费者客户端提供了close()方法来实现客户端的关闭:

public void close() {...}
public void close(Duration timeout) {...}

第二种重载方法中可以传入一个时间来设定关闭方法的最长执行时间,有些内部的关闭逻辑会比较耗费时间,比如设置了自动消费位移提交等等。第一种方法虽然没有传入参数,但其内部设定了默认的最长等待时间为 30s。

指定位移消费

消费位移提交使得消费者在关闭、崩溃重启之后能够根据存储的消费位移继续进行消费。当一个消费者启动后没有查找到对应的消费位移时,则会根据客户端的配置参数auto.offset.reset来确定从何处开始消费,该参数可配置为三种模式:

  • latest:默认配置,表示从分区末尾开始消费;

  • earliest:表示从分区起始处开始消费;

  • none:在该配置下若查询不到消息位移,则抛出 NoOffsetForParititionException;

消费者客户端提供了seek()方法来指定位移进行消费,可以让我们追前消费或回溯消费:

public void seek(TopicPartition partition, long offset) {...}

方法中传入的两个参数分别表示分区与指定的消费位移,seek()只能指定消费者被分配到的分区的消费位移,而分区的分配是在poll()中执行的,即在调用seek()方法前需要调用一次poll()方法进行分区分配。seek()方法的简单使用如下:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(List.of(topic));
consumer.poll(Duration.ofMillis(1000));
Set<TopicPartition> assignment = consumer.assignment();
for (TopicPartition partition: assignment) {
    consumer.seek(partition, 10);
}
while (isRunning.get()) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
    // 编写消息的消费逻辑
}

其中,assignment()方法用于获取消费者所分配到的分区信息:

public Set<TopicPartition> assignment() {...}

使用seek()方法从分区末尾获取消息的代码如下:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(List.of(topic));
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
    consumer.poll(Duration.ofMillis(1000));
    assignment = consumer.assignment();
}
Map<TopicPartition, Long> offsets = consumer.endOffsets(assignment);
for (TopicPartition partition: assignment) {
    consumer.seek(partition, offsets.get(partition));
}

其中,endOffsets()方法用于获取指定分区末尾的消息位置:

public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {...}
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions, Duration timeout) {...}

同样的,消费者客户端也提供了beginningOffsets()方法用于获取指定分区开头的消息位置:

public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {...}
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions, Duration timeout) {...}

实际上消费者客户端也直接提供了相应的方法直接从分区起始或末尾获取消息:

public void seekToBeginning(Collection<TopicPartition> partitions) {...}
public void seekToEnd(Collection<TopicPartition> partitions) {...}

在某些场景下,我们不知道具体的消费位置,而是需要根据某个时间点来进行消费,例如我们需要消费今天早上八点之后产生的消息。为解决这种应用场景,消费者客户端提供了offsetsForTimes()方法:

public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {...}
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch, Duration timeout) {...}

上述方法需要传入一个 map,其 key 为待查询的分区,value 为待查询的时间戳。该方法会返回时间戳大于等于待查询时间的第一条消息的位置和时间戳。下列代码简单实现了从一天前的同一时刻开始消费:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(List.of(topic));
Set<TopicPartition> assignment = new HashSet<>();
while (assignment.size() == 0) {
    consumer.poll(Duration.ofMillis(1000));
    assignment = consumer.assignment();
}
HashMap<TopicPartition, Long> timestampToSearch = new HashMap<>();
for (TopicPartition partition: assignment) {
    timestampToSearch.put(partition, System.currentTimeMillis() - 24 * 60 * 60 * 1000);
}
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampToSearch);
for (TopicPartition partition: assignment) {
    OffsetAndTimestamp offsetAndTimestamp = offsets.get(partition);
    if (offsetAndTimestamp != null) {
        consumer.seek(partition, offsetAndTimestamp.offset());
    }
}

Kafka 中的消息位移是存储在内部主题当中的,而利用seek()方法便可以打破这一限制,我们可以将消息位移存储在其他的地方,如数据库当中。而在进行消费之前从数据库或其他位置获取到相应的消息位移,调用seek()方法指定位移消费即可。

再均衡

再均衡指分区的所属权从一个消费者转移到另一个消费者的行为,再均衡发生期间,消费组内的消费者无法读取消息,此外,当一个分区被重新分配给另外一个消费者时,消费者当前的状态也将丢失。

subscribe()方法中可以传入一个再均衡监听器 ConsumerRebalanceListener,用于设定发生再均衡动作前后的一些准备或者收尾的工作。ConsumerRebalanceListener 接口包含两个方法:

public interface ConsumerRebalanceListener {
    void onPartitionsRevoked(Collection<TopicPartition> var1);
    void onPartitionsAssigned(Collection<TopicPartition> var1);
}
  • onPartitionsRevoked():该方法会在再均衡开始之前和消费者停止读取消息之后调用。可以通过这个回调方法来处理消费位移的提交,从而避免重复消费的现象。传入的参数为 TopicPartition 的集合,表示再均衡前所分配到的分区。

  • onPartitionsAssigned():刚方法会在重新分区之后和消费者开始读取消息之前调用。传入的参数为 TopicPartition 的集合,表示再均衡后所分配到的分区。

再均衡监听器的简单用法如下:

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        consumer.commitSync(currentOffsets);
        currentOffsets.clear();
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        // do nothing
    }
});

try {
    while (isRunning.get()) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
        for (ConsumerRecord<String, String> record: records) {
            // 编写消息的消费逻辑
            currentOffsets.put(new TopicPartition(record.topic(), record.partition()),
                               new OffsetAndMetadata(record.offset() + 1));
        }
        consumer.commitAsync(currentOffsets, null);
    }
} finally {
    consumer.close();
}

此外,再均衡监听器还可以配合外部存储使用,伪代码如下:

Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>();
consumer.subscribe(List.of(topic), new ConsumerRebalanceListener() {
    @Override
    public void onPartitionsRevoked(Collection<TopicPartition> collection) {
        storeOffsetToDB(currentOffsets);
    }

    @Override
    public void onPartitionsAssigned(Collection<TopicPartition> collection) {
        for (TopicPartition partition: collection) {
            consumer.seek(partition, getOffsetFromDB(partition);
        }
    }
});

消费者拦截器

消费者拦截器可以在消费到消息或者提交消费位移时进行一些定制化的操作,自定义消费者拦截器需要实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口:

public interface ConsumerInterceptor<K, V> extends Configurable {
    ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> var1);
    void onCommit(Map<TopicPartition, OffsetAndMetadata> var1);
    void close();
}

该接口定义了三个方法:

  • onConsumer():该方法会在poll()方法返回之前被调用以对消息进行处理,如修改返回的消息内容、按某种规则过滤数据等等;

  • onCommit():该方法会在消费位移提交后调用,通过该方法可以记录所提交的位移信息;

  • close():该方法用于关闭拦截器时执行一些资源的清理工作;

下列代码简单自定义了一个消费者拦截器,通过设置过期时间实现了对消息的过滤:

public class CustomConsumerInterceptor implements ConsumerInterceptor<String, String> {

    private static final long EXPIRE_INTERVAL = 10 * 1000;

    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> consumerRecords) {
        long now = System.currentTimeMillis();
        Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>();
        for (TopicPartition partition: consumerRecords.partitions()) {
            List<ConsumerRecord<String, String>> records = consumerRecords.records(partition);
            List<ConsumerRecord<String, String>> newTpRecord = new ArrayList<>();
            for (ConsumerRecord<String, String> record: records) {
                if (now - record.timestamp() < EXPIRE_INTERVAL) {
                    newTpRecord.add(record);
                }
            }
            if (!newTpRecord.isEmpty()) {
                newRecords.put(partition, newTpRecord);
            }
        }
        return new ConsumerRecords<>(newRecords);
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
        map.forEach(((topicPartition, offsetAndMetadata) ->
                System.out.println(topicPartition + ">>>" + offsetAndMetadata.offset())));
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

在使用时,需要通过消费者客户端的interceptor.classes参数进行配置(同生产者客户端)。

多线程实现

KafkaProducer 是线程安全的,但是 KafkaConsumer 并不是线程安全的。消费者客户端中定义了一对方法acquire()release()表示相应的加锁和解锁操作。

要实现消费者客户端的多线程,一个思路就是在进行消息的处理逻辑时开启多线程进行处理,如下图所示。在此过程中,需要慎重的考虑位移提交的逻辑。

在这里插入图片描述

重要的消费者参数

fetch.min.bytes

该参数用于配置消费者在进行一次拉取请求时能从 Kafka 中拉取的最小数据量,默认值为 1B。

fetch.max.bytes

该参数用于配置消费者在进行一次拉取请求时能从 Kafka 中拉取的最小数据量,默认值为 50MB。

fetch.max.wait.ms

该参数用于设置 Kafka 的等待时间。若 Kafka 中的消息总是无法满足 fetch.min.bytes 所配置的值,那么就会按照等待时间进行消息的拉取。

max.partition.fetch.bytes

该参数用于配置每个分区返回给消费者的最大数据量,默认为 1MB。

max.poll.records

该参数用于配置消费者在一次拉取当中可获得的最大消息数,默认值为 500 条。

connections.max.idle.ms

该参数用于指定在多久之后关闭限制的连接,默认为 9 分钟。

exclude.internal.topics

该参数用于指定 Kafka 内部主题是否可以向消费者公开,默认为 true。如果设置为 true,那么只能使用subscribe(Collection)的方式而不能使用subscribe(Pattern)的方式来订阅内部主题,设置为 false 则没有这个限制。

receive.buffer.bytes

该参数用于指定 Socket 接收消息缓冲区的大小,默认值为 64KB。

send.buffer.bytes

该参数用于指定 Socket 发送消息缓冲区的大小,默认值为 128KB。

request.timeout.ms

该参数用于配置消费者客户端等待请求响应的最大等待时间,默认值为 30000ms。

metadata.max.age.ms

该参数用于配置元数据的过期时间,默认值 30000ms。

reconnect.backoff.ms

该参数用于配置尝试重新连接指定主机前的等待时间,以避免频繁连接主机,默认值为 50ms。

retry.backoff.ms

该参数用于配置尝试重新发送失败的请求到指定的主题分区之前的等待时间,以避免在某些故障情况下频繁重复发送,默认值为 100ms。

isolation.level

该参数用于配置消费者事务的隔离级别:

  • read_committed:消费者忽略未提交的消息,只能消费到 LastStableOffset 的位置;

  • read_uncommitted:默认值,消费者可以消费到 High Watermark 的位置;
    消息的处理逻辑时开启多线程进行处理,如下图所示。在此过程中,需要慎重的考虑位移提交的逻辑。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/439930.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL 按关键字进行截取

一、问题背景 取MySQL数据表中某个字段中的IP信息。 如&#xff1a;t_log 表中的 user_ip 字段值为 {“username”:“miracle”,“ip”:“110.230.128.186”}&#xff0c;取出IP信息 110.230.128.186。 建表和初始化SQL语句&#xff0c;如下&#xff1a; SET NAMES utf8mb4…

GORM操作mysql数据库

对象就是程序的数据结构&#xff0c;关系是数据库。就是将程序的数据结构与数据库表对应起来。 在GORM是Go语言的ORM框架&#xff0c;将go的数据结构转化为数据库表&#xff0c;例如将结构体转化为数据库表。 引入gorm框架 远程下载gorm框架 go get -u gorm.io/driver/mysq…

HTTP中的Content-type详解

Content-Type Content-Type&#xff08;MediaType&#xff09;&#xff0c;即是Internet Media Type&#xff0c;互联网媒体类型&#xff0c;也叫做MIME类型。在互联网中有成百上千中不同的数据类型&#xff0c;HTTP在传输数据对象时会为他们打上称为MIME的数据格式标签&#x…

关于java.io的学习记录(写入文本)

可以通过字节流&#xff08;FileOutputStream&#xff09;、字符流&#xff08;OutputStreamWriter&#xff09;、字符缓冲流&#xff08;BufferedWriter&#xff09;读取文本中的数据。 FileOutputStream写入文本 public void write(){String path "writeTest.txt"…

【是C++,不是C艹】 什么是C++ | C++从哪来 | 学习建议

&#x1f49e;&#x1f49e;欢迎来到 Claffic 的博客 &#x1f49e;&#x1f49e; &#x1f449;专栏&#xff1a;《是C&#xff0c;不是C艹》&#x1f448; 前言&#xff1a; 我知道你急着学C&#xff0c;但你先别急&#xff0c;薛之谦认识认识C还是很有必要的。本期跟大家聊…

文件夹改名,如何在改名之后批量复制文件夹名称

在日常时候中会遇到给文件夹改名的时候&#xff0c;那么我们又如何在改名之后批量复制文件夹名称&#xff1f;今天就由小编来给大家分享一下操作办法。 首先第一步&#xff0c;我们要进入文件批量改名高手&#xff0c;并在板块栏里选择“文件夹批量改名”板块。 第二步&#xf…

SpringBoot 接入chatGPT API

SpringBoot 接入chatGPT API 一、准备工作二、补全接口示例三、申请API-KEY**四、JavaScript调用API**五、SpringBoot整合ChatGPT六、使用curl模拟请求ChatGPT平台已经为技术提供了一个入口了,作为一个Java程序员,我们第一时间想到的就是快速开发一个应用,接入ChatGPT的接口…

第十四天本地锁、Redis分布锁、Redisson锁三者的区别

一、为什么要有redis分布式锁&#xff0c;它解决了什么问题&#xff1f; 在传统单体架构的项目下&#xff0c;使用本地锁synchronized和lock锁就可以锁住当前进程&#xff0c;保证线程的安全性&#xff0c;但是本地锁解决不了分布式环境下多个服务资源共享的问题&#xff0c;而…

产品研发流程管理

先看一张图&#xff0c;该图适应绝大部分的产品的 研发流程 &#xff08;需要的可以去下 产品研发流程| ProcessOn免费在线作图,在线流程图,在线思维导图&#xff09; 该图详细描述了&#xff0c;不同阶段应该做什么&#xff0c;具体的来说&#xff0c;是确定了什么时候 “开会…

高精度人员定位系统源码,采用vue+spring boot框架,支持二次开发

智慧工厂人员定位系统源码&#xff0c;高精度人员定位系统源码&#xff0c;UWB定位技术 文末获取联系&#xff01; 在工厂日常生产活动中&#xff0c;企业很难精准地掌握访客和承包商等各类人员的实际位置&#xff0c;且无法实时监控巡检人员的巡检路线&#xff0c;当厂区发生灾…

【Python】实战:生成无关联单选问卷 csv《精神状态评估表》

目录 一、适用场景 二、业务需求 三、Python 文件 &#xff08;1&#xff09;创建文件 &#xff08;2&#xff09;代码示例 四、csv 文件 一、适用场景 实战场景&#xff1a; 问卷全部为单选题问卷问题全部为必填问题之间无关联关系每个问题的答案分数不同根据问卷全部问…

使用pandas和seaborn绘图

使用pandas和seaborn绘图 matplotlib实际上是一种比较低级的工具。要绘制一张图表&#xff0c;你组装一些基本组件就行&#xff1a;数据展示 &#xff08;即图表类型&#xff1a;线型图、柱状图、盒形图、散布图、等值线图等&#xff09;、图例、标题、刻度标签以及其他注解型…

【远程开发】VSCode使用Remote SSH远程连接Linux服务器

文章目录 前言视频教程1、安装OpenSSH2、vscode配置ssh3. 局域网测试连接远程服务器4. 公网远程连接4.1 ubuntu安装cpolar4.2 创建隧道映射4.3 测试公网远程连接 5. 配置固定TCP端口地址5.1 保留一个固定TCP端口地址5.2 配置固定TCP端口地址5.3 测试固定公网地址远程 转发自CSD…

【Unity】创建一个自己的可交互AR安卓程序

目录 1 创建一个AR场景2 配置AR Camera为前置摄像头3 配置打包场景4 下载官方提供的InteractiveFaceFilterAssets资源5 配置AR Face Manager6 创建眼镜预制件7 设置AR面部追踪8 测试效果8.1 在Unity中测试8.2 在安卓设备上测试 9 在该AR场景的基础上添加自己的想法9.1 改变眼镜…

【Java|golang】1042. 不邻接植花---邻接表着色

有 n 个花园&#xff0c;按从 1 到 n 标记。另有数组 paths &#xff0c;其中 paths[i] [xi, yi] 描述了花园 xi 到花园 yi 的双向路径。在每个花园中&#xff0c;你打算种下四种花之一。 另外&#xff0c;所有花园 最多 有 3 条路径可以进入或离开. 你需要为每个花园选择一…

C++用户信息管理服务 Thrift框架 Mysql数据落地 Redis数据缓存 Kafka 消息队列 总结 附主要源码

不知不觉入职已经一个月了&#xff0c;近期提交了考核2&#xff0c;要求如下&#xff1a; 1、编写一个管理用户信息的服务&#xff0c;通过thrift的远程过程调用实现用户信息管理功能 2、用户信息至少包括 唯一ID、用户名、性别、年龄、手机号、邮箱地址、个人描述 3、提供创建…

PHP+Vue+java导师学生双选系统设计与实现springnboot+pyton

为了直观显示系统的功能&#xff0c;运用用例图这样的工具显示分析的结果。分析的管理员功能如下。管理员管理学员&#xff0c;导师&#xff0c;管理项目信息&#xff0c;管理项目提交&#xff0c;管理指导项目信息。运行环境:phpstudy/wamp/xammp等 卓越导师双选系统根据调研&…

GCC 常用命令

GCC 编译过程 一个 C/C文件要经过预处理(preprocessing)、编译(compilation)、汇编(assembly)和链接(linking) 等 4 步才能变成可执行文件 &#xff08;1&#xff09; 预处理 C/C源文件中&#xff0c;以“#”开头的命令被称为预处理命令&#xff0c;如包含命令“#include”、…

系统集成项目管理工程师案例分析考点汇总(沟通/干系人、风险、合同等)

沟通及干系人管理常见考点1. 沟通管理计划的内容2. 项目绩效报告的主要内容3. 沟通中容易出现的问题4. 如何采取有效措施改进沟通5. 如何召开有效的会议 合同管理常见考点1. 合同签订时应注意的内容及条款2. 合同管理常见的问题3. 合同管理问题的应对措施 采购管理常见考点1. 采…

IntelliJ 上 Azure Event Hubs 全新支持来了!

大家好&#xff0c;欢迎来到 Java on Azure Tooling 的3月更新。在这次更新中&#xff0c;我们将介绍 Azure Event Hubs 支持、Azure Functions 的模板增强&#xff0c;以及在 IntelliJ IDEA 中部署 Azure Spring Apps 时的日志流改进。要使用这些新功能&#xff0c;请下载并安…