消息队列篇--原理篇--Pulsar(Namespace,BookKeeper,类似Kafka甚至更好的消息队列)

news2025/1/22 12:57:26

Apache Pulusar是一个分布式、多租户、高性能的发布/订阅(Pub/Sub)消息系统,最初由Yahoo开发并开源。它结合了Kafka和传统消息队列的优点,提供高吞吐量、低延迟、强一致性和可扩展的消息传递能力,适用于大规模分布式系统的实时数据处理和异步通信。
Pulsar的架构设计结合了消息队列和流处理的特点,既可以作为传统消息队列使用,也可以作为流处理平台支持实时数据处理。

主要特点:

  • 分布式架构:Pulsar采用分层架构,将消息存储与代理服务分离,提供了更好的水平扩展能力和故障隔离。
  • 多租户支持:Pulsar支持多租户部署,不同租户可以共享同一集群,同时保证资源隔离和安全性。
  • 持久化和一致性:Pulsar支持消息的持久化存储,并通过BookKeeper提供强一致性保障。
  • 灵活的消息模型:Pulsar支持多种消息传递模式,包括Pub/Sub、P2P和Key_Shared订阅模式。
  • 多语言支持:Pulsar提供了多种编程语言的客户端库,如Java、Python、Go、C++等。
  • 丰富的生态:Pulsar拥有活跃的社区和丰富的生态系统,支持与其他工具和服务集成,如Kafka Connect、Flink、Spark等。

1、核心概念

(1)、命名空间(Namespace)

命名空间是Pulsar中的一个逻辑单元,用于组织和管理主题(Topic)。每个命名空间可以包含多个主题,并且可以为不同的命名空间设置不同的配置,例如保留策略、订阅类型等。命名空间通常用于实现多租户隔离。

(2)、主题(Topic)

主题是Pulsar中的消息通道,生产者(Producer)将消息发送到主题,消费者(Consumer)从主题中消费消息。

Pulsar支持两种类型的主题:

  • 持久化主题(Persistent Topic):消息会被持久化存储,确保即使在broker故障的情况下也不会丢失。
  • 非持久化主题(Non-Persistent Topic):消息不会被持久化存储,适用于对延迟敏感但对可靠性要求较低的场景。

(3)、订阅(Subscription)

订阅是消费者与主题之间的绑定关系。Pulsar支持多种订阅类型,每种订阅类型决定了消息的分发方式:

  • 独占订阅(Exclusive Subscription):只有一个消费者可以订阅该主题,其他消费者无法订阅。
  • 共享订阅(Shared Subscription):多个消费者可以订阅同一个主题,消息会被轮询分发给不同的消费者。
  • 故障转移订阅(Failover Subscription):多个消费者可以订阅同一个主题,但只有一个是活跃的消费者,其他消费者作为备用。当活跃消费者失败时,备用消费者会接管消息消费。
  • Key_Shared 订阅:基于消息的key进行分区,确保相同key的消息总是被分发给同一个消费者。

(4)、消息(Message)

消息是Pulsar中的基本数据单位,由生产者发送到主题。

每个消息可以包含以下属性:

  • 消息体(Payload):消息的实际内容,可以是任意二进制数据。
  • 消息ID(Message ID):唯一标识每条消息的ID,用于确认消息的消费状态。
  • 属性(Properties):用户可以为消息添加自定义的键值对属性,方便后续处理。
  • 时间戳(Timestamp):消息的创建时间或发送时间。

(5)、分区(Partition)

Pulsar支持主题分区,即将一个主题划分为多个分区,每个分区可以独立地处理消息。分区可以提高主题的吞吐量和并发性,特别是在高负载场景下。Pulsar会自动将消息均匀分布到不同的分区中。

(6)、Broker

Broker是Pulsar的核心组件之一,负责接收生产者的消息并将其分发给消费者。注意,Broker不直接存储消息,而是将消息委托给BookKeeper进行持久化存储。Broker负责管理主题、订阅和消费者的连接,并处理消息的路由和分发。

(7)、BookKeeper

BookKeeper是Pulsar的持久化存储层,负责将消息持久化到磁盘。BookKeeper采用分布式日志存储机制,提供了高可用性和强一致性保障。每个消息会被写入多个BookKeeper节点,确保即使部分节点故障也不会丢失数据。

(8)、ZooKeeper

ZooKeeper是Pulsar的元数据管理组件,用于存储集群的配置信息、主题和命名空间的元数据、以及Broker和BookKeeper的状态信息。ZooKeeper提供了分布式协调服务,确保Pulsar集群的一致性和可靠性。

2、架构设计

Pulsar的架构设计采用了分层结构,将消息存储与代理服务分离,使得系统更加模块化和可扩展。

结构示例图:
在这里插入图片描述

Pulsar的主要组件及其作用:

  • Broker:负责接收生产者的消息并将其分发给消费者。Broker不直接存储消息,而是将消息委托给BookKeeper进行持久化存储。Broker还负责管理主题、订阅和消费者的连接。

  • BookKeeper:即上图BK Client。负责将消息持久化到磁盘,提供高可用性和强一致性保障。BookKeeper采用分布式日志存储机制,确保消息的安全性和可靠性。

  • Bookie:Bookie是BookKeeper的存储节点组成,持久化地存储消息。BookKeeper采用分布式日志存储的方式,将消息以日志的形式存储在多个Bookie节点上。这种设计确保了消息的可靠性和持久性,即使在节点故障的情况下也能保证消息不丢失。

  • ZooKeeper:负责存储集群的元数据,包括主题、命名空间、Broker和BookKeeper的状态信息。ZooKeeper提供了分布式协调服务,确保集群的一致性和可靠性。

  • Proxy(可选):Pulsar提供了一个可选的代理层(Proxy),允许客户端通过HTTP或WebSocket协议与Pulsar集群进行通信。Proxy可以简化客户端的连接管理,并提供跨区域访问的能力。

  • Function:Pulsar提供了一个轻量级的流处理框架(Pulsar Functions),允许用户编写简单的流处理逻辑并将其部署到Pulsar集群中。Pulsar Functions可以用于实时数据处理、事件驱动计算等场景。

  • SQL:Pulsar提供了一个SQL查询引擎(Pulsar SQL),允许用户通过SQL语句查询Pulsar中的消息数据。Pulsar SQL可以用于数据分析、监控和告警等场景。

3、特性与优势

(1)、高吞吐量和低延迟

Pulsar采用了分层架构,将消息存储与代理服务分离,使得系统能够同时具备高吞吐量和低延迟。Broker负责处理消息的路由和分发,而BookKeeper负责持久化存储,两者相互协作,确保消息的高效传递。

(2)、多租户支持

Pulsar支持多租户部署,不同租户可以共享同一集群,同时保证资源隔离和安全性。每个租户可以拥有自己的命名空间,并可以根据需要设置不同的配置,例如保留策略、订阅类型等。
即:类似Nacos的命名空间,实现配置,服务等隔离。

(3)、持久化和一致性

Pulsar支持消息的持久化存储,并通过BookKeeper提供强一致性保障。每个消息会被写入多个Bookie节点,确保即使部分节点故障也不会丢失数据。Pulsar还支持事务和幂等性,确保消息的可靠传递。

(4)、灵活的消息模型

Pulsar支持多种消息传递模式,包括Pub/Sub、P2P和Key_Shared订阅模式。用户可以根据实际需求选择合适的订阅类型,满足不同的业务场景。Pulsar还支持消息的重播、回溯和跳过等功能,方便用户进行调试和故障排查。

(5)、多语言支持

Pulsar提供了多种编程语言的客户端库,包括Java、Python、Go、C++等。用户可以根据自己的技术栈选择合适的客户端库,快速集成Pulsar到应用程序中。

(6)、丰富的生态

Pulsar拥有活跃的社区和丰富的生态系统,支持与其他工具和服务集成。例如,Pulsar可以与Kafka Connect、Flink、Spark等工具集成,实现数据的实时处理和分析。Pulsar还提供了Pulsar Functions和Pulsar SQL等功能,进一步扩展了其应用场景。

4、应用场景

(1)、实时数据处理

Pulsar的高吞吐量和低延迟特性使其非常适合用于实时数据处理场景。例如,电商网站可以使用Pulsar来处理订单、支付、库存等实时数据,确保数据的及时性和准确性。

(2)、物联网(IoT)

Pulsar的分布式架构和多租户支持使其非常适合用于物联网场景。物联网设备可以将传感器数据发送到Pulsar,Pulsar可以将这些数据分发给不同的消费者进行处理。Pulsar还支持消息的重播和回溯功能,方便用户进行历史数据分析。

(3)、微服务架构

Pulsar可以作为微服务之间的消息总线,实现服务间的异步通信。微服务可以通过Pulsar发送和接收消息,避免阻塞主线程,提高系统的响应速度和稳定性。

(4)、日志收集和监控

Pulsar可以用于日志收集和监控场景,将应用的日志数据发送到Pulsar,Pulsar可以将这些数据分发给不同的消费者进行处理。Pulsar还支持消息的持久化存储,确保日志数据不会丢失。

(5)、事件驱动架构

Pulsar支持事件驱动架构,用户可以将事件发送到Pulsar,Pulsar可以将这些事件分发给不同的消费者进行处理。Pulsar还支持消息的重播和回溯功能,方便用户进行事件的回放和调试。

5、代码示例

(1)、生产者示例

import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.MessageId;

public class PulsarProducerExample {

    public static void main(String[] args) throws Exception {
        // 1、创建Pulsar客户端
        try (PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build()) {

            // 2、创建生产者
            try (Producer<byte[]> producer = client.newProducer()
                    .topic("persistent://public/default/example-topic")    // 指定主题
                    .create()) {

                // 3、发送消息
                for (int i = 0; i < 10; i++) {
                    String message = "Hello, Pulsar! " + i;
                    MessageId msgId = producer.send(message.getBytes());    // 发送消息
                    System.out.println(" [x] Sent message: " + message + ", msgId: " + msgId);
                }
            }
        }
    }
}

(2)、消费者示例

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.SubscriptionType;

public class PulsarConsumerExample {

    public static void main(String[] args) throws Exception {
        // 1、创建Pulsar客户端
        try (PulsarClient client = PulsarClient.builder()
                .serviceUrl("pulsar://localhost:6650")
                .build()) {

            // 2、创建消费者
            try (Consumer<byte[]> consumer = client.newConsumer()
                    .topic("persistent://public/default/example-topic")   // 监听的主题
                    .subscriptionName("example-subscription")
                    .subscriptionType(SubscriptionType.Shared)
                    .subscribe()) {

                // 3、接收和消费消息
                while (true) {       // 利用循环接收消息
                    Message<byte[]> msg = consumer.receive();      // 具体接收消息
                    try {
                        System.out.println(" [x] Received message: " + new String(msg.getData()));
                        consumer.acknowledge(msg);  // 4、确认消息已消费
                    } catch (Exception e) {
                        consumer.negativeAcknowledge(msg);  // 5、处理失败,重新投递
                    }
                }
            }
        }
    }
}

6、Pulsar总结

Apache Pulsar是一个功能强大、架构灵活的消息系统,特别适合大规模分布式系统的实时数据处理和异步通信。它的分层架构、多租户支持、持久化和一致性保障、灵活的消息模型等特点,使其在性能、可靠性和可扩展性方面表现出色。Pulsar还拥有丰富的生态系统,支持与其他工具和服务集成,适用于多种应用场景。

乘风破浪会有时,直挂云帆济沧海!!!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2280373.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python配置MITMPROXY中间人监听配置

1、安装python 环境&#xff0c;此处可以使用conda安装:conda create --name my_new_env python3.12 2、pip安装mitmproxy&#xff1a;pip install mitmproxy&#xff0c;安装后如果使用mitmproxy --version 成功返回结果&#xff0c;说明已经在环境变量路径中&#xff0c;如果…

Java-数据结构-二叉树习题(2)

第一题、平衡二叉树 ① 暴力求解法 &#x1f4da; 思路提示&#xff1a; 该题要求我们判断给定的二叉树是否为"平衡二叉树"。 平衡二叉树指&#xff1a;该树所有节点的左右子树的高度相差不超过 1。 也就是说需要我们会求二叉树的高&#xff0c;并且要对节点内所…

【网络原理】万字详解 HTTP 协议

&#x1f970;&#x1f970;&#x1f970;来都来了&#xff0c;不妨点个关注叭&#xff01; &#x1f449;博客主页&#xff1a;欢迎各位大佬!&#x1f448; 文章目录 1. HTTP 前置知识1.1 HTTP 是什么1.2 HTPP 协议应用场景1.3 HTTP 协议工作过程 2. HTTP 协议格式2.1 fiddler…

基于STM32的智能寝室控制系统设计(论文+源码)

1 .系统整体设计 通过需求分析&#xff0c;本设计基于STM32的智能寝室控制系统整体架构如图2.1所示&#xff0c;整系统利用DHT11温湿度传感器获取室内环境数据&#xff0c;并通过OLED显示&#xff0c;提供用户实时信息&#xff0c;火焰传感器和烟雾传感器用于监测火灾情况&…

日历热力图,月度数据可视化图表(日活跃图、格子图)vue组件

日历热力图&#xff0c;月度数据可视化图表&#xff0c;vue组件 先看效果&#x1f447; 在线体验https://www.guetzjb.cn/calanderViewGraph/ 日历图简单划分为近一年时间&#xff0c;开始时间是 上一年的今天&#xff0c;例如2024/01/01 —— 2025/01/01&#xff0c;跨度刚…

铁电存储器FM25CL64B简介及其驱动编写(基于STM32 hal库)

铁电存储器FM25CL64B简介及其驱动编写&#xff08;基于STM32 hal库&#xff09; 文章目录 铁电存储器FM25CL64B简介及其驱动编写&#xff08;基于STM32 hal库&#xff09;前言一、FM25CL64B简介二、驱动代码1.头文件2.c文件 总结 前言 FM25CL64B是赛普拉斯cypress出品的一款铁…

基于微信小程序的科创微应用平台设计与实现(LW+源码+讲解)

专注于大学生项目实战开发,讲解,毕业答疑辅导&#xff0c;欢迎高校老师/同行前辈交流合作✌。 技术范围&#xff1a;SpringBoot、Vue、SSM、HLMT、小程序、Jsp、PHP、Nodejs、Python、爬虫、数据可视化、安卓app、大数据、物联网、机器学习等设计与开发。 主要内容&#xff1a;…

HarmonyOS Next 最强AI智能辅助编程工具 CodeGenie介绍

随着大模型的兴起&#xff0c;在智能编码领域首先获得了应用。 市面上从Microsoft Copilot到国内阿里通义&#xff0c;字节marscode等&#xff0c;都提供了copilot方式的智能编码工具。HarmonyOS Next作为诞生一年的新事物&#xff0c;由于代码量和文档迭代原因&#xff0c;在智…

WPF2-1在xaml为对象的属性赋值.md

1. AttributeValue方式 1.1. 简单属性赋值1.2. 对象属性赋值 2. 属性标签的方式给属性赋值3. 标签扩展 (Markup Extensions) 3.1. StaticResource3.2. Binding 3.2.1. 普通 Binding3.2.2. ElementName Binding3.2.3. RelativeSource Binding3.2.4. StaticResource Binding (带参…

Appium(四)

一、app页面元素定位 1、通过id定位元素: resrouce-id2、通过ClassName定位&#xff1a;classname3、通过AccessibilityId定位&#xff1a;content-desc4、通过AndroidUiAutomator定位5、通过xpath定位xpath、id、class、accessibility id、android uiautomatorUI AutomatorUI自…

Windows图形界面(GUI)-QT-C/C++ - Qt List Widget详解与应用

公开视频 -> 链接点击跳转公开课程博客首页 -> ​​​链接点击跳转博客主页 目录 QListWidget概述 使用场景 常见样式 QListWidget属性设置 显示方式 (Display) 交互行为 (Interaction) 高级功能 (Advanced) QListWidget常见操作 内容处理 增加项目 删除项目…

Oracle 创建并使用外部表

目录 一. 什么是外部表二. 创建外部表所在的文件夹对象三. 授予访问外部表文件夹的权限3.1 DBA用户授予普通用户访问外部表文件夹的权限3.2 授予Win10上的Oracle用户访问桌面文件夹的权限 四. 普通用户创建外部表五. 查询六. 删除 一. 什么是外部表 在 Oracle 数据库中&#x…

靠右行驶数学建模分析(2014MCM美赛A题)

笔记 题目 要求分析&#xff1a; 比较规则的性能&#xff0c;分为light和heavy两种情况&#xff0c;性能指的是 a.流量与安全 b. 速度限制等分析左侧驾驶分析智能系统 论文 参考论文 两类规则分析 靠右行驶&#xff08;第一条&#xff09;2. 无限制&#xff08;去掉了第一条…

Kafka 源码分析(一) 日志段

首先我们的 kafka 的消息本身是存储在日志段中的, 对应的源码是下面这段代码: class LogSegment private[log] (val log: FileRecords,val lazyOffsetIndex: LazyIndex[OffsetIndex],val lazyTimeIndex: LazyIndex[TimeIndex],val txnIndex: TransactionIndex,val baseOffset:…

【番外篇】实现排列组合算法(Java版)

一、说明 在牛客网的很多算法试题中&#xff0c;很多试题底层都是基于排列组合算法实现的&#xff0c;比如最优解、最大值等常见问题。排列组合算法有一定的难度&#xff0c;并不能用一般的多重嵌套循环解决&#xff0c;没有提前做针对性的学习和研究&#xff0c;考试时候肯定…

Linux - 线程池

线程池 什么是池? 池化技术的核心就是"提前准备并重复利用资源". 减少资源创建和销毁的成本. 那么线程池就是提前准备好一些线程, 当有任务来临时, 就可以直接交给这些线程运行, 当线程完成这些任务后, 并不会被销毁, 而是继续等待任务. 那么这些线程在程序运行过程…

【K8S系列】K8s 领域深度剖析:年度技术、工具与实战总结

引言 Kubernetes作为容器编排领域的行业标准&#xff0c;在过去一年里持续进化&#xff0c;深刻推动着云原生应用开发与部署模式的革新。本文我将深入总结在使用K8s特定技术领域的进展&#xff0c;分享在过去一年中相关技术工具及平台的使用体会&#xff0c;并展示基于K8s的技术…

C++《AVL树》

在之前的学习当中我们已经了解了二叉搜索树&#xff0c;并且我们知道二叉搜索树的查找效率是无法满足我们的要求&#xff0c;当二叉树为左或者右斜树查找的效率就很低下了&#xff0c;那么这本篇当中我们就要来学习对二叉搜索树进行优化的二叉树——AVL树。在此会先来了解AVL树…

【MySQL】存储引擎有哪些?区别是什么?

频率难度60%⭐⭐⭐⭐ 这个问题其实难度并不是很大&#xff0c;只是涉及到的相关知识比较繁杂&#xff0c;比如事务、锁机制等等&#xff0c;都和存储引擎有关系。有时还会根据场景选择不同的存储引擎。 下面笔者将会根据几个部分尽可能地讲清楚 MySQL 中的存储引擎&#xff0…

王道数据结构day1

2.1线性表的定义和基本操作 1.线性表的定义 相同数据类型的数据元素的有限序列 位序(从1开始&#xff09; 表头元素&#xff0c;表尾元素 直接钱去&#xff0c;直接后继 2.线性表的基本操作 基本操作&#xff1a;创销&#xff0c;增删改查 优化插入&#xff1a; 查找