RabbitMQ3.13.x之十_流过滤的内部结构设计与实现

news2025/1/2 3:47:48

RabbitMQ3.13.x之十_流过滤的内部结构设计与实现

文章目录

  • RabbitMQ3.13.x之十_流过滤的内部结构设计与实现
  • 1. 概念
    • 1. 消息发布
    • 2. 消息消费
  • 2. 流的结构
    • 1. 在代理端进行过滤
    • 2. 客户端筛选
    • 3. JavaAPI示例
    • 4. 流过滤配置
    • 5. AMQP上的流过滤
    • 6. 总结
  • 3. 相关链接

1. 概念

流过滤的思想是在代理端提供第一级的高效过滤,而无需代理解释消息。 这样,只需要流子集的使用者就不需要自己获取所有数据并处理所有过滤。 这可以大大减少传输给消费者的数据。

通过筛选,可以将筛选器值与每条消息相关联。 它可以是地理信息,例如每条消息来自的世界区域,如下图所示:

在这里插入图片描述

因此,我们的流有 1 条(绿色)消息、1 条(深蓝色)消息、2 条(紫色)消息,然后是 2 条消息。AMER``APAC``EMEA``AMER

1. 消息发布

发布者可以将每封出站邮件与其筛选器值相关联:

在这里插入图片描述

在上图中,发布者发布了 1 条(绿色)消息和 2 条(紫色)消息,这些消息将添加到流中。AMER``EMEA

2. 消息消费

当使用者订阅时,它可以指定一个或多个过滤器值,并且代理应仅发送具有此或这些过滤器值的消息。 我们很快就会看到这在实践中有点不同,但这足以理解这些概念。

在下图中,顶部的使用者指定它只想要(绿色)消息,而代理只发送这些消息。 中间的消费者和底部的消费者也是如此。AMER``EMEA``APAC

在这里插入图片描述

概念就到这里了,现在让我们来了解一下实现细节。

2. 流的结构

我们需要知道流是如何构建的,以便了解流过滤的内部结构。 流是包含段文件的目录。 每个区段文件都有一个关联的索引文件(用于了解在区段文件中给定偏移量处附加使用者的位置等)。 拥有多个“小”段文件比为整个流拥有一个大的整体文件要好:例如,删除“旧”段文件以截断流比删除大文件的开头更有效、更安全。

区段文件由包含消息的块组成。 区块中的消息数取决于入口速率(高入口速率表示一个块中的消息较多,低入口速率表示块中的消息较少)。 块中的消息数量从几条(甚至 1 条)到几千条不等。

块是怎么回事? 块是流中的工作单元:它们用于复制,更重要的是,对于我们的主题,用于消费者交付。 代理使用 sendfile 系统调用(将整个块从文件系统发送到网络套接字,而不将数据复制到用户空间)向使用者发送块,一次一个。

下图说明了流的结构:

在这里插入图片描述

有了这个,让我们看看代理如何知道是否要调度一个区块。

1. 在代理端进行过滤

想象一下,我们有一个只想要(绿色)消息的消费者。 当代理要调度一个区块时,它需要知道该区块是否包含消息。 如果是这样,它可以将块发送给消费者,如果没有,代理可以跳过该块,转到下一个块,然后重新迭代。AMER``AMER

每个区块都有一个标头,该标头可以包含一个 Bloom 过滤器,该标头告诉代理该块是否包含具有给定过滤器值的消息。 Bloom 过滤器是一种节省空间的概率数据结构,用于测试元素是否是集合的成员。 在我们的示例中,集合包含 、 和 ,元素是 。AMER``EMEA``APAC``AMER

下图说明了 3 个块的代理端过滤过程:

在这里插入图片描述

如上图所示,筛选器可能会返回误报,即不包含具有预期筛选器值的消息的块。 这是正常的,因为 Bloom 过滤器是概率性的。 不过,它们不会返回假阴性:如果过滤器显示没有(绿色)消息,我们可以确定它是真的。 我们必须忍受这种不确定性:有时我们可能会无缘无故地调度一些块,但这总比调度所有块要好。AMER

可以肯定的是,消费者可以接收到它不想要的消息:看看我们左边的第一个块,它包含消费者要求的(绿色)消息,但也包含(紫色)和(深蓝色)消息。 这就是为什么客户端也必须进行过滤的原因。AMER``EMEA``APAC

2. 客户端筛选

代理在传递消息时处理第一级过滤,但由于传递单位是块,因此使用者仍然可以接收它不想要的消息。 因此,客户端还必须执行一些筛选,这显然必须与订阅时设置的筛选值一致。

下图说明了一个消费者,它只需要(绿色)消息,并且必须执行最后一步的筛选:AMER

在这里插入图片描述

让我们看看这如何转化为应用程序代码。

3. JavaAPI示例

筛选不是侵入性的,可以作为跨领域问题进行处理,从而最大限度地减少对应用程序代码的影响。 以下是在使用流 Java 客户端(方法)声明生产者时设置从消息中提取过滤器值的逻辑:filterValue(Function<Message,String>)

Producer producer = environment.producerBuilder()
  .stream("invoices")
  .filterValue(msg -> msg.getApplicationProperties().get("region").toString())  
  .build();

在消费端,流 Java 客户端提供了设置过滤器值的方法和设置客户端过滤逻辑的方法。 声明使用者时,必须调用这两种方法:filter().values(String... filterValues)``filter().postFilter(Predicate<Message> filter)

Consumer consumer = environment.consumerBuilder()
  .stream("invoices")
  .filter()
    .values("AMER")  
    .postFilter(msg -> "AMER".equals(msg.getApplicationProperties().get("region")))  
  .builder()
  .messageHandler((ctx, msg) -> {
    // message processing code
  })
  .build();

如您所见,筛选不会更改发布和使用代码,而只是更改生产者和使用者的声明。

现在让我们看看如何以最合适的方式为用例配置流过滤。

4. 流过滤配置

关于流过滤的第一篇文章提供了一些数字(与不过滤相比,过滤节省了大约 80% 的带宽)。 流过滤的好处很大程度上取决于用例:入口速率、基数和过滤器值的分布,以及过滤器大小。 过滤器越大越好(错误率越小)。 可以为块中使用的筛选器大小设置一个介于 16 到 255 字节之间的值,默认值为 16 字节。

流 Java 客户端提供了在创建流时设置过滤器大小的方法(它在内部设置参数):filterSize(int)``stream-filter-size-bytes

environment.streamCreator()
  .stream("invoices")
  .filterSize(32)
  .create()

如何估算过滤器的尺寸? 网上有许多 Bloom 滤镜计算器。 参数包括哈希函数的数量(RabbitMQ 流过滤为 2 个)、预期元素的数量、错误率和大小。 您通常对元素的数量有所了解,因此您需要在错误率和过滤器大小之间找到权衡。

以下是一些示例:

  • 10 个值,16 个字节 => 2 % 错误率
  • 30 个值,16 个字节 => 14 % 错误率
  • 200 个值,128 个字节 => 10 % 的错误率

那么,过滤器越大越好? 不完全是:尽管 Bloom 过滤器在存储方面非常有效,因为它不存储元素,只是元素是否在集合中,过滤器大小是预先分配的。 如果将筛选器大小设置为 255,并且每个块至少包含一条具有筛选器值的消息,则每个块标头中将分配 255 个字节。 如果块包含许多大消息,这很好,因为与块大小相比,筛选器大小可以忽略不计。 但是,对于退化的情况,例如具有 10 字节消息和 10 字节筛选器值的单消息块,您最终会得到一个比实际数据更大的筛选器。

您必须尝试自己的用例,以估计过滤器大小对流大小的影响。 关于流过滤的第一篇文章提供了一个使用 Stream PerfTest 估计流大小的技巧(在不过滤的情况下读取整个流并查阅指标)。rabbitmq_stream_read_bytes_total

5. AMQP上的流过滤

尽管访问流的首选方式是流协议,但支持其他协议,例如 AMQP。 任何 AMQP 客户端库也支持流筛选:

  • 声明:将参数设置为 并使用 在声明流时设置筛选器大小。x-queue-type``stream``x-stream-filter-size-bytes
  • 发布:使用标头设置出站邮件的筛选器值。x-stream-filter-value
  • 使用:使用 consumer 参数设置预期的筛选器值(字符串或字符串数组),并使用 consumer 参数(可选)接收没有任何筛选值的消息(默认值为 )。客户端过滤仍然是必要的!x-stream-filter``x-stream-match-unfiltered``false

6. 总结

流过滤易于使用并从中受益,但有关内部的一些知识可用于优化其使用,尤其是对于棘手的用例。 请记住,客户端筛选是必需的,并且必须与配置的筛选器值一致。 这通常很容易实现。 还可以为给定的用例以最合适的方式设置过滤器大小。

3. 相关链接

参考:

Stream Filtering Internals | RabbitMQ

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1570922.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

linux练习-交互式传参

在shell脚本中&#xff0c;read 向用户显示一行文本并接受用户输入 #!/bin/bash read -p 依次输入你的姓名、年龄、家乡 name age home echo 我是$name,年龄$age,我来自$home

C++数据结构与算法——二叉树公共祖先问题

C第二阶段——数据结构和算法&#xff0c;之前学过一点点数据结构&#xff0c;当时是基于Python来学习的&#xff0c;现在基于C查漏补缺&#xff0c;尤其是树的部分。这一部分计划一个月&#xff0c;主要利用代码随想录来学习&#xff0c;刷题使用力扣网站&#xff0c;不定时更…

优衣库门店可视化与顾客拜访数据分组-Python数据分析项目

文章目录 项目背景1 引言2 数据说明 一、数据导入及预处理1 数据导入2 数据观察2.1 查看数据形状2.2 检查缺失值2.3 有无重复值 3 数据预处理3.1 获取详细地址3.2 批量获取经纬度3.2.1 安装geopy包3.2.2 批量获取经纬度 二、优衣库门店可视化1 数据获取1.1 读取地点数据1.2 筛选…

c语言数据结构(10)——冒泡排序、快速排序

欢迎来到博主的专栏——C语言数据结构 博主ID&#xff1a;代码小豪 文章目录 冒泡排序冒泡排序的代码及原理快速排序快速排序的代码和原理快速排序的其他排序方法非递归的快速排序 冒泡排序 相信冒泡排序是绝大多数计科学子接触的第一个排序算法。作为最简单、最容易理解的排序…

创新性的智慧公厕技术研发与应用

智慧公厕&#xff0c;作为城市基础设施的重要组成部分&#xff0c;扮演着提供舒适便捷卫生服务的角色。智慧公厕源头实力厂家广州中期科技有限公司&#xff0c;通过技术创新与应用升级&#xff0c;打造标杆性的智慧公厕整体解决方案。通过创新性的技术应用&#xff0c;智慧公厕…

字节新作:图像生成质量超越DiT

&#x1f31f;每日更新最新高质量论文&#xff0c;关注我&#xff0c;时刻关注最新大模型进展。&#x1f31f; &#x1f4cc; 元数据概览&#xff1a; 标题&#xff1a;Visual Autoregressive Modeling: Scalable Image Generation via Next-Scale Prediction作者&#xff1a…

Jupyter IPython帮助文档及其魔法命令

1.IPython 的帮助文档 使用 help() 使用 ? 使用 &#xff1f;&#xff1f; tab 自动补全 shift tab 查看参数和函数说明 2.运行外部 Python 文件 使用下面命令运行外部 Python 文件&#xff08;默认是当前目录&#xff0c;也可以使用绝对路径&#xff09; %run *.py …

数据湖概述:大数据演进阶段-数据湖

文章目录 一. 大数据发展过程1. 离线大数据平台2. Lambda架构&#xff1a;速度层批层3. Kappa架构&#xff1a;流批一体4. 大数据架构痛点总结 二. 数据湖助力于解决数据仓库痛点问题1. 数据湖特点2. 开源数据湖的架构 三. 数据湖和数据仓库理念的对比1. 数据湖和数据仓库对比2…

LeetCode 热题 100 | 贪心算法

目录 1 121. 买卖股票的最佳时机 2 55. 跳跃游戏 3 45. 跳跃游戏 II 4 763. 划分字母区间 菜鸟做题&#xff0c;语言是 C 1 121. 买卖股票的最佳时机 解题思路&#xff1a; 维护一个变量 max_pricemax_price 用于存储排在 i 天之后的股票最高价格第 i 天的最高利润 …

Day5-Hive的结构和优化、数据文件存储格式

Hive 窗口函数 案例 需求&#xff1a;连续三天登陆的用户数据 步骤&#xff1a; -- 建表 create table logins (username string,log_date string ) row format delimited fields terminated by ; -- 加载数据 load data local inpath /opt/hive_data/login into table log…

armlinux-外部中断

s3c2440的中断框图 如果我们单纯配置一个按键的外部中断&#xff0c;就不存在子中断与优先级的问题。 由于是按键的外部中断&#xff0c;通过引脚的高低电平来触发。所以我们要先配置引脚的功能。 我们使用按键1&#xff0c;终端源为EINT8&#xff0c;对应引脚GPG0 通过用户手…

Stable diffusion 加载扩展列表报错解决方法

项目场景&#xff1a; 在使用Stable diffusion webui时&#xff0c;使用扩展列表出现错误 问题描述 点击loadfrom后&#xff0c;出现加载扩展列表报错 原因分析&#xff1a; 下载的扩展的时候&#xff0c;都是github 的url&#xff0c;需要科学上网&#xff0c;如果不能科学…

深澜计费管理系统 任意文件读取漏洞复现

0x01 产品简介 深澜计费管理系统是是一套完善的领先的具有复杂生物型特征的弹性认证计费系统。系统主要由 AAA 认证计费平台、系统运营维护管理平台、用户及策略管理平台、用户自助服务平台、智能客户端模块、消息推送模块、数据统计模块组成。目前在全球为超过 2500 家客户提…

MicroPython 树莓派 RP2 入门教程

系列文章目录 前言 Raspberry Pi Pico 开发板&#xff08;图片来源&#xff1a;Raspberry Pi 基金会&#xff09;。 以下是 Raspberry Pi RP2xxx 板的快速参考资料。如果您是第一次使用该开发板&#xff0c;了解微控制器的概况可能会对您有所帮助&#xff1a; 一、关于 RP2xxx…

【项目实战】——商品管理的制作完整代码

&#x1f468;‍&#x1f4bb;个人主页&#xff1a;开发者-曼亿点 &#x1f468;‍&#x1f4bb; hallo 欢迎 点赞&#x1f44d; 收藏⭐ 留言&#x1f4dd; 加关注✅! &#x1f468;‍&#x1f4bb; 本文由 曼亿点 原创 &#x1f468;‍&#x1f4bb; 收录于专栏&#xff1a…

数据结构:详解【树和二叉树】

1. 树的概念及结构&#xff08;了解&#xff09; 1.1 树的概念 树是一种非线性的数据结构&#xff0c;它是由n&#xff08;n>0&#xff09;个有限结点组成一个具有层次关系的集合。把它叫做树是因为它看起来像一棵倒挂的树&#xff0c;也就是说它是根朝上&#xff0c;而叶朝…

Vue项目登录页实现获取短信验证码的功能

之前我们写过不需要调后端接口就获取验证码的方法,具体看《无需后端接口,用原生js轻松实现验证码》这个文章。现在我们管理后台有个需求,就是登录页面需要获取验证码,用户可以输入验证码后进行登录。效果如下,当我点击获取验证码后能获取短信验证码: 这里在用户点击获取…

Cortex-M7 异常处理与返回

1 前言 当CM3开始响应一个中断时&#xff0c;会在它小小的体内奔涌起三股暗流&#xff1a;  入栈&#xff1a; 把8个寄存器的值压入栈;  取向量&#xff1a;从向量表中找出对应的服务程序入口地址;  选择堆栈指针MSP/PSP&#xff0c;更新堆栈指针…

LeetCode 19.删除链表的倒数第N个结点

给你一个链表&#xff0c;删除链表的倒数第 n 个结点&#xff0c;并且返回链表的头结点。 示例 1&#xff1a; 输入&#xff1a;head [1,2,3,4,5], n 2 输出&#xff1a;[1,2,3,5] 示例 2&#xff1a; 输入&#xff1a;head [1], n 1 输出&#xff1a;[] 示例 3&#x…

Leetcode 39.组合总和

题目 思路 1.确定递归函数的返回值及参数&#xff1a; 返回值是void,参数这里还是先设定两个全局变量。一个是path存放符合条件单一结果。如&#xff1a;&#xff08;1&#xff0c;2&#xff09;。一个是result&#xff0c;是所有path的集合[(1,2),(1,3)…]。 此外再设定一个…