kafka学习-概念与简单实战

news2024/11/27 4:19:06

目录

1、核心概念

消息和批次

Topic和Partition

Replicas

Offset

broker和集群

生产者和消费者

2、开发实战

2.1、消息发送

介绍

代码实现

2.2、消息消费

介绍

代码实现

2.3、SpringBoot Kafka

pom

application.yaml

KafkaConfig

producer

consumer


1、核心概念

消息和批次

        kafka的基本数据单元,由字节数组组成。可以理解成数据库的一条数据。

        批次就是一组消息,把同一个主题和分区的消息分批次写入kafka,可以减少网络开销,提高效率;批次越大,单位时间内处理的消息就越多,单个消息的传输时间就越长。

Topic和Partition

        topic主题,kafka通过主题进行分类。主题可以理解成数据库的表或者文件系统里的文件夹。

        partition分区可以理解成一个FIFO的消息队列。(同一个分区的消息保证顺序消费)

        主题可以被分为若干分区,一个主题通过分区将消息存储在kafka集群中,提供横向扩展的能力。消息以追加的方式写入分区,每个分区保证先入先出的顺序读取。在需要严格保证消息顺序消费的场景下,可以将partition设置为1,即主题只有一个分区。

        主题的分区策略有如下几种:

  1. 直接指定分区;
  2. 根据消息的key散列取模得出分区;
  3. 轮询指定分区。

Replicas

  1. 副本,每个分区都有多个副本。其中包含一个首领副本和多个跟随者副本。
  2. 首领副本用于响应生产者的消息写入请求与消费者的消息读取请求;
  3. 跟随者副本用于同步首领副本的数据,保持与首领副本一致的状态,有数据备份的功能。
  4. 一旦首领副本所在的服务器宕机,就会从跟随者中选出一个升级为首领副本。

Offset

        偏移量。

        生产者offset:每个分区都有一个offset,叫做生产者的offset,可以理解为当前这个分区队列的最大值,下一个消息来的时候,就会将消息写入到offset这个位置。

        消费者offset:每个消费者消费分区中的消息时,会记录消费的位置(offset),下一次消费时就会从这个位置开始消费。

broker和集群

broker为一个独立的kafka服务器;一个kafka集群里有多个broker。

        broker接收来自生产者的消息,为消息设置偏移量,并将消息保存到磁盘。同时,broker为消费者提供服务,对读取分区的请求做出响应,返回已经保存到磁盘上的消息。(单个broker可以轻松处理数千个分区以及每秒百万级的消息量)。

        集群中同一个主题的同一个分区,会在多个broker上存在;其中一个broker上的分区被称为首领分区,用于与生产者和消费者交互,其余broker上的分区叫做副本分区,用于备份分区数据,防止broker宕机导致消息丢失。

        每个集群都有一个broker是集群控制器,作用如下:

  1. 将分区分配给首领分区的broker;
  2. 监控broker,首领分区切换

生产者和消费者

        生产者生产消息,消息被发布到一个特定的主题上。默认情况下,kafka会将消息均匀地分布到主题的所有分区上。分区策略有如下几种:

  1. 直接指定分区;
  2. 根据消息的key散列取模得出分区;
  3. 轮询指定分区。

        消费者通过偏移量来区分已经读过的消息,从而消费消息。消费者是消费组的一部分,消费组可以保证每个分区只能被一个消费者使用,避免重复消费。

2、开发实战

2.1、消息发送

介绍

  • 生产者主要有KafkaProducer和ProducerRecord两个对象:KafkaProducer用于发送消息,ProducerRecord用于封装kafka消息。
  • 生产者生产消息后,需要broker的确认,可以选择同步或者异步确认:同步确认效率低;异步确认效率高,但需要设置回调对象。        

代码实现

public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
	Map<String, Object> configs = new HashMap<>();
	// 设置连接Kafka的初始连接⽤到的服务器地址
	// 如果是集群,则可以通过此初始连接发现集群中的其他broker
	 configs.put("bootstrap.servers", "node1:9092");
	// 设置key和value的序列化器
	 configs.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
	 configs.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
	 configs.put("acks", "1");
	 KafkaProducer<Integer, String> producer = new KafkaProducer<Integer, String>(configs);
	 // 用于封装Producer的消息
	 ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>(
		 "topic_1", // 主题名称
		 0, // 分区编号,现在只有⼀个分区,所以是0
		 0, // 数字作为key
		 "message 0" // 字符串作为value
	 );
	 // 发送消息,同步等待消息的确认
	 // producer.send(record).get(3_000, TimeUnit.MILLISECONDS);
	 
	 // 使用回调异步等待消息的确认
	 producer.send(record, new Callback() {
		 @Override
		 public void onCompletion(RecordMetadata metadata, Exception exception) {
			if (exception == null) {
				 System.out.println(
					 "主题:" + metadata.topic() + "\n"
					 + "分区:" + metadata.partition() + "\n"
					 + "偏移量:" + metadata.offset() + "\n"
					 + "序列化的key字节:" + metadata.serializedKeySize() + "\n"
					 + "序列化的value字节:" + metadata.serializedValueSize() + "\n"
					 + "时间戳:" + metadata.timestamp()
				 );
			 } else {
				System.out.println("有异常:" + exception.getMessage());
			 }
		 }
	 });
	 // 关闭连接
	 producer.close();
}

2.2、消息消费

介绍

        消费者主要有KafkaConsumer对象,用于消费消息。Kafka不支持消息的推送,我们可以通过消息拉取(poll)方式实现消息的消费。KafkaConsumer主要参数如下:

代码实现

public static void main(String[] args) {
	Map<String, Object> configs = new HashMap<>();
	// 指定bootstrap.servers属性作为初始化连接Kafka的服务器。
	// 如果是集群,则会基于此初始化连接发现集群中的其他服务器。
	configs.put("bootstrap.servers", "node1:9092");
	// key和value的反序列化器
	configs.put("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
	configs.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
	configs.put("group.id", "consumer.demo");
	// 创建消费者对象
	KafkaConsumer<Integer, String> consumer = new KafkaConsumer<Integer, String>(configs);

	final Pattern pattern = Pattern.compile("topic_[0-9]");
	// 消费者订阅主题或分区
	// consumer.subscribe(pattern);
	// consumer.subscribe(pattern, new ConsumerRebalanceListener() {
	final List<String> topics = Arrays.asList("topic_1");
	consumer.subscribe(topics, new ConsumerRebalanceListener() {
		@Override
		public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
			partitions.forEach(tp -> {
				System.out.println("剥夺的分区:" + tp.partition());
			});	
		}
		@Override
		public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
			partitions.forEach(tp -> {
				System.out.println(tp.partition());
			});
		}
	});
	// 拉取订阅主题的消息
	final ConsumerRecords<Integer, String> records = consumer.poll(3_000);
	// 获取topic_1主题的消息
	final Iterable<ConsumerRecord<Integer, String>> topic1Iterable = records.records("topic_1");
	// 遍历topic_1主题的消息
	topic1Iterable.forEach(record -> {
		System.out.println("========================================");
		System.out.println("消息头字段:" + Arrays.toString(record.headers().toArray()));
		System.out.println("消息的key:" + record.key());
		System.out.println("消息的值:" + record.value());
		System.out.println("消息的主题:" + record.topic());
		System.out.println("消息的分区号:" + record.partition());
		System.out.println("消息的偏移量:" + record.offset());
	});
	// 关闭消费者
	consumer.close();
}

2.3、SpringBoot Kafka

pom

<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.kafka</groupId>
		<artifactId>spring-kafka</artifactId>
	</dependency>
</dependencies>

application.yaml

spring:
  kafka:
    bootstrap-servers: node1:9092       # 用于建立初始连接的broker地址
    producer:
      key-serializer: org.apache.kafka.common.serialization.IntegerSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      batch-size: 16384                 # 默认的批处理记录数
      buffer-memory: 33554432           # 32MB的总发送缓存
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: spring-kafka-02-consumer    # consumer的消费组id
      enable-auto-commit: true              # 是否自动提交消费者偏移量
      auto-commit-interval: 100             # 每隔100ms向broker提交一次偏移量
      auto-offset-reset: earliest           # 如果该消费者的偏移量不存在,则自动设置为最早的偏移量

KafkaConfig

@Configuration
public class KafkaConfig {
	@Bean
	public NewTopic topic1() {
		return new NewTopic("ntp-01", 5, (short) 1);
	}
	@Bean
	public NewTopic topic2() {
		return new NewTopic("ntp-02", 3, (short) 1);
	}
}

producer

@RestController
public class KafkaSyncProducerController {
	@Autowired
	private KafkaTemplate template;
	
	@RequestMapping("send/sync/{message}")
	public String sendSync(@PathVariable String message) {
		ListenableFuture future = template.send(new ProducerRecord<Integer, String>("topic-spring-02", 0, 1, message));
		try {
			// 同步等待broker的响应
			Object o = future.get();
			SendResult<Integer, String> result = (SendResult<Integer, String>) o;
			System.out.println(result.getRecordMetadata().topic() + result.getRecordMetadata().partition() + result.getRecordMetadata().offset());
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
		return "success";
	}
}

@RestController
public class KafkaAsyncProducerController {
	@Autowired
	private KafkaTemplate<Integer, String> template;
	
	@RequestMapping("send/async/{message}")
	public String asyncSend(@PathVariable String message) {
		ProducerRecord<Integer, String> record = new ProducerRecord<Integer, String>("topic-spring-02", 0, 3, message);
		ListenableFuture<SendResult<Integer, String>> future = template.send(record);
		// 添加回调,异步等待响应
		future.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>(){
			@Override
			public void onFailure(Throwable throwable) {
				System.out.println("发送失败: " + throwable.getMessage());
			}
			
			@Override
			public void onSuccess(SendResult<Integer, String> result) {
				System.out.println("发送成功:" + result.getRecordMetadata().topic() + "\t" + result.getRecordMetadata().partition() + "\t" + result.getRecordMetadata().offset());
			}
		});
		return "success";
	}
}

consumer

@Component
public class MyConsumer {

	@KafkaListener(topics = "topic-spring-02")
	public void onMessage(ConsumerRecord<Integer, String> record) {
		Optional<ConsumerRecord<Integer, String>> optional = Optional.ofNullable(record);
		if (optional.isPresent()) {
			System.out.println(record.topic() + "\t" + record.partition() + "\t" + record.offset() + "\t" + record.key() + "\t" + record.value());
		}
	}
}

以上内容为个人学习理解,如有问题,欢迎在评论区指出。

部分内容截取自网络,如有侵权,联系作者删除。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/978290.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【变分法】【书籍阅读笔记】Calculus of Variation by gelfand 第一章 总结与习题题解 【更新中】

文章目录 前言1 第一章 变分法基础1.1 泛函 与 一些简单的变分问题1.2 Function Spaces/ 赋范线性空间1.3 泛函的变分: 具有极值的必要条件1. 重要引理/线性泛函的等零条件2. 泛函变分 1.4 最简单的变分问题&#xff1a;欧拉方程1. 欧拉方程2. 证明/欧拉方程的得出3. 欧拉方程的…

机器学习:基于梯度下降算法的线性拟合实现和原理解析

机器学习&#xff1a;基于梯度下降算法的线性拟合实现和原理解析 线性拟合梯度下降算法步骤算法实现数据可视化&#xff08;动态展示&#xff09;应用示例 当我们需要寻找数据中的趋势、模式或关系时&#xff0c;线性拟合和梯度下降是两个强大的工具。这两个概念在统计学、机器…

SQLI-labs-第五关

知识点&#xff1a;布尔盲注 思路&#xff1a; 1、判断注入点 首先&#xff0c;我们看看正常的回显内容 ?id1 接着输入?id1 &#xff0c;结果出现语句错误 这里说明存在单引号的闭合错误 ?id1 and 11-- ?id1 and 12-- 这里没有任何回显信息&#xff0c;可以准确的确…

基于STM32程序万年历液晶1602显示-proteus仿真-源程序

一、系统方案 本设计采用STM32单片机作为主控器&#xff0c;液晶1602显示&#xff0c;按键设置万年历。 二、硬件设计 原理图如下&#xff1a; 三、单片机软件设计 1、首先是系统初始化 //通用定时器3中断初始化 //这里时钟选择为APB1的2倍&#xff0c;而APB1为36M //arr&…

官方YOLOV5的torch模型->ONNX模型->RKNN模型

1、环境配置 1.1 RKNN Toolkit2的环境配置 下载RKNN Toolkit2 git clone https://github.com/rockchip-linux/rknn-toolkit2.git打开一个终端命令行窗口,安装 Python3.6 和 pip3 sudo apt-get install python3 python3-dev python3-pip安装所需的依赖包 sudo apt-get inst…

机器学习笔记之最优化理论与方法(七)无约束优化问题——常用求解方法(上)

机器学习笔记之最优化理论与方法——基于无约束优化问题的常用求解方法[上] 引言总体介绍回顾&#xff1a;线搜索下降算法收敛速度的衡量方式线性收敛范围高阶收敛范围 二次终止性朴素算法&#xff1a;坐标轴交替下降法最速下降法(梯度下降法)梯度下降法的特点 针对最速下降法缺…

Vue + Element UI 前端篇(十二):用户管理模块

Vue Element UI 实现权限管理系统 前端篇&#xff08;十二&#xff09;&#xff1a;用户管理模块 用户管理模块 添加接口 在 http/moduls/user.js 中添加用户管理相关接口。 import axios from ../axios/* * 用户管理模块*/// 保存 export const save (params) > {ret…

Unity中Shader的变体shader_feature

文章目录 前言一、变体的类型1、multi_compile —— 无论如何都会被编译的变体2、shader_feature —— 通过材质的使用情况来决定是否编译的变体 二、使用 shader_feature 来控制 shader 效果的变化1、首先在属性面板暴露一个开关属性&#xff0c;用于配合shader_feature来控制…

解决deepspeed框架的bug:不保存调度器状态,模型训练重启时学习率从头开始

deepspeed存在一个bug&#xff0c;即在训练时不保存调度器状态&#xff0c;因此如果训练中断后再重新开始训练&#xff0c;调度器还是会从头开始而不是接着上一个checkpoint的调度器状态来训练。这个bug在deepspeed的github中也有其他人提出&#xff1a;https://github.com/mic…

清理Maven仓库中下载失败的文件

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…

【SpringBoot】统一功能处理

目录 &#x1f383;1 拦截器 &#x1f380;1.1 拦截器的代码实现 &#x1f3a8;1.2 拦截器的实现原理 &#x1f9f6;2 拦截器应用——登录验证 &#x1f9ba;3 异常统一处理 &#x1f3ad;4 统一数据返回格式 &#x1f9e4;4.1 为什么需要统一数据返回格式 &#x1f9e3;4.2 统…

Cisco Packet Tracer入门篇

&#x1f490; &#x1f338; &#x1f337; &#x1f340; &#x1f339; &#x1f33b; &#x1f33a; &#x1f341; &#x1f343; &#x1f342; &#x1f33f; &#x1f344;&#x1f35d; &#x1f35b; &#x1f364; &#x1f4c3;个人主页 &#xff1a;阿然成长日记 …

Python中的文件I/O操作:常见问题与解决方案

在Python编程中&#xff0c;文件I/O操作是常见的任务。本文将介绍一些关于Python文件I/O操作的常见问题及其解决方案&#xff0c;并提供详细的代码示例。 1、问题&#xff1a;如何正确地打开和关闭文件&#xff1f; 解决方案&#xff1a;使用with语句可以确保文件在操作完成后…

查漏补缺 - ES6

目录 1&#xff0c;let 和 const1&#xff0c;会产生块级作用域。2&#xff0c;如何理解 const 定义的变量不可被修改? 2&#xff0c;数组3&#xff0c;对象1&#xff0c;Object.is()2&#xff0c;属性描述符3&#xff0c;常用API4&#xff0c;得到除某个属性之外的新对象。 4…

华为云云服务器评测|使用Docker可视化Portainer部署Yolov5项目进行AI识别

目录 初始化配置使用Xshell连接 项目准备 docker-compose Dockerfile .dockerignore 在服务器中启动Docker项目 初始化配置使用Xshell连接 因为我比较喜欢用xshell来操作服务器&#xff0c;如果你是使用华为在线的CloudShell或其他方式&#xff0c;可以跳过第一步的连接…

【Redis专题】Redis持久化、主从与哨兵架构详解

目录 前言课程目录一、Redis持久化1.1 RDB快照&#xff08;Snapshot&#xff09;&#xff1a;二进制文件基本介绍开启/关闭方式触发方式bgsave的写时复制&#xff08;COW&#xff0c;Copy On Write&#xff09;机制优缺点 1.2 AOF&#xff08;append-only file&#xff09;&…

Git—版本控制系统

git版本控制系统 1、什么是版本控制2、常见的版本控制工具3、版本控制分类3.1、本地版本控制3.2、集中版本控制 SVN3.3、分布式版本控制 Git 4、Git与SVN的主要区别5、Git环境配置6、启动Git7、常用的Linux命令8、Git配置9、设置用户名与邮箱&#xff08;用户标识&#xff0c;必…

数学建模--逻辑回归算法的Python实现

首先感谢CSDN上发布吴恩达的机器学习逻辑回归算法任务的各位大佬. 通过大佬的讲解和代码才勉强学会. 这篇文章也就是简单记录一下过程和代码. CSDN上写有关这类文章的大佬有很多,大家都可以多看一看学习学习. 机器学习方面主要还是过程和方法. 这篇文章只完成了线性可分方面的任…

Mac Homebrew中常用的 Brew 命令

Mac 中常用的 Brew 命令集 Brew&#xff08;Homebrew&#xff09;是一个强大的包管理器&#xff0c;用于在 macOS 上安装、更新和管理各种软件包。它使得在 Mac 上安装开发工具、应用程序和库变得轻松和便捷。本博客将介绍一些在 Mac 中常用的 Brew 命令&#xff0c;以帮助您更…

SpringMVC_SSM整合

一、回顾SpringMVC访问接口流程 1.容器加载分析 容器分析 手动注册WebApplicationContext public class ServletConfig extends AbstractDispatcherServletInitializer {Overrideprotected WebApplicationContext createServletApplicationContext() {//获取SpringMVC容器An…