kafka原理之生产者

news2025/1/24 17:50:28

batch.size:只有数据累计到batch.size后,sender才会发送数据。默认16k
linger.ms:如果迟迟没有达到batch.size,sender等待linger.ms设置时间之后,发送数据。单位:ms,默认0(没有延迟)
acks设置:

  • 0:不需要等待数据落盘应答;
  • 1:leader落盘后应答;
  • -1(all):leader和follower落盘后应答;

一、ApI调用

(1)、异步API:

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerTest {

    public static void main(String[] args){
        //配置
        Properties properties = new Properties();
        //连接,如果是集群,不需要全部都配置,只需要配置几个
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop102:9092");
        //设置序列化方式
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //创建连接对象
        KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
        //发送数据--向first发送10条数据
        for (int i = 1; i <= 10; i++){
            kafkaProducer.send(new ProducerRecord("first","test1", "hello-kafka" + i));
        }
        //关闭资源
        kafkaProducer.close();
    }
}

image.png

(2)、带回调的异步API:

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerTest {

	public static void main(String[] args){
		//配置
		Properties properties = new Properties();
		//连接,如果是集群,不需要全部都配置,只需要配置几个
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.88:9092");
		//设置序列化方式
		properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		//创建连接对象
		KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
		//发送数据--向first发送10条数据
		for (int i = 1; i <= 10; i++){
			kafkaProducer.send(new ProducerRecord("first", "test1", "hello-kafka" + i), new Callback() {
				@Override
				public void onCompletion(RecordMetadata recordMetadata, Exception e) {
					if(e == null){
						System.out.println("topic:"+recordMetadata.topic()+";partition:"+ recordMetadata.partition());
					}
				}
			});
		}
		//关闭资源
		kafkaProducer.close();
	}
}

控制台输出:
image.png
image.png

(3)、同步API:

同步API只是比异步API多加了一个 .get()

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerTest {

	public static void main(String[] args) throws ExecutionException, InterruptedException {
		//配置
		Properties properties = new Properties();
		//连接,如果是集群,不需要全部都配置,只需要配置几个
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.88:9092");
		//设置序列化方式
		properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		//创建连接对象
		KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
		//发送数据--向first发送10条数据
		for (int i = 1; i <= 10; i++){
			kafkaProducer.send(new ProducerRecord("first", "test1", "同步API-发送数据" + i)).get();
		}
		//关闭资源
		kafkaProducer.close();
	}
}

image.png

(4)、解决本地无法连接虚拟机中kafka的问题

这个问题表现为,在虚拟机中,使用kafka-console-consumer.sh和kafka-console-producer.sh是可以正常连接和发送数据的。但是本地使用java代码,一直没有发送数据。
如果是集群,不要只启动一台测试!!!!

方案一:

如果本地代码是使用主机名连接,如properties.put("bootstrap.servers", "hadoop102:9092")。修改本地的ip主机名映射,如图,在hosts文件(win路径:C:\Windows\System32\drivers\etc)中添加
image.png

方案二

如果本地代码是使用ip地址连接,如properties.put("bootstrap.servers", "192.168.10.102:9092")。修改/opt/module/kafka_2.13-3.0.0/config/server.properties中的advertised.listeners配置,如图:
image.png

二、生产者分区

创建一个名为first的topic,设置3个分区,3个副本,如图:

image.png

(1)、好处:

  • 1.便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。
  • 2.提高并行度,生产者可以以分区为单位发送数据,消费者可以以分区为单位消费数据

(2)、默认分区策略

  • 1.如果指定分区,发往指定分区
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerTest {

	public static void main(String[] args){
		//配置
		Properties properties = new Properties();
		//连接,如果是集群,不需要全部都配置,只需要配置几个
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
		//设置序列化方式
		properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		//创建连接对象
		KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
		//发送数据--向first发送10条数据
		for (int i = 1; i <= 10; i++){
			//指定分支发送,发往二号分区
			kafkaProducer.send(new ProducerRecord("first",2, "key" + i, "hello-kafka" + i), new Callback() {
				@Override
				public void onCompletion(RecordMetadata recordMetadata, Exception e) {
					if(e == null){
						System.out.println("topic:"+recordMetadata.topic()+";partition:"+ recordMetadata.partition());
					}
				}
			});
		}
		//关闭资源
		kafkaProducer.close();
	}
}

image.png

  • 2.未指定分区,有key,用key的hash值对topic的partition数量取余,得到partition
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
public class KafkaProducerTest {
	public static void main(String[] args){
		//配置
		Properties properties = new Properties();
		//连接,如果是集群,不需要全部都配置,只需要配置几个
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
		//设置序列化方式
		properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		//创建连接对象
		KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
		//发送数据--向first发送10条数据
		for (int i = 1; i <= 10; i++){
			//未指定分区,使用key的来决定分区
			kafkaProducer.send(new ProducerRecord("first","key" + i, "hello-kafka" + i), new Callback() {
				@Override
				public void onCompletion(RecordMetadata recordMetadata, Exception e) {
					if(e == null){
						System.out.println("topic:"+recordMetadata.topic()+";partition:"+ recordMetadata.partition());
					}
				}
			});
		}
		//关闭资源
		kafkaProducer.close();
	}
}

image.png

  • 3.没有指定分区,也没有key,使用粘性分区(一批数据,一次一个分区)。尽可能一直用一个分区,该分区的batch.size/linger.ms已到,再随机选择一个分区,且和上一次的分区不同
import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerTest {

	public static void main(String[] args){
		//配置
		Properties properties = new Properties();
		//连接,如果是集群,不需要全部都配置,只需要配置几个
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
		//设置序列化方式
		properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		//创建连接对象
		KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
		//发送数据--//粘性分区测试,需要数据量大一点才能看出效果
		for (int i = 1; i <= 500; i++){
			//不指定分区,也不指定key
			kafkaProducer.send(new ProducerRecord("first", "hello-kafka" + i), new Callback() {
				@Override
				public void onCompletion(RecordMetadata recordMetadata, Exception e) {
					if(e == null){
						System.out.println("topic:"+recordMetadata.topic()+";partition:"+ recordMetadata.partition());
					}
				}
			});
		}
		//关闭资源
		kafkaProducer.close();
	}
}

(3)、自定义分区策略

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;

import java.util.List;
import java.util.Map;
public class MyPartitioner implements Partitioner {
	/**
	 *
	 * @param topic
	 * @param key 发送的key
	 * @param keyBytes
	 * @param value 发送的value
	 * @param valueBytes
	 * @param cluster
	 * @return
	 */
	@Override
	public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
		List<PartitionInfo> partitionInfos = cluster.availablePartitionsForTopic(topic);
		//分区数量
		int num = partitionInfos.size();
		//根据value与分区数求余的方式得到分区ID
		return Math.abs(value.hashCode()) % num;
	}

	@Override
	public void close() {

	}

	@Override
	public void configure(Map<String, ?> map) {

	}
}

import org.apache.kafka.clients.producer.*;
import java.util.Properties;

public class KafkaProducerTest {

	public static void main(String[] args){
		//配置
		Properties properties = new Properties();
		//连接,如果是集群,不需要全部都配置,只需要配置几个
		properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
		//设置序列化方式
		properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
		//自定义分区
		properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
		//创建连接对象
		KafkaProducer kafkaProducer = new KafkaProducer<>(properties);
		//发送数据--向first发送20条数据
		for (int i = 1; i <= 20; i++){
			//原本key一致,应该发往相同的分区,但是因为使用了自定义分区
            //使用自定义分区规则决定分区
			kafkaProducer.send(new ProducerRecord("first", "key","hello-kafka" + i), new Callback() {
				@Override
				public void onCompletion(RecordMetadata recordMetadata, Exception e) {
					if(e == null){
						System.out.println("topic:"+recordMetadata.topic()+";partition:"+ recordMetadata.partition());
					}
				}
			});
		}
		//关闭资源
		kafkaProducer.close();
	}
}

image.png

三.提高吞吐量

  • batch.size:批次大小,默认16。增加到32k
  • linger.ms: 等待时间,默认0(无延迟)。 修改为5-100ms
  • compression.type:压缩方式(gZip、snappy、Lz4、Zstd)。修改为snappy
  • buffer.memory:RecordAccumulator缓冲区大小,默认32m。修改为64m

四.数据可靠性

通过配置acks来实现不同的数据可靠性

(1)、acks(0)

生产者发送数据,不需要等到数据落盘应答。
数据可靠性:数据丢失

(2)、acks(1)

生产者发送数据,等待leader收到数据并落盘应答。
数据可靠性:比0可靠,但还是会丢数(leader接收到数据,落盘并响应,但是follower还没有同步数据并落盘,如果leader挂掉后,重新选出一个leader,但新leader没有对应数据)

(3)、acks(-1/all)

生产者发送数据,等待leader和follower全都收到数据并落盘应答
数据可靠性:需要有一定的条件(分区副本数 ≥ 2 + isr队列应答最小副本数 ≥ 2)

  • isr队列(in-sync replicas set):和leader保持同步的follower+leader集合(leader:0,isr:0,1,2)
  • ISR队列应答最小副本数:min.insync.replicas

解决(ack设置为-1/all时,某一个follower挂了,导致迟迟没有它的应答)
设置replica.log.time.max.ms参数,默认30ms。达到设置值,将follower踢出isr队列

(4)、总结

akc=0,很少使用
ack=1,一般用于传输日志,允许丢个别数据
ack=-1/all,用于可靠性要求高的场景:钱

akc设置数据可靠性传输效率
0
1中等中等
-1/all

五.数据重复

ack设置为(-1/all)时,follower同步完后,leader准备应答时,忽然挂了。follower成为leader后,重新接收数据,导致数据重复

(1)、至少一次(at least one)

akc=-1/allmin.insync.replice ≥ 2。保证数据不丢失,不保证数据不重复

(2)、最多一次(at most one)

ack = 0。保证数据不重复,不保证不丢失

(3)、精确一次(exactly ont)

幂等性(不管producer发多少次,broker都只有持久化一份数据)+事务+至少一次
幂等性参数:enable.idempotence=true(默认开启)

(4)、幂等判重标准

<PID,partition,SeqNumber>:幂等只能在单分区单会话内不重复
PID:kafka每次重启,都会分配一个新的PID
partition:分区号
Sequence Number:单调自增

(5)、完善幂等的PID问题

在使用事务功能时,自定义一个唯一的transactionalId。这样即使重启后,也可以根据transactionalId继续处理

六.事务原理

  • 开启事务,必须开启幂等性
  • 确认事务信息存储主体的分区(默认50个):transactionalId的hashCode值%50

七、数据有序

(1)、多分区有序

如果想要多分区有序,可以把所有数据都拉倒消费者中,然后排序

(2)、单分区有序

  • 1.x版本之前,通过设置max.in.flight.requests.per.connection = 1,不需要考虑是否开启幂等
  • 1.x版本之后,如果没有开启幂等,同1.x配置。 如果开启幂等,设置max.in.flight.requests.per.connection ≤ 5

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/521568.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java ---多线程(下)

&#xff08;一&#xff09;目录 线程的优先级 守护线程 线程同步 线程并发协作 主要内容 &#xff08;二&#xff09;线程的优先级 1 什么是线程的优先级 每一个线程都是有优先级的&#xff0c;我们可以为每个线程定义线程的优先级&#xff0c;但是这并不能保 证高优…

Anaconda下载与安装详解

文章目录 1 Anaconda1.1 简介1.2 下载安装1.3 配置环境变量1.4 下载配置1.4.1 conda配置1.4.1.1 修改conda下载源1.4.1.2 删除下载源1.4.1.3 包下载目录1.4.1.4 下载报错 1.4.2 pip配置1.4.2.1 配置源1.4.2.2 下载目录1.4.2.3 修改下载目录 1.5 修改虚拟环境地址1.5.1 通过配置…

【软件开发】Memcached(理论篇)

Memcached&#xff08;理论篇&#xff09; 1.Memcached 简介 Memcached 是一个开源的&#xff0c;支持高性能&#xff0c;高并发的分布式内存缓存系统&#xff0c;由 C 语言编写&#xff0c;总共 2000 多行代码。从软件名称上看&#xff0c;前 3 个字符 Mem 就是内存的意思&am…

quartz原理

1.如何实现任务 2.3个组件 3.工作原理 在Quartz中&#xff0c;有两类线程&#xff0c;Scheduler调度线程和任务执行线程&#xff0c;其中任务执行线程通常使用一个线程池维护一组线程。 Scheduler调度线程主要有两个&#xff1a;执行常规调度的线程&#xff0c;和执行misfir…

【C++】关联式容器——mapset的使用

文章目录 1.关联式容器和键值对1. 关联式容器2. 键值对 2. 树形结构的关联式容器——set1. 模版参数列表2. 默认成员函数3. 迭代器4.容量相关操作5.modify6.其他操作接口 3. 树形结构的关联式容器——map1. 默认成员函数2. 迭代器3. 容量与数据访问4.数据修改5. 其他操作接口 1…

vue-5:router

router路由配置&#xff0c;使用 在vue-cli构建的vue单页面应用中&#xff0c;需要借助vue-router库实现路由功能 配置路由 (构建项目时要下载) router文件夹下创建&#xff1a;index.js&#xff0c;routerConfig.js配置路由 路由懒加载&#xff1a; 按需加载&#xff1a;…

轻松掌握线程基础知识和四种创建方式及区别

1、线程与进程 2、并行与并发 3、创建线程 1、方式一&#xff1a;继承Thread类 2、方式二&#xff1a;实现Runnable接口 3、方式三&#xff1a;实现Callable接口 4、方式四&#xff1a;线程池创建线程&#xff08;项目中使用的方式&#xff09; 5、Runnable和Callable区别 6、…

在 Windows 10/11、7/8 上清空后从回收站恢复已删除文件的 6 种方法

Windows&#xff08;包括 Windows 11、10、8、7 和 Vista&#xff09;上的回收站用于回收您打算删除的不需要的文件。如果您删除了一些重要的文件或文件夹并且不小心清空了回收站&#xff0c;您仍然有机会恢复从回收站中删除的文件。这是一个教程&#xff0c;将阐明“如何在清空…

HTML第一天

HTML第一天 我们接下来是进行的网页开发网页的相关概念: 什么是网页?什么是HTML?网页的形成? 什么是网页&#xff1a; 1.网站是指在因特网上根据一定的规则&#xff0c;使用 HTML 等制作的用于展示特定内容相关的网页集合。 2.网页是网站中的一“页”&#xff0c;通常是…

钓鱼圈子钓点钓场鱼漂钓位小程序开发

钓鱼圈子钓点钓场鱼漂钓位小程序开发 功能: 关注好友功能。一键导航至钓鱼点。学习交流。鱼票功能是本系统的—大亮点&#xff0c;此功能可应用于鱼场举办活动比赛以及日常预定位置&#xff0c;在小程序进行预定&#xff0c;线下核销。系统拥有商城功能&#xff0c;可以为运营…

深度学习环境配置系列文章(三):配置VS Code和Jupyter的Python环境

深度学习环境配置系列文章目录 第一章 专业名称和配置方案介绍 第二章 Anaconda配置Python和PyTorch 第三章 配置VS Code和Jupyter的Python环境 第四章 配置Windows11和Linux双系统 第五章 配置Docker深度学习开发环境 第三章文章目录 深度学习环境配置系列文章目录前言一、VS…

C语言中链表经典面试题目——复制带随机指针的链表

&#x1f436;博主主页&#xff1a;ᰔᩚ. 一怀明月ꦿ ❤️‍&#x1f525;专栏系列&#xff1a;线性代数&#xff0c;C初学者入门训练&#xff0c;题解C&#xff0c;C的使用文章&#xff0c;「初学」C&#xff0c;数据结构​​​​​​​ &#x1f525;座右铭&#xff1a;“不…

计算机数据表示和数据转换

1、计算机数据表示和数据转换 送入计算机的数字、字母和符号等信息必须转换成0、1组合的二进制形式形式才能被计算机所接收、存储和运算。能够进行计算的数据并且能得出一个明确的数值叫数值数据&#xff0c;其余信息是非数值数据。 1.1 数值数据的表示 数值数据的计数方式是进…

SpringBoot的自动装配

前言 众所周知&#xff0c;SpringBoot的自动装配是其核心功能之一&#xff0c;SpringBoot提供了许多自动配置类&#xff0c;我们通常会有这样的一个概念&#xff1a;“当应用程序启动时&#xff0c;SpringBoot会扫描路径上的自动配置类进行加载&#xff0c;从而大大简化了项目…

小白量化《穿云箭集群量化》(6)巡航导弹策略

小白量化《穿云箭集群量化》&#xff08;6&#xff09;响尾蛇导弹 响尾蛇导弹是非常著名的武器装备&#xff0c;响尾蛇导弹发射者只需雷达瞄准和发射动作&#xff0c;发射动作完成尽快脱离战场保全自身安全。响尾蛇导弹会自动追踪敌机&#xff0c;直至击毁敌机。 证券交易犹如…

MySQL基础(三十一)数据库其它调优策略

1 数据库调优的措施 1.1 调优的目标 尽可能 节省系统资源 &#xff0c;以便系统可以提供更大负荷的服务。&#xff08;吞吐量更大&#xff09;合理的结构设计和参数调整&#xff0c;以提高用户操作 响应的速度 。&#xff08;响应速度更快&#xff09;减少系统的瓶颈&#xf…

day37_jdbc

今日内容 上课同步视频:CuteN饕餮的个人空间_哔哩哔哩_bilibili 同步笔记沐沐霸的博客_CSDN博客-Java2301 零、 复习昨日 零、 复习昨日 见晨考 一、作业 package com.qf.homework;import com.qf.model.User;import java.sql.Connection; import java.sql.DriverManager; impo…

【笔试强训】(红与黑,五子棋,走迷宫)DFS+BFS算法解析

博主简介&#xff1a;想进大厂的打工人博主主页&#xff1a;xyk:所属专栏: 笔试强训专栏 深度优先遍历&#xff08;Depth First Search, 简称 DFS&#xff09; 与广度优先遍历&#xff08;Breath First Search&#xff09;是图论中两种非常重要的算法。 本文就以习题的方式来给…

STM32F10X--EXTI--外部中断/事件控制器

一、EXTI是什么&#xff1f; EXTI&#xff08;External interrupt/event controller&#xff09;—外部中断/事件控制器&#xff0c;管理了控制器的20 个中断/事 件线。每个中断/事件线都对应有一个边沿检测器&#xff0c;可以实现输入信号的上升沿检测和下降沿的 检测。EXTI 可…

SpringMVC的基础知识

创建SpringMVC项目 SpringMVC项目其实和SpingBoot项目差不多,就多引入了一个SpringWeb项目而已拉 可以看这篇博客,创建的就是一个SpringMVC项目--创建项目の博客 SpringMVC是啥 Spring是啥相信大家都了解 啥是MVC呢?MVC是Model View Controller的缩写 我们分开看这三个词Model…