Kafka 之生产者与消费者基础知识:基本配置、拦截器、序列化、分区器

news2024/12/28 2:06:34

一、配置

1. 必须要配置的参数:

  • kafaf集群地址列表:理论上写一个节点地址,就相当于绑定了整个kafka集群了,但是建议多写几个,如果只写一个,万一宕机就麻烦了
  • kafka消息的key和value要指定序列化方法
  • kafka对应的生产者id

使用java代码表示则为以下代码:

//BOOTSTRAP_SERVERS_CONFIG:连接kafka集群的服务列表,如果有多个,使用"逗号"进行分隔
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.101:9092");

//	使用字符串序列化类:org.apache.kafka.common.serialization.StringSerializer
//	KEY: 是kafka用于做消息投递计算具体投递到对应的主题的哪一个partition而需要的
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
//	VALUE: 实际发送消息的内容
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

//CLIENT_ID_CONFIG:这个属性的目的是标记kafkaclient的ID
properties.put(ProducerConfig.CLIENT_ID_CONFIG, "producer-id");

2. 消息发送重试机制

 可使用 retries 参数 进行设置,同时要注意记住两个概念:可重试异常(重试可能会成功)、不可重试异常(无论重试多少次都不会成功);

retries设置的代码:

properties.put(ProducerConfig.RETRIES_CONFIG, 3);  # 默认是0

3. 一点说明

kafka的生产者是多线程安全的,表示多个线程可以同时共享同一个kafka生产者实例对象;但是kafka的消费者不是线程安全的。

kafka生产者提供的两个send()方法都是异步的,如下:

Future<RecordMetadata> send(ProducerRecord<K, V> record); # 这个send()虽然是异步的,但是可以通过 返回对象调用get()方法达到同步的效果
Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback);

kafka在生产环境中,一定要在在代码中关闭自动创建 topic .可通过 kafka-manage 控制台创建好 topic,再进行消息的发送与接收。

测试代码:

public class NormalProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.31.101:9092");
        properties.put(ProducerConfig.CLIENT_ID_CONFIG, "normal-producer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //	kafka 消息的重试机制: RETRIES_CONFIG该参数默认是0:
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);

        //	可重试异常, 意思是执行指定的重试次数 如果到达重试次数上限还没有发送成功, 也会抛出异常信息
        //	NetworkException
        //	LeaderNotAvailableException

        //	不可重试异常
        //	RecordTooLargeException

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        User user = new User("100", "里德");
        // kafka默认是可以在没有主题的情况下创建的
        // 自动创建主题的特性,在生产环境中一定是禁用的
        ProducerRecord<String, String> record =
                new ProducerRecord<String, String>("normal-topic",
                        JSON.toJSONString(user));
        /**
         * //一条消息 必须通过key 去计算出来实际的partition, 按照partition去存储的
         * ProducerRecord(
         * topic=topic_normal,
         * partition=null,
         * headers=RecordHeaders(headers = [], isReadOnly = false),
         * key=null,
         * value={"id":"001","name":"xiao xiao"},
         * timestamp=null)
         */
        System.err.println("新创建的消息:"+record);
        // 一个参数的send方法 本质上也是异步的 返回的是一个future对象; 可以实现同步阻塞方式
        /*
        Future<RecordMetadata> metadataFuture = producer.send(record);
        RecordMetadata recordMetadata = metadataFuture.get();
        System.err.println(String.format("发送结果:分区位置:%s, 偏移量:%s, 时间戳:%s",
                        recordMetadata.partition(),
                        recordMetadata.offset(),
                        recordMetadata.timestamp()));
         */
        //	带有两个参数的send方法 是完全异步化的。在回调Callback方法中得到发送消息的结果
        Future<RecordMetadata> metadataFuture = producer.send(record, new Callback() {
            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                if(null == exception) {
                    System.err.println(String.format("发送结果:分区位置:%s, 偏移量:%s, 时间戳:%s",
                            metadata.partition(),
                            metadata.offset(),
                            metadata.timestamp()));
                }else {
                    exception.printStackTrace();
                    return;
                }
            }
        });

        producer.close();

    }
}

二. 生产者端的重要参数

1. acks: 表示发送消息后,broker端至少有多少副本接收到该消息:

  • 默认acks=1, 表示只要 leader 副本接收到消息,就能收到来自服务端的成功响应
  • acks=0: 生产者发送消息之后,不要等待任务服务端的响应。
  • acks=-1 或 acks=all:生产者在消息发送之后,需要等待 ISR(In-sync Replication) 中的所有副本都成功写入消息之后,才能够收到来自服务端的成功响应。
  • 并不是asks=-1 或 acks=all 就一定会被投递成功,因为可能只有leader副本在ISR中,follower副本都在 OSR(Out-sync Replication)中,而消息还没来得及传给 OSR 中的副本,leader副本就宕机了。
  • 想要100%投递成立,还要配合参数 min.insync.replicas=2,表示至少两个副本接收到该消息,但是容易影响性能。 

关于ISR与OSR:最开始所有的副本都在ISR中,在kafka工作的过程中,如果某个副本同步速度慢于replica.lag.time.max.ms指定的阈值,则被踢出ISR存入OSR,如果后续速度恢复可以回到ISR中。

2. 批量发送相关的参数

linger.ms:指定生产者发送ProducerBatch之前等待更多的消息加入producerBatch的时间,默认值为0,就像是等人上车的时间

batch.size:累计多少条消息,则一次进行批量发送,就是满多少人即发车的意思

buffer.memory:缓存大小,可以修改它提升缓存性能,默认32M

3. 其他参数

max.request.size:该参数用来限制生产者客户端能发送的消息的最大值,默认值是 1M

retries和retry.backoff.msretries:重试次数和重试间隔时间,第一个默认0,第二个默认100ms

compression.type:指定对发送的消息的压缩方式,默认为“none”,可选gzip,snappy,lz4

connections.max.idle.ms:这个参数用来指定连接空闲多久之后关闭,默认540000ms,即9分钟

receive.buffer.bytes:设置socket接收消息缓冲区 默认32KB

send.buffer.bytes:设置socket发送消息缓冲区 默认128KB

request.timeout.ms:配置producer等待请求broker响应的最长时间,默认30000ms

三、自定义拦截器

1. 自定义生产者拦截器

自定义生产者拦截器类需要继承 org.apache.kafka.clients.producer.ProducerInterceptor,并实现其中的方法: 

  • onSend(ProducerRecord record)是发送消息之前的切面方法;
  • onAcknowledgement(RecordMetadata metadata, Exception exception)是发送消息之后的切面方法;
  • close()是生产者关闭前调用的方法;’
  • configure(Map<String, ?> configs)是拦截器用于配置一些属性的方法;

拦截器代码示例CustomProducerInterceptor.java: 

public class CustomProducerInterceptor implements org.apache.kafka.clients.producer.ProducerInterceptor<String, String> {

    private volatile int success;
    private volatile int failure;

    //	发送消息之前的切面拦截
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        System.err.println("生产者发送前置方法!");
        String value = "prefix:"+record.value();
        return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(), value, record.headers());
    }

    //	发送消息之后的切面拦截
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        if(null == exception){
            success++;
        }else {
            failure++;
        }
        System.err.println("生产者发送后置方法!");
    }

    @Override
    public void close() {
        System.err.println(String.format("发送成功率:%s %%", success*100/success+failure));
    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

将拦截器类定义好之后,只需要在生产者创建时,作为一个属性配置传进去(CustomProducerInterceptor.class是自定义拦截器类):

properties.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, CustomProducerInterceptor.class.getName());

2. 自定义消费者拦截器

需要实现的接口为 org.apache.kafka.clients.consumer.ConsumerInterceptor ,并实现其中的方法:

  • onConsume(ConsumerRecords records)是接到消息,但是处理之前的切面方法;
  • onCommit(Map<TopicPartition, OffsetAndMetadata> offsets)是消息处理完成之后,提交处理结果之前的切面方法,(如果为自动提交,会按时间间隔不停进行提交操作,那么该切面方法也会被不断地执行)
  • close()是消费者关闭前的切面方法;
  • configure(Map<String, ?> configs)是拦截器配置一些属性的方法;

拦截器代码示例 CustomProducerInteceptor.java:

public class CustomConsumerInterceptor implements ConsumerInterceptor<String, String> {

    //	onConsume:消费者接到消息处理之前的拦截器
    @Override
    public ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {
        System.err.println("消费者消费前置方法!");
        return records;
    }

    @Override
    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        System.err.println("消费者消费后置方法!");
        offsets.forEach((tp, om) -> {
            System.err.println(String.format("分区位置:%s,提交偏移量:%s", tp, om));
        });
    }

    @Override
    public void close() {
    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

四、序列化

1. 实现自定义对象的序列化

这里自定义对象为 User.java:

public class User {

	private String id;
	
	private String name;

	public User() {
	}

	public User(String id, String name) {
		this.id = id;
		this.name = name;
	}
    // getter、setter省略
}

自定义序列化类需要实现 org.apache.kafka.common.serialization.Serializer接口:

SerializerProducer.java

public class SerializerProducer implements Serializer<User> {

    @Override
    public byte[] serialize(String topic, User user) {
        try {
            if (user == null) {
                return null;
            }
            else {
                String id = user.getId();
                String name = user.getName();
                byte[] idBytes, nameBytes;
                if(null == id){
                    idBytes = new byte[0];
                }else {
                    idBytes = id.getBytes("UTF-8");
                }
                if(null == name){
                    nameBytes = new byte[0];
                }else {
                    nameBytes = name.getBytes("UTF-8");
                }
                ByteBuffer byteBuffer = ByteBuffer.allocate(4 + 4 + idBytes.length + nameBytes.length);
                //	4个字节 也就是一个 int类型 : putInt 盛放 idBytes的实际真实长度
                byteBuffer.putInt(idBytes.length);
                //	put bytes[] 实际盛放的是idBytes真实的字节数组,也就是内容
                byteBuffer.put(idBytes);
                byteBuffer.putInt(nameBytes.length);
                byteBuffer.put(nameBytes);
                return byteBuffer.array();
            }
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("Error when serializing string to byte[] due to unsupported encoding ", e);
        }
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public void close() {

    }
}

这里是对 消息的value,也就是 User 对象进行序列化,所以需要在生产者配置属性中加入自定义的序列化类:

properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, SerializerProducer.class.getName());

2. 实现自定义对象的反序列化

反序列化类需要实现 org.apache.kafka.common.serialization.Deserializer类:

DeserializerConsumer.java

public class DeserializerConsumer implements Deserializer<User> {

    @Override
    public User deserialize(String topic, byte[] data) {
        if(null == data){
            return null;
        }
        if(data.length < 8){
            throw new SerializationException("size is wrong, must be data.length >= 8");
        }
        ByteBuffer byteBuffer = ByteBuffer.wrap(data);
        //	idBytes 字节数组的真实长度
        int idSize = byteBuffer.getInt();
        byte[] idBytes = new byte[idSize];
        byteBuffer.get(idBytes);

        //	nameBytes 字节数组的真实长度
        int nameSize = byteBuffer.getInt();
        byte[] nameBytes = new byte[nameSize];
        byteBuffer.get(nameBytes);

        String id, name;
        try {
            id = new String(idBytes, "UTF-8");
            name = new String(nameBytes, "UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new SerializationException("deserializing error! ", e);
        }
        return new User(id, name);
    }

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {

    }

    @Override
    public void close() {

    }
}

“将User对象直接转为json字符串,然后将字符串直接使用 getBytes("UTF-8") 方法转为字节数组”这种序列化方法也可以,不过这里是尝试另一种方法,即上面使用ByteBuffer拼接字节数组的方法  

这里是对 消息的value,也就是 User 对象进行反序列化,所以需要在消费者配置属性中加入自定义的反序列化类:

properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, DeserializerConsumer.class.getName());

五、分区器

1. 分区器

默认分区器:是对kafka消息中的key进行一个hash计算,从而得到投递到具体哪个分区的区号;

另外可根据自己的实际业务场景自定义分区器,需要实现 org.apache.kafka.clients.producer.Partitioner 类:

public class CustomPartitioner implements Partitioner {

    private AtomicInteger counter = new AtomicInteger(0);
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitionInfoList = cluster.partitionsForTopic(topic);
        int numPartitions = partitionInfoList.size();
        System.err.println("---- 进入自定义分区器,当前分区个数:" + numPartitions);
        if(keyBytes == null){
            return counter.getAndIncrement() % numPartitions;
        }else {
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

 并在生产者的配置属性中增加该分区器类:

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitioner.class.getName());

2. 应用场景

什么情况下会需要自定义分区器?

比如有四种类型的订单:零食、衣服、灯泡、汽车,根据业务类型,让消息进入到各自的分区,也就是一个分区一种类型的数据,能够让各自类型的consumer快速获取属于自己的业务数据。

如果把所有数据随机的放到某个partation中,那么就会造成数据混乱,因为消息队列是顺序消费的(partition中的数据是先进先出),一些热门类型的业务占据大部分消息,比如零食的订单量远远高于汽车的订单量,零食的订单在消息partition中的前面,汽车的在后面,这就会一直堵塞汽车的消息迟迟到不了consumer端,导致汽车明明有订单,但是状态却是一直无法处理中。

所以最好的方法就是根据类型进行分区,不同的类型数据单独放到对应的partation中,一个类型的数据对应一个partation,可以通过类型自定义分区器。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/518082.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023年湖北省建设厅特种作业操作证报名条件是什么?

建筑施工特种作业人员是指在房屋建筑和市政工程施工活动中&#xff0c;从事可能对本人、他人及周围设备设施的安全造成重大危害作业的人员。建筑施工特种作业人员必须经建设主管部门考核合格&#xff0c;取得建筑施工特种作业人员操作资格证书&#xff08;以下简称“资格证书”…

零代码基础,一分钟教你快速搭建微信 ChatGPT 机器人

零代码基础,一分钟教你快速搭建微信 ChatGPT 机器人 1.注册 Railway 账号:2.部署3. 配置1)点击 Configure2)选择仓库地址3)点击安装4)配置信息如下图:5)部署:deploy6)部署完成:7)微信扫码登录8) 根据需要修改配置:最后总结:效果预览:致谢开源项目:本教程收集于…

U盘分区合并的方法有哪些?

大多数用户在硬盘的使用中会进行分区操作&#xff0c;同时为了方便整理&#xff0c;部分用户也会选择给自己的U盘分区&#xff0c;可是在后续操作中发现U盘分区没有什么用处&#xff0c;因此就想要重新将其合并&#xff0c;但要把分区重新合并没那么容易&#xff0c;那么U盘被分…

笔试练习Day02

一.选择题&#xff1a; 1.A 派生出子类 B &#xff0c; B 派生出子类 C &#xff0c;并且在 java 源代码有如下声明&#xff1a; 1. A a0new A(); 2. A a1new B(); 3. A a2new C(); 问以下哪个说法是正确的&#xff08;&#xff09; A 只有第一行能通过编译 B 第1、2行能通过编…

mysql distinct 和 group by 去重

标题mysql distinct 和 group by 去重 一、先说结论&#xff1a; MySQL中常用去重复数据的方法是使用 distinct 或者 group by group by 分组后&#xff0c;如果没有对分组后的数据进行操作&#xff0c;如使用聚合函数/分组函数&#xff1a;count、sum、avg、max 、min&…

封装Python脚本:使用pymysql+sshtunnel,支持通过SSH隧道方式链接mysql数据库

一、前言 通常为了保证数据库安全&#xff0c;不会允许直接连接数据库&#xff0c;而是需要通过SSH隧道去连接服务器背后的数据库&#xff1b;通过Navicat操作如下&#xff1a; 二、python封装脚本 # -*- coding: utf-8 -*- # Time : 2023/5/12 11:04 # Author : chen…

Activity内存泄漏时包含的view还有没有的救?

Activity泄漏会导致该Activity引用到的Bitmap、DrawingCache等无法释放&#xff0c;对内存造成大的压力&#xff0c;兜底回收是指对于已泄漏Activity&#xff0c;尝试回收其持有的资源&#xff0c;泄漏的仅仅是一个Activity空壳&#xff0c;从而降低对内存的压力。 做法也非常简…

ssh终端工具推荐-WindTerm

什么是WindTerm 官方github https://github.com/kingToolbox/WindTerm A Quicker and better SSH/Telnet/Serial/Shell/Sftp client for DevOps. 按官方说明&#xff0c;WindTerm是一个更快更好的SSH/Telnet/Serial/Shell/Sftp的DevOps工具。 WindTerm目前对商业是免费无限制…

QML APP开发套路(二):前/后端交互概述

&#xff08;1&#xff09;QML开发简介 Qt应用框架在传统UI&#xff08;QWidget窗体&#xff09;的基础上&#xff0c;提供了Qt Quick模块&#xff0c;该模块基于 QML 语言来定义UI及交互方式。区别于 QWidget 定义UI的方式&#xff0c;QML利于将UI交互与业务逻辑处理剥离成前…

什么是智慧校园?

什么是智慧校园&#xff1f; 智慧校园平台是目前教育信息化领域的热点之一。 随着数字化转型的加速&#xff0c;越来越多的学校开始寻求解决方案&#xff0c;以提高教育管理的效率和质量。 在使用智慧校园平台的过程中&#xff0c;一些痛点问题也浮现出来。为解决这些问题&a…

基于AT89C51单片机的贪吃蛇游戏设计

点击链接获取Keil源码与Project Backups仿真图: https://download.csdn.net/download/qq_64505944/87778030 源码获取 主要内容: 设计一个贪吃蛇游戏,使其具有以下游戏规则:①当没有改变方向时,贪吃蛇沿原来路径一直前进②贪吃蛇无法回头,只能异于当前方向改变行动③蛇…

网页更新提醒是什么?如何自动监控网页并自动记录或发送通知?

网页更新提醒是什么&#xff1f; 在互联网信息资源丰富&#xff0c;且更新速度快的情况下&#xff0c;如果需要监控一些网页变化&#xff0c;实现例如热点/热搜/热评监测、商品上新/价格/库存监测、作品上新/评论/点赞监测、招标/投标/拍卖/竞价监测、公告/通知/活动监测等情况…

南京大学主办 | EIScopus检索 | 2023年人工智能与统计学前沿国际会议

2023年人工智能与统计学前沿国际会议 会议简介 Brief Introduction 2023年人工智能与统计学前沿国际会议(CFAIS 2023) 会议时间&#xff1a;2023年8月18日-20日 召开地点&#xff1a;中国南京 大会官网&#xff1a;www.cfais.org 2023年人工智能与统计学国际会议(CFAIS 2023)将…

测评,补单是什么神仙利器?能提高速卖通,国际站,newegg店铺的销量

测评&#xff0c;补单相信这个词对于大部分卖家来说&#xff0c;想必都不陌生&#xff0c;因为都知道它能够快速帮助自己的产品添加评论&#xff0c;获取排名&#xff0c;打造爆款。于是许多卖家潜意识里认为“测评其实就是刷评”。 简单点来说&#xff0c;测评就是卖家通过【评…

485测试

注意如果十六进制发送数据 数据打印时 如果是%c 打印出来的数据会十进制显示 解决方案 1.改变打印方式 %x 打印 2.因为什么也不勾选 &#xff0c;默认asii显示 利用串口助手发ltz &#xff0c;打印也是ltz&#xff08;发什么就显示什么&#xff09; 如下图所示

如何使用appuploader制作描述文件​

如何使用appuploader制作描述文件​ 承接上文我们讲述了怎么制作证书&#xff0c;本文我们来看下怎么制作描述文件吧。​制作描述文件前我们首先我们来添加一个测试设备&#xff0c;后面再制作描述文件。 1.添加测试设备​ 其中添加设备一项中&#xff0c;根据提示操作添加…

OpenGL ES特效分析 --- 跳动的心

很早之前就见过一个博主发的shader图片&#xff0c;一个跳动的心https://blog.csdn.net/Kennethdroid/article/details/104536532&#xff0c; 感觉太好玩了&#xff0c;于是想要分析一下原理&#xff0c;上面的博主也已经做了初步分析&#xff0c;但是对于我这个特效小白来说还…

VMware Aria Operations 8.12 - 自动驾驶式 IT 运维管理

VMware Aria Operations 8.12 - 自动驾驶式 IT 运维管理 请访问原文链接&#xff1a;https://sysin.org/blog/vmware-aria-operations/&#xff0c;查看最新版。原创作品&#xff0c;转载请保留出处。 作者主页&#xff1a;sysin.org 自动驾驶式 IT 运维管理 VMware Aria Op…

YOLOv5白皮书-第Y3周:yolov5s.yaml文件解读

&#x1f368; 本文为&#x1f517;365天深度学习训练营 中的学习记录博客 &#x1f356; 原作者&#xff1a;K同学啊|接辅导、项目定制 ● 难度&#xff1a;夯实基础⭐⭐ ● 语言&#xff1a;Python3、Pytorch3 ● 时间&#xff1a;5月8日-5月12日 &#x1f37a;要求&#xff…

电脑永久删除文件怎么找回来?分享一种数据恢复方法

电脑对于日常生活和工作都起着重要作用&#xff0c;但是在我们日常办公中&#xff0c;有时会误删除一些文档&#xff0c;甚至永久删除&#xff0c;当我们想找回的时候却手足无措&#xff0c;不知道该怎么办。同时在很多用户的观念里&#xff0c;当我们无法从回收站恢复时&#…