ZMQ之自杀的蜗牛模式和黑箱模式

news2024/9/23 1:41:00

一、检测慢订阅者(自杀的蜗牛模式)       

        在使用发布-订阅模式的时候,最常见的问题之一是如何处理响应较慢的订阅者。理想状况下,发布者能以全速发送消息给订阅者,但现实中,订阅者会需要对消息做较长时间的处理,或者写得不够好,无法跟上发布者的脚步。

        如何处理慢订阅者?最好的方法当然是让订阅者高效起来,不过这需要额外的工作。以下是一些处理慢订阅者的方法:

                1、在发布者中贮存消息。这是Gmail的做法,如果过去的几小时里没有阅读邮件的话,它会把邮件保存起来。但在高吞吐量的应用中,发布者堆积消息往往会导致内存溢出,最终崩溃。特别是当同是有多个订阅者时,或者无法用磁盘来做一个缓冲,情况就会变得更为复杂。

                2、在订阅者中贮存消息。这种做法要好的多,其实ZMQ默认的行为就是这样的。如果非得有一个人会因为内存溢出而崩溃,那也只会是订阅者,而非发布者,这挺公平的。然而,这种做法只对瞬间消息量很大的应用才合理,订阅者只是一时处理不过来,但最终会赶上进度。但是,这还是没有解决订阅者速度过慢的问题。

                3、暂停发送消息。这也是Gmail的做法,当我的邮箱容量超过7.554GB时,新的邮件就会被Gmail拒收或丢弃。这种做法对发布者来说很有益,ZMQ中若设置了阈值(HWM),其默认行为也就是这样的。但是,我们仍不能解决慢订阅者的问题,我们只是让消息变得断断续续而已。

                4、断开与满订阅者的连接。这是hotmail的做法,如果连续两周没有登录,它就会断开,这也是为什么我正在使用第十五个hotmail邮箱。不过这种方案在ZMQ里是行不通的,因为对于发布者而言,订阅者是不可见的,无法做相应处理。

        看来没有一种经典的方式可以满足我们的需求,所以我们就要进行创新了。我们可以让订阅者自杀,而不仅仅是断开连接。这就是“自杀的蜗牛”模式。当订阅者发现自身运行得过慢时(对于慢速的定义应该是一个配置项,当达到这个标准时就大声地喊出来吧,让程序员知道),它会哀嚎一声,然后自杀。

        订阅者如何检测自身速度过慢呢?一种方式是为消息进行编号,并在发布者端设置阈值。当订阅者发现消息编号不连续时,它就知道事情不对劲了。这里的阈值就是订阅者自杀的值。

        这种方案有两个问题:一、如果我们连接的多个发布者,我们要如何为消息进行编号呢?解决方法是为每一个发布者设定一个唯一的编号,作为消息编号的一部分。二、如果订阅者使用ZMQ_SUBSRIBE选项对消息进行了过滤,那么我们精心设计的消息编号机制就毫无用处了。

        有些情形不会进行消息的过滤,所以消息编号还是行得通的。不过更为普遍的解决方案是,发布者为消息标注时间戳,当订阅者收到消息时会检测这个时间戳,如果其差别达到某一个值,就发出警报并自杀。

        当订阅者有自身的客户端或服务协议,需要保证最大延迟时间时,自杀的蜗牛模式会很合适。撤销一个订阅者也许并不是最周全的方案,但至少不会引发后续的问题。如果订阅者收到了过时的消息,那可能会对数据造成进一步的破坏,而且很难被发现。

        以下是自杀的蜗牛模式的最简实现:

        suisnail: Suicidal Snail in C

//
//  自杀的蜗牛模式
//
#include "czmq.h"
 
//  ---------------------------------------------------------------------
//  该订阅者会连接至发布者,接收所有的消息,
//  运行过程中它会暂停一会儿,模拟复杂的运算过程,
//  当发现收到的消息超过1秒的延迟时,就自杀。
 
#define MAX_ALLOWED_DELAY   1000    //  毫秒
 
static void
subscriber (void *args, zctx_t *ctx, void *pipe)
{
    //  订阅所有消息
    void *subscriber = zsocket_new (ctx, ZMQ_SUB);
    zsocket_connect (subscriber, "tcp://localhost:5556");
 
    //  获取并处理消息
    while (1) {
        char *string = zstr_recv (subscriber);
        int64_t clock;
        int terms = sscanf (string, "%" PRId64, &clock);
        assert (terms == 1);
        free (string);
 
        //  自杀逻辑
        if (zclock_time () - clock > MAX_ALLOWED_DELAY) {
            fprintf (stderr, "E: 订阅者无法跟进, 取消中\n");
            break;
        }
        //  工作一定时间
        zclock_sleep (1 + randof (2));
    }
    zstr_send (pipe, "订阅者中止");
}
 
 
//  ---------------------------------------------------------------------
//  发布者每毫秒发送一条用时间戳标记的消息
 
static void
publisher (void *args, zctx_t *ctx, void *pipe)
{
    //  准备发布者
    void *publisher = zsocket_new (ctx, ZMQ_PUB);
    zsocket_bind (publisher, "tcp://*:5556");
 
    while (1) {
        //  发送当前时间(毫秒)给订阅者
        char string [20];
        sprintf (string, "%" PRId64, zclock_time ());
        zstr_send (publisher, string);
        char *signal = zstr_recv_nowait (pipe);
        if (signal) {
            free (signal);
            break;
        }
        zclock_sleep (1);            //  等待1毫秒
    }
}
 
 
//  下面的代码会启动一个订阅者和一个发布者,当订阅者死亡时停止运行
//
int main (void)
{
    zctx_t *ctx = zctx_new ();
    void *pubpipe = zthread_fork (ctx, publisher, NULL);
    void *subpipe = zthread_fork (ctx, subscriber, NULL);
    free (zstr_recv (subpipe));
    zstr_send (pubpipe, "break");
    zclock_sleep (100);
    zctx_destroy (&ctx);
    return 0;
}

        几点说明:

                1、示例程序中的消息包含了系统当前的时间戳(毫秒)。在现实应用中,你应该使用时间戳作为消息头,并提供消息内容。

                2、示例程序中的发布者和订阅者是同一个进程的两个线程。在现实应用中,他们应该是两个不同的进程。示例中这么做只是为了演示的方便。

二、高速订阅者(黑箱模式)

        发布-订阅模式的一个典型应用场景是大规模分布式数据处理。如要处理从证券市场上收集到的数据,可以在证券交易系统上设置一个发布者,获取价格信息,并发送给一组订阅者。如果我们有很多订阅者,我们可以使用TCP。如果订阅者到达一定的量,那我们就应该使用可靠的广播协议,如pgm。

        假设我们的发布者每秒产生10万条100个字节的消息。在剔除了不需要的市场信息后,这个比率还是比较合理的。现在我们需要记录一天的数据(8小时约有250GB),再将其传入一个模拟网络,即一组订阅者。虽然10万条数据对ZMQ来说很容易处理,但我们需要更高的速度。

        假设我们有多台机器,一台做发布者,其他的做订阅者。这些机器都是8核的,发布者那台有12核。

        在我们开始发布消息时,有两点需要注意:

                1、即便只是处理很少的数据,订阅者仍有可能跟不上发布者的速度;

                2、当处理到6M/s的数据量时,发布者和订阅者都有可能达到极限。

        首先,我们需要将订阅者设计为一种多线程的处理程序,这样我们就能在一个线程中读取消息,使用其他线程来处理消息。一般来说,我们对每种消息的处理方式都是不同的。这样一来,订阅者可以对收到的消息进行一次过滤,如根据头信息来判别。当消息满足某些条件,订阅者会将消息交给worker处理。用ZMQ的语言来说,订阅者会将消息转发给worker来处理。

        这样一来,订阅者看上去就像是一个队列装置,我们可以用各种方式去连接队列装置和worker。如我们建立单向的通信,每个worker都是相同的,可以使用PUSH和PULL套接字,分发的工作就交给ZMQ吧。这是最简单也是最快速的方式:

        订阅者和发布者之间的通信使用TCP或PGM协议,订阅者和worker的通信由于是在同一个进程中完成的,所以使用inproc协议。

        下面我们看看如何突破瓶颈。由于订阅者是单线程的,当它的CPU占用率达到100%时,它无法使用其他的核心。单线程程序总是会遇到瓶颈的,不管是2M、6M还是更多。我们需要将工作量分配到不同的线程中去,并发地执行。

        很多高性能产品使用的方案是分片,就是将工作量拆分成独立并行的流。如,一半的专题数据由一个流媒体传输,另一半由另一个流媒体传输。我们可以建立更多的流媒体,但如果CPU核心数不变,那就没有必要了。
        让我们看看如何将工作量分片为两个流:

        要让两个流全速工作,需要这样配置ZMQ:

                1、使用两个I/O线程,而不是一个;

                2、使用两个独立的网络接口;

                3、每个I/O线程绑定至一个网络接口;

                4、两个订阅者线程,分别绑定至一个核心;

                5、使用两个SUB套接字;

                6、剩余的核心供worker使用;

                7、worker线程同时绑定至两个订阅者线程的PUSH套接字。

        创建的线程数量应和CPU核心数一致,如果我们建立的线程数量超过核心数,那其处理速度只会减少。另外,开放多个I/O线程也是没有必要的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/57588.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

springboot如何增加 application.yml配置文件

新建springboot 项目,默认项目的配置文件为application.properties。 需要将application.properties 修改为application.yml配置文件。 注意: 我发现直接将application.properties文件重命名为application.yml。 新的application.yml没有配置功能的属…

Compose 动画艺术探索之属性动画

本篇文章是此专栏的第三篇文章,如果想阅读前两篇文章的话请点击下方链接: Compose 动画艺术探索之瞅下 Compose 的动画Compose 动画艺术探索之可见性动画 Compose的属性动画 属性动画是通过不断地修改值来实现的,而初始值和结束值之间的过…

Java项目:ssm实验室设备管理系统

作者主页:源码空间站2022 简介:Java领域优质创作者、Java项目、学习资料、技术互助 文末获取源码 项目介绍 ssm实验室设备管理系统。前台jsplayuieasyui等框架渲染数据、后台java语言搭配ssm(spring、springmvc、mybatis、maven) 数据库mysql5.7、8.0版…

java - 数据结构,双向链表 - LinkedList

一、双向链表 (不带头) 无头双向链表:在Java的集合框架库中LinkedList底层实现就是无头双向循环链表 双向链表 和 单向链表的区别,就在于 双向 比 单向 多个 一个前驱地址。而且 你会发现 正因为有了前驱地址,所以所…

centos 安装和卸载 webmin

在centos里安装webmin 选择安装最新版本的安装包 官方下载路径可以查看下载版本http://download.webmin.com/download/yum/ wget http://download.webmin.com/download/yum/webmin-2.010-1.noarch.rpm如果安装提示 错误: 无法验证 prdownloads.sourceforge.net 的由 “/CUS…

15年架构师:再有面试官问你Kafka,就拿这篇学习笔记怼他

写在前面 Kafka是一个高度可扩展的消息系统,它在LinkedIn的中央数据库管理中扮演着十分重要的角色,因其可水平扩展和高吞吐率而被广泛使用,现在已经被多家不同类型的公司作为多种类型的数据管道和消息系统。 kafka的外在表现很像消息系统&a…

【图像分割】基于PCA结合模糊聚类算法FCM实现SAR图像分割附matlab代码

✅作者简介:热爱科研的Matlab仿真开发者,修心和技术同步精进,matlab项目合作可私信。 🍎个人主页:Matlab科研工作室 🍊个人信条:格物致知。 更多Matlab仿真内容点击👇 智能优化算法 …

[附源码]计算机毕业设计疫情网课管理系统Springboot程序

项目运行 环境配置: Jdk1.8 Tomcat7.0 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 项目技术: SSM mybatis Maven Vue 等等组成,B/S模式 M…

代码随想录刷题Day53 | 1143. 最长公共子序列 | 1035. 不相交的线 | 53. 最大子数组和

代码随想录刷题Day53 | 1143. 最长公共子序列 | 1035. 不相交的线 | 53. 最大子数组和 1143. 最长公共子序列 题目: 给定两个字符串 text1 和 text2,返回这两个字符串的最长 公共子序列 的长度。如果不存在 公共子序列 ,返回 0 。 一个字…

创建Hibernate项目与实现一个例子(idea版)

文章目录创建Hibernate项目一、前提准备二、创建项目三、实现一个例子创建Hibernate项目 一、前提准备 准备Hibernate开发必需的jar包。准备数据库的驱动jar包。准备junit.jar包。 这些包你可以去官网下载,也可以下载我已下载好的(本人目前使用的)。 https://pan…

【机器学习】评价指标 : 准确率,查准率与查全率

引言 在机器学习中,有几个评价指标,今天专门来介绍一下。之前的学习中主要是看模型,学算法,突然有一天发现,机器学习中的一些基本概念还是有点模糊,导致不知道如何综合评价模型的好坏。 这篇文章主要介绍如…

HTML5期末考核大作业:基于Html+Css+javascript的网页制作(化妆品公司网站制作)

🎉精彩专栏推荐 💭文末获取联系 ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 💂 作者主页: 【主页——🚀获取更多优质源码】 🎓 web前端期末大作业: 【📚毕设项目精品实战案例 (10…

【强化学习论文合集 | 2018年合集】一. ICML-2018 强化学习论文

强化学习(Reinforcement Learning, RL),又称再励学习、评价学习或增强学习,是机器学习的范式和方法论之一,用于描述和解决智能体(agent)在与环境的交互过程中通过学习策略以达成回报最大化或实现特定目标的问题。 本专栏整理了近几年国际顶级会议中,涉及强化学习(Rein…

我的数学学习回忆录——一个数学爱好者的反思(二)

早点关注我,精彩不错过!上回说到我在数学学习过程中走的种种弯路,相关内容请戳:我的数学学习回忆录——一个数学爱好者的反思(一)那在这样坎坷的旅程中,有没有给我带来意外惊喜,是不…

C++中的类型转换

文章目录一、隐式类型转换二、显式类型转换三、c风格的类型转换一、隐式类型转换 隐式类型转换,顾名思义,就是没有明显的声明要进行类型转换,隐式类型转换有可能造成数据精度的丢失,所以通常所做的类型转换都是从size小的数据到si…

哈夫曼编码(Huffman coding)

哈夫曼编码哈夫曼编码简介发展历史思想示例不足哈夫曼编码 简介 哈夫曼编码(Huffman Coding),又称霍夫曼编码,是一种编码方式,哈夫曼编码是可变字长编码(VLC)的一种。Huffman于1952年提出一种编码方法,该方法完全依据字符出现概…

[附源码]JAVA毕业设计计算机在线学习管理系统-(系统+LW)

[附源码]JAVA毕业设计计算机在线学习管理系统-(系统LW) 目运行 环境项配置: Jdk1.8 Tomcat8.5 Mysql HBuilderX(Webstorm也行) Eclispe(IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持)。 项…

实现自定义Spring Boot Starter

实现自定义Spring Boot Starter一、原理二、实战1 自定义 Spring Boot Starter1.1 添加maven依赖1.2 属性类AuthorProperties1.3 自动配置类AuthorAutoConfiguration1.4 业务逻辑AuthorServer1.5 spring.factories2 测试自定义的 Spring Boot Starter2.1 新建module或者新建工程…

什么软件能识别软件?学会这几个软件就可以了

在日常学习或工作中,我们经常会因为各种各样的原因,导致资料无法记全。比如上课的时候老师讲课速度过快、或者开会时需要整理的资料太多,我们做不到一心二用,边听边记。你们遇到类似情况的时候,都是怎么解决的呢&#…

0x02. Spring Boot 3 之SpringBoot 版本升级最佳实践指南

Spring Boot 3 之SpringBoot低版本升级最佳实践0x01 前言0x02 升级Spring Boot2.1 从Spring Boot 1.5.x 升级到Spring Boot 2.x2.1.1 依赖检查2.1.2 检查自定义配置2.1.3 检查系统需要2.1.4 升级到Spring Boot 2.x2.1.5 配置属性迁移2.2 从Spring Boot 2.7.x 升级到Spring Boot…