kafka学习-生产者

news2024/11/24 6:42:36

目录

1、消息生产流程

2、生产者常见参数配置

3、序列化器

基本概念

自定义序列化器

4、分区器

默认分区规则

自定义分区器

5、生产者拦截器

作用

自定义拦截器

6、生产者原理解析


1、消息生产流程

2、生产者常见参数配置

3、序列化器

基本概念

  • 在Kafka中保存的数据都是字节数组。
  • 消息发送前,需要将消息序列化为字节数组进行发送。
  • 生产者通过key.serializer和value.serializer指定key和value的序列化器。
  • Kafka使用org.apache.kafka.common.serialization.Serializer接口定义序列化器。
  • Kafka已实现的序列化器有:ByteArraySerializer、ByteBufferSerializer、BytesSerializer、DoubleSerializer、FloatSerializer、IntegerSerializer、StringSerializer、LongSerializer、ShortSerializer。

自定义序列化器

实现org.apache.kafka.common.serialization.Serializer<T>接口,并实现其中的serializer方法。

@Data
public class User {
	private Integer userId;
	private String username;
}

public class UserSerializer implements Serializer<User> {
	@Override
	public void configure(Map<String, ?> configs, boolean isKey) {
	// do nothing
	}
	
	@Override
	public byte[] serialize(String topic, User data) {
		try {
			// 如果数据是null,则返回null
			if (data == null) return null;
			Integer userId = data.getUserId();
			String username = data.getUsername();
			int length = 0;
			byte[] bytes = null;
			if (null != username) {
				bytes = username.getBytes("utf-8");
				length = bytes.length;
			}
			// 第一个4字节存储userId的值
			// 第二个4字节存储username字节数组的长度int值
			// 第三个length长度,存储username序列化之后的字节数组
			ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + length);
			buffer.putInt(userId);
			buffer.putInt(length);
			buffer.put(bytes);
			return buffer.array();
		} catch (UnsupportedEncodingException e) {
			throw new SerializationException("序列化数据异常");
		}
	}
	@Override
	public void close() {
	// do nothing
	}
}

4、分区器

默认分区规则

KafkaProducer.partition();DefaultPartitioner.partition();

  1. 如果record提供了分区号,则使⽤record提供的分区号
  2. 如果record没有提供分区号,则使⽤key的序列化后的值的hash值对分区数量取模
  3. 如果record没有提供分区号,也没有提供key,则使⽤轮询的⽅式分配分区号。

自定义分区器

实现org.apache.kafka.clients.producer.Partitioner接口,并实现其中的partition方法。

在生产者参数中通过配置partitioner.class指定自定义分区器。

/**
 * 自定义分区器
 */
public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        // 此处可以计算分区的数字。
        // 我们直接指定为2
        return 2;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

5、生产者拦截器

作用

        在发送消息前,或者在执行回调逻辑前,对消息做一些定制化的处理,比如修改消息,打印消息日志等。此外,Producer允许设置多个拦截器从而形成一条拦截器链,Producer将按照指定顺序调用它们。

自定义拦截器

        自定义拦截器实现org.apache.kafka.clients.producer.ProducerInterceptor接口,并实现其中的onSend()、onAcknowledgement()、close()接口。其中:

  • onSend(ProducerRecord):Producer 确保在消息被序列化前调⽤该⽅法。⽤户可以在该⽅法中对消息做任何操作,但最好保证不要修 改消息所属的topic和分区,否则会影响⽬标分区的计算。
  • onAcknowledgement(RecordMetadata, Exception):该⽅法会在消息被应答之前或消息发送失败时调⽤, 并且通常都是在Producer回调逻辑触发之前。
  • close:关闭Interceptor,主要⽤于执⾏⼀些资源清理⼯作。

        在生产者参数中通过配置ProducerConfig.INTERCEPTOR_CLASSES_CONFIG指定自定义拦截器。

public class Interceptor<KEY, VALUE> implements ProducerInterceptor<KEY, VALUE> {
	private static final Logger LOGGER = LoggerFactory.getLogger(InterceptorTwo.class);
	@Override
	public ProducerRecord<KEY, VALUE> onSend(ProducerRecord<KEY, VALUE> record) {
		System.out.println("拦截器---go");
		// 此处根据业务需要对相关的数据作修改
		String topic = record.topic();
		Integer partition = record.partition();
		Long timestamp = record.timestamp();
		KEY key = record.key();
		VALUE value = record.value();
		Headers headers = record.headers();
		// 添加消息头
		headers.add("interceptor", "interceptor".getBytes());
		ProducerRecord<KEY, VALUE> newRecord = 
			new ProducerRecord<KEY, VALUE>(topic, partition, timestamp, key, value, headers);
		return newRecord;
	}
	
	@Override
	public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
		System.out.println("拦截器---back");
		if (exception != null) {
			// 如果发生异常,记录在日志中
			LOGGER.error(exception.getMessage());
		}
	}

	@Override
	public void close() {
	}
	
	@Override
	public void configure(Map<String, ?> configs) {
	}
}

6、生产者原理解析

以上内容为个人学习理解,如有问题,欢迎在评论区指出。

部分内容截取自网络,如有侵权,联系作者删除。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/994071.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

虚拟机上部署K8S集群

虚拟机上部署K8S集群 安装VM Ware安装Docker安装K8S集群安装kubeadm使用kubeadm引导集群 安装VM Ware 参考&#xff1a;http://www.taodudu.cc/news/show-2034573.html?actiononClick 安装Docker 参考&#xff1a;https://www.yuque.com/leifengyang/oncloud/mbvigg#2ASxH …

长安链BaaS服务平台调研

目录 一、菜单功能二、其他说明2.1、服务平台的部署方式2.2、链本身2.3、建链流程2.4、支持连接已部署的链2.5、链治理投票2.6、支持动态节点操作2.7、支持应用 长安链ChainMaker管理平台文档地址&#xff1a;https://docs.chainmaker.org.cn 一、菜单功能 菜单子菜单/功能点…

lock screen password (remove)

解除apple手机锁屏密码步骤 对于老人家来说手机越简单越好 换手机的时候连界面图标&#xff0c;页码&#xff0c;原来放那里&#xff0c;新机也是放那里

Nacos实战(19)-Nacos健康检查机制:保障你的服务稳定运行!

0 前言 注册中心不应仅提供服务注册和发现功能&#xff0c;还应保证对服务可用性监测&#xff0c;对不健康的服务和过期的进行标识或剔除&#xff0c;维护实例的生命周期&#xff0c;以保证客户端尽可能的查询到可用的服务列表。 因此本文介绍Nacos注册中心的健康检查机制。 …

C++函数内联详解

本文旨在讲解C中的函数内联相关知识&#xff0c;读完这篇文章&#xff0c;希望读者们会对函数内联有更深一步的认识&#xff01; 内联函数的定义 在计算机科学中&#xff0c; 内联函数 &#xff08;有时称作 在线函数 或 编译时期展开函数 &#xff09;是一种编程语言结构&…

如何给Mybatis-plus再增加点plus

来源公众号&#xff1a;赵侠客 一、Mybatis-plus基本功能 1.1 Mybatis-plus内置方法 Mybatis-plus给我们造了很多轮子&#xff0c;让我们可以开箱即用&#xff0c;在BaseMapper中有19种操作数据库常用的方法&#xff0c;如Insert()、deleteById()、updateById()、selectById(…

Spring系列文章:Spring事务

一、事务简述 1、什么是事务&#xff08; Transaction&#xff08;tx&#xff09;&#xff09; 在⼀个业务流程当中&#xff0c;通常需要多条DML&#xff08;insert delete update&#xff09;语句共同联合才能完成&#xff0c;这 多条DML语句必须同时成功&#xff0c;或者同…

WSL 在windows 家庭版上面的安装方式

目录 1、前言 2、约束 3、安装 1、安装Hyper 2、Hyper-V启用 3、安装Linux 4、0x800701bc问题处理 结论 1、前言 适用于Windows的Linux子系统 Windows Subsystem for Linux&#xff08;简称WSL&#xff09;是一个在Windows 10\11上能够运行原生Linux二进制可执行文件&am…

一条爬虫抓取一个小网站所有数据

一条爬虫抓取一个小网站所有数据 ​ 今天闲来无事&#xff0c;写一个爬虫来玩玩。在网上冲浪的时候发现了一个搞笑的段子网&#xff0c;发现里面的内容还是比较有意思的&#xff0c;于是心血来潮&#xff0c;就想着能不能写一个Python程序&#xff0c;抓取几条数据下来看看&am…

9.3.4(数据链路层)

一. 以太网帧格式: 二.IP地址和Mac地址在网络传输中的区别: 1.源IP:数据发送方的地址. 目的IP:数据接收发的地址. 2.源Mac:相邻两个路由器传输数据时发送方的地址. 目的Mac: 相邻两个路由器传输数据时接收方的地址. 3. 在一次数据传输中,源IP和目的IP不变,源Mac和目的Mac不…

Databend 数据集成方案 | Data Infra 第 15 期

本期的 Data Infra 直播活动我们邀请到了 Databend Cloud 研发工程师-韩山杰&#xff0c;与大家分享主题为《 Databend 数据集成方案》的相关知识。 在本次分享中&#xff0c;你将会学到在云上基于 Databend 及 Databend Cloud 构建应用&#xff0c;掌握 Databend CDC 和 Data…

Validate表单组件的封装

之前一直是直接去使用别人现成的组件库&#xff0c;也没有具体去了解人家的组件是怎么封装的&#xff0c;造轮子才会更好地提高自己&#xff0c;所以尝试开始从封装Form表单组件开始 一&#xff1a;组件需求分析 本次封装组件&#xff0c;主要是摸索封装组件的流程&#xff0c;…

哪个mac虚拟机软件好?怎么选择

虚拟机软件可以说是部分苹果用户们都会使用到的&#xff0c;因为很多软件在Mac上并不兼容&#xff0c;大部分都是基于Windows的框架进行开发设计的。虽然也有出Mac版本&#xff0c;但往往推迟得比较久才会进行发布。 拥有了虚拟机软件之后&#xff0c;我们就能够虚拟想要的系统…

lenovo联想笔记本ThinkPad P16V Gen 1(21FC,21FD)原装出厂Win11系统

原厂W11系统自带所有驱动、出厂主题壁纸、Office办公软件、联想电脑管家等预装程序 链接&#xff1a;https://pan.baidu.com/s/17dTExDSz-EDN4Qd-PZGJuw?pwdrgl3 提取码&#xff1a;rgl3 所需要工具&#xff1a;32G或以上的U盘 文件格式&#xff1a;ISO 文件大小…

油猴插件(Tampermonkey)的使用教程

以下内容源于网络资源的学习与整理&#xff0c;如有侵权请告知删除。 “油猴插件” 与 “油猴扩展程序” 表示同一个意思&#xff0c;下面统一使用“油猴插件”这个名词。 油猴插件的简介 浏览器插件&#xff0c;包括油猴插件和其他插件&#xff0c;通过它们可以实现浏览器网…

Vue 路由守卫详细介绍与演示

Vue 路由守卫是一种在 Vue.js 应用程序中控制路由导航的机制&#xff0c;它允许你在路由变化前、后或在特定路由上执行代码&#xff0c;以便实现诸如权限控制、数据加载、页面切换动画等功能。在下面的介绍中&#xff0c;我将首先提供官方定义和通俗解释&#xff0c;然后详细介…

rosbag 包转TUM数据集

参考链接&#xff1a; ROS学习&#xff1a;制作自己的TUM数据集 配置环境 1.安装ROS 参考我的博客 https://blog.csdn.net/qin_liang/article/details/127035615 2.查看rosbag中的topic rosbag info xxx.bag3.创建catkin_ws/src文件夹 在src下运行 catkin_create_pkg rosb…

<OpenCV> Mat属性

OpenCV的图像数据类型可参考之前的博客&#xff1a;https://blog.csdn.net/thisiszdy/article/details/120238017 OpenCV-Mat类型的部分属性如下&#xff1a; size&#xff1a;矩阵的大小&#xff0c; s i z e ( c o l s , r o w s ) size(cols,rows) size(cols,rows)&#xf…

如何维持股市稳定?——股市定海神针

中国股市于1989年 开始&#xff0c;至今2023年&#xff0c; 已有30多个年头。而这30多年来&#xff0c;却有20多年钟情于3000点。 股市有赌性在&#xff0c;却也为数以千计的企业提供了养料&#xff0c;更关系着数以亿计的股民、以及企业员工的切身利益。 股市3000点&#xff…

(翻译)JavaFX高级教程:JavaFX2.0的FXML语言

原文地址http://download.oracle.com/javafx/2.0/fxml_get_started/jfxpub-fxml_get_started.htm FXML是JavaFX 2.0新引入的。你可能会问"What is FXML?" 和"Is FXML for me?" FXML 是基于XML的一种声明性标记语言&#xff0c;用来定义应用的用户接口。F…