Kafka Producer 开发

news2025/1/11 21:55:54

Kafka Producer 开发

kafka包含5个核心的API接口定义:

  • Producer API - 允许应用程序往kafka集群中的topic中发送事件消息
  • Consumer API - 允许应用程序从kafka topic 中读取数据
  • Streams API - 允许对输入数据流进行数据计算、转换,并发送到其他主题进行消费
  • Connect API - 实现connector API,从某个源系统、应用程序持续的拉入数据至kafka,或者从kafka推数据至sink 应用
  • Admin API - 允许管理、监控 消息主题、broker、其它kafka元数据对象

简单的理解,kafka Producer 就是向kafka写入数据的应用程序,接下来实现一个最简单的 Producer 生产者程序。

Maven依赖

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>3.3.1</version>
</dependency>

程序代码

public class SimpleProducer {

    public static void main(String[] args) {
        String topicName = "simple_topic";

        Properties props = new Properties();
        //指定kafka 服务器连接地址
        props.put("bootstrap.servers", "localhost:9092");
        // ack 机制
        props.put("acks", "all");
        // 发送失败 重试次数
        props.put("retries", 0);
        // 消息发送延迟时间 默认为0 表示消息立即发送,单位为毫秒
        props.put("linger.ms", 1);
        // 序列化方式
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);

        for(int i = 0; i < 10; i++) {
            producer.send(new ProducerRecord<>(topicName,
                    Integer.toString(i), "message : " + i));
        }
        System.out.println("Message sent successfully");
        producer.close();
    }
}

验证测试

创建topic

$ ./bin/kafka-topics.sh --create --topic simple_topic --bootstrap-server localhost:9092

启动消费者

# 启动kafka自带的消费者脚本
$ ./bin/kafka-console-consumer.sh --topic simple_topic  --bootstrap-server localhost:9092

测试结果

首先,启动生产者代码,消费者控制台输出如下

在这里插入图片描述

发送机制

同步获取结果

Kafka提供了一种同步发送方法,用于向主题发送记录。让我们使用这个方法向我们之前创建的Kafka主题发送一些消息id和消息。

for(int i = 0; i < 10; i++) {
  try {
    ProducerRecord<String, String> record = new ProducerRecord<>(topicName,
                                                                 Integer.toString(i), "message : " + i);
    Future<RecordMetadata> send = producer.send(record);
    //同步等待 发送结果
    RecordMetadata metadata = send.get();
    System.out.println(String.format("sent record(key=%s value=%s) 分区数: %d, 偏移量: %d, 时间戳: %d",
                                     record.key(),
                                     record.value(),
                                     metadata.partition(),
                                     metadata.offset(),
                                     metadata.timestamp()
                                    ));
  }catch (Exception e){
    e.printStackTrace();
  }
}

在这里插入图片描述

异步回调

Kafka提供了一种异步发送方法,用于向主题发送记录。生产者发送成功后,自动调用回调方法输出消息的元数据,这里的最大区别是使用lambda表达式来定义回调。

for(int i = 0; i < 10; i++) {
  ProducerRecord<String, String> record = new ProducerRecord<>(topicName,
                                                               Integer.toString(i), "message : " + i);
  // 发送成功 执行callback回调函数
  producer.send(record, (metadata, exception) -> {
    if (metadata != null) {
      System.out.println(String.format("sent record(key=%s value=%s) 分区数: %d, 偏移量: %d, 时间戳: %d",
                                       record.key(),record.value(),
                                       metadata.partition(), metadata.offset(),metadata.timestamp()
                                      ));
    }
  });
}
//等待回调执行完毕
TimeUnit.SECONDS.sleep(1);

批量发送

默认情况下,kafka会尽快的发送message至Broker,这对于消息有时效性的场景非常的有效,但是却降低了kafka的吞吐量,原因是每次发送数据都需要进行一次网络传输。如果允许一定时间的延迟(通常情况下,是毫秒级别),且消息吞吐量非常大的场景,可以采用批量发送方式。

kafka批处理机制允许Kafka提高吞吐量,同时保持非常低的延迟。批处理具有更高的压缩比,因此具有更好的磁盘和网络效率。对开发这而言,kafka的批量发送机制,并不需要更改任何业务代码,主要由两个生成器设置控制:linger.ms和batch.size。

如下图所示,生产者允许等待指定的时间,将等待时间内的消息 M0 - M100 一次性发送到服务端。这里有两种情况

在这里插入图片描述

  • batch.size

    Producer生产者的最重要的参数之一,对于调优producer吞吐量和延迟性指标有非常重要的作用。producer将发往同一分区的消息封装到一个batch中,当batch中的消息数量,满足指定的 大小时 ,producer 会发送batch中的所有消息。

    但是,考虑一个极端的场景,由于消息迟迟不能满足 batch.size的大小,导致消息不能发送到服务端,那么消息就会出现延迟,甚至是丢失的风险,此时 linger.ms参数派上用场

  • linger.ms

    该参数是控制消息延迟发送行为,默认为0,表示立即发送数据,并不关心 batch 是否已满。上面提到的极端情况,可以允许指定消息延迟发送时间,单位是毫秒,时间到达,不管batch是否满足,立即发送。

    实际上该参数是系统吞吐量和延迟时间之间的权衡。

分区机制

在这里插入图片描述

kafka是一个分布式流处理系统,当kafka集群部署时,如上图所示,一个topic可能存在N个分区。那么producer生产者将如何选择对应的分区进行数据发送呢?

核心接口

kafka提供了分区策略以及默认的分区方式供用户使用,分区策略的核心接口定义:

public interface Partitioner extends Configurable, Closeable {

    /**
     * 根据指定记录 计算分区
     */
    int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

    /**
     * 分区关闭时 调用该方法
     */
    void close();

    @Deprecated
    default void onNewBatch(String topic, Cluster cluster, int prevPartition) {
    }
}

默认策略

在这里插入图片描述

在3.3.1的版本中,提供了三个分区方法,其中两个已经过期,默认使用RoundRobin轮训的方式计算分区,核心代码如下

public class RoundRobinPartitioner implements Partitioner {
    private final ConcurrentMap<String, AtomicInteger> topicCounterMap = new ConcurrentHashMap<>();

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 获取集群分区数
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        //根据topic 累加
        int nextValue = nextValue(topic);
        List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
        if (!availablePartitions.isEmpty()) {
            // 累加值取模 计算分区数
            int part = Utils.toPositive(nextValue) % availablePartitions.size();
            return availablePartitions.get(part).partition();
        } else {
            return Utils.toPositive(nextValue) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.computeIfAbsent(topic, k -> new AtomicInteger(0));
        return counter.getAndIncrement();
    }
    //...
}

轮训策略效果如下图

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/101745.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Ultra-high Resolution Image Segmentation via Locality-aware Context Fusion

极高图像语义分割。 作者使用了一个高分辨率分割的pipeline&#xff0c;将原始的超高分辨率图像分成一块一块的用于局部分割&#xff0c;然后将局部的分割结果融合形成最终的高分辨率分割。 方法&#xff1a;1&#xff1a;作者引入了一个局部感知上下文融合&#xff08;LCF&…

怎么提升360网站权重?怎么查询网站在360权重

怎么提升360网站权重&#xff1f; 一、增加网站流量 1、做高指数的关键词排名。 2、关键词的合理布局。 3、关键词的布局必须注意密度。 4、网站关键词的页面布局必须合理。二、网站页面内容布局 网站页面的内容可以说是网站的灵魂。网站的好坏完全取决于网站的内容是否能给访问…

Redis6新数据类型Bitmaps

Redis6新数据类型1.Bitmaps2.命令1.Bitmaps 简介&#xff1a;现代计算机用二进制(位)作为信息的基础单位&#xff0c;1个字节等于8位&#xff0c;例如“abc”字符串由3个字节组成&#xff0c;但实际在计算机存储时将其用二进制表示&#xff0c;“abc”分别对应的ASCII码是97、…

​草莓熊python turtle绘图(圣诞元旦倒数雪花版)附源代码

​草莓熊python turtle绘图&#xff08;圣诞元旦倒数雪花版&#xff09;附源代码 本篇目录&#xff1a; 一、前言 二、​草莓熊python绘图&#xff08;圣诞元旦倒数雪花版&#xff09;效果图 三、源代码保存方法 四、代码命令解释 &#xff08;1&#xff09;、绘图基本代码…

LaTeX教程(四)——文档内元素

文章目录1. 表格2. 插入图片3. 盒子4. 浮动体1. 表格 LaTeX的表格不想Word能够做到所见即所得&#xff0c;当表格较小还好&#xff0c;一旦表格内容逐渐增多&#xff0c;那么编写表格就变得十分麻烦了&#xff0c;为此&#xff0c;一般都是用在线表格并生成LaTeX代码的形式来得…

Linux——管道和重定向

一、Linux的文件 linux中奉行一切皆文件&#xff0c;包括目录、链接&#xff08;类似windows的快捷方式&#xff09;、设备文件。 在内核中,所有打开的文件都使用文件描述符&#xff08;一个非负整数&#xff09;标记。文件描述符的变化范围是0~OPEN_MAX – 1。早期的unix系统…

前端CDN和DNS

DNS的基础知识 统一资源定位符(URL) scheme: 方案&#xff0c;包括http&#xff0c;https协议。 host&#xff1a;主机 port&#xff1a;端口 path&#xff1a;路径 query&#xff1a;查询 fragment&#xff1a;片段&#xff0c;访问网址时候定位某个位置 DNS &#xff08;Do…

Java 开发环境配置

在本章节中我们将为大家介绍如何搭建Java开发环境。 Windows 上安装开发环境Linux 上安装开发环境安装 Eclipse 运行 Javawindow系统安装java 下载JDK 首先我们需要下载 java 开发工具包 JDK&#xff0c;下载地址&#xff1a;Java Downloads | Oracle&#xff0c;在下载页面…

Kaggle房价预测 特征工程模型聚合

目录 一&#xff1a;Kaggle数据集准备 二&#xff1a;数据集分析 三&#xff1a;空值处理 四&#xff1a;空值填充 五&#xff1a;查找所有字符列 六&#xff1a;实例化独热编码对象 七&#xff1a;方差过滤 八&#xff1a;特征数据提取 九&#xff1a;查看特征之间…

跨域/解决跨域方法

一、同源策略 同源策略(Same Origin Policy)是一种约定&#xff0c;它是浏览器最核心也是最基本的安全功能。同源策略会阻止一个域的javascrip脚本和另一个域的内容进行交互&#xff0c;是用于隔离潜在恶意文件的关键安全机制&#xff1b;关于这一点我们后面会举例说明。如果缺…

C语言—指针

指针用来存放一个内存地址&#xff1b; 指针的类型就是要存放地址的变量的数据类型&#xff1b; #include <stdio.h>int main() {int a 123;char b H;int *pa &a;char *pb &b;printf("%d\n", *pa);printf("%c", *pb); } pa要存放int类…

评估篇 | 单元测试评估也能复用到集成测试?脚本帮你高效评估

上次我们分享了单元测试用例的复用&#xff0c;单元测试的用例可以复用到集成测试&#xff0c;那单元测试的评估是否也可以复用到集成测试&#xff1f;答案是可以的。 TPT中提供了多种多样的评估方式&#xff0c;其中的脚本评估使我们复用测试评估成为可能。脚本评估&#xff…

@EnableCaching如何一键开启缓存

EnableCaching如何一键开启缓存手动挡CacheManagerCache使用演示小结自动挡CachingConfigurationSelectorAutoProxyRegistrarProxyCachingConfigurationCacheOperationSourceCacheOperationBeanFactoryCacheOperationSourceAdvisorCacheInterceptor小结手动挡 我们首先来看看S…

成本、利润分析法在企业管理中的应用

1 、成本、利润分析法的主要内容 成本、利润分析法主要是指&#xff0c;利用数学模型&#xff0c;对关于企业成本、利润的要素分析&#xff0c;然后计算出要素的改变对企业成本、利润的影响&#xff0c;进而对企业决策提出建议的一种方法。在成本、利润分析法中&#xff0c;最主…

基础IO——文件描述符

文章目录1. 文件描述符fd1.1 open返回值2. 理解Linux下一切皆文件3. 文件描述符的分配规则4. 重定向的本质4.1 使用 dup2 系统调用4.2 追加重定向4.3 输入重定向1. 文件描述符fd 1.1 open返回值 我们先来看下面的例子&#xff1a; 运行结果如下&#xff1a; 我们知道open的…

磺基-CY5 马来酰亚胺 Cyanine5 Maleimide

磺基-CY5 马来酰亚胺 Cyanine5 Maleimide Cyanine5 maleimide是单一活性染料&#xff0c;有选择性的与硫醇基团&#xff08;比如蛋白和多肽的半胱氨酸&#xff09;结合以进行标记。我们使用水溶的Sulfo-Cyanine5 maleimide标记抗体和其他敏感蛋白。Cyanine5是Cy5的类似物&am…

Pb协议的接口测试

Protocol Buffers 是谷歌开源的序列化与反序列化框架。它与语言无关、平台无关、具有可扩展的机制。用于序列化结构化数据&#xff0c;此工具对标 XML &#xff0c;支持自动编码&#xff0c;解码。比 XML 性能好&#xff0c;且数据易于解析。更多有关工具的介绍可参考官网。 P…

Java8新特性学习

文章目录Lambda表达式为什么使用Lambda表达式Lambda表达式语法语法格式一&#xff1a;无参数&#xff0c;无返回值语法格式二&#xff1a;有一个参数&#xff0c;并且无返回值语法格式三&#xff1a;若只有一个参数&#xff0c;小括号可以省略不写语法格式四&#xff1a;有两个…

Docker容器数据卷

是什么 卷就是目录或文件&#xff0c;存在于一个或多个容器中&#xff0c;由docker挂载到容器&#xff0c;但不属于联合文件系统&#xff0c;因此能够绕过Union File System提供一些用于持续存储或共享数据的特性&#xff1a;卷的设计目的就是数据的持久化&#xff0c;完全独立…

LSTM(Long Short-Term Memory)

长短期记忆&#xff08;long short-term memory&#xff0c;LSTM&#xff09;&#xff0c;LSTM 中引入了3个门&#xff0c;即输入门&#xff08;input gate&#xff09;、遗忘门&#xff08;forget gate&#xff09;和输出门&#xff08;output gate&#xff09;&#xff0c;以…