ZMQ请求应答模式之无中间件的可靠性--自由者模式

news2024/11/24 4:22:40

一、引言

        我们讲了那么多关于中间件的示例,好像有些违背“ZMQ是无中间件”的说法。但要知道在现实生活中,中间件一直是让人又爱又恨的东西。实践中的很多消息架构能都在使用中间件进行分布式架构的搭建,所以说最终的决定还是需要你自己去权衡的。这也是为什么虽然我能驾车10分钟到一个大型商场里购买五箱音量,但我还是会选择走10分钟到楼下的便利店里去买。这种出于经济方面的考虑(时间、精力、成本等)不仅在日常生活中很常见,在软件架构中也很重要。

        这就是为什么ZMQ不会强制使用带有中间件的架构,但仍提供了像内置装置这样的中间件供编程人员自由选用。

        这一节我们会打破以往使用中间件进行可靠性设计的架构,转而使用点对点架构,即自由者模式,来进行可靠的消息传输。我们的示例程序会是一个名称解析服务。ZMQ中的一个常见问题是:我们如何得知需要连接的端点?在代码中直接写入TCP/IP地址肯定是不合适的;使用配置文件会造成管理上的不便。试想一下,你要在上百台计算机中进行配置,只是为了让它们知道google.com的IP地址是74.125.230.82。

        一个ZMQ的名称解析服务需要实现的功能有:

                1、将逻辑名称解析为一个或多个端点地址,包括绑定端和连接端。实际使用时,名称服务会提供一组端点。

                2、允许我们在不同的环境下,即开发环境和生产环境,进行解析。

                3、该服务必须是可靠的,否则应用程序将无法连接到网络。

        为管家模式提供名称解析服务会很有用,虽然将代理程序的端点对外暴露也很简单,但是如果用好名称解析服务,那它将成为唯一一个对外暴露的接口,将更便于管理。

        我们需要处理的故障类型有:服务崩溃或重启、服务过载、网络因素等。为获取可靠性,我们必须建立一个服务群,当某个服务端崩溃后,客户端可以连接其他的服务端。实践中,两个服务端就已经足够了,但事实上服务端的数量可以是任意个。

        在这个架构中,大量客户端和少量服务端进行通信,服务端将套接字绑定至单独的端口,这和管家模式中的代理有很大不同。对于客户端来说,它有这样几种选择:

                1、客户端可以使用REQ套接字和懒惰海盗模式,但需要有一个机制防止客户端不断地请求已停止的服务端。

                2、客户端可以使用DEALER套接字,向所有的服务端发送请求。很简单,但并不太妙;

                3、客户端使用ROUTER套接字,连接特定的服务端。但客户端如何得知服务端的套接字标识呢?一种方式是让服务端主动连接客户端(很复杂),或者将服务端标识写入代码进行固化(很混乱)。

二、模型一:简单重试

        让我们先尝试简单的方案,重写懒惰海盗模式,让其能够和多个服务端进行通信。启动服务端时用命令行参数指定端口。然后启动多个服务端。

        flserver1: Freelance server, Model One in C

//
//  自由者模式 - 服务端 - 模型1
//  提供echo服务
//
#include "czmq.h"
 
int main (int argc, char *argv [])
{
    if (argc < 2) {
        printf ("I: syntax: %s <endpoint>\n", argv [0]);
        exit (EXIT_SUCCESS);
    }
    zctx_t *ctx = zctx_new ();
    void *server = zsocket_new (ctx, ZMQ_REP);
    zsocket_bind (server, argv [1]);
 
    printf ("I: echo服务端点: %s\n", argv [1]);
    while (TRUE) {
        zmsg_t *msg = zmsg_recv (server);
        if (!msg)
            break;          //  中断
        zmsg_send (&msg, server);
    }
    if (zctx_interrupted)
        printf ("W: 中断\n");
 
    zctx_destroy (&ctx);
    return 0;
}

        启动客户端,指定一个或多个端点:

        flclient1: Freelance client, Model One in C

//
//  自由者模式 - 客户端 - 模型1
//  使用REQ套接字请求一个或多个服务端
//
#include "czmq.h"
 
#define REQUEST_TIMEOUT     1000
#define MAX_RETRIES         3       //  尝试次数
 
 
static zmsg_t *
s_try_request (zctx_t *ctx, char *endpoint, zmsg_t *request)
{
    printf ("I: 在端点 %s 上尝试请求echo服务...\n", endpoint);
    void *client = zsocket_new (ctx, ZMQ_REQ);
    zsocket_connect (client, endpoint);
 
    //  发送请求,并等待应答
    zmsg_t *msg = zmsg_dup (request);
    zmsg_send (&msg, client);
    zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
    zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
    zmsg_t *reply = NULL;
    if (items [0].revents & ZMQ_POLLIN)
        reply = zmsg_recv (client);
 
    //  关闭套接字
    zsocket_destroy (ctx, client);
    return reply;
}
 
 
int main (int argc, char *argv [])
{
    zctx_t *ctx = zctx_new ();
    zmsg_t *request = zmsg_new ();
    zmsg_addstr (request, "Hello world");
    zmsg_t *reply = NULL;
 
    int endpoints = argc - 1;
    if (endpoints == 0)
        printf ("I: syntax: %s <endpoint> ...\n", argv [0]);
    else
    if (endpoints == 1) {
        //  若只有一个端点,则尝试N次
        int retries;
        for (retries = 0; retries < MAX_RETRIES; retries++) {
            char *endpoint = argv [1];
            reply = s_try_request (ctx, endpoint, request);
            if (reply)
                break;          //  成功
            printf ("W: 没有收到 %s 的应答, 准备重试...\n", endpoint);
        }
    }
    else {
        //  若有多个端点,则每个尝试一次
        int endpoint_nbr;
        for (endpoint_nbr = 0; endpoint_nbr < endpoints; endpoint_nbr++) {
            char *endpoint = argv [endpoint_nbr + 1];
            reply = s_try_request (ctx, endpoint, request);
            if (reply)
                break;          //  Successful
            printf ("W: 没有收到 %s 的应答\n", endpoint);
        }
    }
    if (reply)
        printf ("服务运作正常\n");
 
    zmsg_destroy (&request);
    zmsg_destroy (&reply);
    zctx_destroy (&ctx);
    return 0;
}

        可用如下命令运行:

flserver1 tcp://*:5555 &
flserver1 tcp://*:5556 &
flclient1 tcp://localhost:5555 tcp://localhost:5556

        客户端的核心机制是懒惰海盗模式,即获得一次成功的应答后就结束。会有两种情况:

                1、如果只有一个服务端,客户端会再尝试N次后停止,这和懒惰海盗模式的逻辑一致。

                2、如果有多个服务端,客户端会每个尝试一次,收到应答后停止。

        这种机制补充了海盗模式,使其能够克服只有一个服务端的情况。

        但是,这种设计无法在现实程序中使用:当有很多客户端连接了服务端,而主服务端崩溃了,那所有客户端都需要在超时后才能继续执行。

三、模型二:批量发送

        下面让我们使用DEALER套接字。我们的目标是能再最短的时间里收到一个应答,不能受主服务端崩溃的影响。可以采取以下措施:

                1、连接所有的服务端。

                2、当有请求时,一次性发送给所有的服务端。

                3、等待第一个应答。

                4、忽略其他应答。

        这样设计客户端时,当发送请求后,所有的服务端都会收到这个请求,并返回应答。如果某个服务端断开连接了,ZMQ可能会将请求发给其他服务端,导致某些服务端会收到两次请求。

        更麻烦的是客户端无法得知应答的数量,容易发生混乱。

        我们可以为请求进行编号,忽略不匹配的应答。我们要对服务端进行改造,返回的消息中需要包含请求编号:
        flserver2: Freelance server, Model Two in C

//
//  自由者模式 - 服务端 - 模型2
//  返回带有请求编号的OK信息
//
#include "czmq.h"
 
int main (int argc, char *argv [])
{
    if (argc < 2) {
        printf ("I: syntax: %s <endpoint>\n", argv [0]);
        exit (EXIT_SUCCESS);
    }
    zctx_t *ctx = zctx_new ();
    void *server = zsocket_new (ctx, ZMQ_REP);
    zsocket_bind (server, argv [1]);
 
    printf ("I: 服务已就绪 %s\n", argv [1]);
    while (TRUE) {
        zmsg_t *request = zmsg_recv (server);
        if (!request)
            break;          //  中断
        //  判断请求内容是否正确
        assert (zmsg_size (request) == 2);
 
        zframe_t *address = zmsg_pop (request);
        zmsg_destroy (&request);
 
        zmsg_t *reply = zmsg_new ();
        zmsg_add (reply, address);
        zmsg_addstr (reply, "OK");
        zmsg_send (&reply, server);
    }
    if (zctx_interrupted)
        printf ("W: interrupted\n");
 
    zctx_destroy (&ctx);
    return 0;
}

        客户端代码:

        flclient2: Freelance client, Model Two in C

//
//  自由者模式 - 客户端 - 模型2
//  使用DEALER套接字发送批量消息
//
#include "czmq.h"
 
//  超时时间
#define GLOBAL_TIMEOUT 2500
 
//  将客户端API封装成一个类
 
#ifdef __cplusplus
extern "C" {
#endif
 
//  声明类结构
typedef struct _flclient_t flclient_t;
 
flclient_t *
    flclient_new (void);
void
    flclient_destroy (flclient_t **self_p);
void
    flclient_connect (flclient_t *self, char *endpoint);
zmsg_t *
    flclient_request (flclient_t *self, zmsg_t **request_p);
 
#ifdef __cplusplus
}
#endif
 
 
int main (int argc, char *argv [])
{
    if (argc == 1) {
        printf ("I: syntax: %s <endpoint> ...\n", argv [0]);
        exit (EXIT_SUCCESS);
    }
    //  创建自由者模式客户端
    flclient_t *client = flclient_new ();
 
    //  连接至各个端点
    int argn;
    for (argn = 1; argn < argc; argn++)
        flclient_connect (client, argv [argn]);
 
    //  发送一组请求,并记录时间
    int requests = 10000;
    uint64_t start = zclock_time ();
    while (requests--) {
        zmsg_t *request = zmsg_new ();
        zmsg_addstr (request, "random name");
        zmsg_t *reply = flclient_request (client, &request);
        if (!reply) {
            printf ("E: 名称解析服务不可用,正在退出\n");
            break;
        }
        zmsg_destroy (&reply);
    }
    printf ("平均请求时间: %d 微秒\n",
        (int) (zclock_time () - start) / 10);
 
    flclient_destroy (&client);
    return 0;
}
 
 
 
//  --------------------------------------------------------------------
//  类结构
 
struct _flclient_t {
    zctx_t *ctx;        //  上下文
    void *socket;       //  用于和服务端通信的DEALER套接字
    size_t servers;     //  以连接的服务端数量
    uint sequence;      //  已发送的请求数
};
 
 
//  --------------------------------------------------------------------
//  Constructor
 
flclient_t *
flclient_new (void)
{
    flclient_t
        *self;
 
    self = (flclient_t *) zmalloc (sizeof (flclient_t));
    self->ctx = zctx_new ();
    self->socket = zsocket_new (self->ctx, ZMQ_DEALER);
    return self;
}
 
//  --------------------------------------------------------------------
//  析构函数
 
void
flclient_destroy (flclient_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        flclient_t *self = *self_p;
        zctx_destroy (&self->ctx);
        free (self);
        *self_p = NULL;
    }
}
 
//  --------------------------------------------------------------------
//  连接至新的服务端端点
 
void
flclient_connect (flclient_t *self, char *endpoint)
{
    assert (self);
    zsocket_connect (self->socket, endpoint);
    self->servers++;
}
 
//  --------------------------------------------------------------------
//  发送请求,接收应答
//  发送后销毁请求
 
zmsg_t *
flclient_request (flclient_t *self, zmsg_t **request_p)
{
    assert (self);
    assert (*request_p);
    zmsg_t *request = *request_p;
 
    //  向消息添加编号和空帧
    char sequence_text [10];
    sprintf (sequence_text, "%u", ++self->sequence);
    zmsg_pushstr (request, sequence_text);
    zmsg_pushstr (request, "");
 
    //  向所有已连接的服务端发送请求
    int server;
    for (server = 0; server < self->servers; server++) {
        zmsg_t *msg = zmsg_dup (request);
        zmsg_send (&msg, self->socket);
    }
    //  接收来自任何服务端的应答
    //  因为我们可能poll多次,所以每次都进行计算
    zmsg_t *reply = NULL;
    uint64_t endtime = zclock_time () + GLOBAL_TIMEOUT;
    while (zclock_time () < endtime) {
        zmq_pollitem_t items [] = { { self->socket, 0, ZMQ_POLLIN, 0 } };
        zmq_poll (items, 1, (endtime - zclock_time ()) * ZMQ_POLL_MSEC);
        if (items [0].revents & ZMQ_POLLIN) {
            //  应答内容是 [empty][sequence][OK]
            reply = zmsg_recv (self->socket);
            assert (zmsg_size (reply) == 3);
            free (zmsg_popstr (reply));
            char *sequence = zmsg_popstr (reply);
            int sequence_nbr = atoi (sequence);
            free (sequence);
            if (sequence_nbr == self->sequence)
                break;
        }
    }
    zmsg_destroy (request_p);
    return reply;
}

        几点说明:

                1、客户端被封装成了一个API类,将复杂的代码都包装了起来。

                2、户端会在几秒之后放弃寻找可用的服务端。

                3、客户端需要创建一个合法的REP信封,所以需要添加一个空帧。

        程序中,客户端发出了1万次名称解析请求(虽然是假的),并计算平均耗费时间。在我的测试机上,有一个服务端时,耗时60微妙;三个时80微妙。

        该模型的优缺点是:

                1、优点:简单,容易理解和编写。

                2、优点:它工作迅速,有重试机制。

                3、缺点:占用了额外的网络带宽。

                4、缺点:我们不能为服务端设置优先级,如主服务、次服务等。

                5、缺点:服务端不能同时处理多个请求。

四、模型三:Complex and Nasty

        批量发送模型看起来不太真实,那就让我们来探索最后这个极度复杂的模型。很有可能在编写完之后我们又会转而使用批量发送,哈哈,这就是我的作风。

        我们可以将客户端使用的套接字更换为ROUTER,让我们能够向特定的服务端发送请求,停止向已死亡的服务端发送请求,从而做得尽可能地智能。我们还可以将服务端的套接字更换为ROUTER,从而突破单线程的瓶颈。

        但是,使用ROUTER-ROUTER套接字连接两个瞬时套接字是不可行的,节点只有在收到第一条消息时才会为对方生成套接字标识。唯一的方法是让其中一个节点使用持久化的套接字,比较好的方式是让客户端知道服务端的标识,即服务端作为持久化的套接字。

        为了避免产生新的配置项,我们直接使用服务端的端点作为套接字标识。

        回想一下ZMQ套接字标识是如何工作的。服务端的ROUTER套接字为自己设置一个标识(在绑定之前),当客户端连接时,通过一个握手的过程来交换双方的标识。客户端的ROUTER套接字会先发送一条空消息,服务端为客户端生成一个随机的UUID。然后,服务端会向客户端发送自己的标识。

        这样一来,客户端就可以将消息发送给特定的服务端了。不过还有一个问题:我们不知道服务端会在什么时候完成这个握手的过程。如果服务端是在线的,那可能几毫秒就能完成。如果不在线,那可能需要很久很久。

        这里有一个矛盾:我们需要知道服务端何时连接成功且能够开始工作。自由者模式不像中间件模式,它的服务端必须要先发送请求后才能的应答。所以在服务端发送消息给客户端之前,客户端必须要先请求服务端,这看似是不可能的。

        我有一个解决方法,那就是批量发送。这里发送的不是真正的请求,而是一个试探性的心跳(PING-PONG)。当收到应答时,就说明对方是在线的。

        下面让我们制定一个协议,来定义自由者模式是如何传递这种心跳的:10/FLP | ZeroMQ RFC

        实现这个协议的服务端很方便,下面就是经过改造的echo服务:

        flserver3: Freelance server, Model Three in C

//
//  自由者模式 - 服务端 - 模型3
//  使用ROUTER-ROUTER套接字进行通信;单线程。
//
#include "czmq.h"
 
int main (int argc, char *argv [])
{
    int verbose = (argc > 1 && streq (argv [1], "-v"));
 
    zctx_t *ctx = zctx_new ();
 
    //  准备服务端套接字,其标识和端点名相同
    char *bind_endpoint = "tcp://*:5555";
    char *connect_endpoint = "tcp://localhost:5555";
    void *server = zsocket_new (ctx, ZMQ_ROUTER);
    zmq_setsockopt (server,
        ZMQ_IDENTITY, connect_endpoint, strlen (connect_endpoint));
    zsocket_bind (server, bind_endpoint);
    printf ("I: 服务端已准备就绪 %s\n", bind_endpoint);
 
    while (!zctx_interrupted) {
        zmsg_t *request = zmsg_recv (server);
        if (verbose && request)
            zmsg_dump (request);
        if (!request)
            break;          //  中断
 
        //  Frame 0: 客户端标识
        //  Frame 1: 心跳,或客户端控制信息帧
        //  Frame 2: 请求内容
        zframe_t *address = zmsg_pop (request);
        zframe_t *control = zmsg_pop (request);
        zmsg_t *reply = zmsg_new ();
        if (zframe_streq (control, "PONG"))
            zmsg_addstr (reply, "PONG");
        else {
            zmsg_add (reply, control);
            zmsg_addstr (reply, "OK");
        }
        zmsg_destroy (&request);
        zmsg_push (reply, address);
        if (verbose && reply)
            zmsg_dump (reply);
        zmsg_send (&reply, server);
    }
    if (zctx_interrupted)
        printf ("W: 中断\n");
 
    zctx_destroy (&ctx);
    return 0;
}

        但是,自由者模式的客户端会变得大一写。为了清晰期间,我们将其拆分为两个类来实现。首先是在上层使用的程序:

        flclient3: Freelance client, Model Three in C

//
//  自由者模式 - 客户端 - 模型3
//  使用flcliapi类来封装自由者模式
//
//  直接编译,不建类库
#include "flcliapi.c"
 
int main (void)
{
    //  创建自由者模式实例
    flcliapi_t *client = flcliapi_new ();
 
    //  链接至服务器端点
    flcliapi_connect (client, "tcp://localhost:5555");
    flcliapi_connect (client, "tcp://localhost:5556");
    flcliapi_connect (client, "tcp://localhost:5557");
 
    //  发送随机请求,计算时间
    int requests = 1000;
    uint64_t start = zclock_time ();
    while (requests--) {
        zmsg_t *request = zmsg_new ();
        zmsg_addstr (request, "random name");
        zmsg_t *reply = flcliapi_request (client, &request);
        if (!reply) {
            printf ("E: 名称解析服务不可用,正在退出\n");
            break;
        }
        zmsg_destroy (&reply);
    }
    printf ("平均执行时间: %d usec\n",
        (int) (zclock_time () - start) / 10);
 
    flcliapi_destroy (&client);
    return 0;
}

        下面是该模式复杂的实现过程:

        flcliapi: Freelance client API in C

/*  =====================================================================
    flcliapi - Freelance Pattern agent class
    Model 3: uses ROUTER socket to address specific services
 
    ---------------------------------------------------------------------
    Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
    Copyright other contributors as noted in the AUTHORS file.
 
    This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
 
    This is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or (at
    your option) any later version.
 
    This software is distributed in the hope that it will be useful, but
    WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    Lesser General Public License for more details.
 
    You should have received a copy of the GNU Lesser General Public
    License along with this program. If not, see
    <http://www.gnu.org/licenses/>.
    =====================================================================
*/
 
#include "flcliapi.h"
 
//  请求超时时间
#define GLOBAL_TIMEOUT  3000    //  msecs
//  心跳间隔
#define PING_INTERVAL   2000    //  msecs
//  判定服务死亡的时间
#define SERVER_TTL      6000    //  msecs
 
 
//  =====================================================================
//  同步部分,在应用程序层面运行
 
//  ---------------------------------------------------------------------
//  类结构
 
struct _flcliapi_t {
    zctx_t *ctx;        //  上下文
    void *pipe;         //  用于和主线程通信的套接字
};
 
//  这是运行后台代理程序的线程
static void flcliapi_agent (void *args, zctx_t *ctx, void *pipe);
 
 
//  ---------------------------------------------------------------------
//  构造函数
 
flcliapi_t *
flcliapi_new (void)
{
    flcliapi_t
        *self;
 
    self = (flcliapi_t *) zmalloc (sizeof (flcliapi_t));
    self->ctx = zctx_new ();
    self->pipe = zthread_fork (self->ctx, flcliapi_agent, NULL);
    return self;
}
 
//  ---------------------------------------------------------------------
//  析构函数
 
void
flcliapi_destroy (flcliapi_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        flcliapi_t *self = *self_p;
        zctx_destroy (&self->ctx);
        free (self);
        *self_p = NULL;
    }
}
 
//  ---------------------------------------------------------------------
//  连接至新服务器端点
//  消息内容:[CONNECT][endpoint]
 
void
flcliapi_connect (flcliapi_t *self, char *endpoint)
{
    assert (self);
    assert (endpoint);
    zmsg_t *msg = zmsg_new ();
    zmsg_addstr (msg, "CONNECT");
    zmsg_addstr (msg, endpoint);
    zmsg_send (&msg, self->pipe);
    zclock_sleep (100);      //  等待连接
}
 
//  ---------------------------------------------------------------------
//  发送并销毁请求,接收应答
 
zmsg_t *
flcliapi_request (flcliapi_t *self, zmsg_t **request_p)
{
    assert (self);
    assert (*request_p);
 
    zmsg_pushstr (*request_p, "REQUEST");
    zmsg_send (request_p, self->pipe);
    zmsg_t *reply = zmsg_recv (self->pipe);
    if (reply) {
        char *status = zmsg_popstr (reply);
        if (streq (status, "FAILED"))
            zmsg_destroy (&reply);
        free (status);
    }
    return reply;
}
 
 
//  =====================================================================
//  异步部分,在后台运行
 
//  ---------------------------------------------------------------------
//  单个服务端信息
 
typedef struct {
    char *endpoint;             //  服务端端点/套接字标识
    uint alive;                 //  是否在线
    int64_t ping_at;            //  下一次心跳时间
    int64_t expires;            //  过期时间
} server_t;
 
server_t *
server_new (char *endpoint)
{
    server_t *self = (server_t *) zmalloc (sizeof (server_t));
    self->endpoint = strdup (endpoint);
    self->alive = 0;
    self->ping_at = zclock_time () + PING_INTERVAL;
    self->expires = zclock_time () + SERVER_TTL;
    return self;
}
 
void
server_destroy (server_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        server_t *self = *self_p;
        free (self->endpoint);
        free (self);
        *self_p = NULL;
    }
}
 
int
server_ping (char *key, void *server, void *socket)
{
    server_t *self = (server_t *) server;
    if (zclock_time () >= self->ping_at) {
        zmsg_t *ping = zmsg_new ();
        zmsg_addstr (ping, self->endpoint);
        zmsg_addstr (ping, "PING");
        zmsg_send (&ping, socket);
        self->ping_at = zclock_time () + PING_INTERVAL;
    }
    return 0;
}
 
int
server_tickless (char *key, void *server, void *arg)
{
    server_t *self = (server_t *) server;
    uint64_t *tickless = (uint64_t *) arg;
    if (*tickless > self->ping_at)
        *tickless = self->ping_at;
    return 0;
}
 
 
//  ---------------------------------------------------------------------
//  后台处理程序信息
 
typedef struct {
    zctx_t *ctx;                //  上下文
    void *pipe;                 //  用于应用程序通信的套接字
    void *router;               //  用于服务端通信的套接字
    zhash_t *servers;           //  已连接的服务端
    zlist_t *actives;           //  在线的服务端
    uint sequence;              //  请求编号
    zmsg_t *request;            //  当前请求
    zmsg_t *reply;              //  当前应答
    int64_t expires;            //  请求过期时间
} agent_t;
 
agent_t *
agent_new (zctx_t *ctx, void *pipe)
{
    agent_t *self = (agent_t *) zmalloc (sizeof (agent_t));
    self->ctx = ctx;
    self->pipe = pipe;
    self->router = zsocket_new (self->ctx, ZMQ_ROUTER);
    self->servers = zhash_new ();
    self->actives = zlist_new ();
    return self;
}
 
void
agent_destroy (agent_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        agent_t *self = *self_p;
        zhash_destroy (&self->servers);
        zlist_destroy (&self->actives);
        zmsg_destroy (&self->request);
        zmsg_destroy (&self->reply);
        free (self);
        *self_p = NULL;
    }
}
 
//  当服务端从列表中移除时,回调该函数。
 
static void
s_server_free (void *argument)
{
    server_t *server = (server_t *) argument;
    server_destroy (&server);
}
 
void
agent_control_message (agent_t *self)
{
    zmsg_t *msg = zmsg_recv (self->pipe);
    char *command = zmsg_popstr (msg);
 
    if (streq (command, "CONNECT")) {
        char *endpoint = zmsg_popstr (msg);
        printf ("I: connecting to %s...\n", endpoint);
        int rc = zmq_connect (self->router, endpoint);
        assert (rc == 0);
        server_t *server = server_new (endpoint);
        zhash_insert (self->servers, endpoint, server);
        zhash_freefn (self->servers, endpoint, s_server_free);
        zlist_append (self->actives, server);
        server->ping_at = zclock_time () + PING_INTERVAL;
        server->expires = zclock_time () + SERVER_TTL;
        free (endpoint);
    }
    else
    if (streq (command, "REQUEST")) {
        assert (!self->request);    //  遵循请求-应答循环
        //  将请求编号和空帧加入消息顶部
        char sequence_text [10];
        sprintf (sequence_text, "%u", ++self->sequence);
        zmsg_pushstr (msg, sequence_text);
        //  获取请求消息的所有权
        self->request = msg;
        msg = NULL;
        //  设置请求过期时间
        self->expires = zclock_time () + GLOBAL_TIMEOUT;
    }
    free (command);
    zmsg_destroy (&msg);
}
 
void
agent_router_message (agent_t *self)
{
    zmsg_t *reply = zmsg_recv (self->router);
 
    //  第一帧是应答的服务端标识
    char *endpoint = zmsg_popstr (reply);
    server_t *server =
        (server_t *) zhash_lookup (self->servers, endpoint);
    assert (server);
    free (endpoint);
    if (!server->alive) {
        zlist_append (self->actives, server);
        server->alive = 1;
    }
    server->ping_at = zclock_time () + PING_INTERVAL;
    server->expires = zclock_time () + SERVER_TTL;
 
    //  第二帧是应答的编号
    char *sequence = zmsg_popstr (reply);
    if (atoi (sequence) == self->sequence) {
        zmsg_pushstr (reply, "OK");
        zmsg_send (&reply, self->pipe);
        zmsg_destroy (&self->request);
    }
    else
        zmsg_destroy (&reply);
}
 
 
//  ---------------------------------------------------------------------
//  异步的后台代理会维护一个服务端池,处理请求和应答。
 
static void
flcliapi_agent (void *args, zctx_t *ctx, void *pipe)
{
    agent_t *self = agent_new (ctx, pipe);
 
    zmq_pollitem_t items [] = {
        { self->pipe, 0, ZMQ_POLLIN, 0 },
        { self->router, 0, ZMQ_POLLIN, 0 }
    };
    while (!zctx_interrupted) {
        //  计算超时时间
        uint64_t tickless = zclock_time () + 1000 * 3600;
        if (self->request
        &&  tickless > self->expires)
            tickless = self->expires;
        zhash_foreach (self->servers, server_tickless, &tickless);
 
        int rc = zmq_poll (items, 2,
            (tickless - zclock_time ()) * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;              //  上下文对象被关闭
 
        if (items [0].revents & ZMQ_POLLIN)
            agent_control_message (self);
 
        if (items [1].revents & ZMQ_POLLIN)
            agent_router_message (self);
 
        //  如果我们需要处理一项请求,将其发送给下一个可用的服务端
        if (self->request) {
            if (zclock_time () >= self->expires) {
                //  请求超时
                zstr_send (self->pipe, "FAILED");
                zmsg_destroy (&self->request);
            }
            else {
                //  寻找可用的服务端
                while (zlist_size (self->actives)) {
                    server_t *server =
                        (server_t *) zlist_first (self->actives);
                    if (zclock_time () >= server->expires) {
                        zlist_pop (self->actives);
                        server->alive = 0;
                    }
                    else {
                        zmsg_t *request = zmsg_dup (self->request);
                        zmsg_pushstr (request, server->endpoint);
                        zmsg_send (&request, self->router);
                        break;
                    }
                }
            }
        }
        //  断开并删除已过期的服务端
        //  发送心跳给空闲服务器
        zhash_foreach (self->servers, server_ping, self->router);
    }
    agent_destroy (&self);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/51144.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

3.8、集线器与交换机的区别

1、早期总线型以太网 最初使用粗同轴电缆作为传输媒体&#xff0c;后来是用相对便宜的细同轴电缆 普遍认为有源器件不可靠&#xff0c;无缘的电缆线最可靠&#xff08;并没有那么可靠&#xff09; 2、只用双绞线和集线器 HUB 的星型以太网 主机中的以太网卡及集线器个接口使…

Old money风盛行,柯罗芭KLOVA演绎中式奢华

Ralph Lauren先生说过&#xff1a;“奢侈是一种感性的生活方式。它和本季推出什么新品无关。它更关乎个人风格和舒适、轻松的环境。奢侈品是质量和永恒的优雅”。Ralph lauren以一己之力托起Old money风格的半壁江山&#xff0c;它属于带着一丝上流社会的雅痞绅士&#xff0c;优…

一起学时序分析之建立/保持时间裕量

何为裕量&#xff1f; 裕量&#xff0c;英文名称叫做“Slack”。我们在Vivado实现后的报告中常常能看到这样一栏&#xff1a; 因为都是缩写&#xff0c;所以我们来解释一下前四栏的含义&#xff1a; WNS&#xff0c;即Worst Negative Slack&#xff0c;最差负时序裕量。这个表…

leetcode:1579. 保证图可完全遍历【并查集思路】

目录题目截图题目分析ac code总结题目截图 题目分析 从删除比较难&#xff0c;考虑增加增加的过程中无用的边就可以删除考虑alice和bob各自的联通分量最后希望都是1&#xff0c;一开始都是n如果将两个独立的联通分量连起来了&#xff0c;那么连通分量个数减1这里很明显就是用并…

kubernetes-Pod详解2

kubernetes-Pod详解2 文章目录kubernetes-Pod详解2Pod生命周期创建和终止pod的创建过程pod的终止过程初始化容器钩子函数容器探测方式一&#xff1a;Exec方式二&#xff1a;TCPSocket方式三&#xff1a;HTTPGet重启策略Pod调度定向调度NodeSelector亲和性调度NodeAffinityPodAf…

Kamiya丨Kamiya艾美捷AREG酶联免疫吸附试验原理

Kamiya艾美捷AREG酶联免疫吸附试验预期用途&#xff1a; 该试剂盒是一种用于体外定量测量大鼠AREG的夹心酶免疫测定法血清、血浆和其他生物流体。仅供研究使用。不用于诊断程序。 存储&#xff1a; 所有试剂应按照小瓶上的标签保存。校准器、检测试剂A、检测试剂B和96孔带板应…

ZMQ之高可靠对称节点--双子星模式

一、概览 双子星模式是一对具有主从机制的高可靠节点。任一时间&#xff0c;某个节点会充当主机&#xff0c;接收所有客户端的请求&#xff1b;另一个则作为一种备机存在。两个节点会互相监控对方&#xff0c;当主机从网络中消失时&#xff0c;备机会替代主机的位置。 双子星模…

gateway网关聚合knife4j文档,同时兼容swagger2与swagger3

基于前两篇文章&#xff0c;进行整合 springcloud-gateway 聚合swagger3请求接口丢失appliactionName解决 springcloud-gateway聚合knife4j接口文档 为何要兼容&#xff1f;微服务开发者有的使用了swagger2版本&#xff0c;有的使用了swagger3版本&#xff0c;但暴露外部给前…

聊一聊我的第一个开源项目

项目地址&#xff1a;https://github.com/kpretty/hdd 我在21年的国庆写过一篇文章&#xff1a;《Docker 实战&#xff1a;部署hadoop集群》&#xff0c;当时也是刚接触docker&#xff0c;作为docker第一个练手项目对很多概念理解的不是很到位&#xff0c;因此那篇文章所使用的…

基于PHP+MySQL菜品食谱美食网站的设计与实现

美食是人类永恒的追求,现在有很多的美食爱好者,他们希望通过自己的各种方式来学习更多的美食制作方式,以及分享自己制作美食的一些过程,说让更多的人。享受到更加美味可口的饭菜。本系统也是基于这样的目的来进行开发的。 本系统是通过PHP&#xff1a;MySQL来进行开发,主要实现…

存储器扩展,画图题

目录 存储器与CPU的接口 地址线的连接 数据线的连接 控制线的连接&#xff08;读写和片选&#xff09; 考题 引出 第一题 第二题 第三题 计算地址范围&#xff08;这里用的38译码器&#xff09; 第四题 填空题 第五题 第六题&#xff08;2017&#xff09; 要求&…

【微信小程序】CSS模块化、使用缓存在本地模拟服务器数据库

&#x1f3c6;今日学习目标&#xff1a;第十五期——CSS模块化、使用缓存在本地模拟服务器数据库 &#x1f603;创作者&#xff1a;颜颜yan_ ✨个人主页&#xff1a;颜颜yan_的个人主页 ⏰预计时间&#xff1a;25分钟 &#x1f389;专栏系列&#xff1a;我的第一个微信小程序 文…

【这款神器可以有】3DMAX一键墙体门洞窗洞插件使用教程

3DMAX一键墙体门洞窗洞插件&#xff0c;只需导入户型图&#xff0c;单/双面墙体一键生成。 【主要功能】 --一键生成墙体 --一键门洞 --一键窗洞 --支持单/双面墙体生成 【安装方法】 无需安装&#xff0c;直接拖动插件脚本到3dmax窗口即可打开插件。 【快速开始】 将3dm…

11.我为 Netty 贡献源码 | 且看 Netty 如何应对 TCP 连接的正常关闭,异常关闭,半关闭场景

我为 Netty 贡献源码 | 且看 Netty 如何应对 TCP 连接的正常关闭&#xff0c;异常关闭&#xff0c;半关闭场景 本系列Netty源码解析文章基于 4.1.56.Final版本 写在前面..... 本文是笔者肉眼盯 Bug 系列的第三弹&#xff0c;前两弹分别是: 抓到Netty一个Bug&#xff0c;顺带来…

【Spring(七)】带你手写一个Spring容器

有关Spring的所有文章都收录于我的专栏&#xff1a;&#x1f449;Spring&#x1f448; 目录 前置准备 第一步、创建我们自定的注解 第二步、创建我们自己的容器类 测试 总结 相关文章 【Spring&#xff08;一&#xff09;】如何获取对象&#xff08;Bean&#xff09;【Spring&a…

CSS伪类使用详解

基本描述 CSS伪类是很常用的功能&#xff0c;主要应用于选择器的关键字&#xff0c;用来改变被选择元素的特殊状态下的样式。 伪类类似于普通CSS类的用法&#xff0c;是对CSS选择器的一种扩展&#xff0c;增强选择器的功能。 目前可用的伪类有大概40多个&#xff0c;少部分有兼…

Spring Bean的生命周期理解

一、Spring Bean的生命周期大的概括起来有四个阶段&#xff1a; 1、实例化 2、属性填充注入 3、初始化使用 4、Bean的销毁 二、如流程图所示 三、步骤说明 1、实例化 实例化一个Bean&#xff0c;即new 2、IOC依赖注入 按照Spring上下文对实例化的Bean进行属性填充注入 3、setB…

昆船智能上市:预计年营收19亿到22.5亿 市值48亿

雷递网 雷建平 11月30日昆船智能技术股份有限公司&#xff08;简称&#xff1a;“昆船智能”&#xff0c;证券代码&#xff1a;301311&#xff09;今日在深交所创业板上市。昆船智能本次发行股票6000万股&#xff0c;发行价为13.88元&#xff0c;募资8.33亿元。昆船智能开盘价为…

2022CTF培训(七)逆向专项练习

附件下载链接 babyre 首先是一个迷宫&#xff0c;由于答案不唯一&#xff0c;因此到 dfs 求出所有路径。 #include <bits/stdc.h>constexpr char s[] "**************.****.**s..*..******.****.***********..***..**..#*..***..***.********************.**..*…

springMVC01,springMVC的执行流程【第一个springMVC例子(XML配置版本):HelloWorld】

springMVC01,springMVC的执行流程【第一个springMVC项目&#xff1a;HelloWorld】springMVC的简介springMVC的执行流程第一个springMVC项目&#xff08;XML配置版本&#xff09;1.创建项目1.1 新建maven项目&#xff1a;1.2 添加web支持1.3 在pom.xml中导入依赖1.4 配置tomcat2…