kafka学习-基本概念与简单实战

news2025/1/2 2:35:25

目录

1、核心概念

消息和批次

Topic和Partition

Replicas

Offset

broker和集群

生产者和消费者

2、开发实战

2.1、消息发送

介绍

代码实现

2.2、消息消费

介绍

代码实现

2.3、SpringBoot Kafka

pom

application.yaml

KafkaConfig

producer

consumer


1、核心概念

消息和批次

        kafka的基本数据单元,由字节数组组成。可以理解成数据库的一条数据。

        批次就是一组消息,把同一个主题和分区的消息分批次写入kafka,可以减少网络开销,提高效率;批次越大,单位时间内处理的消息就越多,单个消息的传输时间就越长。

Topic和Partition

        topic主题,kafka通过主题进行分类。主题可以理解成数据库的表或者文件系统里的文件夹。

        partition分区可以理解成一个FIFO的消息队列。(同一个分区的消息保证顺序消费)

        主题可以被分为若干分区,一个主题通过分区将消息存储在kafka集群中,提供横向扩展的能力。消息以追加的方式写入分区,每个分区保证先入先出的顺序读取。在需要严格保证消息顺序消费的场景下,可以将partition设置为1,即主题只有一个分区。

        主题的分区策略有如下几种:

  1. 直接指定分区;
  2. 根据消息的key散列取模得出分区;
  3. 轮询指定分区。

Replicas

  1. 副本,每个分区都有多个副本。其中包含一个首领副本和多个跟随者副本。
  2. 首领副本用于响应生产者的消息写入请求与消费者的消息读取请求;
  3. 跟随者副本用于同步首领副本的数据,保持与首领副本一致的状态,有数据备份的功能。
  4. 一旦首领副本所在的服务器宕机,就会从跟随者中选出一个升级为首领副本。

Offset

        偏移量。

        生产者offset:每个分区都有一个offset,叫做生产者的offset,可以理解为当前这个分区队列的最大值,下一个消息来的时候,就会将消息写入到offset这个位置。

        消费者offset:每个消费者消费分区中的消息时,会记录消费的位置(offset),下一次消费时就会从这个位置开始消费。

broker和集群

broker为一个独立的kafka服务器;一个kafka集群里有多个broker。

        broker接收来自生产者的消息,为消息设置偏移量,并将消息保存到磁盘。同时,broker为消费者提供服务,对读取分区的请求做出响应,返回已经保存到磁盘上的消息。(单个broker可以轻松处理数千个分区以及每秒百万级的消息量)。

        集群中同一个主题的同一个分区,会在多个broker上存在;其中一个broker上的分区被称为首领分区,用于与生产者和消费者交互,其余broker上的分区叫做副本分区,用于备份分区数据,防止broker宕机导致消息丢失。

        每个集群都有一个broker是集群控制器,作用如下:

  1. 将分区分配给首领分区的broker;
  2. 监控broker,首领分区切换

生产者和消费者

        生产者生产消息,消息被发布到一个特定的主题上。默认情况下,kafka会将消息均匀地分布到主题的所有分区上。分区策略有如下几种:

  1. 直接指定分区;
  2. 根据消息的key散列取模得出分区;
  3. 轮询指定分区。

        消费者通过偏移量来区分已经读过的消息,从而消费消息。消费者是消费组的一部分,消费组可以保证每个分区只能被一个消费者使用,避免重复消费。

2、开发实战

2.1、消息发送

介绍

  • 生产者主要有KafkaProducer和ProducerRecord两个对象:KafkaProducer用于发送消息,ProducerRecord用于封装kafka消息。
  • 生产者生产消息后,需要broker的确认,可以选择同步或者异步确认:同步确认效率低;异步确认效率高,但需要设置回调对象。        

代码实现

public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
	Map<String, Object> configs = new HashMap<>();
	// 设置连接Kafka的初始连接⽤到的服务器地址
	// 如果是集群,则可以通过此初始连接发现集群中的其他broker
	 configs.put("bootstrap.servers", "node1:9092");
	// 设置key和value的序列化器
	 configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
	 configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
	 configs.put("acks", "1");
	 KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);
	 // 用于封装Producer的消息
	 ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
		 "topic_1", // 主题名称
		 0, // 分区编号,现在只有⼀个分区,所以是0
		 0, // 数字作为key
		 "message 0" // 字符串作为value
	 );
	 // 发送消息,同步等待消息的确认
	 // producer.send(record).get(3_000, TimeUnit.MILLISECONDS);
	 
	 // 使用回调异步等待消息的确认
	 producer.send(record, new Callback() {
		 @Override
		 public void onCompletion(RecordMetadata metadata, Exception exception) {
			if (exception == null) {
				 System.out.println(
					 "主题:" + metadata.topic() + "\n"
					 + "分区:" + metadata.partition() + "\n"
					 + "偏移量:" + metadata.offset() + "\n"
					 + "序列化的key字节:" + metadata.serializedKeySize() + "\n"
					 + "序列化的value字节:" + metadata.serializedValueSize() + "\n"
					 + "时间戳:" + metadata.timestamp()
				 );
			 } else {
				System.out.println("有异常:" + exception.getMessage());
			 }
		 }
	 });
	 // 关闭连接
	 producer.close();
}

2.2、消息消费

介绍

        消费者主要有KafkaConsumer对象,用于消费消息。Kafka不支持消息的推送,我们可以通过消息拉取(poll)方式实现消息的消费。KafkaConsumer主要参数如下:

代码实现

public static void main(String[] args) {
	Map<String, Object> configs = new HashMap<>();
	// 指定bootstrap.servers属性作为初始化连接Kafka的服务器。
	// 如果是集群,则会基于此初始化连接发现集群中的其他服务器。
	configs.put("bootstrap.servers", "node1:9092");
	// key和value的反序列化器
	configs.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
	configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
	configs.put("group.id", "consumer.demo");
	// 创建消费者对象
	KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);

	final Pattern pattern = Pattern.compile("topic_[0-9]");
	// 消费者订阅主题或分区
	// consumer.subscribe(pattern);
	// consumer.subscribe(pattern, new ConsumerRebalanceListener() {
	final List<String> topics = Arrays.asList("topic_1");
	consumer.subscribe(topics, new ConsumerRebalanceListener() {
		@Override
		public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
			partitions.forEach(tp -> {
				System.out.println("剥夺的分区:" + tp.partition());
			});	
		}
		@Override
		public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
			partitions.forEach(tp -> {
				System.out.println(tp.partition());
			});
		}
	});
	// 拉取订阅主题的消息
	final ConsumerRecords<Integer, String> records = consumer.poll(3_000);
	// 获取topic_1主题的消息
	final Iterable<ConsumerRecord<Integer, String>> topic1Iterable = records.records("topic_1");
	// 遍历topic_1主题的消息
	topic1Iterable.forEach(record -> {
		System.out.println("========================================");
		System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));
		System.out.println("消息的key:" + record.key());
		System.out.println("消息的值:" + record.value());
		System.out.println("消息的主题:" + record.topic());
		System.out.println("消息的分区号:" + record.partition());
		System.out.println("消息的偏移量:" + record.offset());
	});
	// 关闭消费者
	consumer.close();
}

2.3、SpringBoot Kafka

pom

<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.kafka</groupId>
		<artifactId>spring-kafka</artifactId>
	</dependency>
</dependencies>

application.yaml

spring:
  kafka:
    bootstrap-servers: node1:9092       # 用于建立初始连接的broker地址
    producer:
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      batch-size: 16384                 # 默认的批处理记录数
      buffer-memory: 33554432           # 32MB的总发送缓存
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: spring-kafka-02-consumer    # consumer的消费组id
      enable-auto-commit: true              # 是否自动提交消费者偏移量
      auto-commit-interval: 100             # 每隔100ms向broker提交一次偏移量
      auto-offset-reset: earliest           # 如果该消费者的偏移量不存在,则自动设置为最早的偏移量

KafkaConfig

@Configuration
public class KafkaConfig {
	@Bean
	public NewTopic topic1() {
		return new NewTopic("ntp-01", 5, (short) 1);
	}
	@Bean
	public NewTopic topic2() {
		return new NewTopic("ntp-02", 3, (short) 1);
	}
}

producer

@RestController
public class KafkaSyncProducerController {
	@Autowired
	private KafkaTemplate template;
	
	@RequestMapping("send/sync/{message}")
	public String sendSync(@PathVariable String message) {
		ListenableFuture future = template.send(new ProducerRecord<Integer, String>("topic-spring-02", 0, 1, message));
		try {
			// 同步等待broker的响应
			Object o = future.get();
			SendResult<Integer, String> result = (SendResult<Integer, String>) o;
			System.out.println(result.getRecordMetadata().topic() + result.getRecordMetadata().partition() + result.getRecordMetadata().offset());
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
		return "success";
	}
}

@RestController
public class KafkaAsyncProducerController {
	@Autowired
	private KafkaTemplate<Integer, String> template;
	
	@RequestMapping("send/async/{message}")
	public String asyncSend(@PathVariable String message) {
		ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic-spring-02", 0, 3, message);
		ListenableFuture<SendResult<Integer, String>> future = template.send(record);
		// 添加回调,异步等待响应
		future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>(){
			@Override
			public void onFailure(Throwable throwable) {
				System.out.println("发送失败: " + throwable.getMessage());
			}
			
			@Override
			public void onSuccess(SendResult<Integer, String> result) {
				System.out.println("发送成功:" + result.getRecordMetadata().topic() + "\t" + result.getRecordMetadata().partition() + "\t" + result.getRecordMetadata().offset());
			}
		});
		return "success";
	}
}

consumer

@Component
public class MyConsumer {

	@KafkaListener(topics = "topic-spring-02")
	public void onMessage(ConsumerRecord<Integer, String> record) {
		Optional<ConsumerRecord<Integer, String>> optional = Optional.ofNullable(record);
		if (optional.isPresent()) {
			System.out.println(record.topic() + "\t" + record.partition() + "\t" + record.offset() + "\t" + record.key() + "\t" + record.value());
		}
	}
}

以上内容为个人学习理解,如有问题,欢迎在评论区指出。

部分内容截取自网络,如有侵权,联系作者删除。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/993946.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C++项目实战——基于多设计模式下的同步异步日志系统-③-前置知识补充-设计模式

文章目录 专栏导读六大原则单例模式饿汉模式懒汉模式 工厂模式简单工厂模式工厂方法模式抽象工厂模式 建造者模式代理模式 专栏导读 &#x1f338;作者简介&#xff1a;花想云 &#xff0c;在读本科生一枚&#xff0c;C/C领域新星创作者&#xff0c;新星计划导师&#xff0c;阿…

LP(六十九)智能文档助手升级

本文在笔者之前研发的大模型智能文档问答项目中&#xff0c;开发更进一步&#xff0c;支持多种类型文档和URL链接&#xff0c;支持多种大模型接入&#xff0c;且使用更方便、高效。 项目介绍 在文章NLP&#xff08;六十一&#xff09;使用Baichuan-13B-Chat模型构建智能文档中…

CodeJock Active-X / COM v22.1.0 Crack

CodeJock Active-X / COM v22.1.0--这个支持 Unicode 啦&#xff0c; Unicode Unicode 创建专业应用程序&#xff0c;其中包含一整套高度可定制的用户界面组件&#xff0c;包括 Visual Studio 风格的对接窗格和 Office 风格的功能区、工具栏和菜单&#xff0c;为您的应用程序…

电商邮件营销攻略:教你如何有效运营邮件营销策略!

作为一种领先的营销渠道&#xff0c;电子邮件营销已被电子商务公司作为推动客户参与度、促进销售和提高ROI的不可或缺的方式。在这篇文章中&#xff0c;我们将深入探讨电子商务公司为什么要做EDM邮件营销&#xff1f;以及电商公司怎么做邮件营销&#xff1f; 一、电子商务公司…

系统架构设计师(第二版)学习笔记----多媒体技术

【原文链接】系统架构设计师&#xff08;第二版&#xff09;学习笔记----多媒体技术 文章目录 一、多媒体概述1.1 媒体的分类1.2 多媒体的特征1.3 多媒体系统的基本组成 二、多媒体系统的关键技术2.1 多媒体系统的关键技术2.2 视频技术的内容2.3 音频技术的内容2.4 数据压缩算法…

时序分解 | MATLAB实现基于SSA奇异谱分析的信号分解分量可视化

时序分解 | MATLAB实现基于LMD局部均值分解的信号分解分量可视化 目录 时序分解 | MATLAB实现基于LMD局部均值分解的信号分解分量可视化效果一览基本介绍程序设计参考资料 效果一览 基本介绍 奇异谱分解奇异谱分析SSA 可直接替换txt数据运行 Matlab 1.包含3D分解效果图 频谱图等…

多路转接之PollEpoll

文章目录 Pollpoll函数接口poll的优缺点poll示例Util.hpp(所用到的函数方法)Server.hppServer.cclog.hpp(日志) Epollepoll的相关系统调用epoll_createepoll_ctlepoll_wait epoll工作原理epoll的优点epoll工作方式对比LT和ETepoll服务器(LT模式)示例Util.hpp(需要调用的函数)Se…

DeepinV20/Ubuntu安装postgresql方法

首先&#xff0c;建议看一下官方的安装文档PostgreSQL: Linux downloads (Ubuntu) PostgreSQL Apt Repository 简单的说&#xff0c;就是Ubuntu下的Apt仓库&#xff0c;可以用来安装任何支持版本的PgSQL。 If the version included in your version of Ubuntu is not the one…

一笑的大型连续剧之第二集

开场白 各位小伙伴们大家晚上好&#xff0c;今天来和大家一起更新一下我的开发之旅的第二集。上周时间也已经匆匆过去了。今天也是周六晚上了&#xff0c;这个周末很充实但是又很空虚。 本周小结 本周完成了我开发旅途中的第一个模块&#xff0c;关于绩效面谈的一个模块的一…

树莓派入门

目录 前言系统烧录使用官方烧录工具选择操作系统选择存储卡配置 Win32DiskImager 有屏幕树莓派开机树莓派关机无屏幕树莓派开机获取树莓派IP地址通过路由器获取共享网络方式获取给树莓派配置静态IP地址查找默认网关分盘给树莓派的IP地址修改树莓派DHCP配置文件 ssh登录 让树莓派…

排序(408)

一、插入排序&#xff08;直接、折半、希尔&#xff09; 【2009统考】若数据元素序列{11,12,13,7,8,9,23,4,5}是采用下列排序方法之一得到的第二趟排序后的结果&#xff0c;则该排序算法只能是&#xff08;B&#xff09; A、冒泡排序 B、插入排序 C、选择排序 …

freemarker模板引擎详解以及使用方法

哈喽&#xff01;大家好&#xff0c;我是旷世奇才李先生 文章持续更新&#xff0c;可以微信搜索【小奇JAVA面试】第一时间阅读&#xff0c;回复【资料】更有我为大家准备的福利哟&#xff0c;回复【项目】获取我为大家准备的项目 文章目录 一、freemarker 介绍1、简介 二、free…

Java 基于 SpringBoot 的酒店管理系统,附源码和数据库

博主介绍&#xff1a;✌程序员徐师兄、7年大厂程序员经历。全网粉丝30W,Csdn博客专家、掘金/华为云/阿里云/InfoQ等平台优质作者、专注于Java技术领域和毕业项目实战✌ 文章目录 一、前言介绍二、系统结构三、系统详细实现3.1用户信息管理3.2会员信息管理3.3客房信息管理3.4收藏…

浅析linux异步io框架 io_uring

前言 Linux内核5.1支持了新的异步IO框架iouring&#xff0c;由Block IO大神也即Fio作者Jens Axboe开发&#xff0c;意在提供一套公用的网络和磁盘异步IO&#xff0c;不过io_uring目前在磁盘方面要比网络方面更加成熟。 目录 背景简介 io_uring 系统API liburing 高级特性…

SpringBoot实例类-@Data

1.配置pom.xml 说明&#xff1a;添加lombok依赖 <!-- lombok--><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency> 2.刷新maven 说明&#xff1a;一般修改xml文件就需要刷…

数据库相关基础知识

第一章 概念 1、数据&#xff1a;描述事物的符号记录称为数据。特点&#xff1a;数据和关于数据的解释不可分。 2、数据库&#xff1a;长期存储在计算机内、有组织、可共享的大量的数据的集合。数据库中的数据按照一定的数据模型组织、描述和存储&#xff0c;具有较小的冗余度、…

Linux —— 信号阻塞

目录 一&#xff0c;信号内核表示 sigset_t sigprocmask sigpending 二&#xff0c;捕捉信号 sigaction 三&#xff0c;可重入函数 四&#xff0c;volatile 五&#xff0c;SIGCHLD 信号常见概念 实际执行信号的处理动作&#xff0c;称为信号递达Delivery&#xff1b;信…

广东智科与涂鸦智能达成合作,引领热泵市场智能转型新风向

全球能源危机正推动热泵市场的增长&#xff0c;据国际能源署报道&#xff0c;2022年全球热泵的销售额增长达11%&#xff0c;欧洲的销售额增长更是达到了40%。中国作为热泵市场的最大出口国&#xff0c;全球热泵市场需求的激增对于中国企业而言无疑是一剂“振奋剂”。 广东智科电…

QT/QTCreator开发/使用技巧

调试模式完整的展示过长的字符串 如图&#xff0c;当字符串过长时在调试模式下&#xff0c;无法非常清晰的看到全部的字符串&#xff0c;此时可以通过 右键菜单→ change value display format→spearate Window。此时字符串将单独显示在一个独立的窗口里。如果你想回到原状勾选…

关于“找不到mfc140u.dll,无法继续执行代码”问题的分析处理方法

我想和大家分享一个在编程过程中经常会遇到的问题——找不到mfc140u.dll,无法继续执行代码。找不到 mfc140u.dll&#xff0c;这个问题可能会让我们感到困扰。mfc140u.dll 是 Microsoft Foundation Classes&#xff08;MFC&#xff09;库的一部分&#xff0c;它是一个 Windows 系…