kafka生产者API

news2024/11/20 0:41:22

生产者工作流程

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7nbn8JO8-1673512011693)(./assets/5f9ec46a650a4cc195be42e2abfbe1bb.png)]

  1. 首先生产者调用send方法发送消息后,会先经过拦截器,接着进入序列化器。序列化器主要用于对消息的Key和Value进行序列化。接着进入分区器选择消息的分区。

  2. 上面这几步完成之后,消息会进入到一个名为RecordAccumulator的缓冲队列,这个队列默认32M。当满足以下两个条件的任意一个之,消息由sender线程发送。

    条件一:消息累计达到batch.size,默认是16kb。
    条件二:等待时间达到linger.ms,默认是0毫秒。
    所以在默认情况下,由于等待时间是0毫秒,所以只要消息来一条就会发送一条。

  3. Sender线程首先会通过sender读取数据,并创建发送的请求,针对Kafka集群里的每一个Broker,都会有一个InFlightRequests请求队列存放在NetWorkClient中,默认每个InFlightRequests请求队列中缓存5个请求。接着这些请求就会通过Selector发送到Kafka集群中。

  4. 当请求发送到Kafka集群后,Kafka集群会返回对应的acks信息。生产者可以根据具体的情况选择处理acks信息。
    0:生产者发送过来的数据,不需要等数据落盘应答。
    1:生产者发送过来的数据,Leader收到数据后应答。
    -1(all):生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答。默认值是-1,-1和all是等价的。

  5. 发送成功的话,Deque会将NetworkClient中的缓存删除,然后将这个batch从Deque中删除。发送失败的话,sender会对这个发送请求进行重试(默认重试次数为int最大值,可以自定义)

Kafka生产者

操作kafka导入依赖

 		<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.2.8.RELEASE</version>
        </dependency>

异步发送

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GLPASiGT-1673511940020)(./assets/aa88689c85fb47909e8178865a5308b2.png)]

  public static void main(String[] args) {

        // 创建Kafka生产者配置对象
        Properties props = new Properties();

        // 给kafka配置对象添加配置信息
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop001:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 创建生产者 指定配置
        Producer<String, String> producer = new KafkaProducer<>(props);
		
		// 循环发送多条消息
        for (int i = 0; i < 10; i++) {
            // 创建生产者发送的消息和topic
            ProducerRecord<String, String> record = new ProducerRecord<>("Hello-Kafka", "hello" + i,
                    "world" + i);
            // 发送消息
            producer.send(record);
        }
        // 关闭生产者
        producer.close();
    }

这里并没有使用Boot的自动装配,而是使用,通过API来进行操作Kafka的生产者

里面对于配置信息部分,都是可以方式Spring的YAML中的

如何查看消息发送的分区等信息,因为我们在发送完消息,并不知道消息的位置在哪里

在send()方法中进行 Callback中进行获取回调信息

 			/* 发送消息*/
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (e == null) {
                        System.out.println("topic:"+metadata.topic());
                        System.out.println("partition:"+metadata.partition());
                    }else{
                        e.printStackTrace();
                    }
                }
            });

同步发送

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-PVJKpmm9-1673511940021)(./assets/1e1d67b103254649adf8a8d5f63cb4e2.png)]

	for (int i = 0; i < 10; i++) {

            // 创建生产者发送的消息和topic
            ProducerRecord<String, String> record = new ProducerRecord<>("Hello-Kafka", "hello" + i,
                    "world" + i);

            //  同步发送消息 需要处理check异常
            try {
                // 返回的对象就是 回调函数返回的对象获取回调信息
                RecordMetadata metadata = producer.send(record).get();


            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        // 关闭生产者
        producer.close();

分区操作

为什么使用分区

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pAax6ehH-1673511940021)(./assets/aa998ca49b7547f79fb4336da491757c.png)]

消息对象指定分区

在创建消息的时候可以去指定分区

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Rx0fcVv4-1673511940022)(./assets/61dd2f538e134626a32a30a533fbcb4e.png)]

代码展示


            // 创建生产者消息
            // 指定发送到 1号 分区
            ProducerRecord<String, String> record =
                    new ProducerRecord<>("Hello-Kafka", 1 ,"", "world");

            // 创建生产者消息
            // 指定发送到 同一个key下。kafka会对Key进行操作来通过key来指定一个分区
            // 所以我们可以使用key来进行将 同一组的消息放在一个分区下
            ProducerRecord<String, String> record1 =
                    new ProducerRecord<>("Hello-Kafka" ,"type", "world");


            // 创建生产者消息
            // 不指定分区和key  采取默认规则进行 多个分区选择传入
            ProducerRecord<String, String> record2 =
                    new ProducerRecord<>("Hello-Kafka", "world");


[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gnQX992o-1673511940022)(./assets/image-20230111102927506.png)]

生产者提高吞吐量

当我们发送的消息过多的时候,我们可以通过调节kafk的配置来提高我们想kafka传入的数据

buffer.memory

设置发送消息的缓冲区,默认值是33554432,就是32MB

如果发送消息出去的速度小于写入消息进去的速度,就会导致缓冲区写满,此时生产消息就会阻塞住,所以说这里就应该多做一些压测,尽可能保证说这块缓冲区不会被写满导致生产行为被阻塞住

compression.type

默认是none,不压缩,但是也可以使用lz4压缩,效率还是不错的,压缩之后可以减小数据量,提升吞吐量,但是会加大producer端的cpu开销。

batch.size

设置merge batch的大小,如果 batch 太小,会导致频繁网络请求,吞吐量下降;

如果batch太大,会导致一条消息需要等待很久才能被发送出去,而且会让内存缓冲区有很大压力,过多数据缓冲在内存里

默认值是:16384,就是16kb,也就是一个batch满了16kb就发送出去,一般在实际生产环境,这个batch的值可以增大一些来提升吞吐量,可以自己压测一下。

linger.ms

这个值默认是0,意思就是消息必须立即被发送,但是这是不对的。

一般设置一个100毫秒之类的,这样的话就是说,这个消息被发送出去后进入一个batch,如果100毫秒内,这个batch满了16kb,自然就会发送出去。

但是如果100毫秒内,batch没满,那么也必须把消息发送出去了,不能让消息的发送延迟时间太长,也避免给内存造成过大的一个压力。

这些可以通过Boot的yaml配置文件。或者通过编写的指定消费者配置信息时候指定

spring.kafka.proucer.buffer.memory
spring.kafka.proucer.batch.size
spring.kafka.proucer.linger.ms
spring.kafka.proucer.compression.type

数据丢失问题

ACK应答级别

spring.kafka.producer.ack = 
## 0
## 1
## -1/all

0的时候,数据发过来还没落盘就应答,结果leader挂了导致了数据丢失。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-7g0C1dJo-1673513735362)(./assets/b6fbd906a7084a85bd1ccdbe6252674e.png)]

1的时候,数据发送过来,leader落盘后就会应答,生产者收到ack应答认为信息已经发送成功,随后就会清除掉队列中的消息,但是此时follwer可能还没完成同步,这个时候leader挂掉,就会有一个follwer成为新的leader,可是生产者已经认为信息发送成功从队列中清除了消息,这就导致了数据的丢失。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-0Bcrrffw-1673511940023)(./assets/4fadbde7b4374e5ea6a11e7958937e26.png)]

-1(all):leader收到消息,并且所有follwer都完成消息同步后返回ack应答

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fIExCJpN-1673511940024)(./assets/796577d933ae4b05aac033099840eb6f.png)]

最后一种 应答模式看似安全其实也是存在问题

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-n9Hd519R-1673511940024)(./assets/image-20230111160154487.png)]

可靠性总结:

  1. acks=0,生产者发送过来数据就不管了,可靠性差,效率高;
  2. acks=1,生产者发送过来数据Leader应答,可靠性中等,效率中等;
  3. acks=-1,生产者发送过来数据Leader和ISR队列里面所有Follwer应答,可靠性高,效率低; 在生产环境中,
  4. acks=0很少使用;acks=1,一般用于传输普通日志,允许丢个别数据;acks=-1,一般用于传输和钱相关的数据, 对可靠性要求比较高的场景。

SpringBoot整合Kafka

上面操作通过Kafka整合的java的api同样可以在SpringBoot中使用

但是SpringBoot自动装配之后进行了封装 KafkaTemplate<> 使用

前面的配置信息很多可以通过YAML进行添加配置信息

kafka配置类


// 配置类
@Configuration
public class KafkaConfig {


    /**
     * 使用Spring配置类注解 将yaml中的信息引入到SpringBoot的kafka配置信息类中
     */
    @Bean
    @ConfigurationProperties(prefix = "spring.kafka")
    @Primary
    public KafkaProperties KafkaListenerKafkaProperties(){
        return new KafkaProperties();
    }

    /**
     *  创建kafka的工厂类
     */
    @Bean
    @Primary
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> props = new HashMap<>();
        //kafka 集群地址
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
        //重试次数
        props.put(ProducerConfig.RETRIES_CONFIG, 3);
        //应答级别
        //acks=0 把消息发送到kafka就认为发送成功
        //acks=1 把消息发送到kafka leader分区,并且写入磁盘就认为发送成功
        //acks=all 把消息发送到kafka leader分区,并且leader分区的副本follower对消息进行了同步就任务发送成功
        props.put(ProducerConfig.ACKS_CONFIG, "all");
        //Key 序列化方式
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //Value 序列化方式
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //消息压缩:none、lz4、gzip、snappy,默认为 none。
        props.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");
        return new DefaultKafkaProducerFactory<>(props);
    }

    /**
     * kafkaTemplate 通过SpringBoot中的  kafka生产者工厂 和 kafka配置项
     * 来创建 生产者的
     */
    @Bean("kafkaTemplate")
    @Primary
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(
                new DefaultKafkaProducerFactory<>(KafkaListenerKafkaProperties().buildAdminProperties()));
    }


}

SpringBoot的Kafka生产者


/**
 * kafka 生产服务
 *
 * @author Leo
 * @create 2020/12/31 16:06
 **/
@Slf4j
@Service
public class KafkaProducerService {
    @Qualifier("kafkaTemplate")
    @Resource
    private KafkaTemplate<String, String> kafkaTemplate;

    @Qualifier("kafkaTemplateWithTransaction")
    @Resource
    private KafkaTemplate<String, String> kafkaTemplateWithTransaction;

    /**
     * 发送消息(同步)
     *
     * @param topic   主题
     * @param key     键
     * @param message 值
     */
    public void sendMessageSync(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {
        //可以指定最长等待时间,也可以不指定
        kafkaTemplate.send(topic, message).get(10, TimeUnit.SECONDS);
        log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message);
        //指定key,kafka根据key进行hash,决定存入哪个partition
//        kafkaTemplate.send(topic, key, message).get(10, TimeUnit.SECONDS);
        //存入指定partition
//        kafkaTemplate.send(topic, 0, key, message).get(10, TimeUnit.SECONDS);
    }

    /**
     * 发送消息并获取结果
     *
     * @param topic
     * @param message
     * @throws ExecutionException
     * @throws InterruptedException
     */
    public void sendMessageGetResult(String topic, String key, String message) throws ExecutionException, InterruptedException {
        SendResult<String, String> result = kafkaTemplate.send(topic, message).get();
        log.info("sendMessageSync => topic: {}, key: {}, message: {}", topic, key, message);
        log.info("The partition the message was sent to: " + result.getRecordMetadata().partition());
    }

    /**
     * 发送消息(异步)
     *
     * @param topic   主题
     * @param message 消息内容
     */
    public void sendMessageAsync(String topic, String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        //添加回调
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                log.error("sendMessageAsync failure! topic : {}, message: {}", topic, message);
            }

            @Override
            public void onSuccess(SendResult<String, String> stringStringSendResult) {
                log.info("sendMessageAsync success! topic: {}, message: {}", topic, message);
            }
        });
    }

    /**
     * 可以将消息组装成 Message 对象和 ProducerRecord 对象发送
     *
     * @param topic
     * @param key
     * @param message
     * @throws InterruptedException
     * @throws ExecutionException
     * @throws TimeoutException
     */
    public void testMessageBuilder(String topic, String key, String message) throws InterruptedException, ExecutionException, TimeoutException {
        // 组装消息
        Message msg = MessageBuilder.withPayload(message)
                .setHeader(KafkaHeaders.MESSAGE_KEY, key)
                .setHeader(KafkaHeaders.TOPIC, topic)
                .setHeader(KafkaHeaders.PREFIX, "kafka_")
                .build();
        //同步发送
        kafkaTemplate.send(msg).get();
        // 组装消息
//        ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, message);
//        kafkaTemplate.send(producerRecord).get(10, TimeUnit.SECONDS);
    }
}

afkaHeaders.MESSAGE_KEY, key)
.setHeader(KafkaHeaders.TOPIC, topic)
.setHeader(KafkaHeaders.PREFIX, “kafka_”)
.build();
//同步发送
kafkaTemplate.send(msg).get();
// 组装消息
// ProducerRecord<String, String> producerRecord = new ProducerRecord<>(topic, message);
// kafkaTemplate.send(producerRecord).get(10, TimeUnit.SECONDS);
}
}








本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/159465.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

javaWeb servlet

使用案例&#xff1a; <dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <version>3.1.0</version> <scope>provided</scope> </dependency> 导了包之…

五指山[《信息学奥赛一本通》](扩展欧几里得算法)

题目如下&#xff1a; 题解 or 思路 我们可以将题目 抽象 成数学模型 xkd≡y(modn)x kd \equiv y\ (mod\ n)xkd≡y (mod n) xkdya∗nx kd y a * nxkdya∗n k∗d−a∗ny−xk*d - a*n y - xk∗d−a∗ny−x 式子 在这里 kkk, aaa 是变量&#xff0c;其余是常数 我们可以扩展…

Java 快速开发几 MB 独立 EXE,写图形界面很方便

Java 写的桌面软件带上运行时只有 6 MB&#xff0c;而且还是独立 EXE 文 件&#xff0c;是不是难以置信&#xff1f; 想一想 Electron 没写多少功能就可能超过百 MB 的体积&#xff0c;Java 写的桌面软件算不算得上小、轻、快呢&#xff1f; aardio 可以支持很多编程语言&…

什么是 智慧行业 App?

智慧行业 是帮助大家将智能设备和边缘网关设备添加到云开发项目的移动端 App。App 提供 iOS 和安卓版本&#xff0c;您可以在对应系统的应用商店下载。 应用场景 智慧行业 能帮助用户将不同网络协议的设备&#xff08;包含边缘网关子设备&#xff09;配置到相关项目中。开发者…

对于TP, TN, FP, FN, Pre, Recall的举例

对于以上参数的概念以及理解&#xff0c;请参考我的文章&#xff1a; 机器学习中TP&#xff0c;TN&#xff0c;FP&#xff0c;FN&#xff0c;Acc&#xff0c;Pre&#xff0c;Sen, Rec的含义_汤宪宇的博客-CSDN博客_机器学习accz 这里我们在将上面概念的定义重新梳理一遍&#…

深入源码剖析 Mybatis 缓存机制

深入源码剖析 Mybatis 缓存机制 Mybatis 为了减轻数据库压力&#xff0c;提高数据库性能。提供了两级缓存机制 一级缓存 sqlSession 级别缓存&#xff0c;缓存的数据只在 sqlSession 内有效&#xff0c;一级缓存默认为我们开起来&#xff0c;不需要我们手动操作&#xff0c;而…

【Dash搭建可视化网站】项目14:美国机场交通数据可视化看板制作步骤详解

美国机场交通数据可视化看板制作步骤详解1 项目效果图2 项目架构3 文件介绍和功能完善3.1 assets文件夹介绍3.2 app.py和index.py文件完善3.3 sider.py文件完善3.4 mapchart.py文件完善3.5 barchart.py文件完善3.6 api.py和api.ipynb文件完善4 样式修改4.1 整体样式修改4.2 sid…

联合证券|A股汽车板块爆发,北向资金半日净买入43亿

今日上午&#xff0c;A股商场震动胶着&#xff0c;轿车、电力设备、通信、机械设备等板块领涨。房地产、美容护理、商贸零售等板块领跌。 北向资金再度成为商场重要亮点之一&#xff0c;半个交易日净买入约43亿元。至此&#xff0c;2023年1月以来&#xff0c;北向资金累计净买入…

Jmeter常用函数

1、生成随机数函数 ${__Random(m,n)}&#xff0c;其中m,n参数是必填 2、 生成随机日期函数${__RandomDate(dateTimeFormat,from,end,locale,var)}&#xff0c;其中end是必选的&#xff0c;代表最大的日期&#xff1b; 3、 随机生成字符串函数${__RandomString(length,chars,)}…

Linux---vim编辑器

目录 1. vim的基本概念 2. vim正常/命令模式命令集 3. vim底行模式命令集 1. vim的基本概念 vim是Linux下一款常用编辑器&#xff0c;vim的三种模式(其实有好多模式&#xff0c;主要掌握这3种即可),分别是命令模式&#xff08;command mode&#xff09;、插入模式&#xff0…

Synchronized锁原理及 ConcurrentHashMap

文章目录一、Synchronized原理加锁过程锁消除锁粗化二、线程安全的集合类多线程环境使用ArrayList多线程环境使用队列多线程环境下使用哈希表一、Synchronized原理 我们表面看到的&#xff0c;两个线程针对同一对象加锁&#xff0c;就会产生阻塞等待&#xff0c;但实际我们的S…

2023我的创作纪念日

文章目录机缘收获日常憧憬机缘 这个博客还是我上大一的时候注册的&#xff0c;在大一、大二、大三期间更多的是为了方便搜索&#xff0c;学校里边的习题大部分是可以在CSDN上找到的。真正写博客是在大三下学习实习&#xff0c;当时为了方便记录实习中遇到的问题。在C站对我影响…

【Git 从入门到精通】2023最新版的Git安装与卸载每一步附详细讲解

文章目录安装1.下载Git2.开始安装卸载1.找到电脑中的Git2.卸载3.删除环境变量安装 1.下载Git 首先去官网下载Git安装包&#xff0c;可以直接在百度搜索Git&#xff0c;以下几个网站都可以。也可以点击直达&#xff0c;官网上下载如果不科学上网的话还是很慢的&#xff0c;所以…

[Zombodb那些事]Zombodb与ElasticSearch的Bulk通信

Zombodb与ElasticSearch的Bulk通信0.前言Zombodb是一个PostgreSQL插件&#xff0c;使用rust编写&#xff0c;支持pg14以下版本。Zombodb可以允许PostgreSQL查询ElasticSearch中的内容。本篇为《Zombodb那些事》第一篇&#xff0c;后面将更新其他部分内容。Zombodb会在pg数据库上…

智能文字识别技术推动彝文识别弘扬中华文化

前言 谈起图像识别自己颇有感触&#xff0c;因为之前的两段工作经历都和图像识别密切相关&#xff1b;之前一家公司的主营业务就是将历史上珍贵文献进行数字化&#xff1b;上家公司自己负责图像识别模块相关的工作&#xff1b;不但使用了第三方平台产品而且进行了自建&#xff…

设计模式相关内容介绍—UML

统一建模语言(Unified ModelingLanguage&#xff0c;UML)是用来设计软件的可视化建模语言。它的特点是简单、统一、图形化、能表达软件设计中的动态与静态信息。 UML从目标系统的不同角度出发&#xff0c;定义了用例图、类图、对象图、状态图、活动图、时序图、协作图、构件图、…

经过2022年这大环境,我学会了如何管理我的领导

2022年这大环境&#xff0c;可以说是我干软件开发这些年来&#xff0c;经历的最残酷的一年&#xff0c;所以做为职场软件开发一员的我&#xff0c;不得不修炼一下真本事。 很多时候不是你不努力&#xff0c;不是你连mysql连的不溜&#xff0c;不是你布局页面布局的不精细&#…

16.Isaac教程--Codelets详解

Codelets详解 ISAAC教程合集地址: https://blog.csdn.net/kunhe0512/category_12163211.html 文章目录Codelets详解Codelets 和 tick接收消息传输消息方便的 ToProto/FromProto 函数配置参数应用程序 JSON子图姿态组件是机器人应用程序的基本构建块。 Isaac SDK 包含可在您的应…

「数据结构详解·九」图的初步

「数据结构详解一」树的初步「数据结构详解二」二叉树的初步「数据结构详解三」栈「数据结构详解四」队列「数据结构详解五」链表「数据结构详解六」哈希表「数据结构详解七」并查集的初步「数据结构详解八」带权并查集 & 扩展域并查集「数据结构详解九」图的初步 注意&…

基于JavaWEB SSM SpringBoot婚纱影楼摄影预约网站设计和实现

基于JavaWEB SSM SpringBoot婚纱影楼摄影预约网站设计和实现 博主介绍&#xff1a;5年java开发经验&#xff0c;专注Java开发、定制、远程、文档编写指导等,csdn特邀作者、专注于Java技术领域 作者主页 超级帅帅吴 Java毕设项目精品实战案例《500套》 欢迎点赞 收藏 ⭐留言 文末…