Kafka中的Topic

news2025/1/14 0:55:21

在Kafka中,Topic是消息的逻辑容器,用于组织和分类消息。本文将深入探讨Kafka Topic的各个方面,包括创建、配置、生产者和消费者,以及一些实际应用中的示例代码。

1. 介绍

在Kafka中,Topic是消息的逻辑通道,生产者将消息发布到Topic,而消费者从Topic订阅消息。每个Topic可以有多个分区(Partitions),每个分区可以在不同的服务器上,以实现横向扩展。

2. 创建和配置Topic

2.1 创建Topic

使用Kafka提供的命令行工具(kafka-topics.sh)或Kafka的API来创建Topic。下面是一个使用命令行工具创建Topic的示例:

bin/kafka-topics.sh --create --topic my_topic --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092

这将创建一个名为my_topic的Topic,有3个分区,复制因子为2。

2.2 配置Topic

Kafka的Topic有各种配置选项,可以通过修改Topic的属性来满足不同的需求。例如,可以设置消息保留时间、清理策略等。以下是一个配置Topic属性的示例:

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics --entity-name my_topic --alter --add-config max.message.bytes=1048576

这将修改my_topic的配置,将最大消息字节数设置为1 MB。

3. 生产者和消费者

3.1 生产者

生产者负责将消息发布到Topic。使用Kafka的Producer API,可以轻松地创建一个生产者。以下是一个简单的Java示例代码:

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(properties);

producer.send(new ProducerRecord<>("my_topic", "key1", "value1"));
producer.close();

3.2 消费者

消费者从Topic中读取消息。Kafka的Consumer API提供了强大而灵活的方式来实现消费者。

以下是一个简单的Java示例代码:

Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("group.id", "my_group");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("my_topic"));

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        System.out.printf("Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value());
    }
}

4. 实际应用示例

4.1 实时日志处理

在实时日志处理的场景中,Kafka的Topic可以按照日志类型进行划分,每个Topic代表一种日志类型。这样的设计可以使得系统更具可维护性、可扩展性,并且允许不同类型的日志通过独立的消费者进行处理。以下是一个更详细的示例代码,展示如何在实时日志处理中使用Kafka Topic:

4.1.1 创建日志类型Topic

首先,为不同的日志类型创建各自的Topic。以错误日志和访问日志为例:

# 创建错误日志Topic
bin/kafka-topics.sh --create --topic error_logs --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092

# 创建访问日志Topic
bin/kafka-topics.sh --create --topic access_logs --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092
4.1.2 生产者发布日志消息

在应用中,生成错误日志和访问日志的代码可能如下:

// 错误日志生产者
Producer<String, String> errorLogProducer = new KafkaProducer<>(errorLogProperties);
errorLogProducer.send(new ProducerRecord<>("error_logs", "Error message"));

// 访问日志生产者
Producer<String, String> accessLogProducer = new KafkaProducer<>(accessLogProperties);
accessLogProducer.send(new ProducerRecord<>("access_logs", "Access log message"));
4.1.3 消费者实时处理日志

创建独立的消费者来处理错误日志和访问日志:

// 错误日志消费者
Consumer<String, String> errorLogConsumer = new KafkaConsumer<>(errorLogProperties);
errorLogConsumer.subscribe(Collections.singletonList("error_logs"));

while (true) {
    ConsumerRecords<String, String> records = errorLogConsumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 处理错误日志
        System.out.printf("Error Log - Offset = %d, Value = %s%n", record.offset(), record.value());
    }
}

// 访问日志消费者
Consumer<String, String> accessLogConsumer = new KafkaConsumer<>(accessLogProperties);
accessLogConsumer.subscribe(Collections.singletonList("access_logs"));

while (true) {
    ConsumerRecords<String, String> records = accessLogConsumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 处理访问日志
        System.out.printf("Access Log - Offset = %d, Value = %s%n", record.offset(), record.value());
    }
}
4.1.4 实时监控和分析

消费者可以通过实时处理日志来进行监控和分析。例如,可以使用流处理框架(如Kafka Streams)对日志进行聚合、过滤或转换。以下是一个简化的示例:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> errorLogsStream = builder.stream("error_logs");
KStream<String, String> accessLogsStream = builder.stream("access_logs");

// 在这里进行实时处理,如聚合、过滤等

// 通过输出Topic将处理结果发送到下游系统
errorLogsStream.to("processed_error_logs");
accessLogsStream.to("processed_access_logs");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

通过这种设计,可以根据实际需要扩展不同类型的日志处理,同时确保系统具有高度的灵活性和可扩展性。在实际应用中,可能需要更详细的配置和处理逻辑,以满足具体的监控和分析需求。

4.2 事件溯源

在事件驱动的架构中,事件溯源是一种强大的方式,通过创建一个专门的Kafka Topic来记录每个业务事件的发生,以便随时追踪和回溯整个系统的状态。以下是一个基于Kafka的事件溯源的详细示例代码:

4.2.1 创建事件Topic

首先,为每个关键的业务事件创建一个专用的Kafka Topic,例如order_createdorder_shipped等:

# 创建订单创建事件Topic
bin/kafka-topics.sh --create --topic order_created --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092

# 创建订单发货事件Topic
bin/kafka-topics.sh --create --topic order_shipped --partitions 3 --replication-factor 2 --bootstrap-server localhost:9092
4.2.2 发布业务事件

在应用中,当业务事件发生时,将事件发布到相应的Topic。以下是一个订单创建事件和订单发货事件的示例:

// 订单创建事件生产者
Producer<String, String> orderCreatedProducer = new KafkaProducer<>(orderCreatedProperties);
orderCreatedProducer.send(new ProducerRecord<>("order_created", "order_id", "Order created - Order ID: 123"));

// 订单发货事件生产者
Producer<String, String> orderShippedProducer = new KafkaProducer<>(orderShippedProperties);
orderShippedProducer.send(new ProducerRecord<>("order_shipped", "order_id", "Order shipped - Order ID: 123"));
4.2.3 事件溯源消费者

为了实现事件溯源,我们需要一个专用的消费者来订阅所有的事件Topic,并将事件记录到一个持久化存储中(如数据库、日志文件等):

// 事件溯源消费者
Consumer<String, String> eventTraceConsumer = new KafkaConsumer<>(eventTraceProperties);
eventTraceConsumer.subscribe(Arrays.asList("order_created", "order_shipped"));

while (true) {
    ConsumerRecords<String, String> records = eventTraceConsumer.poll(Duration.ofMillis(100));
    for (ConsumerRecord<String, String> record : records) {
        // 处理事件,可以将事件记录到数据库或日志文件中
        System.out.printf("Event Trace - Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value());
        // 持久化处理逻辑
    }
}
4.2.4 事件回溯和分析

通过上述设置,可以在任何时候回溯系统中的每个事件,了解事件的发生时间、顺序和内容。通过将事件存储到持久化存储中,可以建立一个事件溯源系统,支持系统状态的分析、回滚和审计。

还可以使用流处理来实时分析事件,例如计算每个订单的处理时间、统计每个事件类型的发生频率等。以下是一个简单的流处理示例:

KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> eventStream = builder.stream(Arrays.asList("order_created", "order_shipped"));

// 在这里进行实时处理,如计算处理时间、统计频率等

// 通过输出Topic将处理结果发送到下游系统
eventStream.to("processed_events");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

通过这种方式,可以在事件溯源系统中实现强大的监控、分析和管理功能,提高系统的可观察性和可维护性。

5. 消息处理语义

Kafka支持不同的消息处理语义,包括最多一次、最少一次和正好一次。这些语义由消费者的配置决定,可以根据应用的要求进行选择。以下是一个使用最多一次语义的消费者示例代码:

properties.put("enable.auto.commit", "false"); // 禁用自动提交偏移量
properties.put("auto.offset.reset", "earliest"); // 设置偏移量重置策略为最早

Consumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("my_topic"));

try {
    while (true) {
        ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<String, String> record : records) {
            // 处理消息
            System.out.printf("Offset = %d, Key = %s, Value = %s%n", record.offset(), record.key(), record.value());
        }
        consumer.commitSync(); // 手动提交偏移量
    }
} finally {
    consumer.close();
}

6. 安全性和权限控制

Kafka提供了安全性特性,包括SSL加密、SASL认证等。在生产环境中,确保适当的安全性设置是至关重要的。

以下是一个使用SSL连接的生产者示例:

properties.put("security.protocol", "SSL");
properties.put("ssl.truststore.location", "/path/to/truststore");
properties.put("ssl.truststore.password", "truststore_password");

Producer<String, String> producer = new KafkaProducer<>(properties);

7. 故障容忍和可伸缩性

7.1 多节点分布和分区

在Kafka中,分布式的设计允许数据分布在多个节点上,这提供了高度的可伸缩性。每个Topic可以分成多个分区,而这些分区可以分布在不同的服务器上。这种分布式设计使得Kafka可以轻松地处理大规模数据,并实现水平扩展。

7.1.1 增加分区数

要增加Topic的分区数,可以使用以下命令:

bin/kafka-topics.sh --alter --topic my_topic --partitions 5 --bootstrap-server localhost:9092

这将把my_topic的分区数增加到5,从而提高系统的吞吐量和可伸缩性。

7.2 复制因子

Kafka通过数据的复制来实现容错性。每个分区可以有多个副本,这些副本分布在不同的节点上。在节点发生故障时,其他副本可以继续提供服务。

7.2.1 增加复制因子

要增加Topic的复制因子,可以使用以下命令:

bin/kafka-topics.sh --alter --topic my_topic --partitions 3 --replication-factor 3 --bootstrap-server localhost:9092

这将把my_topic的复制因子增加到3,确保每个分区有3个副本。增加复制因子提高了系统的容错性,因为每个分区都有多个副本,即使一个节点发生故障,其他节点上的副本仍然可用。

7.3 节点故障处理

Kafka能够处理节点故障,确保系统的可用性。当一个节点发生故障时,Kafka会自动将该节点上的分区重新分配到其他可用节点上,以保持分区的复制因子。

7.3.1 节点故障模拟

为了模拟节点故障,你可以通过停止一个Kafka broker进程来模拟。Kafka会自动感知到该节点的故障,并进行分区的重新分配。

# 停止一个Kafka broker进程
bin/kafka-server-stop.sh config/server-1.properties

7.4 性能调优

在实际应用中,通过监控系统的性能指标,你可以调整Kafka的配置以满足不同的性能需求。例如,调整日志刷写频率、调整内存和磁盘的配置等,都可以对系统的性能产生影响。

总结

Kafka的Topic是构建实时流数据处理系统的核心组件之一。通过深入了解Topic的创建、配置、生产者和消费者,以及实际应用中的示例代码,可以更好地理解和应用Kafka。在实际项目中,根据具体需求和场景进行灵活配置,以确保系统的可靠性、性能和安全性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1287914.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SpringBoot集成mail发送邮件

前言 发送邮件功能&#xff0c;借鉴 刚果商城&#xff0c;根据文档及项目代码实现。整理总结便有了此文&#xff0c;文章有不对的点&#xff0c;请联系博主指出&#xff0c;请多多点赞收藏&#xff0c;您的支持是我最大的动力~ 发送邮件功能主要借助 mail、freemarker以及rocke…

MQTT框架和使用

目录 MQTT框架 1. MQTT概述 1.1 形象地理解三个角色 1.2 消息的传递 2. 在Windows上体验MQTT 2.1 安装APP 2.2 启动服务器 2.3 使用MQTTX 2.3.1 建立连接 2.3.2 订阅主题 2.3.3 发布主题 2.4 使用mosquitto 2.4.1 发布消息 2.4.2 订阅消息 3. kawaii-mqtt源码分析…

STM32下载程序的五种方法

刚开始学习 STM32 的时候&#xff0c;很多小伙伴满怀热情买好了各种设备&#xff0c;但很快就遇到了第一个拦路虎——如何将写好的代码烧进去这个黑乎乎的芯片&#xff5e; STM32 的烧录方式多样且灵活&#xff0c;可以根据实际需求选择适合的方式来将程序烧录到芯片中。本文将…

ESP32-Web-Server编程-在网页中插入图片

ESP32-Web-Server编程-在网页中插入图片 概述 图胜与言&#xff0c;在网页端显示含义清晰的图片&#xff0c;可以使得内容更容易理解。 需求及功能解析 本节演示在 ESP32 Web 服务器上插入若干图片。在插入图片时还可以对图片设置一个超链接&#xff0c;用户点击该图片时&a…

go-fastfds部署心得

我是windows系统安装 Docker Desktop部署 docker run --name go-fastdfs&#xff08;任意的一个名称&#xff09; --privilegedtrue -t -p 3666:8080 -v /data/fasttdfs_data:/data -e GO_FASTDFS_DIR/data sjqzhang/go-fastdfs:lastest docker run&#xff1a;该命令用于运…

常见测试技术都有哪些?

测试技术是用于评估系统或组件的方法&#xff0c;目的是发现它是否满足给定的要求。系统测试有助于识别缺口、错误&#xff0c;或与实际需求不同的任何类型的缺失需求。测试技术是测试团队根据给定的需求评估已开发软件所使用的最佳实践。这些技术可以确保产品或软件的整体质量…

我想修改vCenter IP地址

部署vCenter Server Appliance后&#xff0c;您可以在vCenter修改DNS设置并选择域名服务器使用。您可以编辑vCenter Server Appliance的IP地址设置。从vSphere 6.5开始正式支持vCenter修改IP地址。因此可以更改vCenter Server Appliance的IP地址和DNS设置。 注意&#xff1a;更…

AI助力智慧农业,基于YOLOv3开发构建农田场景下的庄稼作物、田间杂草智能检测识别系统

智慧农业随着数字化信息化浪潮的演变有了新的定义&#xff0c;在前面的系列博文中&#xff0c;我们从一些现实世界里面的所见所想所感进行了很多对应的实践&#xff0c;感兴趣的话可以自行移步阅读即可&#xff1a; 《自建数据集&#xff0c;基于YOLOv7开发构建农田场景下杂草…

Javaweb之前端工程打包部署的详细解析

6 打包部署 我们的前端工程开发好了&#xff0c;但是我们需要发布&#xff0c;那么如何发布呢&#xff1f;主要分为2步&#xff1a; 前端工程打包 通过nginx服务器发布前端工程 6.1 前端工程打包 接下来我们先来对前端工程进行打包 我们直接通过VS Code的NPM脚本中提供的…

Linux gtest单元测试

1 安装git sudo apt-get install git2 下载googletest git clone https://github.com/google/googletest.git3 安装googletest 注意1: 如果在 make 过程中报错,可在 CMakeLists.txt 中增加如下行,再执行下面的命令: SET(CMAKE_CXX_FLAGS “-std=c++11”) 注意2: CMakeLists…

AI助力智慧农业,基于YOLOv5全系列模型【n/s/m/l/x】开发构建不同参数量级农田场景下庄稼作物、杂草智能检测识别系统

紧接前文&#xff0c;本文是农田场景下庄稼作物、杂草检测识别的第二篇文章&#xff0c;前文是基于YOLOv3这一网络模型实现的目标检测&#xff0c;v3相对来说比较早期的网络模型了&#xff0c;本文是基于最为经典的YOLOv5来开发不同参数量级的检测端模型。 首先看下实例效果&a…

【QT】Qt常用数值输入和显示控件

目录 1.QAbstractslider 1.1主要属性 2.QSlider 2.1专有属性 2.2 常用函数 3.QScrollBar 4.QProgressBar 5.QDial 6.QLCDNumber 7.上述控件应用示例 1.QAbstractslider 1.1主要属性 QSlider、QScrollBar和Qdial3个组件都从QAbstractSlider继承而来&#xff0c;有一些共有的属性…

精准定位安全续航 无人机解决方案打造交通巡逻新模式

现代城市交通管理是城市现代化的重要组成部分&#xff0c;但传统的交通管理系统存在一系列复杂繁琐的问题&#xff0c;同时&#xff0c;交警执勤也存在较大的安全隐患。为应对这一挑战&#xff0c;复亚智能深入研究无人机技术及应用&#xff0c;推出了一套全面的无人机解决方案…

[BPE]论文实现:Neural Machine Translation of Rare Words with Subword Units

文章目录 一、完整代码二、论文解读2.1 模型架构2.2 BPE 三、过程实现四、整体总结 论文&#xff1a;Neural Machine Translation of Rare Words with Subword Units 作者&#xff1a;Rico Sennrich, Barry Haddow, Alexandra Birch 时间&#xff1a;2016 一、完整代码 这里我…

uniapp踩坑之项目:使用过滤器将时间格式化为特定格式

利用filters过滤器对数据直接进行格式化&#xff0c;注意&#xff1a;与method、onLoad、data同层级 <template><div><!-- orderInfo.time的数据为&#xff1a;2023-12-12 12:10:23 --><p>{{ orderInfo.time | formatDate }}</p> <!-- 2023-1…

D7292 双向直流电机驱动电路 ( 速度可控 ) 7V~20V 400mA,峰值电流可达1.2A 采用DIP8、SOP8的封装形式

D7292是一块带有制动和速度控制功能的双向直流电机单片电路。它可以用来驱动CDP、VCR 和 TOY等负载。该电路通过两个逻辑输入管脚的电压&#xff0c;可以控制电机正反 个方向转动以及制动。并且可以通过改变速度控制管脚的电压&#xff0c;从而方便的改变电机的速度。D7292采用…

搞笑视频无水印下载,高清无水印视频网站!

搞笑视频无水印下载这件事情一直困扰了广大网友&#xff0c;每当看见好玩好笑的搞笑视频然而下载下来的时候&#xff0c;要么画质模糊就带有水印今天分享大家几个搞笑视频无水印下载方法。 这是一个非常良心的搞笑视频无水印下载小程序水印云&#xff0c;它支持图片去水印、视…

【matlab程序】matlab画太极图|阴阳

【matlab程序】matlab画太极图|阴阳 %% 海洋与大气科学; % 时间:20231205; % clear;clc;close all; t=0:1/100000:2pi+0.00001; t1=-pi/2:1/100000:pi/2+0.00001; t2=pi/2:1/100000:3pi/2+0.00001; R=10; r=1; figure plot(Rcos(t),Rsin(t),‘color’,‘k’,‘lin…

Python下TCP编程

​ 在Python中使用socket模块的socket函数可以完成&#xff0c;语法格式如下&#xff1a; ssocket.socket(AddressFamily, Type)函数socket.socket创建一个socket&#xff0c;返回该socket的描述符。该函数带有两个参数。 Address Family&#xff1a;可以选择AF_INET&#xf…

软件测试方法之等价类测试

01 等价类划分法 1、应用场合 有数据输入的地方&#xff0c;可以使用等价类划分法。 从大量数据中挑选少量代表数据进行测试。 2、测试思想 穷举测试&#xff1a;把所有可能的数据全部测试一遍叫穷举测试。穷举测试是最全面的测试&#xff0c;但是在实际工作中不能采用&am…