ZMQ之高可靠对称节点--双子星模式

news2024/11/23 9:06:57

一、概览

        双子星模式是一对具有主从机制的高可靠节点。任一时间,某个节点会充当主机,接收所有客户端的请求;另一个则作为一种备机存在。两个节点会互相监控对方,当主机从网络中消失时,备机会替代主机的位置。

        双子星模式由Pieter Hintjens和Martin Sustrik设计,应用在iMatix的OpenAMQ服务器中。它的设计理念是:

                1、提供一种简明的高可靠性解决方案;

                2、易于理解和使用;

                3、能够进行可靠的故障切换。

        假设我们有一组双子星模式的服务器,以下是可能发生的故障:

                1、主机发生硬件故障(断电、失火等),应用程序发送后立刻使用备机进行连接。

                2、主机的网络环境发生故障,可能某个路由器被雷击了,立刻使用备机。

                3、主机上的服务被维护人员误杀,无法自动恢复。

        恢复步骤如下:

                1、维护人员排查主机故障。

                2、将备机关闭,造成短时间的服务不可用。

                3、待应用程序都连接到主机后,维护人员重启备机。

        恢复过程是人工进行的,惨痛的经验告诉我们自动恢复是很可怕的:

                1、故障的发生会造成10-30秒之间的服务暂停,如果这是一个真正的突发状况,那最好还是让主机暂停服务的好,因为立刻重启服务可能造成另一个10-30秒的暂停,不如让用户停止使用。

                2、当有紧急状况发生时,可以在修复的过程中记录故障发生原因,而不是让系统自动恢复,管理员因此无法用其经验抵御下一次突发状况。

                3、最后,如果自动恢复确实成功了,管理员将无从得知故障的发生原因,因而无法进行分析。

        双子星模式的故障恢复过程是:在修复了主机的问题后,将备机做关闭处理,稍后再重新开启:

        双子星模式的关闭过程有两种:

                1、先关闭备机,等待一段时间后再关闭主机。

                2、同时关闭主机和备机,间隔时间不超过几秒。

        关闭时,间隔时间要比故障切换时间短,否则会导致应用程序失去连接、重新连接、并再次失去连接,导致用户投诉。

二、详细要求

        双子星模式可以非常简单,但能工作得很出色。事实上,这里的实现方法已经历经三个版本了,之前的版本都过于复杂,想要做太多的事情,因而被我们抛弃。我们需要的只是最基本的功能,能够提供易理解、易开发、高可靠的解决方法就可以了。

        以下是该架构的详细需求:

                1、需要用到双子星模式的故障是:系统遭受灾难性的打击,如硬件崩溃、火灾、意外等。对于其他常规的服务器故障,可以用更简单的方法。

                2、故障恢复时间应该在60秒以内,理想情况下应该在10秒以内;

                3、故障恢复(failover)应该是自动完成的,而系统还原(recover)则是由人工完成的。我们希望应用程序能够在发生故障时自动从主机切换到备机,但不希望在问题解决之前自动切换回主机,因为这很有可能让主机再次崩溃。

                4、程序的逻辑应该尽量简单,易于使用,最好能封装在API中;

                5、需要提供一个明确的指示,哪台主机正在提供服务,以避免“精神分裂”的症状,即两台服务器都认为自己是主机;

                6、两台服务器的启动顺序不应该有限制;

                7、启动或关闭主从机时不需要更改客户端的配置,但有可能会中断连接;

                8、管理员需要能够同时监控两台机器;

                9、两台机器之间必须有专用的高速网络连接,必须能使用特定IP进行路由。

        我们做如下架假设:

                1、单台备机能够提供足够的保障,不需要再进行其他备份机制;

                2、主从机应该都能够提供完整的服务,承载相同的压力,不需要进行负载均衡;

                3、预算中允许有这样一台长时间闲置的备机。

        双子星模式不会用到:

                1、多台备机,或在主从机之间进行负载均衡。该模式中的备机将一直处于空闲状态,只有主机发生问题时才会工作。

                2、处理持久化的消息或事务。我们假设所连接的网络是不可靠的(或不可信的)。

                3、自动搜索网络。双子星模式是手工配置的,他们知道对方的存在,应用程序则知道双子星的存在。

                4、主从机之间状态的同步。所有服务端的状态必须能由应用程序进行重建。

        以下是双子星模式中的几个术语:

                1、主机 - 通常情况下作为master的机器;

                2、备机 - 通常情况下作为slave的机器,只有当主机从网络中消失时,备机才会切换成master状态,接收所有的应用程序请求;

                3、master - 双子星模式中接收应用程序请求的机器;同一时刻只有一台master;

                4、slave - 当master消失时用以顶替的机器。

        配置双子星模式的步骤:

                1、让主机知道备机的位置。

                2、让备机知道主机的位置。

                3、调整故障恢复时间,两台机器的配置必须相同。

        比较重要的配置是应让两台机器间隔多久检查一次对方的状态,以及多长时间后采取行动。在我们的示例中,故障恢复时间设置为2000毫秒,超过这个时间备机就会代替主机的位置。但若你将主机的服务包裹在一个shell脚本中进行重启,就需要延长这个时间,否则备机可能在主机恢复连接的过程中转换成master。

        要让客户端应用程序和双子星模式配合,你需要做的是:

                1、知道两台服务器的地址。

                2、尝试连接主机,若失败则连接备机。

                3、检测失效的连接,一般使用心跳机制。

                4、尝试重连主机,然后再连接备机,其间的间隔应比服务器故障恢复时间长。

                5、重建服务器端需要的所有状态数据。

                6、如果要保证可靠性,应重发故障期间的消息。

        这不是件容易的事,所以我们一般会将其封装成一个API,供程序员使用。

        双子星模式的主要限制有:

                1、服务端进程不能涉及到一个以上的双子星对称节点;

                2、主机只能有一个备机;

                3、当备机于slave状态时,它不会处理任何请求。

                4、备机必须能够承受所有的应用程序请求。

                5、故障恢复时间不能在运行时调整。

                6、客户端应用程序需要做一些重连的工作。

三、防止精神分裂

        “精神分裂”症状指的是一个集群中的不同部分同时认为自己是master,从而停止对对方的检测。双子星模式中的算法会降低这种症状的发生几率:主备机在决定自己是否为master时会检测自身是否收到了应用程序的请求,以及对方是否已经从网络中消失。

        但在某些情况下,双子星模式也会发生精神分裂。比如说,主备机被配置在两幢大楼里,每幢大楼的局域网中又分布了一些应用程序。这样,当两幢大楼的网络通信被阻断,双子星模式的主备机就会分别在两幢大楼里接受和处理请求。

        为了防止精神分裂,我们必须让主备机使用专用的网络进行连接,最简单的方法当然是用一根双绞线将他们相连。

        我们不能将双子星部署在两个不同的岛屿上,为各自岛屿的应用程序服务。这种情况下,我们会使用诸如联邦模式的机制进行可靠性设计。

        最好但最夸张的做法是,将两台机器之间的连接和应用程序的连接完全隔离开来,甚至是使用不同的网卡,而不仅仅是不同的端口。这样做也是为了日后排查错误时更为明确。

四、实现双子星模式

        闲话少说,下面是双子星模式的服务端代码:

        bstarsrv: Binary Star server in C

//
//  双子星模式 - 服务端
//
#include "czmq.h"
 
//  发送状态信息的间隔时间
//  如果对方在两次心跳过后都没有应答,则视为断开
#define HEARTBEAT 1000          //  In msecs
 
//  服务器状态枚举
typedef enum {
    STATE_PRIMARY = 1,          //  主机,等待同伴连接
    STATE_BACKUP = 2,           //  备机,等待同伴连接
    STATE_ACTIVE = 3,           //  激活态,处理应用程序请求
    STATE_PASSIVE = 4           //  被动态,不接收请求
} state_t;
 
//  对话节点事件
typedef enum {
    PEER_PRIMARY = 1,           //  主机
    PEER_BACKUP = 2,            //  备机
    PEER_ACTIVE = 3,            //  激活态
    PEER_PASSIVE = 4,           //  被动态
    CLIENT_REQUEST = 5          //  客户端请求
} event_t;
 
//  有限状态机
typedef struct {
    state_t state;              //  当前状态
    event_t event;              //  当前事件
    int64_t peer_expiry;        //  判定节点死亡的时限
} bstar_t;
 
 
//  执行有限状态机(将事件绑定至状态);
//  发生异常时返回TRUE。
 
static Bool
s_state_machine (bstar_t *fsm)
{
    Bool exception = FALSE;
    //  主机等待同伴连接
    //  该状态下接收CLIENT_REQUEST事件
    if (fsm->state == STATE_PRIMARY) {
        if (fsm->event == PEER_BACKUP) {
            printf ("I: 已连接至备机(slave),可以作为master运行。\n");
            fsm->state = STATE_ACTIVE;
        }
        else
        if (fsm->event == PEER_ACTIVE) {
            printf ("I: 已连接至备机(master),可以作为slave运行。\n");
            fsm->state = STATE_PASSIVE;
        }
    }
    else
    //  备机等待同伴连接
    //  该状态下拒绝CLIENT_REQUEST事件
    if (fsm->state == STATE_BACKUP) {
        if (fsm->event == PEER_ACTIVE) {
            printf ("I: 已连接至主机(master),可以作为slave运行。\n");
            fsm->state = STATE_PASSIVE;
        }
        else
        if (fsm->event == CLIENT_REQUEST)
            exception = TRUE;
    }
    else
    //  服务器处于激活态
    //  该状态下接受CLIENT_REQUEST事件
    if (fsm->state == STATE_ACTIVE) {
        if (fsm->event == PEER_ACTIVE) {
            //  若出现两台master,则抛出异常
            printf ("E: 严重错误:双master。正在退出。\n");
            exception = TRUE;
        }
    }
    else
    //  服务器处于被动态
    //  若同伴已死,CLIENT_REQUEST事件将触发故障恢复
    if (fsm->state == STATE_PASSIVE) {
        if (fsm->event == PEER_PRIMARY) {
            //  同伴正在重启 - 转为激活态,同伴将转为被动态。
            printf ("I: 主机(slave)正在重启,可作为master运行。\n");
            fsm->state = STATE_ACTIVE;
        }
        else
        if (fsm->event == PEER_BACKUP) {
            //  同伴正在重启 - 转为激活态,同伴将转为被动态。
            printf ("I: 备机(slave)正在重启,可作为master运行。\n");
            fsm->state = STATE_ACTIVE;
        }
        else
        if (fsm->event == PEER_PASSIVE) {
            //  若出现两台slave,集群将无响应
            printf ("E: 严重错误:双slave。正在退出\n");
            exception = TRUE;
        }
        else
        if (fsm->event == CLIENT_REQUEST) {
            //  若心跳超时,同伴将成为master;
            //  此行为由客户端请求触发。
            assert (fsm->peer_expiry > 0);
            if (zclock_time () >= fsm->peer_expiry) {
                //  同伴已死,转为激活态。
                printf ("I: 故障恢复,可作为master运行。\n");
                fsm->state = STATE_ACTIVE;
            }
            else
                //  同伴还在,拒绝请求。
                exception = TRUE;
        }
    }
    return exception;
}
 
 
int main (int argc, char *argv [])
{
    //  命令行参数可以为:
    //      -p  作为主机启动, at tcp://localhost:5001
    //      -b  作为备机启动, at tcp://localhost:5002
    zctx_t *ctx = zctx_new ();
    void *statepub = zsocket_new (ctx, ZMQ_PUB);
    void *statesub = zsocket_new (ctx, ZMQ_SUB);
    void *frontend = zsocket_new (ctx, ZMQ_ROUTER);
    bstar_t fsm = { 0 };
 
    if (argc == 2 && streq (argv [1], "-p")) {
        printf ("I: 主机master,等待备机(slave)连接。\n");
        zsocket_bind (frontend, "tcp://*:5001");
        zsocket_bind (statepub, "tcp://*:5003");
        zsocket_connect (statesub, "tcp://localhost:5004");
        fsm.state = STATE_PRIMARY;
    }
    else
    if (argc == 2 && streq (argv [1], "-b")) {
        printf ("I: 备机slave,等待主机(master)连接。\n");
        zsocket_bind (frontend, "tcp://*:5002");
        zsocket_bind (statepub, "tcp://*:5004");
        zsocket_connect (statesub, "tcp://localhost:5003");
        fsm.state = STATE_BACKUP;
    }
    else {
        printf ("Usage: bstarsrv { -p | -b }\n");
        zctx_destroy (&ctx);
        exit (0);
    }
    //  设定下一次发送状态的时间
    int64_t send_state_at = zclock_time () + HEARTBEAT;
 
    while (!zctx_interrupted) {
        zmq_pollitem_t items [] = {
            { frontend, 0, ZMQ_POLLIN, 0 },
            { statesub, 0, ZMQ_POLLIN, 0 }
        };
        int time_left = (int) ((send_state_at - zclock_time ()));
        if (time_left < 0)
            time_left = 0;
        int rc = zmq_poll (items, 2, time_left * ZMQ_POLL_MSEC);
        if (rc == -1)
            break;              //  上下文对象被关闭
 
        if (items [0].revents & ZMQ_POLLIN) {
            //  收到客户端请求
            zmsg_t *msg = zmsg_recv (frontend);
            fsm.event = CLIENT_REQUEST;
            if (s_state_machine (&fsm) == FALSE)
                //  返回应答
                zmsg_send (&msg, frontend);
            else
                zmsg_destroy (&msg);
        }
        if (items [1].revents & ZMQ_POLLIN) {
            //  收到状态消息,作为事件处理
            char *message = zstr_recv (statesub);
            fsm.event = atoi (message);
            free (message);
            if (s_state_machine (&fsm))
                break;          //  错误,退出。
            fsm.peer_expiry = zclock_time () + 2 * HEARTBEAT;
        }
        //  定时发送状态信息
        if (zclock_time () >= send_state_at) {
            char message [2];
            sprintf (message, "%d", fsm.state);
            zstr_send (statepub, message);
            send_state_at = zclock_time () + HEARTBEAT;
        }
    }
    if (zctx_interrupted)
        printf ("W: 中断\n");
 
    //  关闭套接字和上下文
    zctx_destroy (&ctx);
    return 0;
}

        下面是客户端代码:

        bstarcli: Binary Star client in C

//
//  双子星模式 - 客户端
//
#include "czmq.h"
 
#define REQUEST_TIMEOUT     1000    //  毫秒
#define SETTLE_DELAY        2000    //  超时时间
 
int main (void)
{
    zctx_t *ctx = zctx_new ();
 
    char *server [] = { "tcp://localhost:5001", "tcp://localhost:5002" };
    uint server_nbr = 0;
 
    printf ("I: 正在连接服务器 %s...\n", server [server_nbr]);
    void *client = zsocket_new (ctx, ZMQ_REQ);
    zsocket_connect (client, server [server_nbr]);
 
    int sequence = 0;
    while (!zctx_interrupted) {
        //  发送请求并等待应答
        char request [10];
        sprintf (request, "%d", ++sequence);
        zstr_send (client, request);
 
        int expect_reply = 1;
        while (expect_reply) {
            //  轮询套接字
            zmq_pollitem_t items [] = { { client, 0, ZMQ_POLLIN, 0 } };
            int rc = zmq_poll (items, 1, REQUEST_TIMEOUT * ZMQ_POLL_MSEC);
            if (rc == -1)
                break;          //  中断
 
            //  处理应答
            if (items [0].revents & ZMQ_POLLIN) {
                //  审核应答编号
                char *reply = zstr_recv (client);
                if (atoi (reply) == sequence) {
                    printf ("I: 服务端应答正常 (%s)\n", reply);
                    expect_reply = 0;
                    sleep (1);  //  每秒发送一个请求
                }
                else {
                    printf ("E: 错误的应答内容: %s\n",
                        reply);
                }
                free (reply);
            }
            else {
                printf ("W: 服务器无响应,正在重试\n");
                //  重开套接字
                zsocket_destroy (ctx, client);
                server_nbr = (server_nbr + 1) % 2;
                zclock_sleep (SETTLE_DELAY);
                printf ("I: 正在连接服务端 %s...\n",
                        server [server_nbr]);
                client = zsocket_new (ctx, ZMQ_REQ);
                zsocket_connect (client, server [server_nbr]);
 
                //  使用新套接字重发请求
                zstr_send (client, request);
            }
        }
    }
    zctx_destroy (&ctx);
    return 0;
}

        运行以下命令进行测试,顺序随意:

bstarsrv -p     # Start primary
bstarsrv -b     # Start backup
bstarcli

        可以将主机进程杀掉,测试故障恢复机制;再开启主机,杀掉备机,查看还原机制。要注意是由客户端触发这两个事件的。

        下图展现了服务进程的状态图。绿色状态下会接收客户端请求,粉色状态会拒绝请求。事件指的是同伴的状态,所以“同伴激活态”指的是同伴机器告知我们它处于激活态。“客户请求”表示我们从客户端获得了请求,“客户投票”则指我们从客户端获得了请求并且同伴已经超时死亡。

        需要注意的是,服务进程使用PUB-SUB套接字进行状态交换,其它类型的套接字在这里不适用。比如,PUSH和DEALER套接字在没有节点相连的时候会发生阻塞;PAIR套接字不会在节点断开后进行重连;ROUTER套接字需要地址才能发送消息。

        These are the main limitations of the Binary Star pattern:

                1、A server process cannot be part of more than one Binary Star pair.

                2、A primary server can have a single backup server, no more.

                3、The backup server cannot do useful work while in slave mode.

                4、The backup server must be capable of handling full application loads.

                5、Failover configuration cannot be modified at runtime.

                6、Client applications must do some work to benefit from failover.

五、双子星反应堆

        我们可以将双子星模式打包成一个类似反应堆的类,供以后复用。在C语言中,我们使用czmq的zloop类,其他语言应该会有相应的实现。以下是C语言版的bstar接口:

// 创建双子星模式实例,使用本地(绑定)和远程(连接)端点来设置节点对。
bstar_t *bstar_new (int primary, char *local, char *remote);
 
// 销毁实例
void bstar_destroy (bstar_t **self_p);
 
// 返回底层的zloop反应堆,用以添加定时器、读取器、注册和取消等功能。
zloop_t *bstar_zloop (bstar_t *self);
 
// 注册投票读取器
int bstar_voter (bstar_t *self, char *endpoint, int type,
zloop_fn handler, void *arg);
 
// 注册状态机处理器
void bstar_new_master (bstar_t *self, zloop_fn handler, void *arg);
void bstar_new_slave (bstar_t *self, zloop_fn handler, void *arg);
 
// 开启反应堆,当回调函数返回-1,或进程收到SIGINT、SIGTERM信号时中止。
int bstar_start (bstar_t *self);

        以下是类的实现:

        bstar: Binary Star core class in C

/*  =====================================================================
    bstar - Binary Star reactor
 
    ---------------------------------------------------------------------
    Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
    Copyright other contributors as noted in the AUTHORS file.
 
    This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
 
    This is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or (at
    your option) any later version.
 
    This software is distributed in the hope that it will be useful, but
    WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    Lesser General Public License for more details.
 
    You should have received a copy of the GNU Lesser General Public
    License along with this program. If not, see
    <http://www.gnu.org/licenses/>.
    =====================================================================
*/
 
#include "bstar.h"
 
//  服务器状态枚举
typedef enum {
    STATE_PRIMARY = 1,          //  主机,等待同伴连接
    STATE_BACKUP = 2,           //  备机,等待同伴连接
    STATE_ACTIVE = 3,           //  激活态,处理应用程序请求
    STATE_PASSIVE = 4           //  被动态,不接收请求
} state_t;
 
//  对话节点事件
typedef enum {
    PEER_PRIMARY = 1,           //  主机
    PEER_BACKUP = 2,            //  备机
    PEER_ACTIVE = 3,            //  激活态
    PEER_PASSIVE = 4,           //  被动态
    CLIENT_REQUEST = 5          //  客户端请求
} event_t;
 
 
//  发送状态信息的间隔时间
//  如果对方在两次心跳过后都没有应答,则视为断开
#define BSTAR_HEARTBEAT     1000        //  In msecs
 
//  类结构
 
struct _bstar_t {
    zctx_t *ctx;                //  私有上下文
    zloop_t *loop;              //  反应堆循环
    void *statepub;             //  状态发布者
    void *statesub;             //  状态订阅者
    state_t state;              //  当前状态
    event_t event;              //  当前事件
    int64_t peer_expiry;        //  判定节点死亡的时限
    zloop_fn *voter_fn;         //  投票套接字处理器
    void *voter_arg;            //  投票处理程序的参数
    zloop_fn *master_fn;        //  成为master时回调
    void *master_arg;           //  参数
    zloop_fn *slave_fn;         //  成为slave时回调
    void *slave_arg;            //  参数
};
 
 
//  ---------------------------------------------------------------------
//  执行有限状态机(将事件绑定至状态);
//  发生异常时返回-1,正确时返回0。
 
static int
s_execute_fsm (bstar_t *self)
{
    int rc = 0;
    //  主机等待同伴连接
    //  该状态下接收CLIENT_REQUEST事件
    if (self->state == STATE_PRIMARY) {
        if (self->event == PEER_BACKUP) {
            zclock_log ("I: 已连接至备机(slave),可以作为master运行。");
            self->state = STATE_ACTIVE;
            if (self->master_fn)
                (self->master_fn) (self->loop, NULL, self->master_arg);
        }
        else
        if (self->event == PEER_ACTIVE) {
            zclock_log ("I: 已连接至备机(master),可以作为slave运行。");
            self->state = STATE_PASSIVE;
            if (self->slave_fn)
                (self->slave_fn) (self->loop, NULL, self->slave_arg);
        }
        else
        if (self->event == CLIENT_REQUEST) {
            zclock_log ("I: 收到客户端请求,可作为master运行。");
            self->state = STATE_ACTIVE;
            if (self->master_fn)
                (self->master_fn) (self->loop, NULL, self->master_arg);
        }
    }
    else
    //  备机等待同伴连接
    //  该状态下拒绝CLIENT_REQUEST事件
    if (self->state == STATE_BACKUP) {
        if (self->event == PEER_ACTIVE) {
            zclock_log ("I: 已连接至主机(master),可以作为slave运行。");
            self->state = STATE_PASSIVE;
            if (self->slave_fn)
                (self->slave_fn) (self->loop, NULL, self->slave_arg);
        }
        else
        if (self->event == CLIENT_REQUEST)
            rc = -1;
    }
    else
    //  服务器处于激活态
    //  该状态下接受CLIENT_REQUEST事件
    //  只有服务器死亡才会离开激活态
    if (self->state == STATE_ACTIVE) {
        if (self->event == PEER_ACTIVE) {
            //  若出现两台master,则抛出异常
            zclock_log ("E: 严重错误:双master。正在退出。");
            rc = -1;
        }
    }
    else
    //  服务器处于被动态
    //  若同伴已死,CLIENT_REQUEST事件将触发故障恢复
    if (self->state == STATE_PASSIVE) {
        if (self->event == PEER_PRIMARY) {
            //  同伴正在重启 - 转为激活态,同伴将转为被动态。
            zclock_log ("I: 主机(slave)正在重启,可作为master运行。");
            self->state = STATE_ACTIVE;
        }
        else
        if (self->event == PEER_BACKUP) {
            //  同伴正在重启 - 转为激活态,同伴将转为被动态。
            zclock_log ("I: 备机(slave)正在重启,可作为master运行。");
            self->state = STATE_ACTIVE;
        }
        else
        if (self->event == PEER_PASSIVE) {
            //  若出现两台slave,集群将无响应
            zclock_log ("E: 严重错误:双slave。正在退出");
            rc = -1;
        }
        else
        if (self->event == CLIENT_REQUEST) {
            //  若心跳超时,同伴将成为master;
            //  此行为由客户端请求触发。
            assert (self->peer_expiry > 0);
            if (zclock_time () >= self->peer_expiry) {
                //  同伴已死,转为激活态。
                zclock_log ("I: 故障恢复,可作为master运行。");
                self->state = STATE_ACTIVE;
            }
            else
                //  同伴还在,拒绝请求。
                rc = -1;
        }
        //  触发状态更改事件处理函数
        if (self->state == STATE_ACTIVE && self->master_fn)
            (self->master_fn) (self->loop, NULL, self->master_arg);
    }
    return rc;
}
 
 
//  ---------------------------------------------------------------------
//  反应堆事件处理程序
 
//  发送状态信息
int s_send_state (zloop_t *loop, void *socket, void *arg)
{
    bstar_t *self = (bstar_t *) arg;
    zstr_sendf (self->statepub, "%d", self->state);
    return 0;
}
 
//  接收状态信息,启动有限状态机
int s_recv_state (zloop_t *loop, void *socket, void *arg)
{
    bstar_t *self = (bstar_t *) arg;
    char *state = zstr_recv (socket);
    if (state) {
        self->event = atoi (state);
        self->peer_expiry = zclock_time () + 2 * BSTAR_HEARTBEAT;
        free (state);
    }
    return s_execute_fsm (self);
}
 
//  收到应用程序请求,判断是否接收
int s_voter_ready (zloop_t *loop, void *socket, void *arg)
{
    bstar_t *self = (bstar_t *) arg;
    //  如果能够处理请求,则调用函数
    self->event = CLIENT_REQUEST;
    if (s_execute_fsm (self) == 0) {
        puts ("CLIENT REQUEST");
        (self->voter_fn) (self->loop, socket, self->voter_arg);
    }
    else {
        //  销毁等待中的消息
        zmsg_t *msg = zmsg_recv (socket);
        zmsg_destroy (&msg);
    }
    return 0;
}
 
 
//  ---------------------------------------------------------------------
//  构造函数
 
bstar_t *
bstar_new (int primary, char *local, char *remote)
{
    bstar_t
        *self;
 
    self = (bstar_t *) zmalloc (sizeof (bstar_t));
 
    //  初始化双子星
    self->ctx = zctx_new ();
    self->loop = zloop_new ();
    self->state = primary? STATE_PRIMARY: STATE_BACKUP;
 
    //  创建状态PUB套接字
    self->statepub = zsocket_new (self->ctx, ZMQ_PUB);
    zsocket_bind (self->statepub, local);
 
    //  创建状态SUB套接字
    self->statesub = zsocket_new (self->ctx, ZMQ_SUB);
    zsocket_connect (self->statesub, remote);
 
    //  设置基本的反应堆事件处理器
    zloop_timer (self->loop, BSTAR_HEARTBEAT, 0, s_send_state, self);
    zloop_reader (self->loop, self->statesub, s_recv_state, self);
    return self;
}
 
 
//  ---------------------------------------------------------------------
//  析构函数
 
void
bstar_destroy (bstar_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        bstar_t *self = *self_p;
        zloop_destroy (&self->loop);
        zctx_destroy (&self->ctx);
        free (self);
        *self_p = NULL;
    }
}
 
 
//  ---------------------------------------------------------------------
//  返回底层zloop对象,用以添加额外的定时器、阅读器等。
 
zloop_t *
bstar_zloop (bstar_t *self)
{
    return self->loop;
}
 
 
//  ---------------------------------------------------------------------
//  创建套接字,连接至本地端点,注册成为阅读器;
//  只有当有限状态机允许时才会读取该套接字;
//  从该套接字获得的消息将作为一次“投票”;
//  我们要求双子星模式中只有一个“投票”套接字。
 
int
bstar_voter (bstar_t *self, char *endpoint, int type, zloop_fn handler,
             void *arg)
{
    //  保存原始的回调函数和参数,稍后使用
    void *socket = zsocket_new (self->ctx, type);
    zsocket_bind (socket, endpoint);
    assert (!self->voter_fn);
    self->voter_fn = handler;
    self->voter_arg = arg;
    return zloop_reader (self->loop, socket, s_voter_ready, self);
}
 
//  ---------------------------------------------------------------------
//  注册状态变化事件处理器
 
void
bstar_new_master (bstar_t *self, zloop_fn handler, void *arg)
{
    assert (!self->master_fn);
    self->master_fn = handler;
    self->master_arg = arg;
}
 
void
bstar_new_slave (bstar_t *self, zloop_fn handler, void *arg)
{
    assert (!self->slave_fn);
    self->slave_fn = handler;
    self->slave_arg = arg;
}
 
 
//  ---------------------------------------------------------------------
//  启用或禁止跟踪信息
void bstar_set_verbose (bstar_t *self, Bool verbose)
{
    zloop_set_verbose (self->loop, verbose);
}
 
 
//  ---------------------------------------------------------------------
//  开启反应堆,当回调函数返回-1,或进程收到SIGINT、SIGTERM信号时中止。
 
int
bstar_start (bstar_t *self)
{
    assert (self->voter_fn);
    return zloop_start (self->loop);
}

        这样一来,我们的服务端代码会变得非常简短:

        bstarsrv2: Binary Star server, using core class in C

//
//  双子星模式服务端,使用bstar反应堆
//
 
//  直接编译,不建类库
#include "bstar.c"
 
//  Echo service
int s_echo (zloop_t *loop, void *socket, void *arg)
{
    zmsg_t *msg = zmsg_recv (socket);
    zmsg_send (&msg, socket);
    return 0;
}
 
int main (int argc, char *argv [])
{
    //  命令行参数可以为:
    //      -p  作为主机启动, at tcp://localhost:5001
    //      -b  作为备机启动, at tcp://localhost:5002
    bstar_t *bstar;
    if (argc == 2 && streq (argv [1], "-p")) {
        printf ("I: 主机master,等待备机(slave)连接。\n");
        bstar = bstar_new (BSTAR_PRIMARY,
            "tcp://*:5003", "tcp://localhost:5004");
        bstar_voter (bstar, "tcp://*:5001", ZMQ_ROUTER, s_echo, NULL);
    }
    else
    if (argc == 2 && streq (argv [1], "-b")) {
        printf ("I: 备机slave,等待主机(master)连接。\n");
        bstar = bstar_new (BSTAR_BACKUP,
            "tcp://*:5004", "tcp://localhost:5003");
        bstar_voter (bstar, "tcp://*:5002", ZMQ_ROUTER, s_echo, NULL);
    }
    else {
        printf ("Usage: bstarsrvs { -p | -b }\n");
        exit (0);
    }
    bstar_start (bstar);
    bstar_destroy (&bstar);
    return 0;
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/51132.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

gateway网关聚合knife4j文档,同时兼容swagger2与swagger3

基于前两篇文章&#xff0c;进行整合 springcloud-gateway 聚合swagger3请求接口丢失appliactionName解决 springcloud-gateway聚合knife4j接口文档 为何要兼容&#xff1f;微服务开发者有的使用了swagger2版本&#xff0c;有的使用了swagger3版本&#xff0c;但暴露外部给前…

聊一聊我的第一个开源项目

项目地址&#xff1a;https://github.com/kpretty/hdd 我在21年的国庆写过一篇文章&#xff1a;《Docker 实战&#xff1a;部署hadoop集群》&#xff0c;当时也是刚接触docker&#xff0c;作为docker第一个练手项目对很多概念理解的不是很到位&#xff0c;因此那篇文章所使用的…

基于PHP+MySQL菜品食谱美食网站的设计与实现

美食是人类永恒的追求,现在有很多的美食爱好者,他们希望通过自己的各种方式来学习更多的美食制作方式,以及分享自己制作美食的一些过程,说让更多的人。享受到更加美味可口的饭菜。本系统也是基于这样的目的来进行开发的。 本系统是通过PHP&#xff1a;MySQL来进行开发,主要实现…

存储器扩展,画图题

目录 存储器与CPU的接口 地址线的连接 数据线的连接 控制线的连接&#xff08;读写和片选&#xff09; 考题 引出 第一题 第二题 第三题 计算地址范围&#xff08;这里用的38译码器&#xff09; 第四题 填空题 第五题 第六题&#xff08;2017&#xff09; 要求&…

【微信小程序】CSS模块化、使用缓存在本地模拟服务器数据库

&#x1f3c6;今日学习目标&#xff1a;第十五期——CSS模块化、使用缓存在本地模拟服务器数据库 &#x1f603;创作者&#xff1a;颜颜yan_ ✨个人主页&#xff1a;颜颜yan_的个人主页 ⏰预计时间&#xff1a;25分钟 &#x1f389;专栏系列&#xff1a;我的第一个微信小程序 文…

【这款神器可以有】3DMAX一键墙体门洞窗洞插件使用教程

3DMAX一键墙体门洞窗洞插件&#xff0c;只需导入户型图&#xff0c;单/双面墙体一键生成。 【主要功能】 --一键生成墙体 --一键门洞 --一键窗洞 --支持单/双面墙体生成 【安装方法】 无需安装&#xff0c;直接拖动插件脚本到3dmax窗口即可打开插件。 【快速开始】 将3dm…

11.我为 Netty 贡献源码 | 且看 Netty 如何应对 TCP 连接的正常关闭,异常关闭,半关闭场景

我为 Netty 贡献源码 | 且看 Netty 如何应对 TCP 连接的正常关闭&#xff0c;异常关闭&#xff0c;半关闭场景 本系列Netty源码解析文章基于 4.1.56.Final版本 写在前面..... 本文是笔者肉眼盯 Bug 系列的第三弹&#xff0c;前两弹分别是: 抓到Netty一个Bug&#xff0c;顺带来…

【Spring(七)】带你手写一个Spring容器

有关Spring的所有文章都收录于我的专栏&#xff1a;&#x1f449;Spring&#x1f448; 目录 前置准备 第一步、创建我们自定的注解 第二步、创建我们自己的容器类 测试 总结 相关文章 【Spring&#xff08;一&#xff09;】如何获取对象&#xff08;Bean&#xff09;【Spring&a…

CSS伪类使用详解

基本描述 CSS伪类是很常用的功能&#xff0c;主要应用于选择器的关键字&#xff0c;用来改变被选择元素的特殊状态下的样式。 伪类类似于普通CSS类的用法&#xff0c;是对CSS选择器的一种扩展&#xff0c;增强选择器的功能。 目前可用的伪类有大概40多个&#xff0c;少部分有兼…

Spring Bean的生命周期理解

一、Spring Bean的生命周期大的概括起来有四个阶段&#xff1a; 1、实例化 2、属性填充注入 3、初始化使用 4、Bean的销毁 二、如流程图所示 三、步骤说明 1、实例化 实例化一个Bean&#xff0c;即new 2、IOC依赖注入 按照Spring上下文对实例化的Bean进行属性填充注入 3、setB…

昆船智能上市:预计年营收19亿到22.5亿 市值48亿

雷递网 雷建平 11月30日昆船智能技术股份有限公司&#xff08;简称&#xff1a;“昆船智能”&#xff0c;证券代码&#xff1a;301311&#xff09;今日在深交所创业板上市。昆船智能本次发行股票6000万股&#xff0c;发行价为13.88元&#xff0c;募资8.33亿元。昆船智能开盘价为…

2022CTF培训(七)逆向专项练习

附件下载链接 babyre 首先是一个迷宫&#xff0c;由于答案不唯一&#xff0c;因此到 dfs 求出所有路径。 #include <bits/stdc.h>constexpr char s[] "**************.****.**s..*..******.****.***********..***..**..#*..***..***.********************.**..*…

springMVC01,springMVC的执行流程【第一个springMVC例子(XML配置版本):HelloWorld】

springMVC01,springMVC的执行流程【第一个springMVC项目&#xff1a;HelloWorld】springMVC的简介springMVC的执行流程第一个springMVC项目&#xff08;XML配置版本&#xff09;1.创建项目1.1 新建maven项目&#xff1a;1.2 添加web支持1.3 在pom.xml中导入依赖1.4 配置tomcat2…

【云享·人物】华为云AI高级专家白小龙:AI如何释放应用生产力,向AI工程化前行?

摘要&#xff1a;AI技术发展&#xff0c;正由应用落地阶段向效率化生产阶段演进&#xff0c;AI工程化能力将会不断深入业务&#xff0c;释放企业生产力。本文分享自华为云社区《【云享人物】华为云AI高级专家白小龙&#xff1a;AI如何释放应用生产力&#xff0c;向AI工程化前行…

[附源码]Python计算机毕业设计Django飞越青少儿兴趣培训机构管理系统

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

旅游景区地图导览系统,传统导览智慧新升级

地图在景区导览中一直扮演着重要角色。 从传统导览的纸质地图&#xff0c;再到智慧导览的电子地图&#xff0c;游客都可以从景区地图上了解到景点名称、游玩路线、服务设施等内容&#xff0c;帮助游客更好地游览景区。 相比传统的纸质地图导览&#xff0c;电子地图导览系统有哪…

计算机组成原理习题课第四章-4(唐朔飞)

计算机组成原理习题课第四章-4&#xff08;唐朔飞&#xff09; ✨欢迎关注&#x1f5b1;点赞&#x1f380;收藏⭐留言✒ &#x1f52e;本文由京与旧铺原创&#xff0c;csdn首发&#xff01; &#x1f618;系列专栏&#xff1a;java学习 &#x1f4bb;首发时间&#xff1a;&…

TIA博途中通用函数库指令FIFO先入先出的具体使用方法

TIA博途中通用函数库指令FIFO先入先出的具体使用方法 前些天发现了一个巨牛的人工智能学习网站,通俗易懂,风趣幽默,忍不住分享一下给大家。点击跳转到网站。 如下图所示,在TIA博途中添加通用函数库指令,然后在库指令中找到FIFO,鼠标直接拖拽到程序段中,系统会自动生成一…

【毕业设计】10-基于单片机的车站安检门_磁性霍尔传感器系统设计(原理图+源码+仿真工程+答辩论文)

【毕业设计】10-基于单片机的车站安检门/磁性霍尔传感器系统设计&#xff08;原理图源码仿真工程答辩论文&#xff09; 文章目录【毕业设计】10-基于单片机的车站安检门/磁性霍尔传感器系统设计&#xff08;原理图源码仿真工程答辩论文&#xff09;任务书设计说明书摘要设计框架…

https加密解密过程二、名词解析及文件生成

https加密解密过程二、名词解析及文件生成 密钥仓库keystore文件 Keytool是一个Java数据证书的管理工具 &#xff0c;Keytool将密钥(key)和证书(certificates)存在一个称为keystore的文件中 keystore文件的内容其实就是把私钥、公钥以及公钥对应的地址等信息输出为json格式的…