Kafka的入门及简单使用

news2024/9/17 8:48:39

文章目录

  • 前言
  • 一、Kafka 的基本架构?
    • 1. Producer(生产者)
    • 2. Broker(代理/服务器)
    • 3. Consumer(消费者)
    • 4. Consumer Group(消费者组)
    • 5. Topic(主题)
    • 6. Partition(分区)
    • 7. Replication(复制)
    • 8. ZooKeeper
  • 二、代码测试
    • 1.引入依赖
    • 2.启动
    • 3.创建生产者
    • 4.创建消费者
    • 5.结果


前言

Kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。该项目的目标是为处理实时数据提供一个统一、高吞吐、低延迟的平台。其持久化层本质上是一个“按照分布式事务日志架构的大规模发布/订阅消息队列”,这使它作为企业级基础设施来处理流式数据非常有价值。

Kafka 的核心特性可以总结为以下几个方面:

  1. 发布订阅模型

    • Kafka 使用类似于消息队列的发布订阅模型,但更侧重于消息持久化以及支持多消费者模型。
    • 生产者(Producer)将消息发送到主题(Topic),消费者(Consumer)则订阅这些主题来消费消息。
    • 消费者可以是多个消费者组成的消费者组(Consumer Group),这样可以实现消息的并行处理。
  2. 可扩展性

    • Kafka 能够水平扩展,通过增加更多的服务器节点可以提升系统的吞吐量。
    • Kafka 可以部署在分布式集群中,具有很强的容错能力。
  3. 持久性和可靠性

    • Kafka 将消息存储在磁盘上,并允许复制到多个服务器上以防止数据丢失。
    • Kafka 保证消息的顺序性,在一个分区内的消息会按照它们被发送的顺序存储和读取。
  4. 高性能

    • Kafka 设计为支持高吞吐量,即使在非常大的数据集上也能保持低延迟。
    • Kafka 利用零拷贝技术来提高性能,这意味着它可以在不复制数据的情况下直接从磁盘读取数据到网络栈。
  5. 存储

    • Kafka 的数据存储是基于日志文件的,这意味着它可以有效地存储大量数据。
    • Kafka 支持数据保留策略,可以根据时间和大小来决定何时删除旧数据。
  6. 流处理

    • Kafka Streams API 允许开发者创建复杂的流处理应用程序,如实时聚合、过滤和转换数据等操作。
    • Kafka 还与其他流处理框架(如 Apache Flink 和 Apache Spark Streaming)集成良好。

Apache Kafka 是一个非常强大的流处理平台,它被广泛应用于多种不同的场景中。以下是 Kafka 的一些典型应用场景:

1. 日志处理与分析

  • 日志收集:Kafka 可以收集来自不同服务的日志数据,如 Web 服务器、应用服务器、数据库服务器等。
  • 日志聚合:将收集的日志数据聚合起来,以便进一步分析。
  • 日志分析:通过集成工具(如 Apache Flink, Hadoop, Elasticsearch 等)进行实时或批量分析。

2. 消息队列

  • 异步通信:Kafka 可以作为消息中间件,实现服务间的异步通信,降低服务耦合度。
  • 消息缓冲:在消息生产者和消费者之间提供缓冲,帮助平衡负载。
  • 服务解耦:通过消息队列实现服务之间的解耦。

3. 实时数据流处理

  • 实时分析:处理实时数据流,执行复杂的事件处理、转换和分析操作。
  • 流式数据处理:构建实时数据处理流水线,例如实时计算、警报触发等。

4. 系统监控与报警

  • 指标收集:收集各种监控指标,并实时处理这些数据。
  • 异常检测:基于实时数据流检测异常行为,并及时发出警报。

5. 流量削峰

  • 负载均衡:通过设置消息队列的最大容量来控制客户端流量,避免后端服务过载。

6. 高可用性

  • 多副本冗余:Kafka 的多副本机制确保了数据的高可用性。
  • 容错性:即使部分节点发生故障,Kafka 仍然能够保证数据的可靠性和持续的服务。

7. 分布式任务调度

  • 任务管理:通过 Kafka 发布任务,多个消费者可以并发地处理这些任务。

8. 物联网 (IoT)

  • 传感器数据处理:处理来自 IoT 设备的大量数据流。

一、Kafka 的基本架构?

在这里插入图片描述
下面是 Kafka 架构的主要组成部分:

1. Producer(生产者)

  • 生产者是向 Kafka 发布消息的应用程序或服务。
  • 生产者可以选择将消息发送到特定的 topicpartition
  • 生产者还负责选择消息的 key,这会影响消息如何被分发到分区。

2. Broker(代理/服务器)

  • Kafka 集群由一个或多个服务器组成,这些服务器称为 Broker。
  • Broker 负责接收来自生产者的消息并将它们存储在磁盘上。
  • 每个 Broker 可以托管一个或多个 topic 的分区。
  • Broker 也负责将消息发送给消费者。

3. Consumer(消费者)

  • 消费者是从 Kafka 中读取消息的应用程序或服务。
  • 消费者可以订阅一个或多个 topic,并且只能从自己订阅的 topic 中读取数据。
  • 消费者可以从特定的时间点开始读取历史数据,也可以从最新的消息开始读取。

4. Consumer Group(消费者组)

  • 一个 consumer group 是一组具有相同 group id 的消费者。
  • 每个 topic 的分区只会被分配给同一个 consumer group 中的一个消费者。
  • 这种机制允许消息被并行处理,并且提供了容错性,因为如果一个消费者失败,它的分区会被重新分配给同一个组内的其他消费者。

5. Topic(主题)

  • Topic 是一种逻辑类别,用于组织和发布消息。
  • 一个 topic 可以包含多个 partition
  • 每个 topic 都有一个预定义的配置,比如分区数量、复制因子等。

6. Partition(分区)

  • Partitiontopic 的物理分割,每个 partition 是一个有序的不可变消息序列。
  • 分区可以分布在多个 Broker 上,这使得 Kafka 能够水平扩展。
  • 每个 partition 都有一个主 broker(leader)以及可能的一些副本 broker(follower)。

7. Replication(复制)

  • 为了确保高可用性和容错性,每个 partition 都可以有多个副本。
  • 除了一个主副本(leader)之外,还可以有从副本(follower),这些副本会在不同的 Broker 上存储同一份数据。
  • 如果主副本出现故障,其中一个从副本可以被提升为主副本。

8. ZooKeeper

  • Kafka 使用 ZooKeeper 来管理集群的元数据,包括:
    • broker 的注册和发现。
    • topic 的配置。
    • consumer group 的状态。
    • 分区的领导者选举。

二、代码测试

1.引入依赖

<!-- Spring Kafka Starter -->
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

2.启动

在Linux下安装、配置、启动
在IDEA中配置

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

3.创建生产者

@RestController
public class ProducerController {

    private static final Logger log = LoggerFactory.getLogger(ProducerController.class);

    private static final String TOPIC = "admin-messages";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @PostMapping("/sendMessage")
    public String sendMessage(@RequestBody FrontEvent frontEvent) {
        String json = JSON.toJSONString(frontEvent);
        kafkaTemplate.send(TOPIC, json);
        log.info("Send message=========={}",json);
        return "send success!";
    }
}

4.创建消费者

@Component
public class Consumer{

    private static final Logger log = LoggerFactory.getLogger(Consumer.class);
    
    @KafkaListener(topics = "admin-messages",groupId = "myGroupId")
    public void receiveAdminMessage(String message) {
        try {
            FrontEvent frontEvent = JSON.parseObject(message, FrontEvent.class);
            log.info("Received message=========={}", frontEvent.toString());

            // 保存到数据库
            //frontEventRepository.save(frontEvent);
        } catch (Exception e) {
            // 如果发生异常,则需要进行失败重试,需手动设置重试次数,死信队列 
            // 手动ack ack.acknowledge();
            log.error("Failed to process the message: {}", message, e);
        }
    }
}

5.结果

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1962234.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

docker前端部署

挂载&#xff0c;把自己的目录位置&#xff0c;挂载到容器内的HTML

万亿赛道!AI 大模型典型应用深度分析 2024

大模型由于其强大的自然语言与多模态信息处理能力&#xff0c;可以应对不同语义粒度下的任务,进行复杂的逻辑推理&#xff0c;还具有超强的迁移学习和少样本学习能力, 可以快速掌握新的任务, 实现对不同领域、不同数据模式的适配&#xff0c;这些特点使得大模型较容易的赋能其他…

吴恩达机器学习C1W2Lab05-使用Scikit-Learn进行线性回归

前言 有一个开源的、商业上可用的机器学习工具包&#xff0c;叫做scikit-learn。这个工具包包含了你将在本课程中使用的许多算法的实现。 目标 在本实验中&#xff0c;你将: 利用scikit-learn实现使用梯度下降的线性回归 工具 您将使用scikit-learn中的函数以及matplotli…

付费进群系统源码原版最新修复全开源版

付费进群&#xff0c;和平时所见到的别人拉你进群是不一样的&#xff0c;付费进群需要先缴费以后&#xff0c;才会看到群的二维码&#xff0c;扫码进群或者是长按二维码图片识别进群&#xff0c;付费进群这个功能广泛应用于拼多多的砍价群&#xff0c;活动的助力群&#xff0c;…

Chapter 21 深入理解JSON

欢迎大家订阅【Python从入门到精通】专栏&#xff0c;一起探索Python的无限可能&#xff01; 文章目录 前言一、JSON数据格式1. 什么是JSON&#xff1f;2. JSON数据的格式 二、JSON格式数据转化三、格式化JSON数据的在线工具 前言 在当今数据驱动的世界中&#xff0c;JSON&…

【大模型系列篇】Vanna-ai基于检索增强(RAG)的sql生成框架

简介 Vanna是基于检索增强(RAG)的sql生成框架 Vanna 使用一种称为 LLM&#xff08;大型语言模型&#xff09;的生成式人工智能。简而言之&#xff0c;这些模型是在大量数据&#xff08;包括一堆在线可用的 SQL 查询&#xff09;上进行训练的&#xff0c;并通过预测响应提示中最…

中间件安全:Nginx 解析漏洞测试.

中间件安全&#xff1a;Nginx 解析漏洞测试. Nginx 是一个高性能的 HTTP和 反向代理服务器&#xff0c;Nginx 解析漏洞是一个由于配置不当导致的安全问题&#xff0c;它不依赖于Nginx或PHP的特定版本&#xff0c;而是由于用户配置错误造成的。这个漏洞允许攻击者上传看似无害的…

通俗易懂,车载显示屏简单介绍!

2024年后&#xff0c;小汽车产量的年增长率预计将在1%至3%之间 2023年在COVID完全解封后&#xff0c;全球汽车销售数量提升至8千8百万台车。2024预估微幅增加到 9000万辆&#xff0c; 自2024起&#xff0c;年成长率预期将放缓至3%以下。全球汽车主要销售前三大市场 (比较2022年…

为什么阿里开发手册不建议使用Date类?

在日常编码中&#xff0c;基本上99%的项目都会有一个DateUtil工具类&#xff0c;而时间工具类里用的最多的就是java.util.Date。 大家都这么写&#xff0c;这还能有问题&#xff1f;&#xff1f; 当你的“默认常识”出现问题&#xff0c;这个打击&#xff0c;就是毁灭性的。 …

BUG解决(vue3+echart报错):Cannot read properties of undefined (reading ‘type‘)

这是 vue3echart5 遇到的报错&#xff1a;Cannot read properties of undefined (reading ‘type‘) 这个问题需要搞清楚两个关键方法&#xff1a; toRaw&#xff1a; 作用&#xff1a;将一个由reactive生成的响应式对象转为普通对象。 使用场景&#xff1a; 用于读取响应式…

idea2023 总报Low memory

idea2023 总报Low memory 问题背景问题处理 问题背景 在日常开发中&#xff0c;使用idea2023开发工具&#xff0c;开发过程中总会遇到idea提示Low memory的情况&#xff0c;并且每当提示出现的时候&#xff0c;整个idea页面便什么也不能操作了&#xff0c;如何处理这个情况呢&…

AI测试:人工智能模型的核心测试指标,分类判别、目标检测、图像分割、定量计算分别有哪些指标?

在前面的人工智能测试技术系列文章中&#xff0c;我们详细介绍了人工智能测试的技术方法和实践流程。在了解人工智能测试方法后&#xff0c;我们需要进一步学习和研究如何衡量这些方法的有效性&#xff0c;即人工智能模型测试指标的选择。测试指标的选择主要取决于模型的类型和…

借助大语言模型快速升级你的 Java 应用程序

大家都知道我爱小 Q。在我“转码”的征程中&#xff0c;它就像上帝之手&#xff0c;在我本该枯燥漫长的学习进程中拉满快进条。 不仅是我&#xff0c;最近 Amazon Q Developer 还帮助 Amazon 一个由 5 人组成的团队在短短两天内将 1,000 多个生产应用程序从 Java 8 升级到 Jav…

Spring Cloud 组件

1.eureka注册中心原理简述 1.服务注册: Eureka Client 会通过发送rest请求的方式向eureka服务端注册自身元数据:ip地址,端口,运行状况等信息,服务端会把注册信息存储在一个双层map中。 Eureka 的数据存储分了两层&#xff1a;数据存储层和缓存层。 Eureka Client 在拉取服务信息…

【STM32嵌入式系统设计与开发拓展】——13_PWM脉宽

目录 1、什么是PWM?用来做什么的&#xff1f;PWM&#xff08;Pulse Width Modulation&#xff09;脉冲宽度调制常见用到 PWM 的情况&#xff1a; 2、什么是输出比较&#xff1f;输出比较模式![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/42434920ca0940b1b1083215…

vue el-input 输入框下拉显示匹配数据

1、效果图&#xff1a; 2、需求&实现&#xff1a; 输入条件 下面匹配查询到的数据有多少个 需要调用后端接口展示&#xff0c;后端查询到之后返回条数 前端展示 3、具体代码实现&#xff1a; html&#xff1a; 图片需要自己根据实际情况增加 // 查询 重置 筛选 本文章…

【git】git常用命令提交规范

Git 是程序员工作中不可或缺的版本控制工具&#xff0c;以下是一些优化后的常用 Git 命令列表&#xff0c;旨在帮助你更高效地使用 Git 进行版本控制。 基础操作 拉取代码 git clone xxx.git创建分支 git branch dev切换分支 git checkout dev # 或者 git switch dev创建并切换…

Python酷库之旅-第三方库Pandas(056)

目录 一、用法精讲 211、pandas.Series.truncate方法 211-1、语法 211-2、参数 211-3、功能 211-4、返回值 211-5、说明 211-6、用法 211-6-1、数据准备 211-6-2、代码示例 211-6-3、结果输出 212、pandas.Series.where方法 212-1、语法 212-2、参数 212-3、功能…

论报文加密加签场景下如何高效的进行渗透测试

前言 最新的测试中&#xff0c;经常遇到HTTP报文加密/加签传输的情况&#xff0c;这导致想要查看和修改明文报文很不方便。 之前应对这种情况我们有几种常见的办法解决&#xff0c;比如使用burpy插件、在Burp上下游使用mitmproxy进行代理等&#xff0c;但这些使用起来不太方便…

LSTM详解总结

LSTM&#xff08;Long Short-Term Memory&#xff09;是一种用于处理和预测时间序列数据的递归神经网络&#xff08;RNN&#xff09;的改进版本。其设计初衷是为了解决普通RNN在长序列训练中出现的梯度消失和梯度爆炸问题。以下是对LSTM的详细解释&#xff0c;包括原理、公式、…