中间件之MQ-Kafka

news2024/10/23 1:06:08

一、引言

Apache Kafka是一个分布式消息队列系统,最初由LinkedIn开发,并于2011年开源。Kafka以其高吞吐量、低延迟和容错能力而著名,广泛应用于日志收集、实时流处理、事件驱动架构等领域。本文将详细介绍Kafka的基本概念、特点、应用场景以及如何使用,同时与另一个流行的消息中间件RocketMQ进行对比,以帮助读者更好地理解和应用Kafka。

二、Kafka的基本概念

1. 主题(Topic)

Kafka中的主题是一个逻辑上的消息分类,类似于数据库中的表。每条消息都属于一个特定的主题。生产者将消息发送到特定的主题,而消费者则从主题中订阅并消费消息。

2. 分区(Partition)

每个主题可以被分成一个或多个分区,每个分区是一个有序的、不可变的消息序列,这些消息被顺序地追加到分区日志中。分区是Kafka实现并行处理的关键,每个分区可以独立地被消费。

3. 副本(Replica)

为了提高数据的可靠性和容错性,每个分区可以有多个副本,这些副本分布在不同的Kafka服务器上。Kafka会自动处理副本之间的数据同步,确保数据的一致性。

4. 生产者(Producer)

生产者负责将消息发送到Kafka集群。生产者可以指定消息的主题和键(Key),Kafka会根据键和分区策略将消息发送到相应的分区。

5. 消费者(Consumer)

消费者从Kafka集群中订阅并消费消息。每个消费者都属于一个特定的消费者组(Consumer Group),同一个组内的消费者共同消费一个主题的所有分区,而不同的组则可以消费相同的主题。

6. 消费者组(Consumer Group)

消费者组允许你将消息流分成多个并行流,每个消费者组内的消费者实例可以独立地处理消息。Kafka通过消费者组实现了消息的负载均衡。

三、Kafka的特点

1. 高吞吐量

Kafka的设计目标是处理高吞吐量的消息流。通过顺序写磁盘、零拷贝技术和批量处理等技术手段,Kafka能够实现每秒数十万到数百万条消息的处理能力。

2. 低延迟

Kafka提供了低延迟的消息传递,这对于实时流处理和事件驱动架构至关重要。Kafka的消息传递延迟通常在几毫秒到几百毫秒之间。

3. 高容错性

Kafka通过分区和副本机制实现了数据的高容错性。即使部分Kafka服务器出现故障,也不会导致数据的丢失或服务的中断。

4. 可扩展性

Kafka的架构是高度可扩展的,可以轻松地增加更多的Kafka服务器和分区来处理更多的消息流。

5. 持久化

Kafka将消息持久化到磁盘上,即使服务器重启也不会丢失数据。同时,Kafka还支持消息的压缩和清理策略,以节省磁盘空间。

四、Kafka的应用场景

1. 日志收集

Kafka可以作为日志收集系统的一部分,将各种日志信息发送到Kafka集群,然后由专门的日志处理服务进行处理和分析。

2. 实时流处理

Kafka可以与实时流处理框架(如Apache Flink、Apache Storm)集成,实现实时的数据流处理和分析。

3. 事件驱动架构

Kafka可以作为事件驱动架构的核心组件,将各种事件发送到Kafka集群,然后由不同的消费者处理这些事件。

4. 用户活动跟踪

Kafka可以用来记录web用户或app用户的各种活动(如浏览网页、搜索、点击等),然后订阅者可以通过订阅这些活动信息来做实时的监控分析或离线分析。

五、Kafka与RocketMQ的对比

1. 基本概念

RocketMQ是一个纯Java、分布式、队列模型的开源消息中间件,具有高性能、高可靠、高实时、分布式特点。与Kafka类似,RocketMQ也支持生产者、消费者、主题、队列等概念。但RocketMQ在消息模型、存储机制、消费模型等方面与Kafka有所不同。

2. 消息模型

Kafka主要支持发布/订阅(Pub/Sub)模型,即消息被发布到一个主题上,然后由多个消费者订阅并消费这些消息。而RocketMQ支持多种消息模型,包括发布/订阅模型、点对点(P2P)模型等。发布/订阅模型适用于需要广播消息的场景,而点对点模型则适用于需要严格顺序消息的场景。

3. 存储机制

Kafka采用顺序写磁盘的方式存储消息,这种方式比随机写入快得多,显著提高了消息存储的效率。RocketMQ则采用基于磁盘的存储方式,同时支持消息的持久化和快速重放。

4. 消费模型

Kafka的消费者通过拉取(Pull)的方式从主题中消费消息,这种方式给消费者提供了更大的灵活性。而RocketMQ则支持推送(Push)和拉取(Pull)两种消费模式,推送模式可以实时地将消息推送给消费者,而拉取模式则允许消费者按照自己的节奏消费消息。

5. 性能和可靠性

Kafka和RocketMQ在性能和可靠性方面都有出色的表现。Kafka以其高吞吐量和低延迟著称,而RocketMQ则提供了多种机制来保证消息的可靠性,如消息持久化、消息确认机制、消息重试和死信队列等。

六、Kafka的使用示例

1. 环境准备

在使用Kafka之前,你需要先准备好Kafka环境。你可以从Apache Kafka的官方网站下载并安装Kafka,也可以使用Docker等容器化技术来部署Kafka。

2. 创建主题

在Kafka中创建主题通常使用kafka-topics.sh脚本。以下是一个创建名为test-topic的主题的示例命令:

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-topic

这个命令会在本地运行的ZooKeeper上创建一个名为test-topic的主题,该主题有1个分区和1个副本。

3. 生产者示例

以下是一个简单的Kafka生产者示例,使用Java编写:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        for (int i = 0; i < 100; i++) {
            ProducerRecord<String, String> record = new ProducerRecord<>("test-topic", Integer.toString(i), "message-" + i);
            producer.send(record);
        }

        producer.close();
    }
}

这个示例创建了一个Kafka生产者,并向test-topic主题发送了100条消息。

4. 消费者示例

以下是一个简单的Kafka消费者示例,使用Java编写:

import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.Collections;
import java.util.Properties;

public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList("test-topic"));

        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(100);
                for (ConsumerRecord<String, String> record : records) {
                    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
                }
            }
        } finally {
            consumer.close();
        }
    }
}

这个示例创建了一个Kafka消费者,并订阅了test-topic主题。消费者会不断地从该主题中拉取消息并打印出来。

七、结论

本文介绍了Apache Kafka,一个由LinkedIn开发并于2011年开源的分布式消息队列系统。Kafka以高吞吐量、低延迟和容错能力著称,广泛应用于日志收集、实时流处理等领域。文章详细阐述了Kafka的基本概念,包括主题、分区、副本、生产者和消费者等,并总结了Kafka的特点,如高吞吐量、低延迟、高容错性等。此外,还介绍了Kafka在日志收集、实时流处理等场景中的应用,并与RocketMQ进行了对比。最后,通过Java示例展示了如何使用Kafka创建主题、生产消息和消费消息。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2221199.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Leetcode—192. 统计词频【中等】(Shell)

2024每日刷题&#xff08;188&#xff09; Leetcode—192. 统计词频 实现代码 # Read from the file words.txt and output the word frequency list to stdout. cat words.txt | tr -s \n | sort | uniq -c | sort -nr | awk {print $2, $1}运行结果 之后我会持续更新&…

学习threejs,通过THREE.Raycaster给模型绑定点击事件

&#x1f468;‍⚕️ 主页&#xff1a; gis分享者 &#x1f468;‍⚕️ 感谢各位大佬 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍⚕️ 收录于专栏&#xff1a;threejs gis工程师 文章目录 一、&#x1f340;前言1.1 ☘️THREE.Raycaster光线投射概…

DirectX11:Position Based Fluid

前言 这是我本科毕业设计项目&#xff0c;使用DirectX11实现一个基于PBD的流体模拟仿真&#xff0c;同时也算是补了Games101的大作业了。 阅读本文假设你对以下内容比较熟悉&#xff1a; 流体模拟&#xff1a;Smoothed Particle Hydrodynamics 流体模拟&#xff1a;Neighbor…

UNIX网络编程-传输层

概述 传输层主要包括&#xff1a;TCP、UDP、SCTP&#xff08;流控制传输协议&#xff09;&#xff01; 绝大多数客户端/服务器网络应用都使用TCP/UDP。SCTP是一个较新的协议&#xff0c;最初设计用于跨因特网传输电话信令。 这些传输协议都转而使用网络协议IP&#xff1a;或是…

windows中命令行批处理脚本学习

目录 一 基础知识二 常见命令1. 输出 echo2. 注释 rem .... %...% :: goto if (10) ()3. 变量 set4. 获取参数 %数字 %*5. 退出 exit6. 复制 copy7.读取输出文件内容 type8. 帮助 命令xxx /?9.等待当前命令运行结束后,才执行下一条命令 call10. 修改字体编码 chcp11. 特殊变量…

集合框架16:HashMap的使用

视频链接&#xff1a;13.35 HashMap使用&#xff08;1&#xff09;_哔哩哔哩_bilibilihttps://www.bilibili.com/video/BV1zD4y1Q7Fw?spm_id_from333.788.videopod.episodes&vd_sourceb5775c3a4ea16a5306db9c7c1c1486b5&p35 1.创建Student类&#xff0c;添加无参构造…

UML之用例图详解

~犬&#x1f4f0;余~ “我欲贱而贵&#xff0c;愚而智&#xff0c;贫而富&#xff0c;可乎&#xff1f; 曰&#xff1a;其唯学乎” 零、什么是用例图 用例图&#xff08;Use Case Diagram&#xff09;是UML中一种重要的图表类型&#xff0c;它主要用于描述系统的功能性需求&am…

Java使用HttpClient5实现发送HTTP请求

Java 实现发送 HTTP 请求&#xff0c;系列文章&#xff1a; 《Java使用原生HttpURLConnection实现发送HTTP请求》 《Java使用HttpClient5实现发送HTTP请求》 《SpringBoot使用RestTemplate实现发送HTTP请求》 1、HttpClient5 的介绍 HttpClient5 是 Apache HttpComponents 项目…

文件处理新纪元:微信小程序的‘快递员’与‘整理师’

嗨&#xff0c;我是中二青年阿佑&#xff0c;今天阿佑将带领大家如何通过巧妙的文件处理功能&#xff0c;让用户体验从‘杂乱无章’到‘井井有条’的转变&#xff01; 文章目录 微信小程序的文件处理文件上传&#xff1a;小程序的“快递服务”文件下载&#xff1a;小程序的“超…

植物大战僵尸杂交版游戏分享

植物大战僵尸杂交版游戏下载&#xff1a;夸克网盘分享 无捆绑之类的隐形消费&#xff0c;下载即玩

vue3 解决背景图与窗口留有间隙的问题

需要实现一个登录界面&#xff0c;login.vue的代码如下&#xff1a; <script> import { ref } from vue;export default {setup() {return {};}, }; </script><template><div id"login-container" class"login-container"><di…

Taro构建的H5页面路由切换返回上一页存在白屏页面过渡

目录 项目背景&#xff1a;Taro与Hybrid开发问题描述&#xff1a;白屏现象可能的原因包括&#xff1a; 解决方案解决后的效果图 其他优化方案可参考&#xff1a; 项目背景&#xff1a;Taro与Hybrid开发 项目使用Taro框架同时开发微信小程序和H5页面&#xff0c;其中H5页面被嵌…

Nodes 节点

Goto Tree List 树列表 Nodes 节点 Tree List 节点是组织成树状层次结构的数据行。 Add New Nodes 添加新节点 如果 Tree List 具有数据源&#xff0c;则会自动生成节点&#xff08;TreeListNode 类对象&#xff09;。要在未绑定模式下添加节点&#xff0c;请调用“树列表设…

【K8S系列】Kubernetes Pod节点Pending状态及解决方案详解【已解决】

在 Kubernetes 中&#xff0c;Pod 的状态为 Pending 表示 Pod 已被创建&#xff0c;但尚未被调度到节点上&#xff0c;或者已调度到节点上但容器尚未开始运行。这一状态常常是因为某些条件未满足&#xff0c;导致 Pod 无法正常启动。以下是对 Pending 状态的详细分析及解决方案…

自由学习记录(12)

综合实践 2D的Shape&#xff0c;Tilemap都要导包的&#xff0c;编辑器也要导包&#xff0c;。。和2d沾边的可能3d都要主动导包 应该综合的去运用&#xff0c;不见得Tilemap就很万能&#xff0c;如果要做什么顶方块的有交互反应的物体&#xff0c; 那直接拖Sprite会更方便一些…

APIJSON 为零代码提供了新的思路

APIJSON 核心概念 APIJSON 是一种用于构建 RESTful API 的 JSON 格式&#xff0c;它提供了一种标准化的方式来定义和处理 API 请求和响应。APIJSON 的设计目标是简化前端和后端之间的数据交互&#xff0c;减少开发工作量&#xff0c;提高开发效率。 在线解析 自动生成文档&am…

【SpringBoot】16 文件上传(Thymeleaf + MySQL)

Gitee仓库 https://gitee.com/Lin_DH/system 介绍 文件上传是指将本地的图片、视频、音频等文件上传到服务器&#xff0c;供其他用户浏览下载的过程&#xff0c;文件上传在日常项目中用的非常广泛。 实现代码 第一步&#xff1a;在配置文件新增如下配置 application.yml s…

docker-compose-lnmp-wordpress

使用 docker-compose 在 CentOS 7 上编写并部署 LNMP (Linux, Nginx, MySQL, PHP) 环境的 YAML 文章目录 部署步骤&#xff1a;1. 安装 Docker 和 Docker Compose1.1安装 Docker&#xff1a;1.2安装 Docker Compose&#xff1a; 2.创建目录结构3.编写docker-compose.yml4.ngin…

Java项目-基于springboot框架的财务管理系统项目实战(附源码+文档)

作者&#xff1a;计算机学长阿伟 开发技术&#xff1a;SpringBoot、SSM、Vue、MySQL、ElementUI等&#xff0c;“文末源码”。 开发运行环境 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringBoot、Vue、Mybaits Plus、ELementUI工具&#xff1a;IDEA/…