11、Kafka ------ Kafka 核心API 及 生产者API 讲解

news2025/1/12 10:42:45

目录

  • Kafka核心API 及 生产者API讲解
    • ★ Kafka的核心API
      • Kafka包含如下5类核心API:
    • ★ 生产者API
      • Kafka 的API 文档
    • ★ 使用生产者API发送消息

Kafka核心API 及 生产者API讲解

官方文档

★ Kafka的核心API

Kafka包含如下5类核心API:

在这里插入图片描述

Producer API(生产者API):
应用程序通过该API向主题发布消息。

Consumer API(消费者API):
应用程序通过该API订阅一个或多个主题,并从所订阅的主题中拉取消息(记录)

Streams API(流API):
应用程序可通过该API实现流处理器,可以将一个主题的消息“导流”到另一个主题,并能地对消息进行任意自定义的转换。

类似于 RabbitMQ 的 Exchange

Connector API(连接器API):
应用程序可通过这套API来实现连接器,这些连接器不断地从源系统或应用程序导入数据到Kafka,反过来也可将Kafka消息不断地导入某个接收系统或应用程序。

通过这个API,可以让应用程序和Kafka这个消息系统进行一个实时的交互,我们的系统可以不断的接收来自Kafka的消息,也可以让我们的程序不断的把数据导入到Kafka的消息系统中,就像是一个通道,所以叫连接API。

应用场景:我们的应用程序要和Kafka之间保持实时的数据流的时候,就可以用这个连接API。

AdminAPI(管理API):
应用程序可通过该API管理和检查主题、Broker和其他Kafka实体。

在这里插入图片描述



这5套API中,只有流API使用的是专门的JAR包。

其他都用的是org.apache.kafka:kafka-clients依赖库。

而流API用的是org.apache.kafka:kafka-streams依赖库。



★ 生产者API


在这里插入图片描述

<dependency>
	<groupId>org.apache.kafka</groupId>
	<artifactId>kafka-clients</artifactId>
	<version>3.6.1</version>
</dependency>

生产者API 的核心类是 KafkaProducer,它提供了一个 send()方法 来发送消息,该方法需要传入一个 ProducerRecord<K,V>对象。

ProducerRecord 代表了一条消息,Kafka 的消息是包含了key、value、timestamp。

ProducerRecord定义了如下6个构造器:

- ProducerRecord(String topic, Integer partition, K key, V value):
  创建一条发送到指定主题和指定分区的消息。

- ProducerRecord(String topic, Integer partition, K key, V value, Iterable<Header> headers):
  创建一条发送到指定主题和指定分区的消息,且包含多个消息头。
  
- ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value):
  创建一条发送到指定主题和指定分区的消息,且使用给定的时间戳。
  
- ProducerRecord(String topic, Integer partition, Long timestamp, K key, V value, Iterable<Header> headers):
  创建一条发送到指定主题和指定分区的消息、使用给定的时间戳,且包含多个消息头。
  
- ProducerRecord(String topic, K key, V value):
  创建一条发送到指定主题的消息。

- ProducerRecord(String topic, V value):
  创建一条发送到指定主题的、只带value,不带key的消息。

通过查 API 文档可看这个 ProducerRecord 消息对象 的6个构造器:

在这里插入图片描述

Kafka 的API 文档

Kafka 的API 文档

在这里插入图片描述

★ 使用生产者API发送消息

使用生产者API发送消息很简单,基本只要两步:

1、创建KafkaProducer对象,创建该对象时要传入Properties对象,用于对该生产者进行配置。

2、调用KafkaProducer对象的send()方法发送消息,调用ProducerRecord的构造器即可创建不同的消息。

3、发送完成后,关闭KafkaProducer对象。



为何Kafka的KafkaProducer需要一个Properties来来创建KafkaProducer?

因为Kafka的Producer API提供了海量的配置选项——如果你将这些配置选项每个都定义成方法,那将是一件让人无比痛苦的事情。

所以Kafka在设计该API时,就直接用了一个Properties来封装所有的配置属性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1399822.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ESP32-HTTP_webServer库(Arduino)

ESP32-HTTP 介绍 ESP32是一款功能强大的微控制器&#xff0c;具有丰富的网络和通信功能。其中之一就是支持HTTP协议&#xff0c;这使得ESP32可以用于创建Web服务器。 HTTP是什么&#xff1f; HTTP&#xff08;Hyper Text Transfer Protocol&#xff09;&#xff0c;即超文本传…

如何快速打开github

作为一个资深码农&#xff0c;怎么能不熟悉全球最大的同性交友社区——github呢&#xff0c;但头疼的是github有时能打开&#xff0c;有时打不开&#xff0c;这是怎么回事&#xff1f; 其实问题出在github.com解析DNS上&#xff0c;并不是需要FQ。下面提供一个方法&#xff0c;…

前端打同一个包可以从测试晋升到生产的配置方案

前端打同一个包从测试晋升到生产环境的方案&#xff0c;是一种高效、可靠且易于维护的部署方式。在这种方案中&#xff0c;前端代码在开发完成后&#xff0c;经过测试验证无误后&#xff0c;可以直接打包部署到生产环境&#xff0c;无需进行额外的配置或修改。这样可以减少部署…

HarmonyOS 应用开发入门

HarmonyOS 应用开发入门 前言 DevEco Studio Release版本为&#xff1a;DevEco Studio 3.1.1。 Compile SDK Release版本为&#xff1a;3.1.0&#xff08;API 9&#xff09;。 构建方式为 HVigor&#xff0c;而非 Gradle。 最新版本已不再支持 &#xff08;”Java、JavaScrip…

已解决Error:AttributeError: module ‘numpy‘ has no attribute ‘complex‘

文章目录 引言报错分析解决方案1&#xff1a;降低NumPy版本解决方案2&#xff1a;更改NumPy源码 结尾 引言 在Python编程的世界里&#xff0c;NumPy无疑占据着举足轻重的地位&#xff0c;它承担着处理大规模数值计算的重要任务。然而&#xff0c;与所有强大工具一样&#xff0…

晶振负载电容公式,怎么计算?

晶振&#xff0c;或称晶体振荡器&#xff0c;是电子设备中的关键元件&#xff0c;尤其在时序电路中扮演着重要角色。晶振的性能直接影响到整个电路的稳定性和精度。在晶振设计中&#xff0c;负载电容是一个重要的参数。理解如何计算负载电容对于正确配置和使用晶振至关重要。今…

Find My卡片正成为消费电子香饽饽,伦茨科技ST17H6x可以帮到您

今年CES许多公司发布支持苹果Find My的卡片产品&#xff0c;这种产品轻薄可充电&#xff0c;放在钱包、背包或者手提包可以防丢查找&#xff0c;在智能化加持下&#xff0c;防丢卡片使得人们日益关心自行车的去向。最新的防丢卡片与苹果Find My结合&#xff0c;智能防丢&#x…

Chatgpt+Comfyui绘图源码说明及本地部署文档

其他文档地址&#xff1a; ChatgptComfyui绘图源码运营文档 ChatgptComfyui绘图源码线上部署文档 一、源码说明 1、源码目录说明 app_home&#xff1a;app官网源码chatgpt-java&#xff1a;管理后台服务端源码、用户端的服务端源码chatgpt-pc&#xff1a;电脑网页前端源码cha…

Halcon 边缘提取

文章目录 算子Halcon edges_image 示例Halcon frei_amp 示例Halcon kirsch_amp示例Halcon sobel_amp示例Halcon sobel_amp 算子示例Halcon sobel_dir 算子示例Halcon close_edges关闭图像间隙示例Halcon close_edges_length关闭图像间隙示例 算子 edges_image 对于图像进行边缘…

64 C++对象模型探索。类对象占用空间问题分析

0.没有任何成员变量的空类&#xff0c;实际上占用空间为1字节。 这是因为当你的使用代码实例化一个类的时候&#xff0c;tea对象就有了空间&#xff0c;&tea是有地址的&#xff0c;那么你这个地址至少要占用1个字节才合理。 Teacher tea; 就像我们买房子&#xff0c;你的…

爬虫接口获取外汇数据(汇率,外汇储备,贸易顺差,美国CPI,M2,国债利率)

akshare是一个很好用的财经数据api接口&#xff0c;完全免费&#xff01;&#xff01;和Tushare不一样。 除了我标题显示的数据外&#xff0c;他还提供各种股票数据&#xff0c;债券数据&#xff0c;外汇&#xff0c;期货&#xff0c;宏观经济&#xff0c;基金&#xff0c;银行…

JDK 动态代理(Spring AOP 的原理)(面试重点)

代理模式 也叫委托模式.定义&#xff1a;为其他对象提供⼀种代理以控制对这个对象的访问.它的作⽤就是通过提供⼀个代理类,让我们 在调⽤⽬标⽅法的时候,不再是直接对⽬标⽅法进⾏调⽤,⽽是通过代理类间接调⽤&#xff0c;在某些情况下,⼀个对象不适合或者不能直接引⽤另⼀个对…

Stream toList不能滥用以及与collect(Collectors.toList())的区别

Stream toList()返回的是只读List原则上不可修改&#xff0c;collect(Collectors.toList())默认返回的是ArrayList,可以增删改查 1. 背景 在公司看到开发环境突然发现了UnsupportedOperationException 报错&#xff0c;想到了不是自己throw的应该就是操作collection不当。 发…

Jetbrains Writerside 使用教程

系列文章目录 前言 一、入门 Writerside 是基于 IntelliJ 平台的 JetBrains 集成开发环境。使用它可以编写、构建、测试和发布技术文档。 如果你想将 Writerside 作为另一个 JetBrains IDE 的插件&#xff0c;请参阅 Writerside 作为插件。 1.1 安装 Writerside&#xfeff;…

vue路由-全局前置守卫

1. 介绍 详见&#xff1a;全局前置守卫网址 使用场景&#xff1a; 对于支付页&#xff0c;订单页等&#xff0c;必须是登录的用户才能访问的&#xff0c;游客不能进入该页面&#xff0c;需要做拦截处理&#xff0c;跳转到登录页面 全局前置守卫的原理&#xff1a; 全局前置…

QT+opencv源码编译

时间记录&#xff1a;2024/1/20 一、版本介绍 QT5.12.7cmake3.22.0opencv4.5.4 二、编译步骤 &#xff08;1&#xff09;下载opencv源码&#xff0c;然后安装&#xff0c;opencv的安装即对源码的解压过程&#xff0c;解压后的文件目录如下 &#xff08;2&#xff09;openc…

stm32 FOC 电机介绍

今年开始学习foc控制无刷电机&#xff0c;这几天把所学整理一下&#xff0c;记录一下知识内容。 前言: 为什么要学习FOC? 1.电机控制是自动化控制领域重要一环。 2.目前直流无刷电机应用越来越广泛&#xff0c;如无人机、机械臂、云台、仿生机器人等等。 需要什么基础&…

最长公共前缀(Leetcode14)

例题&#xff1a; 分析&#xff1a; 我们可以先定义两个变量 i &#xff0c; j&#xff0c; j表示数组中的每一个字符串&#xff0c; i 表示每个字符串中的第几个字符。一列一列地进行比较&#xff0c;先比较第一列的字符&#xff0c;若都相同&#xff0c;则 i &#xff0c;继…

字节跳动 ByteHouse 云原生之路 – 计算存储分离与性能优化

01 起源 ByteHouse 的故事从字节跳动对于先进数据处理和分析的需求开始&#xff0c;这一需求随着公司业务规模的迅速扩张而日益增长&#xff0c;起源是对开源数据库管理系统 ClickHouse 的改造和增强。面对数据处理的高延迟、大规模数据操作的复杂性以及数据存储和处理成本的上…

记录一次QT乱码问题

问题描述 在敲陆文周的书《QT5开发及实例》的示例代码时&#xff0c;出现乱码&#xff0c;如下图所示 具体代码如下 Widget::Widget(QWidget *parent): QWidget(parent), ui(new Ui::Widget) {ui->setupUi(this);ui->treeWidget->clear();int groupSize 2;int ite…