ZMQ之面向服务的可靠队列(管家模式)

news2024/11/24 22:35:33

        管家模式协议(MDP)在扩展PPP协议时引入了一个有趣的特性:client发送的每一个请求都有一个“服务名称”,而worker在像队列装置注册时需要告知自己的服务类型。MDP的优势在于它来源于现实编程,协议简单,且容易提升。

        引入“服务名称”的机制,是对偏执海盗队列的一个简单补充,而结果是让其成为一个面向服务的代理。

        在实施管家模式之前,我们需要为client和worker编写一个框架。如果程序员可以通过简单的API来实现这种模式,那就没有必要让他们去了解管家模式的协议内容和实现方法了。
        所以,我们第一个协议(即管家模式协议)定义了分布式架构中节点是如何互相交互的,第二个协议则要定义应用程序应该如何通过框架来使用这一协议。
        管家模式有两个端点,客户端和服务端。因为我们要为client和worker都撰写框架,所以就需要提供两套API。以下是用简单的面向对象方法设计的client端API雏形,使用的是C语言的ZFL library。

mdcli_t *mdcli_new     (char *broker);
void     mdcli_destroy (mdcli_t **self_p);
zmsg_t  *mdcli_send    (mdcli_t *self, char *service, zmsg_t **request_p);

        就这么简单。我们创建了一个会话来和代理通信,发送并接收一个请求,最后关闭连接。以下是worker端API的雏形。

mdwrk_t *mdwrk_new     (char *broker,char *service);
void     mdwrk_destroy (mdwrk_t **self_p);
zmsg_t  *mdwrk_recv    (mdwrk_t *self, zmsg_t *reply);

        上面两段代码看起来差不多,但是worker端API略有不同。worker第一次执行recv()后会传递一个空的应答,之后才传递当前的应答,并获得新的请求。

        两段的API都很容易开发,只需在偏执海盗模式代码的基础上修改即可。以下是client API:

        mdcliapi: Majordomo client API in C

/*  =====================================================================
    mdcliapi.c
 
    Majordomo Protocol Client API
    Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
 
    ---------------------------------------------------------------------
    Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
    Copyright other contributors as noted in the AUTHORS file.
 
    This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
 
    This is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or (at
    your option) any later version.
 
    This software is distributed in the hope that it will be useful, but
    WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    Lesser General Public License for more details.
 
    You should have received a copy of the GNU Lesser General Public
    License along with this program. If not, see
    <http://www.gnu.org/licenses/>.
    =====================================================================
*/
 
#include "mdcliapi.h"
 
//  类结构
//  我们会通过成员方法来访问这些属性
 
struct _mdcli_t {
    zctx_t *ctx;                //  上下文
    char *broker;
    void *client;               //  连接至代理的套接字
    int verbose;                //  使用标准输出打印当前活动
    int timeout;                //  请求超时时间
    int retries;                //  请求重试次数
};
 
 
//  ---------------------------------------------------------------------
//  连接或重连代理
 
void s_mdcli_connect_to_broker (mdcli_t *self)
{
    if (self->client)
        zsocket_destroy (self->ctx, self->client);
    self->client = zsocket_new (self->ctx, ZMQ_REQ);
    zmq_connect (self->client, self->broker);
    if (self->verbose)
        zclock_log ("I: 正在连接至代理 %s...", self->broker);
}
 
 
//  ---------------------------------------------------------------------
//  构造函数
 
mdcli_t *
mdcli_new (char *broker, int verbose)
{
    assert (broker);
 
    mdcli_t *self = (mdcli_t *) zmalloc (sizeof (mdcli_t));
    self->ctx = zctx_new ();
    self->broker = strdup (broker);
    self->verbose = verbose;
    self->timeout = 2500;           //  毫秒
    self->retries = 3;              //  尝试次数
 
    s_mdcli_connect_to_broker (self);
    return self;
}
 
 
//  ---------------------------------------------------------------------
//  析构函数
 
void
mdcli_destroy (mdcli_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        mdcli_t *self = *self_p;
        zctx_destroy (&self->ctx);
        free (self->broker);
        free (self);
        *self_p = NULL;
    }
}
 
 
//  ---------------------------------------------------------------------
//  设定请求超时时间
 
void
mdcli_set_timeout (mdcli_t *self, int timeout)
{
    assert (self);
    self->timeout = timeout;
}
 
 
//  ---------------------------------------------------------------------
//  设定请求重试次数
 
void
mdcli_set_retries (mdcli_t *self, int retries)
{
    assert (self);
    self->retries = retries;
}
 
 
//  ---------------------------------------------------------------------
//  向代理发送请求,并尝试获取应答;
//  对消息保持所有权,发送后销毁;
//  返回应答消息,或NULL。
 
zmsg_t *
mdcli_send (mdcli_t *self, char *service, zmsg_t **request_p)
{
    assert (self);
    assert (request_p);
    zmsg_t *request = *request_p;
 
    //  用协议前缀包装消息
    //  Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
    //  Frame 2: 服务名称 (可打印字符串)
    zmsg_pushstr (request, service);
    zmsg_pushstr (request, MDPC_CLIENT);
    if (self->verbose) {
        zclock_log ("I: 发送请求给 '%s' 服务:", service);
        zmsg_dump (request);
    }
 
    int retries_left = self->retries;
    while (retries_left && !zctx_interrupted) {
        zmsg_t *msg = zmsg_dup (request);
        zmsg_send (&msg, self->client);
 
        while (TRUE) {
            //  轮询套接字以接收应答,有超时时间
            zmq_pollitem_t items [] = {
                { self->client, 0, ZMQ_POLLIN, 0 } };
            int rc = zmq_poll (items, 1, self->timeout * ZMQ_POLL_MSEC);
            if (rc == -1)
                break;          //  中断
 
            //  收到应答后进行处理
            if (items [0].revents & ZMQ_POLLIN) {
                zmsg_t *msg = zmsg_recv (self->client);
                if (self->verbose) {
                    zclock_log ("I: received reply:");
                    zmsg_dump (msg);
                }
                //  不要尝试处理错误,直接报错即可
                assert (zmsg_size (msg) >= 3);
 
                zframe_t *header = zmsg_pop (msg);
                assert (zframe_streq (header, MDPC_CLIENT));
                zframe_destroy (&header);
 
                zframe_t *reply_service = zmsg_pop (msg);
                assert (zframe_streq (reply_service, service));
                zframe_destroy (&reply_service);
 
                zmsg_destroy (&request);
                return msg;     //  成功
            }
            else
            if (--retries_left) {
                if (self->verbose)
                    zclock_log ("W: no reply, reconnecting...");
                //  重连并重发消息
                s_mdcli_connect_to_broker (self);
                zmsg_t *msg = zmsg_dup (request);
                zmsg_send (&msg, self->client);
            }
            else {
                if (self->verbose)
                    zclock_log ("W: 发生严重错误,放弃重试。");
                break;          //  放弃
            }
        }
    }
    if (zctx_interrupted)
        printf ("W: 收到中断消息,结束client进程...\n");
    zmsg_destroy (&request);
    return NULL;
}

        以下测试程序会执行10万次请求应答:

        mdclient: Majordomo client application in C

//
//  管家模式协议 - 客户端示例
//  使用mdcli API隐藏管家模式协议的内部实现
//
 
//  让我们直接编译这段代码,不生成类库
#include "mdcliapi.c"
 
int main (int argc, char *argv [])
{
    int verbose = (argc > 1 && streq (argv [1], "-v"));
    mdcli_t *session = mdcli_new ("tcp://localhost:5555", verbose);
 
    int count;
    for (count = 0; count < 100000; count++) {
        zmsg_t *request = zmsg_new ();
        zmsg_pushstr (request, "Hello world");
        zmsg_t *reply = mdcli_send (session, "echo", &request);
        if (reply)
            zmsg_destroy (&reply);
        else
            break;              //  中断或停止
    }
    printf ("已处理 %d 次请求-应答\n", count);
    mdcli_destroy (&session);
    return 0;
}

        下面是worker的API:

        mdwrkapi: Majordomo worker API in C

/*  =====================================================================
    mdwrkapi.c
 
    Majordomo Protocol Worker API
    Implements the MDP/Worker spec at http://rfc.zeromq.org/spec:7.
 
    ---------------------------------------------------------------------
    Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
    Copyright other contributors as noted in the AUTHORS file.
 
    This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
 
    This is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or (at
    your option) any later version.
 
    This software is distributed in the hope that it will be useful, but
    WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    Lesser General Public License for more details.
 
    You should have received a copy of the GNU Lesser General Public
    License along with this program. If not, see
    <http://www.gnu.org/licenses/>.
    =====================================================================
*/
 
#include "mdwrkapi.h"
 
//  可靠性参数
#define HEARTBEAT_LIVENESS  3       //  合理值:3-5
 
//  类结构
//  使用成员函数访问属性
 
struct _mdwrk_t {
    zctx_t *ctx;                //  上下文
    char *broker;
    char *service;
    void *worker;               //  连接至代理的套接字
    int verbose;                //  使用标准输出打印活动
 
    //  心跳设置
    uint64_t heartbeat_at;      //  发送心跳的时间
    size_t liveness;            //  尝试次数
    int heartbeat;              //  心跳延时,单位:毫秒
    int reconnect;              //  重连延时,单位:毫秒
 
    //  内部状态
    int expect_reply;           //  初始值为0
 
    //  应答地址,如果存在的话
    zframe_t *reply_to;
};
 
 
//  ---------------------------------------------------------------------
//  发送消息给代理
//  如果没有提供消息,则内部创建一个
 
static void
s_mdwrk_send_to_broker (mdwrk_t *self, char *command, char *option,
                        zmsg_t *msg)
{
    msg = msg? zmsg_dup (msg): zmsg_new ();
 
    //  将协议信封压入消息顶部
    if (option)
        zmsg_pushstr (msg, option);
    zmsg_pushstr (msg, command);
    zmsg_pushstr (msg, MDPW_WORKER);
    zmsg_pushstr (msg, "");
 
    if (self->verbose) {
        zclock_log ("I: sending %s to broker",
            mdps_commands [(int) *command]);
        zmsg_dump (msg);
    }
    zmsg_send (&msg, self->worker);
}
 
 
//  ---------------------------------------------------------------------
//  连接或重连代理
 
void s_mdwrk_connect_to_broker (mdwrk_t *self)
{
    if (self->worker)
        zsocket_destroy (self->ctx, self->worker);
    self->worker = zsocket_new (self->ctx, ZMQ_DEALER);
    zmq_connect (self->worker, self->broker);
    if (self->verbose)
        zclock_log ("I: 正在连接代理 %s...", self->broker);
 
    //  向代理注册服务类型
    s_mdwrk_send_to_broker (self, MDPW_READY, self->service, NULL);
 
    //  当心跳健康度为零,表示代理已断开连接
    self->liveness = HEARTBEAT_LIVENESS;
    self->heartbeat_at = zclock_time () + self->heartbeat;
}
 
 
//  ---------------------------------------------------------------------
//  构造函数
 
mdwrk_t *
mdwrk_new (char *broker,char *service, int verbose)
{
    assert (broker);
    assert (service);
 
    mdwrk_t *self = (mdwrk_t *) zmalloc (sizeof (mdwrk_t));
    self->ctx = zctx_new ();
    self->broker = strdup (broker);
    self->service = strdup (service);
    self->verbose = verbose;
    self->heartbeat = 2500;     //  毫秒
    self->reconnect = 2500;     //  毫秒
 
    s_mdwrk_connect_to_broker (self);
    return self;
}
 
 
//  ---------------------------------------------------------------------
//  析构函数
 
void
mdwrk_destroy (mdwrk_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        mdwrk_t *self = *self_p;
        zctx_destroy (&self->ctx);
        free (self->broker);
        free (self->service);
        free (self);
        *self_p = NULL;
    }
}
 
 
//  ---------------------------------------------------------------------
//  设置心跳延迟
 
void
mdwrk_set_heartbeat (mdwrk_t *self, int heartbeat)
{
    self->heartbeat = heartbeat;
}
 
 
//  ---------------------------------------------------------------------
//  设置重连延迟
 
void
mdwrk_set_reconnect (mdwrk_t *self, int reconnect)
{
    self->reconnect = reconnect;
}
 
 
//  ---------------------------------------------------------------------
//  若有应答则发送给代理,并等待新的请求
 
zmsg_t *
mdwrk_recv (mdwrk_t *self, zmsg_t **reply_p)
{
    //  格式化并发送请求传入的应答
    assert (reply_p);
    zmsg_t *reply = *reply_p;
    assert (reply || !self->expect_reply);
    if (reply) {
        assert (self->reply_to);
        zmsg_wrap (reply, self->reply_to);
        s_mdwrk_send_to_broker (self, MDPW_REPLY, NULL, reply);
        zmsg_destroy (reply_p);
    }
    self->expect_reply = 1;
 
    while (TRUE) {
        zmq_pollitem_t items [] = {
            { self->worker,  0, ZMQ_POLLIN, 0 } };
        int rc = zmq_poll (items, 1, self->heartbeat * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;              //  中断
 
        if (items [0].revents & ZMQ_POLLIN) {
            zmsg_t *msg = zmsg_recv (self->worker);
            if (!msg)
                break;          //  中断
            if (self->verbose) {
                zclock_log ("I: 从代理处获得消息:");
                zmsg_dump (msg);
            }
            self->liveness = HEARTBEAT_LIVENESS;
 
            //  不要处理错误,直接报错即可
            assert (zmsg_size (msg) >= 3);
 
            zframe_t *empty = zmsg_pop (msg);
            assert (zframe_streq (empty, ""));
            zframe_destroy (&empty);
 
            zframe_t *header = zmsg_pop (msg);
            assert (zframe_streq (header, MDPW_WORKER));
            zframe_destroy (&header);
 
            zframe_t *command = zmsg_pop (msg);
            if (zframe_streq (command, MDPW_REQUEST)) {
                //  这里需要将消息中空帧之前的所有地址都保存起来,
                //  但在这里我们暂时只保存一个
                self->reply_to = zmsg_unwrap (msg);
                zframe_destroy (&command);
                return msg;     //  处理请求
            }
            else
            if (zframe_streq (command, MDPW_HEARTBEAT))
                ;               //  不对心跳做任何处理
            else
            if (zframe_streq (command, MDPW_DISCONNECT))
                s_mdwrk_connect_to_broker (self);
            else {
                zclock_log ("E: 消息不合法");
                zmsg_dump (msg);
            }
            zframe_destroy (&command);
            zmsg_destroy (&msg);
        }
        else
        if (--self->liveness == 0) {
            if (self->verbose)
                zclock_log ("W: 失去与代理的连接 - 正在重试...");
            zclock_sleep (self->reconnect);
            s_mdwrk_connect_to_broker (self);
        }
        //  适时地发送消息
        if (zclock_time () > self->heartbeat_at) {
            s_mdwrk_send_to_broker (self, MDPW_HEARTBEAT, NULL, NULL);
            self->heartbeat_at = zclock_time () + self->heartbeat;
        }
    }
    if (zctx_interrupted)
        printf ("W: 收到中断消息,中止worker...\n");
    return NULL;
}

        以下测试程序实现了名为echo的服务:

        mdworker: Majordomo worker application in C

//
//  管家模式协议 - worker示例
//  使用mdwrk API隐藏MDP协议的内部实现
//
 
//  让我们直接编译代码,而不创建类库
#include "mdwrkapi.c"
 
int main (int argc, char *argv [])
{
    int verbose = (argc > 1 && streq (argv [1], "-v"));
    mdwrk_t *session = mdwrk_new (
        "tcp://localhost:5555", "echo", verbose);
 
    zmsg_t *reply = NULL;
    while (1) {
        zmsg_t *request = mdwrk_recv (session, &reply);
        if (request == NULL)
            break;              //  worker被中止
        reply = request;        //  echo服务……其实很复杂:)
    }
    mdwrk_destroy (&session);
    return 0;
}

        几点说明:

                1、API是单线程的,所以说worker不会再后台发送心跳,而这也是我们所期望的:如果worker应用程序停止了,心跳就会跟着中止,代理便会停止向该worker发送新的请求。

                2、wroker API没有做回退算法的设置,因为这里不值得使用这一复杂的机制。

                3、API没有提供任何报错机制,如果出现问题,它会直接报断言(或异常,依语言而定)。这一做法对实验性的编程是有用的,这样可以立刻看到执行结果。但在真实编程环境中,API应该足够健壮,合适地处理非法消息。

        也许你会问,worker API为什么要关闭它的套接字并新开一个呢?特别是ZMQ是有重连机制的,能够在节点归来后进行重连。我们可以回顾一下简单海盗模式中的worker,以及偏执海盗模式中的worker来加以理解。ZMQ确实会进行自动重连,但如果代理死亡并重连,worker并不会重新进行注册。这个问题有两种解决方案:一是我们这里用到的较为简便的方案,即当worker判断代理已经死亡时,关闭它的套接字并重头来过;另一个方案是当代理收到未知worker的心跳时要求该worker对其提供的服务类型进行注册,这样一来就需要在协议中说明这一规则。

        下面让我们设计管家模式的代理,它的核心代码是一组队列,每种服务对应一个队列。我们会在worker出现时创建相应的队列(worker消失时应该销毁对应的队列,不过我们这里暂时不考虑)。额外的,我们会为每种服务维护一个worker的队列。

        为了让C语言代码更为易读易写,我使用了ZFL项目提供的哈希和链表容器,并命名为zhash和zlist。如果使用现代语言编写,那自然可以使用其内置的容器。

//
//  管家模式协议 - 代理
//  协议 http://rfc.zeromq.org/spec:7 和 spec:8 的最简实现
//
#include "czmq.h"
#include "mdp.h"
 
//  一般我们会从配置文件中获取以下值
 
#define HEARTBEAT_LIVENESS  3       //  合理值:3-5
#define HEARTBEAT_INTERVAL  2500    //  单位:毫秒
#define HEARTBEAT_EXPIRY    HEARTBEAT_INTERVAL * HEARTBEAT_LIVENESS
 
//  定义一个代理
typedef struct {
    zctx_t *ctx;                //  上下文
    void *socket;               //  用于连接client和worker的套接字
    int verbose;                //  使用标准输出打印活动信息
    char *endpoint;             //  代理绑定到的端点
    zhash_t *services;          //  已知服务的哈希表
    zhash_t *workers;           //  已知worker的哈希表
    zlist_t *waiting;           //  正在等待的worker队列
    uint64_t heartbeat_at;      //  发送心跳的时间
} broker_t;
 
//  定义一个服务
typedef struct {
    char *name;                 //  服务名称
    zlist_t *requests;          //  客户端请求队列
    zlist_t *waiting;           //  正在等待的worker队列
    size_t workers;             //  可用worker数
} service_t;
 
//  定义一个worker,状态为空闲或占用
typedef struct {
    char *identity;             //  worker的标识
    zframe_t *address;          //  地址帧
    service_t *service;         //  所属服务
    int64_t expiry;             //  过期时间,从未收到心跳起计时
} worker_t;
 
 
//  ---------------------------------------------------------------------
//  代理使用的函数
static broker_t *
    s_broker_new (int verbose);
static void
    s_broker_destroy (broker_t **self_p);
static void
    s_broker_bind (broker_t *self, char *endpoint);
static void
    s_broker_purge_workers (broker_t *self);
 
//  服务使用的函数
static service_t *
    s_service_require (broker_t *self, zframe_t *service_frame);
static void
    s_service_destroy (void *argument);
static void
    s_service_dispatch (broker_t *self, service_t *service, zmsg_t *msg);
static void
    s_service_internal (broker_t *self, zframe_t *service_frame, zmsg_t *msg);
 
//  worker使用的函数
static worker_t *
    s_worker_require (broker_t *self, zframe_t *address);
static void
    s_worker_delete (broker_t *self, worker_t *worker, int disconnect);
static void
    s_worker_destroy (void *argument);
static void
    s_worker_process (broker_t *self, zframe_t *sender, zmsg_t *msg);
static void
    s_worker_send (broker_t *self, worker_t *worker, char *command,
                   char *option, zmsg_t *msg);
static void
    s_worker_waiting (broker_t *self, worker_t *worker);
 
//  客户端使用的函数
static void
    s_client_process (broker_t *self, zframe_t *sender, zmsg_t *msg);
 
 
//  ---------------------------------------------------------------------
//  主程序
 
int main (int argc, char *argv [])
{
    int verbose = (argc > 1 && streq (argv [1], "-v"));
 
    broker_t *self = s_broker_new (verbose);
    s_broker_bind (self, "tcp://*:5555");
 
    //  接受并处理消息,直至程序被中止
    while (TRUE) {
        zmq_pollitem_t items [] = {
            { self->socket,  0, ZMQ_POLLIN, 0 } };
        int rc = zmq_poll (items, 1, HEARTBEAT_INTERVAL * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;              //  中断
 
        //  Process next input message, if any
        if (items [0].revents & ZMQ_POLLIN) {
            zmsg_t *msg = zmsg_recv (self->socket);
            if (!msg)
                break;          //  中断
            if (self->verbose) {
                zclock_log ("I: 收到消息:");
                zmsg_dump (msg);
            }
            zframe_t *sender = zmsg_pop (msg);
            zframe_t *empty  = zmsg_pop (msg);
            zframe_t *header = zmsg_pop (msg);
 
            if (zframe_streq (header, MDPC_CLIENT))
                s_client_process (self, sender, msg);
            else
            if (zframe_streq (header, MDPW_WORKER))
                s_worker_process (self, sender, msg);
            else {
                zclock_log ("E: 非法消息:");
                zmsg_dump (msg);
                zmsg_destroy (&msg);
            }
            zframe_destroy (&sender);
            zframe_destroy (&empty);
            zframe_destroy (&header);
        }
        //  断开并删除过期的worker
        //  适时地发送心跳给worker
        if (zclock_time () > self->heartbeat_at) {
            s_broker_purge_workers (self);
            worker_t *worker = (worker_t *) zlist_first (self->waiting);
            while (worker) {
                s_worker_send (self, worker, MDPW_HEARTBEAT, NULL, NULL);
                worker = (worker_t *) zlist_next (self->waiting);
            }
            self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
        }
    }
    if (zctx_interrupted)
        printf ("W: 收到中断消息,关闭中...\n");
 
    s_broker_destroy (&self);
    return 0;
}
 
 
//  ---------------------------------------------------------------------
//  代理对象的构造函数
 
static broker_t *
s_broker_new (int verbose)
{
    broker_t *self = (broker_t *) zmalloc (sizeof (broker_t));
 
    //  初始化代理状态
    self->ctx = zctx_new ();
    self->socket = zsocket_new (self->ctx, ZMQ_ROUTER);
    self->verbose = verbose;
    self->services = zhash_new ();
    self->workers = zhash_new ();
    self->waiting = zlist_new ();
    self->heartbeat_at = zclock_time () + HEARTBEAT_INTERVAL;
    return self;
}
 
//  ---------------------------------------------------------------------
//  代理对象的析构函数
 
static void
s_broker_destroy (broker_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        broker_t *self = *self_p;
        zctx_destroy (&self->ctx);
        zhash_destroy (&self->services);
        zhash_destroy (&self->workers);
        zlist_destroy (&self->waiting);
        free (self);
        *self_p = NULL;
    }
}
 
//  ---------------------------------------------------------------------
//  将代理套接字绑定至端点,可以重复调用该函数
//  我们使用一个套接字来同时处理client和worker
 
void
s_broker_bind (broker_t *self, char *endpoint)
{
    zsocket_bind (self->socket, endpoint);
    zclock_log ("I: MDP broker/0.1.1 is active at %s", endpoint);
}
 
//  ---------------------------------------------------------------------
//  删除空闲状态中过期的worker
 
static void
s_broker_purge_workers (broker_t *self)
{
    worker_t *worker = (worker_t *) zlist_first (self->waiting);
    while (worker) {
        if (zclock_time () < worker->expiry)
            continue;              //  该worker未过期,停止搜索
        if (self->verbose)
            zclock_log ("I: 正在删除过期的worker: %s",
                worker->identity);
 
        s_worker_delete (self, worker, 0);
        worker = (worker_t *) zlist_first (self->waiting);
    }
}
 
//  ---------------------------------------------------------------------
//  定位或创建新的服务项
 
static service_t *
s_service_require (broker_t *self, zframe_t *service_frame)
{
    assert (service_frame);
    char *name = zframe_strdup (service_frame);
 
    service_t *service =
        (service_t *) zhash_lookup (self->services, name);
    if (service == NULL) {
        service = (service_t *) zmalloc (sizeof (service_t));
        service->name = name;
        service->requests = zlist_new ();
        service->waiting = zlist_new ();
        zhash_insert (self->services, name, service);
        zhash_freefn (self->services, name, s_service_destroy);
        if (self->verbose)
            zclock_log ("I: 收到消息:");
    }
    else
        free (name);
 
    return service;
}
 
//  ---------------------------------------------------------------------
//  当服务从broker->services中移除时销毁该服务对象
 
static void
s_service_destroy (void *argument)
{
    service_t *service = (service_t *) argument;
    //  销毁请求队列中的所有项目
    while (zlist_size (service->requests)) {
        zmsg_t *msg = zlist_pop (service->requests);
        zmsg_destroy (&msg);
    }
    zlist_destroy (&service->requests);
    zlist_destroy (&service->waiting);
    free (service->name);
    free (service);
}
 
//  ---------------------------------------------------------------------
//  可能时,分发请求给等待中的worker
 
static void
s_service_dispatch (broker_t *self, service_t *service, zmsg_t *msg)
{
    assert (service);
    if (msg)                    //  将消息加入队列
        zlist_append (service->requests, msg);
 
    s_broker_purge_workers (self);
    while (zlist_size (service->waiting)
        && zlist_size (service->requests))
    {
        worker_t *worker = zlist_pop (service->waiting);
        zlist_remove (self->waiting, worker);
        zmsg_t *msg = zlist_pop (service->requests);
        s_worker_send (self, worker, MDPW_REQUEST, NULL, msg);
        zmsg_destroy (&msg);
    }
}
 
//  ---------------------------------------------------------------------
//  使用8/MMI协定处理内部服务
 
static void
s_service_internal (broker_t *self, zframe_t *service_frame, zmsg_t *msg)
{
    char *return_code;
    if (zframe_streq (service_frame, "mmi.service")) {
        char *name = zframe_strdup (zmsg_last (msg));
        service_t *service =
            (service_t *) zhash_lookup (self->services, name);
        return_code = service && service->workers? "200": "404";
        free (name);
    }
    else
        return_code = "501";
 
    zframe_reset (zmsg_last (msg), return_code, strlen (return_code));
 
    //  移除并保存返回给client的信封,插入协议头信息和服务名称,并重新包装信封
    zframe_t *client = zmsg_unwrap (msg);
    zmsg_push (msg, zframe_dup (service_frame));
    zmsg_pushstr (msg, MDPC_CLIENT);
    zmsg_wrap (msg, client);
    zmsg_send (&msg, self->socket);
}
 
//  ---------------------------------------------------------------------
//  按需创建worker
 
static worker_t *
s_worker_require (broker_t *self, zframe_t *address)
{
    assert (address);
 
    //  self->workers使用wroker的标识为键
    char *identity = zframe_strhex (address);
    worker_t *worker =
        (worker_t *) zhash_lookup (self->workers, identity);
 
    if (worker == NULL) {
        worker = (worker_t *) zmalloc (sizeof (worker_t));
        worker->identity = identity;
        worker->address = zframe_dup (address);
        zhash_insert (self->workers, identity, worker);
        zhash_freefn (self->workers, identity, s_worker_destroy);
        if (self->verbose)
            zclock_log ("I: 正在注册新的worker: %s", identity);
    }
    else
        free (identity);
    return worker;
}
 
//  ---------------------------------------------------------------------
//  从所有数据结构中删除wroker,并销毁worker对象
 
static void
s_worker_delete (broker_t *self, worker_t *worker, int disconnect)
{
    assert (worker);
    if (disconnect)
        s_worker_send (self, worker, MDPW_DISCONNECT, NULL, NULL);
 
    if (worker->service) {
        zlist_remove (worker->service->waiting, worker);
        worker->service->workers--;
    }
    zlist_remove (self->waiting, worker);
    //  以下方法间接调用了s_worker_destroy()方法
    zhash_delete (self->workers, worker->identity);
}
 
//  ---------------------------------------------------------------------
//  当worker从broker->workers中移除时,销毁worker对象
 
static void
s_worker_destroy (void *argument)
{
    worker_t *worker = (worker_t *) argument;
    zframe_destroy (&worker->address);
    free (worker->identity);
    free (worker);
}
 
//  ---------------------------------------------------------------------
//  处理worker发送来的消息
 
static void
s_worker_process (broker_t *self, zframe_t *sender, zmsg_t *msg)
{
    assert (zmsg_size (msg) >= 1);     //  消息中至少包含命令帧
 
    zframe_t *command = zmsg_pop (msg);
    char *identity = zframe_strhex (sender);
    int worker_ready = (zhash_lookup (self->workers, identity) != NULL);
    free (identity);
    worker_t *worker = s_worker_require (self, sender);
 
    if (zframe_streq (command, MDPW_READY)) {
        //  若worker队列中已有该worker,但仍收到了它的“已就绪”消息,则删除这个worker。
        if (worker_ready)
            s_worker_delete (self, worker, 1);
        else
        if (zframe_size (sender) >= 4  //  服务名称为保留的服务
        &&  memcmp (zframe_data (sender), "mmi.", 4) == 0)
            s_worker_delete (self, worker, 1);
        else {
            //  将worker对应到服务,并置为空闲状态
            zframe_t *service_frame = zmsg_pop (msg);
            worker->service = s_service_require (self, service_frame);
            worker->service->workers++;
            s_worker_waiting (self, worker);
            zframe_destroy (&service_frame);
        }
    }
    else
    if (zframe_streq (command, MDPW_REPLY)) {
        if (worker_ready) {
            //  移除并保存返回给client的信封,插入协议头信息和服务名称,并重新包装信封
            zframe_t *client = zmsg_unwrap (msg);
            zmsg_pushstr (msg, worker->service->name);
            zmsg_pushstr (msg, MDPC_CLIENT);
            zmsg_wrap (msg, client);
            zmsg_send (&msg, self->socket);
            s_worker_waiting (self, worker);
        }
        else
            s_worker_delete (self, worker, 1);
    }
    else
    if (zframe_streq (command, MDPW_HEARTBEAT)) {
        if (worker_ready)
            worker->expiry = zclock_time () + HEARTBEAT_EXPIRY;
        else
            s_worker_delete (self, worker, 1);
    }
    else
    if (zframe_streq (command, MDPW_DISCONNECT))
        s_worker_delete (self, worker, 0);
    else {
        zclock_log ("E: 非法消息");
        zmsg_dump (msg);
    }
    free (command);
    zmsg_destroy (&msg);
}
 
//  ---------------------------------------------------------------------
//  发送消息给worker
//  如果指针指向了一条消息,发送它,但不销毁它,因为这是调用者的工作
 
static void
s_worker_send (broker_t *self, worker_t *worker, char *command,
               char *option, zmsg_t *msg)
{
    msg = msg? zmsg_dup (msg): zmsg_new ();
 
    //  将协议信封压入消息顶部
    if (option)
        zmsg_pushstr (msg, option);
    zmsg_pushstr (msg, command);
    zmsg_pushstr (msg, MDPW_WORKER);
 
    //  在消息顶部插入路由帧
    zmsg_wrap (msg, zframe_dup (worker->address));
 
    if (self->verbose) {
        zclock_log ("I: 正在发送消息给worker %s",
            mdps_commands [(int) *command]);
        zmsg_dump (msg);
    }
    zmsg_send (&msg, self->socket);
}
 
//  ---------------------------------------------------------------------
//  正在等待的worker
 
static void
s_worker_waiting (broker_t *self, worker_t *worker)
{
    //  将worker加入代理和服务的等待队列
    zlist_append (self->waiting, worker);
    zlist_append (worker->service->waiting, worker);
    worker->expiry = zclock_time () + HEARTBEAT_EXPIRY;
    s_service_dispatch (self, worker->service, NULL);
}
 
//  ---------------------------------------------------------------------
//  处理client发来的请求
 
static void
s_client_process (broker_t *self, zframe_t *sender, zmsg_t *msg)
{
    assert (zmsg_size (msg) >= 2);     //  服务名称 + 请求内容
 
    zframe_t *service_frame = zmsg_pop (msg);
    service_t *service = s_service_require (self, service_frame);
 
    //  为应答内容设置请求方的地址
    zmsg_wrap (msg, zframe_dup (sender));
    if (zframe_size (service_frame) >= 4
    &&  memcmp (zframe_data (service_frame), "mmi.", 4) == 0)
        s_service_internal (self, service_frame, msg);
    else
        s_service_dispatch (self, service, msg);
    zframe_destroy (&service_frame);
}

        这个例子应该是我们见过最复杂的一个示例了,大约有500行代码。编写这段代码并让其变的健壮,大约花费了两天的时间。但是,这也仅仅是一个完整的面向服务代理的一部分。

        几点说明:

                1、管家模式协议要求我们在一个套接字中同时处理client和worker,这一点对部署和管理代理很有益处:它只会在一个ZMQ端点上收发请求,而不是两个。

                2、代理很好地实现了MDP/0.1协议中规范的内容,包括当代理发送非法命令和心跳时断开的机制。

                3、可以将这段代码扩充为多线程,每个线程管理一个套接字、一组client和worker。这种做法在大型架构的拆分中显得很有趣。C语言代码已经是这样的格式了,因此很容易实现。

                4、还可以将这段代码扩充为主备模式、双在线模式,进一步提高可靠性。因为从本质上来说,代理是无状态的,只是保存了服务的存在与否,因此client和worker可以自行选择除此之外的代理来进行通信。

                5、示例代码中心跳的间隔为5秒,主要是为了减少调试时的输出。现实中的值应该设得低一些,但是,重试的过程应该设置得稍长一些,让服务有足够的时间启动,如10秒钟。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/53276.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

孩子到底是食物过敏?还是食物不耐受?

虾蟹过敏&#xff0c;牛奶过敏&#xff0c;鸡蛋过敏&#xff0c;甚至花生过敏……近年来&#xff0c;儿童食物过敏的发病率逐年上升。食物过敏对儿童危害很大&#xff0c;其临床症状会使儿童患病&#xff0c;影响身心健康&#xff1b;长期禁食&#xff0c;影响均衡营养摄入&…

Segger RTT深度使用说明-移植-Jlink rtt viewer显示-输出到Secure CRT

简介 RTT&#xff08; Real Time Terminal&#xff09;是SEGGER公司新出的可以在嵌入式应用中与用户进行交互的实时终端。J-Link驱动4.90之后的版本都支持RTT。RTT既可以从MCU上输出信息、也可以向应用程序发送信息&#xff0c;由于其高速的特性&#xff0c;所以不影响MCU的实…

重磅消息:Lazada和Shopee通过中国执照就可以开通本地店铺,享受更多的流量和资源扶持

目前Lazada平台是可以做跨境店和本地店铺的&#xff0c;跨境店铺指的是中国卖家通过国内的营业执照开店&#xff0c;本地店指的是东南亚本地的商家提供个人身份证或当地的营业执照开的店铺&#xff0c;那么有什么区别和优劣势 1.跨境店/本地店开店所需资料 跨境店&#xff1a…

横向的Excel输出为pdf自动分成两页怎么办?不分页,铺满整张纸的方法来了

我们工作中有时候会需要把Excel转换成pdf&#xff0c;一般我们用WPS的“输出为pdf”功能就可以转了。但是有些横向的Excel转换的时候&#xff0c;会自动分成两页&#xff0c;这并不是我们想要的效果。怎么才能不分成两页呢&#xff1f; 首先我们有一个Excel&#xff1a; 使用…

什么牌子蓝牙耳机通话质量好?通话质量好的蓝牙耳机推荐

蓝牙耳机作为手机的最佳伴侣&#xff0c;已经成为老百姓日常生活必备。每次有大品牌发布新款蓝牙耳机&#xff0c;几乎都能够得到很好的反响&#xff0c;蓝牙耳机不仅在音质上有了很大的提升&#xff0c;并且在其他功能也在不断的提升&#xff0c;使用蓝牙耳机通话避免不了电话…

m基于matlab的光通信误码率仿真,分别对比了OFDM+BPSK和OFDM+4QAM的误码率和星座图

目录 1.算法描述 2.matlab算法仿真效果 3.MATLAB核心程序 4.完整MATLAB 1.算法描述 光通信系统中&#xff0c;QAM调制应用广泛&#xff0c;因为其利用幅度和相位同时传递i信息&#xff0c;提高了频带利用率。目前&#xff0c;最高阶数的QAM已达到1024-QAM即星座图有1024个点…

网站风格变黑白的方法,用css或javascript方法将网站改为灰色

如何把网站颜色去掉&#xff1f;用下面的方法即可 1.通用方法 body { -webkit-filter: grayscale(100%);-moz-filter: grayscale(100%);-ms-filter: grayscale(100%);-o-filter: grayscale(100%);filter: grayscale(100%);filter: gray; }html {-webkit-filter: grayscale(10…

harbor私有仓库部署

一、Harbor的概念 Harbor是VMware公司开源的企业级Docker Registry项目&#xff0c;其目标是帮助用户迅速搭建一个企业级的Docker Registry服务 Harbor以 Docker 公司开源的Registry 为基础&#xff0c;提供了图形管理UI、基于角色的访问控制(Role Based AccessControl)、AD/LD…

python习题004--使用python实现ATM机效果

相信每一个人在日常生活中使用ATM机的次数不少&#xff0c;尤其是现在微信支付&#xff0c;支付宝支付的普及&#xff0c;就需要用到ATM机并将现金存入银行卡内。 接下来我就使用python并结合面向对象的知识编写一个简易的ATM机存取款的效果【仅供参考】。 题目 简易的ATM机…

K8S-1.18.20高可用集群之部署集群监控系统kube-prometheus插件

K8S-1.18.20高可用集群之部署集群插件-KUBE-PROMETHEUS插件 一、简介 kube-prometheus 是一整套监控解决方案&#xff0c;它使用 Prometheus 采集集群指标&#xff0c;Grafana 做展示&#xff0c;包含如下组件&#xff1a; The Prometheus Operator Highly available Promet…

某车app登录参数分析

目标app: Y29tLmNoZTE2OC51Y2RlYWxlcg== 抓取登录包如下: 提交的data参数: _appid (app标识,固定值) _sign(加密) appversion(app版本,固定值) channelid(固定值) pass (密码加密) udid (加密) user (手机号) 共有_sign、 pass、udid三处加密, 其中_sign预估md5加密, pa…

XShell与XFtp的安装及简单使用

目录 XShell的安装 XFtp的安装 测试XShell和XFtp XShell的安装 1、完成解压后如下&#xff0c;所示 2、双击exe文件进行安装 3、选中免费为家庭/学校 &#xff08;因为XShell的商业版和免费版功能是一样的&#xff0c;区别在于可以开启的窗口数量的不同&#xff09; 4、勾选 …

RK3588移植-ffmpeg交叉编译

文章目录1.下载ffmpeg2.交叉编译3.修改cmakelist.txt4.将lib文件复制到install目录下的lib目录5.测试文件6.运行测试样例7.错误n.测试文件源码1.下载ffmpeg git clone https://git.ffmpeg.org/ffmpeg.git ffmpeg 2.交叉编译 进入下载目录&#xff0c;将ffmpeg编译成arm64平台…

【菜菜的sklearn课堂笔记】聚类算法Kmeans-重要参数n_clusters

视频作者&#xff1a;菜菜TsaiTsai 链接&#xff1a;【技术干货】菜菜的机器学习sklearn【全85集】Python进阶_哔哩哔哩_bilibili n_clusters是KMeans中的k&#xff0c;表示着我们告诉模型我们要分几类。这是KMeans当中唯一一个必填的参数&#xff0c;默认为8类&#xff0c;但通…

glxy_阿里云存储

阿里云OSS储存 讲师的添加实现&#xff1a;oss服务 访问并登陆阿里云&#xff0c;&#xff0c;实名认证 产品分类---->对象储存OSS 开通OSS 进入管理控制台 使用OSS前先创建bucket java 代码实现 准备工作&#xff1a;创建操作阿里云oss许可证&#xff08;阿里云颁发…

1545_AURIX_TC275_CPU子系统_存储以及性能

全部学习汇总&#xff1a; GreyZhang/g_TC275: happy hacking for TC275! (github.com) 章节的标题不是我写的标题&#xff0c;但是从功能分类看的话基本是&#xff0c;暂且如此总结。 1. DSYNC的命令之前看过&#xff0c;因为cache是具有不可见性的&#xff0c;通过这个命令可…

mysql分区表的增删改查操作

mysql分区表的增删改查操作一、mysql创建表分区二、基本分区信息查询2.1 查看mysql版本是否支持分区2.2 查看表是否为分区表2.2.1 查询表分区信息2.2.2 查看表的所有分区三、分区表的查询操作四、分区表的增删改操作4.1 新增分区4.1.1 给已有的表加上分区4.1.2 新增分区4.2 重新…

面试字节,过关斩将到 3 面,结果找了个架构师来吊打我?

人人都有大厂梦&#xff0c;对于程序员来说&#xff0c;BAT 为首的一线互联网公司肯定是自己的心仪对象&#xff0c;毕竟能到这些大厂工作&#xff0c;不仅薪资高待遇好&#xff0c;而且能力技术都能够得到提升&#xff0c;最关键的是还能够给自己镀上一层金&#xff0c;让人瞻…

物联网卡与共享饮水机的关系

近些年来居民更加注重饮水健康&#xff0c;对水质&#xff0c;口感的要求明显提升&#xff0c;饮水机市场高速发展&#xff0c;很多小区&#xff0c;校园&#xff0c;公园等公共场所都出现了共享饮水机。共享饮水机随处可见&#xff0c;马路边、商场里、社区里的社区直饮水机等…

Java+JSP+MySQL基于SSM的扶贫信息管理系统-计算机毕业设计

项目介绍 扶贫信息管理系统采用了B/S(浏览器/服务器)体系结构&#xff0c;JAVA作为开发语言&#xff0c;MySQL作为数据存储进行开发&#xff0c;Tomcat作为WEB服务器&#xff0c;开发平台windows&#xff0c;开发工具采用比elicpse更好用的myeclipse。系统角色分为用户和管理员…