分布式消息队列Kafka(二)- 生产者

news2024/11/26 10:30:34

1.生产者消息发送流程

(1)消息发送原理

​ 在消息发送的过程中,涉及到了两个线程——main线程和 Sender 线程。在 main 线程中创建了一个双端队列 RecordAccumulator。main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka Broker。

在这里插入图片描述

(2)生产者重要参数列表

参数名称描述
bootstrap.servers生产者连接集群所需的 broker 地 址 清 单 。例如 hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置 1个或者多个,中间用逗号隔开。注意这里并非需要所有的 broker 地址,因为生产者从给定的 broker里查找到其他 broker 信息。
key.serializer 和 value.serializer指定发送消息的 key 和 value 的序列化类型。一定要写全类名。
buffer.memoryRecordAccumulator 缓冲区总大小,默认 32m。
batch.size缓冲区一批数据最大值,默认 16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。
linger.ms如果数据迟迟未达到 batch.size,sender 等待 linger.time之后就会发送数据。单位 ms,默认值是 0ms,表示没有延迟。生产环境建议该值大小为 5-100ms 之间。
acks0:生产者发送过来的数据,不需要等数据落盘应答。 1:生产者发送过来的数据,Leader 收到数据后应答。-1(all):生产者发送过来的数据,Leader+和 isr 队列里面的所有节点收齐数据后应答。默认值是-1,-1 和all 是等价的。
max.in.flight.requests.per.connection允许最多没有返回 ack 的次数,默认为 5,开启幂等性要保证该值是 1-5 的数字。
retries当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是 int 最大值,2147483647。如果设置了重试,还想保证消息的有序性,需要设置MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。
retry.backoff.ms两次重试之间的时间间隔,默认是 100ms。
enable.idempotence是否开启幂等性,默认 true,开启幂等性。
compression.type生产者发送的所有数据的压缩方式。默认是 none,也就是不压缩。支持压缩类型:none、gzip、snappy、lz4 和 zstd。

2.生产者异步发送API

(1)普通异步发送

在这里插入图片描述

(2)发送代码

1)创建工程

2)pom依赖

<dependencies>
 <dependency>
 <groupId>org.apache.kafka</groupId>
 <artifactId>kafka-clients</artifactId>
 <version>3.0.0</version>
 </dependency>
</dependencies>

3)普通异步代码

package com.zrclass.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducer {
 public static void main(String[] args) throws InterruptedException {
  // 1. 创建 kafka 生产者的配置对象
  Properties properties = new Properties();
  // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
  properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
 
  // key,value 序列化(必须):key.serializer,value.serializer
  properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
 
  properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
  // 3. 创建 kafka 生产者对象
  KafkaProducer<String, String> kafkaProducer = new 
  KafkaProducer<String, String>(properties);
  // 4. 调用 send 方法,发送消息
  for (int i = 0; i < 5; i++) {
    kafkaProducer.send(new ProducerRecord<>("test","PHP " + i));
  }
 // 5. 关闭资源
  kafkaProducer.close();
 }
}

4)观察kafka消费者控制台是否接收到消息

[zrclass@hadoop102 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic test
PHP 0
PHP 1
PHP 2
PHP 3
PHP 4

5)带回调函数的异步发送

回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

package com.zrclass.kafka.producer;
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class CustomProducerCallback {
 public static void main(String[] args) throws InterruptedException {
		// 1. 创建 kafka 生产者的配置对象
		Properties properties = new Properties();
		// 2. 给 kafka 配置对象添加配置信息
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
		"hadoop102:9092");
		// key,value 序列化(必须):key.serializer,value.serializer
		properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
		StringSerializer.class.getName());
		
		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
		StringSerializer.class.getName());
		// 3. 创建 kafka 生产者对象
		KafkaProducer<String, String> kafkaProducer = new 
		KafkaProducer<String, String>(properties);
		// 4. 调用 send 方法,发送消息
		for (int i = 0; i < 5; i++) {
		 // 添加回调
		  kafkaProducer.send(new ProducerRecord<>("first", "PHP " + i), new Callback() {
		    // 该方法在 Producer 收到 ack 时调用,为异步调用
		    @Override
		    public void onCompletion(RecordMetadata metadata, Exception exception) {
			  if (exception == null) {
			  // 没有异常,输出信息到控制台
			  System.out.println(" 主题: " + metadata.topic() + "->" + "分区:" + metadata.partition());
			  } else {
			   // 出现异常打印
			   exception.printStackTrace();
			  }
		    }
		   });
		  // 延迟一会会看到数据发往不同分区
		  Thread.sleep(2);
		}
		// 5. 关闭资源
		kafkaProducer.close();
	} 
 }

3.生产者分区

(1)kafka分区优势

(1)便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。

(2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-irf7Miq4-1682404805995)(分布式消息队列Kafka.assets/在这里插入图片描述

(2)生产者发送消息的分区策略

1)默认的分区器 DefaultPartitioner

/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it
* <li>If no partition is specified but a key is present choose a 
partition based on a hash of the key
* <li>If no partition or key is present choose the sticky 
partition that changes when the batch is full.
* 
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {
 … …
}

2)生产者消息类ProduceRecord构造方法

在这里插入图片描述

<1>指定partition的情况下,直接发送消息到指定的partition分区

// 指定数据发送到 1 号分区,key 为空(IDEA 中 ctrl + p 查看参数)
 kafkaProducer.send(new ProducerRecord<>("test", 1,"","java " + i));

<2>没有指定partition值但有key的情况下,将key的hash值与topic的partition数进行取余得到partition值,例如:key1的hash值=5,key2的hash值=6,topic的partition数=2,那么key1对应的value1写入1号分区,key2对应的value2写入0号分区

// 依次指定 key 值为 a,b,f ,数据 key 的 hash 值与 3 个分区求余,分别发往 1、2、0
 kafkaProducer.send(new ProducerRecord<>("test", "a","java " + i));

<3>没有指定分区也没有指定key,kafka会随机选择一个分区使用

<4>自定义分区

例如我们实现一个分区器实现,发送过来的数据中如果包含 java,就发往 0 号分区, 不包含 java,就发往 1 号分区。

实现步骤

1>定义类实现 Partitioner 接口。

2>重写 partition()方法。

package com.zrclass.kafka.producer;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
/**
* 1. 实现接口 Partitioner
* 2. 实现 3 个方法:partition,close,configure
* 3. 编写 partition 方法,返回分区号
*/
public class MyPartitioner implements Partitioner {
 /**
  * 返回信息对应的分区
  * @param topic 主题
  * @param key 消息的 key
  * @param keyBytes 消息的 key 序列化后的字节数组
  * @param value 消息的 value
  * @param valueBytes 消息的 value 序列化后的字节数组
  * @param cluster 集群元数据可以查看分区信息
  * @return
  */
	@Override
	public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
		// 获取消息
		String msgValue = value.toString();
		// 创建 partition
		int partition;
		// 判断消息是否包含 java
		if (msgValue.contains("java")){
		partition = 0;
		}else {
		partition = 1;
		}
		// 返回分区号
		return partition;
	}
	// 关闭资源
	@Override
	public void close() {
	}
	// 配置方法
	@Override
	public void configure(Map<String, ?> configs) {
	} 
 }

3>定义kafka配置信息时指定自定义分区

// 添加自定义分区器
properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.zrclass.kafka.producer.MyPartitioner");
 KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

4.生产者如何提高吞吐量

合理调整以下参数的组合

• batch.size:批次大小,默认16k

• linger.ms:等待时间,默认0

• compression.type:压缩,默认 none,可配置值 gzip、snappy、 lz4 和 zstd

• RecordAccumulator:缓冲区大小,默认32M

5.生产者消息可靠性(消息发送不丢失)

在这里插入图片描述

(1)消息的ack机制

1)ACK应答级别

0:生产者发送过来的数据,不需要等数据落盘应答

1:生产者发送过来的数据,Leader收到数据后应答。

-1all:生产者发送过来的数据,Leader和ISR队列里面的所有节点收齐数据后应答。

在这里插入图片描述

在这里插入图片描述

可靠性总结:

在这里插入图片描述

2)代码设置

package com.zrclass.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerAck {
	public static void main(String[] args) throws InterruptedException {
		// 1. 创建 kafka 生产者的配置对象
		Properties properties = new Properties();
		// 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
		
		// key,value 序列化(必须):key.serializer,value.serializer
		properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
		StringSerializer.class.getName());
		
		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
		StringSerializer.class.getName());
		// 设置 acks
		properties.put(ProducerConfig.ACKS_CONFIG, "all");
		// 重试次数 retries,默认是 int 最大值,2147483647
		properties.put(ProducerConfig.RETRIES_CONFIG, 3);
		// 3. 创建 kafka 生产者对象
		KafkaProducer<String, String> kafkaProducer = new 
		KafkaProducer<String, String>(properties);
		// 4. 调用 send 方法,发送消息
		for (int i = 0; i < 5; i++) {
			kafkaProducer.send(new ProducerRecord<>("first","java " + i));
		}
		// 5. 关闭资源
		kafkaProducer.close();
	}
}

6.生产者发送消息不重复

(1)ACK机制引入保证了消息不丢失,但没有处理消息重复的问题

至少一次(At Least Once= ACK级别设置为-1 + 分区副本大于等于2 + ISR里应答的最小副本数量大于等于2

最多一次(At Most Once= ACK级别设置为0

总结:

At Least Once可以保证数据不丢失,但是不能保证数据不重复;

At Most Once可以保证数据不重复,但是不能保证数据不丢失。

• 精确一次(Exactly Once):对于一些非常重要的信息,比如和钱相关的数据,要求数据既不能重复也不丢失。

Kafka 0.11版本以后,引入了一项重大特性:幂等性和事务。

(2)幂等性

1)幂等性原理

幂等性 :就是指Producer不论向Broker发送多少次重复数据,Broker端都只会持久化一条,保证了不重复。

精确一次(Exactly Once=幂等性 + 至少一次( ack=-1 + 分区副本数>=2 + ISR最小副本数量>=2) 。

重复数据的判断标准:具有<PID, Partition, SeqNumber>相同主键的消息提交时,Broker只会持久化一条。其

中PID是Kafka每次重启都会分配一个新的;Partition 表示分区号;Sequence Number是单调自增的。

所以幂等性只能保证的是在单分区单会话内不重复。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-2bapiMoo-1682404805998)(分布式消息队列Kafka.assets/image-20230424143930098.png)]

2)开启幂等性

开启参数 enable.idempotence 默认为 true,false 关闭。

(3)生产者事务

在这里插入图片描述

1)kafka事务api

// 1 初始化事务
void initTransactions();
// 2 开启事务
void beginTransaction() throws ProducerFencedException;
// 3 在事务内提交已经消费的偏移量(主要用于消费者)
void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,
 String consumerGroupId) throws 
ProducerFencedException;
// 4 提交事务
void commitTransaction() throws ProducerFencedException;
// 5 放弃事务(类似于回滚事务的操作)
void abortTransaction() throws ProducerFencedException;

2)生产者开启事务代码

package com.zrclass.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class CustomProducerTransactions {
 public static void main(String[] args) throws InterruptedException {
	// 1. 创建 kafka 生产者的配置对象
	Properties properties = new Properties();
	// 2. 给 kafka 配置对象添加配置信息
	properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
	// key,value 序列化
	properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
	
	properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
	// 设置事务 id(必须),事务 id 任意起名
	properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction_id_0");
	// 3. 创建 kafka 生产者对象
	KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
	// 初始化事务
	kafkaProducer.initTransactions();
	// 开启事务
	kafkaProducer.beginTransaction();
	try {
		// 4. 调用 send 方法,发送消息
		for (int i = 0; i < 5; i++) {
		// 发送消息
		kafkaProducer.send(new ProducerRecord<>("test", "java " + i));
		}
		// int i = 1 / 0;
		// 提交事务
		kafkaProducer.commitTransaction();
	} catch (Exception e) {
		// 终止事务
		kafkaProducer.abortTransaction();
	} finally {
		// 5. 关闭资源
		kafkaProducer.close();
	}
  }
 }

7.kafka在一定条件下保证单分区数据有序

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/459649.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

刘浩:当谈到RTO < 8s时,OceanBase究竟在说什么?

本文为 OceanBase 高级技术专家刘浩在第一届 OceanBase 开发者大会带来的分享。欢迎访问 OceanBase 官网获取更多信息&#xff1a;https://www.oceanbase.com/ 3 月 25 日&#xff0c;第一届 OceanBase 开发者大会在北京举行&#xff0c;OceanBase 高级技术专家刘浩为大家带来了…

VuePress打包后没有样式或者没有图片或者js加载失败

原因是没有部署到服务器上&#xff01;&#xff01;&#xff01; 这可能是我们打包后的东西 直接点击index.html 变成这样了&#xff01;&#xff01;什么样式都没有了&#xff0c;怎么办那&#xff1f; 很简单&#xff0c;找个服务器部署以下就什么都有了&#xff01;&…

NC 打开系统提示“安全日志数据源异常,请联系环境管理员处理”

问题&#xff1a;NC 用的是sql sever数据库&#xff0c;在sysConfig中正常配置好数据源后&#xff0c;点击测试&#xff0c;测试通过&#xff0c;但是打开系统后还提示“安全日志数据源异常&#xff0c;请联系环境管理员处理”&#xff0c;如下图&#xff1a; 原因&#xff1a;…

echarts 画中国地图

数据可视化平台&#xff0c;中国各省数据和坐标 阿里云可视化 效果 使用echart画中国地图&#xff0c;步骤如下 1.安装依赖 npm i echarts4 创建一个js文件 当你可以访问https请求的时候则使用&#xff0c;如下代码 import axios from "axios"; export default a…

网络安全SSRF漏洞 检测

SSRF 检测的一些思考 DNS 平台没有立刻收到请求&#xff0c;是在之后的某个时间段收到了不同的请求信息&#xff0c;这至少表明了一点&#xff0c;此处存在有无回显的 SSRF&#xff0c;虽然想要证明有更大的危害比较困难&#xff0c;但是至少说明了存在有 SSRF 的风险&#xf…

AI机器人ChatGPT使用体验(注册,使用,简易方式)

最近ChatGPT很火 号称下一代搜索引擎 吊打谷歌百度 它可以做到代替很多职业 究竟有多厉害呢&#xff1f; 看看这个例子&#xff1a; 你问他答&#xff0c;是不是感觉啥都知道&#xff1f; 文员、程序员全被打败 这个究竟怎么用呢&#xff1f; 注册 国内99%的人都卡在了…

超详细Docker的安装以及Docker部署C++

系列文章目录 这学期&#xff0c;学校开了一门云计算大数据课程&#xff0c;老师要求从OpenStack、Hadoop、Docker等软件进行部署一个框架。 我去从中选择了一个Docker&#xff0c;来对这个作业进行实现。以下就是我对这次作业的实现过程以及注意事项&#xff0c;还有犯的错误总…

牛客网Verilog刷题——VL27

牛客网Verilog刷题——VL27 题目答案 题目 请编写一个序列检测模块&#xff0c;检测输入信号&#xff08;a&#xff09;是否满足011100序列&#xff0c; 要求以每六个输入为一组&#xff0c;不检测重复序列&#xff0c;例如第一位数据不符合&#xff0c;则不考虑后五位。一直到…

【C++】反向迭代器的设计

前言 STL中不少的容器需要有迭代器这样的设计&#xff0c;特别是正向迭代器&#xff0c;几乎每个容器都有自己的特定实现方式&#xff0c;有了正向迭代器之后&#xff0c;我们还要提供反向迭代器以供一些特殊的需求&#xff0c;但是许多容器的正向迭代器实现的方式不一样&#…

华东师范大学副校长周傲英:未来,中国需要什么样的数据库?

本文为华东师范大学副校长&#xff0c;CCF 会士周傲英教授在第一届 OceanBase 开发者大会带来的分享。欢迎访问 OceanBase 官网获取更多信息&#xff1a;https://www.oceanbase.com/ 3 月 25 日&#xff0c;第一届 OceanBase 开发者大会在北京举行&#xff0c;华东师范大学副校…

AI智能智能课程第四讲 -数据库领域专家

使用chatGPT让你成为数据库领域专家 作业 现在要测试电商的下单功能&#xff1a;测试员张三在公司的电商平台上下了几个单&#xff0c;现在需要验证&#xff1a;张三这个客户下单的所有订单信息&#xff0c;包含订单编号&#xff0c;商品名称&#xff0c;商品价格&#xff0c;…

什么是gpt4-如何用上gpt-4

gpt4主要强化了哪些功能 OpenAI尚未公布GPT-4的详细信息&#xff0c;不过可以根据OpenAI前CEO Sam Altman在2020年所发表的一篇博客中提到的&#xff0c;GPT-4可能会具有更强大和智能的能力&#xff0c;包括更准确的理解和表达自然语言、更高效的记忆和推理、更全面的知识和视…

thinkphp:数值(保留小数点后N位,四舍五入,左侧补零,格式化货币,取整,生成随机数,数字与字母进行转换)

一、保留小数点后N位/类似四舍五入&#xff08;以保留小数点后三位为准&#xff09; number_format()函数&#xff1a;第一个参数为要格式化的数字&#xff0c;第二个参数为保留的小数位数 方法一&#xff1a; public function test() {$num 12.56789; // 待格式化的数字$r…

Maven配置阿里云仓库

Maven简介&#xff1a; Maven项目对象模型(POM)&#xff0c;可以通过一小段描述信息来管理项目的构建&#xff0c;报告和文档的项目管理工具软件。 Maven 除了以程序构建能力为特色之外&#xff0c;还提供高级项目管理工具。由于 Maven 的缺省构建规则有较高的可重用性&#x…

QMS-云质说质量 - 11 我和我的客户投诉(3) - 明枪易躲 暗箭难防

云质QMS原创 转载请注明来源 作者&#xff1a;王洪石 君子思义 小人贪利 金庸老先生在《笑傲江湖》中写道&#xff0c;“只要有人的地方就有恩怨&#xff0c;有恩怨就会有江湖&#xff0c;人就是江湖。”这句话映射到现实社会中&#xff0c;就是“社会险恶&#xff0c;人心叵测…

链表(JS实现、LeetCode例题)

&#x1f4dd;个人主页&#xff1a;爱吃炫迈 &#x1f48c;系列专栏&#xff1a;数据结构与算法 &#x1f9d1;‍&#x1f4bb;座右铭&#xff1a;道阻且长&#xff0c;行则将至&#x1f497; 文章目录 链表链表的分类创建链表LinkedList类的骨架 实现链表的方法push尾部添加元…

『网络基础 一 』

目录 网络发展 认识 “协议” 网络协议初始 协议分层 OSI七层模型 TCP/IP五层&#xff08;或四层&#xff09;模型 网络传输基本流程 ​编辑 协议报头 数据包封装和分用 网络中的地址管理 认识IP地址 认识MAC地址 网络发展 独立设计&#xff1a;计算机之间的相互独立…

Flink系列-10、Flink DataStream的Transformation

版权声明&#xff1a;本文为博主原创文章&#xff0c;遵循 CC 4.0 BY-SA 版权协议&#xff0c;转载请附上原文出处链接和本声明。 大数据系列文章目录 官方网址&#xff1a;https://flink.apache.org/ 学习资料&#xff1a;https://flink-learning.org.cn/ 目录 官网所有的…

探析Android中的四类性能优化

作者&#xff1a;Yj家的孺子牛 流畅性优化 主线程模型 了解 Android 的流畅性优化之前&#xff0c;我们需要先了解Android的线程结构。在 Android 中&#xff0c;有一个主线程模型&#xff0c;其中所有的绘制以及交互都是在主线程中进行的&#xff0c;所以&#xff0c;当我们…

【LaTex】Elsevier投稿系统到底何时整顿?‘expl3.sty‘ aborted!

前言 两年前&#xff0c;我在投稿Elsevier旗下的Knoeldeg-based systems时就被这个投稿系统整得是头昏脑胀&#xff0c;直接肝爆。首先&#xff0c;第一次提交手稿时可以接受PDF&#xff0c;很方便。然而&#xff0c;后面大修时提交可编辑的源文件时给我狠狠的打脸了。记得当时…