ZMQ中请求-应答模式的可靠性设计

news2025/1/22 16:00:17

一、什么是可靠性?

        要给可靠性下定义,我们可以先界定它的相反面——故障。如果我们可以处理某些类型的故障,那么我们的模型对于这些故障就是可靠的。下面我们就来列举分布式ZMQ应用程序中可能发生的问题,从可能性高的故障开始:

                1、应用程序代码是最大的故障来源。程序会崩溃或中止,停止对数据来源的响应,或是响应得慢,耗尽内存等。

                2、系统代码,如使用ZMQ编写的中间件,也会意外中止。系统代码应该要比应用程序代码更为可靠,但毕竟也有可能崩溃。特别是当系统代码与速度过慢的客户端交互时,很容易耗尽内存。

                3、消息队列溢出,典型的情况是系统代码中没有对蛮客户端做积极的处理,任由消息队列溢出。

                4、网络临时中断,造成消息丢失。这类错误ZMQ应用程序是无法及时发现的,因为ZMQ会自动进行重连。

                5、硬件系统崩溃,导致所有进程中止。

                6、网络会出现特殊情形的中断,如交换机的某个端口发生故障,导致部分网络无法访问。

                7、数据中心可能遭受雷击、地震、火灾、电压过载、冷却系统失效等。

        想要让软件系统规避上述所有的风险,需要大量的人力物力,故不在本指南的讨论范围之内。

        由于前五个故障类型涵盖了99.9%的情形(这一数据源自我近期进行的一项研究),所以我们会深入探讨。如果你的公司大到足以考虑最后两种情形,那请及时联系我,因为我正愁没钱将我家后院的大坑建成游泳池。

二、可靠性设计

        简单地来说,可靠性就是当程序发生故障时也能顺利地运行下去,这要比搭建一个消息系统来得困难得多。我们会根据ZMQ提供的每一种核心消息模式,来看看如何保障代码的持续运行。

                1、请求-应答模式:当服务端在处理请求是中断,客户端能够得知这一信息,并停止接收消息,转而选择等待重试、请求另一服务端等操作。这里我们暂不讨论客户端发生问题的情形。

                2、发布-订阅模式:如果客户端收到一些消息后意外中止,服务端是不知道这一情况的。发布-订阅模式中的订阅者不会返回任何消息给发布者。但是,订阅者可以通过其他方式联系服务端,如请求-应答模式,要求服务端重发消息。这里我们暂不讨论服务端发生问题的情形。此外,订阅者可以通过某些方式检查自身是否运行得过慢,并采取相应措施(向操作者发出警告、中止等)。

                3、管道模式:如果worker意外终止,任务分发器将无从得知。管道模式和发布-订阅模式类似,只朝一个方向发送消息。但是,下游的结果收集器可以检测哪项任务没有完成,并告诉任务分发器重新分配该任务。如果任务分发器或结果收集器意外中止了,那客户端发出的请求只能另作处理。所以说,系统代码真的要减少出错的几率,因为这很难处理。

        本章主要讲解请求-应答模式中的可靠性设计,其他模式将在后续章节中讲解。

        最基本的请求应答模式是REQ客户端发送一个同步的请求至REP服务端,这种模式的可靠性很低。如果服务端在处理请求时中止,那客户端会永远处于等待状态。

        相比TCP协议,ZMQ提供了自动重连机制、消息分发的负载均衡等。但是,在真实环境中这也是不够的。唯一可以完全信任基本请求-应答模式的应用场景是同一进程的两个线程之间进行通信,没有网络问题或服务器失效的情况。

        但是,只要稍加修饰,这种基本的请求-应答模式就能很好地在现实环境中工作了。我喜欢将其称为“海盗”模式。

        粗略地讲,客户端连接服务端有三种方式,每种方式都需要不同的可靠性设计:

                1、多个客户端直接和单个服务端进行通信。使用场景:只有一个单点服务器,所有客户端都需要和它通信。需处理的故障:服务器崩溃和重启;网络连接中断。

                2、多个客户端和单个队列装置通信,该装置将请求分发给多个服务端。使用场景:任务分发。需处理的故障:worker崩溃和重启,死循环,过载;队列装置崩溃和重启;网络中断。

                3、多个客户端直接和多个服务端通信,无中间件。使用场景:类似域名解析的分布式服务。需处理的故障:服务端崩溃和重启,死循环,过载;网络连接中断。

        以上每种设计都必须有所取舍,很多时候会混合使用。下面我们详细说明。

2.1、懒惰海盗模式

        我们可以通过在客户端进行简单的设置,来实现可靠的请求-应答模式。我暂且称之为“懒惰的海盗”(Lazy Pirate)模式。

        在接收应答时,我们不进行同步等待,而是做以下操作:

                1、对REQ套接字进行轮询,当消息抵达时才进行接收。

                2、请求超时后重发消息,循环多次。

                3、若仍无消息,则结束当前事务。

        使用REQ套接字时必须严格遵守发送-接收过程,因为它内部采用了一个有限状态机来限定状态,这一特性会让我们应用“海盗”模式时遇上一些麻烦。最简单的做法是将REQ套接字关闭重启,从而打破这一限定。

        lpclient: Lazy Pirate client in C

//
//  Lazy Pirate client
//  使用zmq_poll轮询来实现安全的请求-应答
//  运行时可随机关闭或重启lpserver程序
//
#include "czmq.h"
 
#define REQUEST_TIMEOUT     2500    //  毫秒, (> 1000!)
#define REQUEST_RETRIES     3       //  尝试次数
#define SERVER_ENDPOINT     "tcp://localhost:5555"
 
int main (void)
{
    zctx_t *ctx = zctx_new ();
    printf ("I: 正在连接服务器...\n");
    void *client = zsocket_new (ctx, ZMQ_REQ);
    assert (client);
    zsocket_connect (client, SERVER_ENDPOINT);
 
    int sequence = 0;
    int retries_left = REQUEST_RETRIES;
    while (retries_left && !zctx_interrupted) {
        //  发送一个请求,并开始接收消息
        char request [10];
        sprintf (request, "%d", ++sequence);
        zstr_send (client, request);
 
        int expect_reply = 1;
        while (expect_reply) {
            //  对套接字进行轮询,并设置超时时间
            zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
            int rc = zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
            if (rc == -1)
                break;          //  中断
 
            //  如果接收到回复则进行处理
            if (items [0].revents & ZMQ_POLLIN) {
                //  收到服务器应答,必须和请求时的序号一致
                char *reply = zstr_recv (client);
                if (!reply)
                    break;      //  Interrupted
                if (atoi (reply) == sequence) {
                    printf ("I: 服务器返回正常 (%s)\n", reply);
                    retries_left = REQUEST_RETRIES;
                    expect_reply = 0;
                }
                else
                    printf ("E: 服务器返回异常: %s\n",
                        reply);
 
                free (reply);
            }
            else
            if (--retries_left == 0) {
                printf ("E: 服务器不可用,取消操作\n");
                break;
            }
            else {
                printf ("W: 服务器没有响应,正在重试...\n");
                //  关闭旧套接字,并建立新套接字
                zsocket_destroy (ctx, client);
                printf ("I: 服务器重连中...\n");
                client = zsocket_new (ctx, ZMQ_REQ);
                zsocket_connect (client, SERVER_ENDPOINT);
                //  使用新套接字再次发送请求
                zstr_send (client, request);
            }
        }
    }
    zctx_destroy (&ctx);
    return 0;
}

        lpserver: Lazy Pirate server in C

//
//  Lazy Pirate server
//  将REQ套接字连接至 tcp://*:5555
//  和hwserver程序类似,除了以下两点:
//   - 直接输出请求内容
//   - 随机地降慢运行速度,或中止程序,模拟崩溃
//
#include "zhelpers.h"
 
int main (void)
{
    srandom ((unsigned) time (NULL));
 
    void *context = zmq_init (1);
    void *server = zmq_socket (context, ZMQ_REP);
    zmq_bind (server, "tcp://*:5555");
 
    int cycles = 0;
    while (1) {
        char *request = s_recv (server);
        cycles++;
 
        //  循环几次后开始模拟各种故障
        if (cycles > 3 && randof (3) == 0) {
            printf ("I: 模拟程序崩溃\n");
            break;
        }
        else
        if (cycles > 3 && randof (3) == 0) {
            printf ("I: 模拟CPU过载\n");
            sleep (2);
        }
        printf ("I: 正常请求 (%s)\n", request);
        sleep (1);              //  耗时的处理过程
        s_send (server, request);
        free (request);
    }
    zmq_close (server);
    zmq_term (context);
    return 0;
}

        运行这个测试用例时,可以打开两个控制台,服务端会随机发生故障,你可以看看客户端的反应。服务端的典型输出如下:

I: normal request (1)
I: normal request (2)
I: normal request (3)
I: simulating CPU overload
I: normal request (4)
I: simulating a crash

        客户端的输出是: 

I: connecting to server...
I: server replied OK (1)
I: server replied OK (2)
I: server replied OK (3)
W: no response from server, retrying...
I: connecting to server...
W: no response from server, retrying...
I: connecting to server...
E: server seems to be offline, abandoning

        客户端为每次请求都加上了序列号,并检查收到的应答是否和序列号一致,以保证没有请求或应答丢失,同一个应答收到多次或乱序。多运行几次实例,看看是否真的能够解决问题。现实环境中你不需要使用到序列号,那只是为了证明这一方式是可行的。

        客户端使用REQ套接字进行请求,并在发生问题时打开一个新的套接字来,绕过REQ强制的发送/接收过程。可能你会想用DEALER套接字,但这并不是一个好主意。首先,DEALER并不会像REQ那样处理信封(如果你不知道信封是什么,那更不能用DEALER了)。其次,你可能会获得你并不想得到的结果。

        这一方案的优劣是:

                1、优点:简单明了,容易实施;

                2、优点:可以方便地应用到现有的客户端和服务端程序中;

                3、优点:ZMQ有自动重连机制;

                4、缺点:单点服务发生故障时不能定位到新的可用服务。

2.2、简单海盗模式

        在第二种模式中,我们使用一个队列装置来扩展上述的“懒惰的海盗”模式,使客户端能够透明地和多个服务端通信。这里的服务端可以定义为worker。我们可以从最基础的模型开始,分阶段实施这个方案。

        在所有的海盗模式中,worker是无状态的,或者说存在着一个我们所不知道的公共状态,如共享数据库。队列装置的存在意味着worker可以在client毫不知情的情况下随意进出。一个worker死亡后,会有另一个worker接替它的工作。这种拓扑结果非常简洁,但唯一的缺点是队列装置本身会难以维护,可能造成单点故障。

        在第三章中,队列装置的基本算法是最近最少使用算法。那么,如果worker死亡或阻塞,我们需要做些什么?答案是很少很少。我们已经在client中加入了重试的机制,所以,使用基本的LRU队列就可以运作得很好了。这种做法也符合ZMQ的逻辑,所以我们可以通过在点对点交互中插入一个简单的队列装置来扩展它:

         我们可以直接使用“懒惰的海盗”模式中的client,以下是队列装置的代码:

        spqueue: Simple Pirate queue in C

//
//  简单海盗队列
//  
//  这个装置和LRU队列完全一致,不存在任何可靠性机制,依靠client的重试来保证装置的运行
//
#include "czmq.h"
 
#define LRU_READY   "\001"      //  消息:worker准备就绪
 
int main (void)
{
    //  准备上下文和套接字
    zctx_t *ctx = zctx_new ();
    void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
    void *backend = zsocket_new (ctx, ZMQ_ROUTER);
    zsocket_bind (frontend, "tcp://*:5555");    //  client端点
    zsocket_bind (backend,  "tcp://*:5556");    //  worker端点
 
    //  存放可用worker的队列
    zlist_t *workers = zlist_new ();
 
    while (1) {
        zmq_pollitem_t items [] = {
            { backend,  0, ZMQ_POLLIN, 0 },
            { frontend, 0, ZMQ_POLLIN, 0 }
        };
        //  当有可用的woker时,轮询前端端点
        int rc = zmq_poll (items, zlist_size (workers)? 2: 1, -1);
        if (rc == -1)
            break;              //  中断
 
        //  处理后端端点的worker消息
        if (items [0].revents & ZMQ_POLLIN) {
            //  使用worker的地址进行LRU排队
            zmsg_t *msg = zmsg_recv (backend);
            if (!msg)
                break;          //  中断
            zframe_t *address = zmsg_unwrap (msg);
            zlist_append (workers, address);
 
            //  如果消息不是READY,则转发给client
            zframe_t *frame = zmsg_first (msg);
            if (memcmp (zframe_data (frame), LRU_READY, 1) == 0)
                zmsg_destroy (&msg);
            else
                zmsg_send (&msg, frontend);
        }
        if (items [1].revents & ZMQ_POLLIN) {
            //  获取client请求,转发给第一个可用的worker
            zmsg_t *msg = zmsg_recv (frontend);
            if (msg) {
                zmsg_wrap (msg, (zframe_t *) zlist_pop (workers));
                zmsg_send (&msg, backend);
            }
        }
    }
    //  程序运行结束,进行清理
    while (zlist_size (workers)) {
        zframe_t *frame = (zframe_t *) zlist_pop (workers);
        zframe_destroy (&frame);
    }
    zlist_destroy (&workers);
    zctx_destroy (&ctx);
    return 0;
}

         以下是worker的代码,用到了“懒惰的海盗”服务,并将其调整为LRU模式(使用REQ套接字传递“已就绪”信号):

        spworker: Simple Pirate worker in C

//
//  简单海盗模式worker
//  
//  使用REQ套接字连接tcp://*:5556,使用LRU算法实现worker
//
#include "czmq.h"
#define LRU_READY   "\001"      //  消息:worker已就绪
 
int main (void)
{
    zctx_t *ctx = zctx_new ();
    void *worker = zsocket_new (ctx, ZMQ_REQ);
 
    //  使用随机符号来指定套接字标识,方便追踪
    srandom ((unsigned) time (NULL));
    char identity [10];
    sprintf (identity, "%04X-%04X", randof (0x10000), randof (0x10000));
    zmq_setsockopt (worker, ZMQ_IDENTITY, identity, strlen (identity));
    zsocket_connect (worker, "tcp://localhost:5556");
 
    //  告诉代理worker已就绪
    printf ("I: (%s) worker准备就绪\n", identity);
    zframe_t *frame = zframe_new (LRU_READY, 1);
    zframe_send (&frame, worker, 0);
 
    int cycles = 0;
    while (1) {
        zmsg_t *msg = zmsg_recv (worker);
        if (!msg)
            break;              //  中断
 
        //  经过几轮循环后,模拟各种问题
        cycles++;
        if (cycles > 3 && randof (5) == 0) {
            printf ("I: (%s) 模拟崩溃\n", identity);
            zmsg_destroy (&msg);
            break;
        }
        else
        if (cycles > 3 && randof (5) == 0) {
            printf ("I: (%s) 模拟CPU过载\n", identity);
            sleep (3);
            if (zctx_interrupted)
                break;
        }
        printf ("I: (%s) 正常应答\n", identity);
        sleep (1);              //  进行某些处理
        zmsg_send (&msg, worker);
    }
    zctx_destroy (&ctx);
    return 0;
}

        运行上述事例,启动多个worker,一个client,以及一个队列装置,顺序随意。你可以看到worker最终都会崩溃或死亡,client则多次重试并最终放弃。装置从来不会停止,你可以任意重启worker和client,这个模型可以和任意个worker、client交互。

2.3、偏执海盗模式

        “简单海盗队列”模式工作得非常好,主要是因为它只是两个现有模式的结合体。不过,它也有一些缺点:

                1、该模式无法处理队列的崩溃或重启。client会进行重试,但worker不会重启。虽然ZMQ会自动重连worker的套接字,但对于新启动的队列装置来说,由于worker并没有发送“已就绪”的消息,所以它相当于是不存在的。为了解决这一问题,我们需要从队列发送心跳给worker,这样worker就能知道队列是否已经死亡。

                2、队列没有检测worker是否已经死亡,所以当worker在处于空闲状态时死亡,队列装置只有在发送了某个请求之后才会将该worker从队列中移除。这时,client什么都不能做,只能等待。这不是一个致命的问题,但是依然是不够好的。所以,我们需要从worker发送心跳给队列装置,从而让队列得知worker什么时候消亡。

        我们使用一个名为“偏执的海盗模式”来解决上述两个问题。

        之前我们使用REQ套接字作为worker的套接字类型,但在偏执海盗模式中我们会改用DEALER套接字,从而使我们能够任意地发送和接受消息,而不是像REQ套接字那样必须完成发送-接受循环。而DEALER的缺点是我们必须自己管理消息信封。

         我们仍会使用懒惰海盗模式的client,以下是偏执海盗的队列装置代码:

        ppqueue: Paranoid Pirate queue in C

//
//  偏执海盗队列
//
#include "czmq.h"
 
#define HEARTBEAT_LIVENESS  3       //  心跳健康度,3-5是合理的
#define HEARTBEAT_INTERVAL  1000    //  单位:毫秒
 
//  偏执海盗协议的消息代码
#define PPP_READY       "\001"      //  worker已就绪
#define PPP_HEARTBEAT   "\002"      //  worker心跳
 
 
//  使用以下结构表示worker队列中的一个有效的worker
 
typedef struct {
    zframe_t *address;          //  worker的地址
    char *identity;             //  可打印的套接字标识
    int64_t expiry;             //  过期时间
} worker_t;
 
//  创建新的worker
static worker_t *
s_worker_new (zframe_t *address)
{
    worker_t *self = (worker_t *) zmalloc (sizeof (worker_t));
    self->address = address;
    self->identity = zframe_strdup (address);
    self->expiry = zclock_time () + HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS;
    return self;
}
 
//  销毁worker结构,包括标识
static void
s_worker_destroy (worker_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        worker_t *self = *self_p;
        zframe_destroy (&self->address);
        free (self->identity);
        free (self);
        *self_p = NULL;
    }
}
 
//  worker已就绪,将其移至列表末尾
static void
s_worker_ready (worker_t *self, zlist_t *workers)
{
    worker_t *worker = (worker_t *) zlist_first (workers);
    while (worker) {
        if (streq (self->identity, worker->identity)) {
            zlist_remove (workers, worker);
            s_worker_destroy (&worker);
            break;
        }
        worker = (worker_t *) zlist_next (workers);
    }
    zlist_append (workers, self);
}
 
//  返回下一个可用的worker地址
static zframe_t *
s_workers_next (zlist_t *workers)
{
    worker_t *worker = zlist_pop (workers);
    assert (worker);
    zframe_t *frame = worker->address;
    worker->address = NULL;
    s_worker_destroy (&worker);
    return frame;
}
 
//  寻找并销毁已过期的worker。
//  由于列表中最旧的worker排在最前,所以当找到第一个未过期的worker时就停止。
static void
s_workers_purge (zlist_t *workers)
{
    worker_t *worker = (worker_t *) zlist_first (workers);
    while (worker) {
        if (zclock_time () < worker->expiry)
            break;              //  worker未过期,停止扫描
 
        zlist_remove (workers, worker);
        s_worker_destroy (&worker);
        worker = (worker_t *) zlist_first (workers);
    }
}
 
 
int main (void)
{
    zctx_t *ctx = zctx_new ();
    void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
    void *backend  = zsocket_new (ctx, ZMQ_ROUTER);
    zsocket_bind (frontend, "tcp://*:5555");    //  client端点
    zsocket_bind (backend,  "tcp://*:5556");    //  worker端点
    //  List of available workers
    zlist_t *workers = zlist_new ();
 
    //  规律地发送心跳
    uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
 
    while (1) {
        zmq_pollitem_t items [] = {
            { backend,  0, ZMQ_POLLIN, 0 },
            { frontend, 0, ZMQ_POLLIN, 0 }
        };
        //  当存在可用worker时轮询前端端点
        int rc = zmq_poll (items, zlist_size (workers)? 2: 1,
            HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;              //  中断
 
        //  处理后端worker请求
        if (items [0].revents & ZMQ_POLLIN) {
            //  使用worker地址进行LRU路由
            zmsg_t *msg = zmsg_recv (backend);
            if (!msg)
                break;          //  中断
 
            //  worker的任何信号均表示其仍然存活
            zframe_t *address = zmsg_unwrap (msg);
            worker_t *worker = s_worker_new (address);
            s_worker_ready (worker, workers);
 
            //  处理控制消息,或者将应答转发给client
            if (zmsg_size (msg) == 1) {
                zframe_t *frame = zmsg_first (msg);
                if (memcmp (zframe_data (frame), PPP_READY, 1)
                &&  memcmp (zframe_data (frame), PPP_HEARTBEAT, 1)) {
                    printf ("E: invalid message from worker");
                    zmsg_dump (msg);
                }
                zmsg_destroy (&msg);
            }
            else
                zmsg_send (&msg, frontend);
        }
        if (items [1].revents & ZMQ_POLLIN) {
            //  获取下一个client请求,交给下一个可用的worker
            zmsg_t *msg = zmsg_recv (frontend);
            if (!msg)
                break;          //  中断
            zmsg_push (msg, s_workers_next (workers));
            zmsg_send (&msg, backend);
        }
 
        //  发送心跳给空闲的worker
        if (zclock_time () >= heartbeat_at) {
            worker_t *worker = (worker_t *) zlist_first (workers);
            while (worker) {
                zframe_send (&worker->address, backend,
                             ZFRAME_REUSE + ZFRAME_MORE);
                zframe_t *frame = zframe_new (PPP_HEARTBEAT, 1);
                zframe_send (&frame, backend, 0);
                worker = (worker_t *) zlist_next (workers);
            }
            heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
        }
        s_workers_purge (workers);
    }
 
    //  程序结束后进行清理
    while (zlist_size (workers)) {
        worker_t *worker = (worker_t *) zlist_pop (workers);
        s_worker_destroy (&worker);
    }
    zlist_destroy (&workers);
    zctx_destroy (&ctx);
    return 0;
}

        该队列装置使用心跳机制扩展了LRU模式,看起来很简单,但要想出这个主意还挺难的。下文会更多地介绍心跳机制。

        以下是偏执海盗的worker代码:

        ppworker: Paranoid Pirate worker in C

//
//  偏执海盗worker
//
#include "czmq.h"
 
#define HEARTBEAT_LIVENESS  3       //  合理值:3-5
#define HEARTBEAT_INTERVAL  1000    //  单位:毫秒
#define INTERVAL_INIT       1000    //  重试间隔
#define INTERVAL_MAX       32000    //  回退算法最大值
 
//  偏执海盗规范的常量定义
#define PPP_READY       "\001"      //  消息:worker已就绪
#define PPP_HEARTBEAT   "\002"      //  消息:worker心跳
 
//  返回一个连接至偏执海盗队列装置的套接字
 
static void *
s_worker_socket (zctx_t *ctx) {
    void *worker = zsocket_new (ctx, ZMQ_DEALER);
    zsocket_connect (worker, "tcp://localhost:5556");
 
    //  告知队列worker已准备就绪
    printf ("I: worker已就绪\n");
    zframe_t *frame = zframe_new (PPP_READY, 1);
    zframe_send (&frame, worker, 0);
 
    return worker;
}
 
int main (void)
{
    zctx_t *ctx = zctx_new ();
    void *worker = s_worker_socket (ctx);
 
    //  如果心跳健康度为零,则表示队列装置已死亡
    size_t liveness = HEARTBEAT_LIVENESS;
    size_t interval = INTERVAL_INIT;
 
    //  规律地发送心跳
    uint64_t heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
 
    srandom ((unsigned) time (NULL));
    int cycles = 0;
    while (1) {
        zmq_pollitem_t items [] = { { worker,  0, ZMQ_POLLIN, 0 } };
        int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;              //  中断
 
        if (items [0].revents & ZMQ_POLLIN) {
            //  获取消息
            //  - 3段消息,信封+内容,表示一个请求
            //  - 1段消息,表示心跳
            zmsg_t *msg = zmsg_recv (worker);
            if (!msg)
                break;          //  中断
 
            if (zmsg_size (msg) == 3) {
                //  若干词循环后模拟各种问题
                cycles++;
                if (cycles > 3 && randof (5) == 0) {
                    printf ("I: 模拟崩溃\n");
                    zmsg_destroy (&msg);
                    break;
                }
                else
                if (cycles > 3 && randof (5) == 0) {
                    printf ("I: 模拟CPU过载\n");
                    sleep (3);
                    if (zctx_interrupted)
                        break;
                }
                printf ("I: 正常应答\n");
                zmsg_send (&msg, worker);
                liveness = HEARTBEAT_LIVENESS;
                sleep (1);              //  做一些处理工作
                if (zctx_interrupted)
                    break;
            }
            else
            if (zmsg_size (msg) == 1) {
                zframe_t *frame = zmsg_first (msg);
                if (memcmp (zframe_data (frame), PPP_HEARTBEAT, 1) == 0)
                    liveness = HEARTBEAT_LIVENESS;
                else {
                    printf ("E: 非法消息\n");
                    zmsg_dump (msg);
                }
                zmsg_destroy (&msg);
            }
            else {
                printf ("E: 非法消息\n");
                zmsg_dump (msg);
            }
            interval = INTERVAL_INIT;
        }
        else
        if (--liveness == 0) {
            printf ("W: 心跳失败,无法连接队列装置\n");
            printf ("W: %zd 毫秒后进行重连...\n", interval);
            zclock_sleep (interval);
 
            if (interval < INTERVAL_MAX)
                interval *= 2;
            zsocket_destroy (ctx, worker);
            worker = s_worker_socket (ctx);
            liveness = HEARTBEAT_LIVENESS;
        }
 
        //  适时发送心跳给队列
        if (zclock_time () > heartbeat_at) {
            heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
            printf ("I: worker心跳\n");
            zframe_t *frame = zframe_new (PPP_HEARTBEAT, 1);
            zframe_send (&frame, worker, 0);
        }
    }
    zctx_destroy (&ctx);
    return 0;
}

        几点说明:

                1、代码中包含了几处失败模拟,和先前一样。这会让代码极难维护,所以当投入使用时,应当移除这些模拟代码。

                2、偏执海盗模式中队列的心跳有时会不正常,下文会讲述这一点。

                3、worker使用了一种类似于懒惰海盗client的重试机制,但有两点不同:

                        1、回退算法设置;

                        2、永不言弃。

        尝试运行以下代码,跑通流程:

ppqueue &
for i in 1 2 3 4; do
    ppworker &
    sleep 1
done
lpclient &

        你会看到worker逐个崩溃,client在多次尝试后放弃。你可以停止并重启队列装置,client和worker会相继重连,并正确地发送、处理和接收请求,顺序不会混乱。所以说,整个通信过程只有两种情形:交互成功,或client最终放弃。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/59248.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

[附源码]计算机毕业设计springboot演唱会门票售卖系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

总结:原生servlet请求转发url与请求重定向url的使用区别

总结&#xff1a;原生servlet请求转发url与请求重定向url的使用区别一演示前提&#xff1a;1.演示案例的项目架构如图&#xff1a;2.设置web应用的映射根目录&#xff1a;/lmf&#xff0c;当然也可以不设置。二什么叫请求转发、请求重定向&#xff1f;1.请求转发解释图2. forwa…

Windows 文件共享功能使用方法, 局域网多台电脑之间传送文件

设想一下&#xff0c;家里或者公司有多台电脑&#xff0c;连接同一个Wifi&#xff0c;也就是处于同一个局域网中。 在不能使用微信、网盘的文件传输功能的情况下&#xff0c;这多台电脑之间&#xff0c;就只能用U盘传送数据吗&#xff1f; 不。Windows系统中已经提供了文件共享…

关于DDR协议一些操作的理解1

整体流程: 一些基本概念: 1.p_bank和l_bank 2.rank和bank 3.DIMM和SIMM 4.DLL概念: DDR控制器架构: 时钟频率对比: (1)

(1-线性回归问题)RBF神经网络

直接看公式&#xff0c;本质上就是非线性变换后的线性变化&#xff08;RBF神经网络的思想是将低维空间非线性不可分问题转换成高维空间线性可分问题&#xff09; Deeplearning Algorithms tutorial 谷歌的人工智能位于全球前列&#xff0c;在图像识别、语音识别、无人驾驶等技…

wy的leetcode刷题记录_Day56

wy的leetcode刷题记录_Day56 声明 本文章的所有题目信息都来源于leetcode 如有侵权请联系我删掉! 时间&#xff1a;2022-11-30 前言 目录wy的leetcode刷题记录_Day56声明前言895. 最大频率栈题目介绍思路代码收获236. 二叉树的最近公共祖先题目介绍思路代码收获895. 最大频率…

React项目中Manifest: Line: 1, column: 1, Syntax error的解决方法

大家好&#xff0c;今天和大家分享一个React项目中的一个小报错的解决方法。 在创建了一个项目后会有几个文件 public ---- 静态资源文件夹 favicon.ico ------ 网站页签图标 index.html -------- 主页面 logo192.png ------- logo图 logo512.png ------- logo图 manifest.js…

如何将C/C++代码转成webassembly

概述 WebAssembly/wasm WebAssembly 或者 wasm 是一个可移植、体积小、加载快并且兼容 Web 的全新格式 官网 &#xff1a; WebAssembly 快速上手&#xff1a; I want to… - WebAssemblyhttps://webassembly.org/getting-started/developers-guide/ 其实官网写的很详细&#xf…

局域网综合设计-----计算机网络

局域网综合设计 信息楼的配置 拓扑图 配置 全部在三层交换机配置 1.创建两个全局地址池vlan 52和valn53 全局地址池vlan52 全局地址池vlan53 2给vlan 52 和53 配置IP 地址 给vlan52配置ip并开启vlan52从全局地址池获取IP 子网 dns 给vlan53配置ip并开启vlan53从全局…

Android入门第37天-在子线程中调用Handler

简介 前一章我们以一个简单的小动画来解释了Handler。 这章我们会介绍在子线程里写Handler。如果是Handler写在了子线程中的话,我们就需要自己创建一个Looper对象了&#xff1a;创建的流程如下: 直接调用Looper.prepare()方法即可为当前线程创建Looper对象,而它的构造器会创…

Java并发编程—线程池

文章目录线程池什么是线程池线程池优点&#xff1a;线程复用技术线程池的实现原理是什么线程池执行任务的流程&#xff1f;线程池如何知道一个线程的任务已经执行完成线程池的核心参数拒绝策略线程池类型&#xff08;常用线程池&#xff09;阻塞队列执行execute()方法和submit(…

[附源码]计算机毕业设计springboot医疗纠纷处理系统

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

MySQL统计函数count详解

MySQL统计函数count详解1. count()概述2. count(1)和count(*)和count(列名)的区别3. count(*)的实现方式1. count()概述 count() 是一个聚合函数&#xff0c;返回指定匹配条件的行数。开发中常用来统计表中数据&#xff0c;全部数据&#xff0c;不为null数据&#xff0c;或者去…

yocto machine class解析之flashlayout-stm32mp

yocto machine class解析之flashlayout-stm32mp 上一篇文章中我们详细介绍了st-partitions-image class。里面根据配置生成了许多的分区镜像以及分区镜像的一些参数设置。本章节介绍的flashlayout class就会根据上面生成的这些参数来生成特定的.tsv刷机文件供ST的刷机工具使用…

Bootstrap5 容器

我们可以使用以下两个容器类&#xff1a; .container 类用于固定宽度并支持响应式布局的容器。.container-fluid 类用于 100% 宽度&#xff0c;占据全部视口&#xff08;viewport&#xff09;的容器。固定宽度 .container 类用于创建固定宽度的响应式页面。 注意&#xff1a…

[node文件的上传和下载]一.node实现文件上传;二、Express实现文件下载;三、遍历下载文件夹下的文件,拼接成一个下载的url,传递到前端

目录 一.node实现文件上传 1.FormData对象&#xff1a;以对象的方式来表示页面中的表单&#xff0c;又称为表单对象。以key:value的方式来保存数据&#xff0c;XMLHttpRequest对象可以轻松的将表单对象发送到服务器端 &#xff08;1&#xff09;是一个构造函数&#xff1a;ne…

LabVIEW在应用程序和接口中使用LabVIEW类和接口

LabVIEW在应用程序和接口中使用LabVIEW类和接口 LabVIEW类和接口是用户定义的数据类型。LabVIEW类和接口开发人员创建并发布这些数据类型。LabVIEW类或接口用户无需了解如何创建LabVIEW类或接口&#xff0c;但必须了解应用程序中通过类或接口定义的数据类型应当如何使用&#…

通过java代码实现对json字符串的格式美化(完整版)

一、前言 之前转载过一篇文章&#xff0c;也是有关于通过java代码实现对json字符串的格式美化&#xff0c;但是那篇文章的实现还不够完善&#xff0c;比如其对字符串中出现特殊字符时&#xff0c;会出现转换失败。因此博主本人也是闲暇时在那份代码的基础上做了完善和补充。好…

[附源码]计算机毕业设计校园租赁系统Springboot程序

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

Tomcat服务器的简介以及安装

文章目录1.概念1.1 什么是Web服务器&#xff1f;1.2 静态资源和动态资源1.3 常用服务器产品2. Tomcat的安装2.1 下载2.2 解压安装2.3 Tomcat的目录结构2.4 Tomcat服务器的启动和关闭2.5 tomcat启动失败的原因2.5.1 查看报错原因2.5.2 错误原因2.5.2.1 查看JAVA_HOME配置是正确2…