ZMQ之克隆模式的可靠性

news2025/1/15 13:00:24

克隆服务器的可靠性

        克隆模型1至5相对比较简单,下面我们会探讨一个非常复杂的模型。可以发现,为了构建可靠的消息队列,我们需要花费非常多的精力。所以我们经常会问:有必要这么做吗?如果说你能够接受可靠性不够高的、或者说已经足够好的架构,那恭喜你,你在成本和收益之间找到了平衡。虽然我们会偶尔丢失一些消息,但从经济的角度来说还是合理的。不管怎样,下面我们就来介绍这个复杂的模型。

        在模型3中,你会关闭和重启服务,这会导致数据的丢失。任何后续加入的客户端只能得到重启之后的那些数据,而非所有的。下面就让我们想办法让克隆模式能够承担服务器重启的故障。

        以下列举我们需要处理的问题:

                1、克隆服务器进程崩溃并自动或手工重启。进程丢失了所有数据,所以必须从别处进行恢复。

                2、克隆服务器硬件故障,长时间不能恢复。客户端需要切换至另一个可用的服务端。

                3、克隆服务器从网络上断开,如交换机发生故障等。它会在某个时点重连,但期间的数据就需要替代的服务器负责处理。

        第一步我们需要增加一个服务器。我们可以使用第四章中提到的双子星模式,它是一个反应堆,而我们的程序经过整理后也是一个反应堆,因此可以互相协作。

        我们需要保证更新事件在主服务器崩溃时仍能保留,最简单的机制就是同时发送给两台服务器。

        备机就可以当做一台客户端来运行,像其他客户端一样从主机获取更新事件。同时它又能从客户端获取更新事件——虽然不应该以此更新数据,但可以先暂存起来。

        所以,相较于模型5,模型6中引入了以下特性:

                1、客户端发送更新事件改用PUB-SUB套接字,而非PUSH-PULL。原因是PUSH套接字会在没有接收方时阻塞,且会进行负载均衡——我们需要两台服务器都接收到消息。我们会在服务器端绑定SUB套接字,在客户端连接PUB套接字。

                2、我们在服务器发送给客户端的更新事件中加入心跳,这样客户端可以知道主机是否已死,然后切换至备机。

                3、我们使用双子星模式的bstar反应堆类来创建主机和备机。双子星模式中需要有一个“投票”套接字,来协助判定对方节点是否已死。这里我们使用快照请求来作为“投票”。

                4、我们将为所有的更新事件添加UUID属性,它由客户端生成,服务端会将其发布给所有客户端。

                5、备机将维护一个“待处理列表”,保存来自客户端、尚未由服务端发布的更新事件;或者反过来,来自服务端、尚未从客户端收到的更新事件。这个列表从旧到新排列,这样就能方便地从顶部删除消息。

        我们可以为客户端设计一个有限状态机,它有三种状态:

                1、客户端打开并连接了套接字,然后向服务端发送快照请求。为了避免消息风暴,它只会请求两次。

                2、客户端等待快照应答,如果获得了则保存它;如果没有获得,则向第二个服务器发送请求。

                3、客户端收到快照,便开始等待更新事件。如果在一定时间内没有收到服务端响应,则会连接第二个服务端。

        客户端会一直循环下去,可能在程序刚启动时,部分客户端会试图连接主机,部分连接备机,相信双子星模式会很好地处理这一情况的。

        我们可以将客户端状态图绘制出来:

40676bb282b244e3a5a84415dfb6ea0a.png

        故障恢复的步骤如下:

                1、客户端检测到主机不再发送心跳,因此转而连接备机,并请求一份新的快照;

                2、备机开始接收快照请求,并检测到主机死亡,于是开始作为主机运行;

                3、备机将待处理列表中的更新事件写入自身状态中,然后开始处理快照请求。

        当主机恢复连接时:

                1、启动为slave状态,并作为克隆模式客户端连接备机;

                2、同时,使用SUB套接字从客户端接收更新事件。

        我们做两点假设:

                1、至少有一台主机会继续运行。如果两台主机都崩溃了,那我们将丢失所有的服务端数据,无法恢复。

                2、不同的客户端不会同时更新同一个键值对。客户端的更新事件会先后到达两个服务器,因此更新的顺序可能会不一致。单个客户端的更新事件到达两台服务器的顺序是相同的,所以不用担心。

        下面是整体架构图:

ced5fd07772c447f90c6eb52e7b305c4.png

        开始编程之前,我们需要将客户端重构成一个可复用的类。在ZMQ中写异步类有时是为了练习如何写出优雅的代码,但这里我们确实是希望克隆模式可以成为一种易于使用的程序。上述架构的伸缩性来源于客户端的正确行为,因此有必要将其封装成一份API。要在客户端中进行故障恢复还是比较复杂的,试想一下自由者模式和克隆模式结合起来会是什么样的吧。

        按照我的习惯,我会先写出一份API的列表,然后加以实现。让我们假想一个名为clone的API,在其基础之上编写克隆模式客户端API。将代码封装为API显然会提升代码的稳定性,就以模型5为例,客户端需要打开三个套接字,端点名称直接写在了代码里。我们可以创建这样一组API:

    //  为每个套接字指定端点
    clone_subscribe (clone, "tcp://localhost:5556");
    clone_snapshot  (clone, "tcp://localhost:5557");
    clone_updates   (clone, "tcp://localhost:5558");

    //  由于有两个服务端,因此再执行一次
    clone_subscribe (clone, "tcp://localhost:5566");
    clone_snapshot  (clone, "tcp://localhost:5567");
    clone_updates   (clone, "tcp://localhost:5568");

        但这种写法还是比较啰嗦的,因为没有必要将API内部的一些设计暴露给编程人员。现在我们会使用三个套接字,而将来可能就会使用两个,或者四个。我们不可能让所有的应用程序都相应地修改吧?让我们把这些信息包装到API中:

    //  指定主备服务器端点
    clone_connect (clone, "tcp://localhost:5551");
    clone_connect (clone, "tcp://localhost:5561");

        这样一来代码就变得非常简洁,不过也会对现有代码的内部就够造成影响。我们需要从一个端点中推算出三个端点。一种方法是假设客户端和服务端使用三个连续的端点通信,并将这个规则写入协议;另一个方法是向服务器索取缺少的端点信息。我们使用第一种较为简单的方法:

                1、服务器状态ROUTER在端点P;

                2、服务器更新事件PUB在端点P + 1;

                3、服务器更新事件SUB在端点P + 2。

        clone类和第四章的flcliapi类很类似,由两部分组成:

                1、一个在后台运行的异步克隆模式代理。该代理处理所有的I/O操作,实时地和服务器进行通信;

                2、一个在前台应用程序中同步运行的clone类。当你创建了一个clone对象后,它会自动创建后台的clone线程;当你销毁clone对象,该后台线程也会被销毁。

        前台的clone类会使用inproc管道和后台的代理进行通信。C语言中,czmq线程会自动为我们创建这个管道。这也是ZMQ多线程编程的常规方式。

        如果没有ZMQ,这种异步的设计将很难处理高压工作,而ZMQ会让其变得简单。编写出来额代码会相对比较复杂。我们可以用反应堆的模式来编写,但这会进一步增加复杂度,且影响应用程序的使用。因此,我们的设计的API将更像是一个能够和服务器进行通信的键值表:

clone_t *clone_new (void);
void clone_destroy (clone_t **self_p);
void clone_connect (clone_t *self, char *address, char *service);
void clone_set (clone_t *self, char *key, char *value);
char *clone_get (clone_t *self, char *key);

        下面就是克隆模式客户端模型6的代码,因为调用了API,所以非常简短:
        clonecli6: Clone client, Model Six in C

//
//  克隆模式 - 客户端 - 模型6
//
 
//  直接编译,不建类库
#include "clone.c"
 
#define SUBTREE "/client/"
 
int main (void)
{
    //  创建分布式哈希表
    clone_t *clone = clone_new ();
 
    //  配置
    clone_subtree (clone, SUBTREE);
    clone_connect (clone, "tcp://localhost", "5556");
    clone_connect (clone, "tcp://localhost", "5566");
 
    //  插入随机键值
    while (!zctx_interrupted) {
        //  生成随机值
        char key [255];
        char value [10];
        sprintf (key, "%s%d", SUBTREE, randof (10000));
        sprintf (value, "%d", randof (1000000));
        clone_set (clone, key, value, randof (30));
        sleep (1);
    }
    clone_destroy (&clone);
    return 0;
}

        以下是clone类的实现:
        clone: Clone class in C

/*  =====================================================================
    clone - client-side Clone Pattern class
 
    ---------------------------------------------------------------------
    Copyright (c) 1991-2011 iMatix Corporation <www.imatix.com>
    Copyright other contributors as noted in the AUTHORS file.
 
    This file is part of the ZeroMQ Guide: http://zguide.zeromq.org
 
    This is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or (at
    your option) any later version.
 
    This software is distributed in the hope that it will be useful, but
    WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
    Lesser General Public License for more details.
 
    You should have received a copy of the GNU Lesser General Public
    License along with this program. If not, see
    <http://www.gnu.org/licenses/>.
    =====================================================================
*/
 
#include "clone.h"
 
//  请求超时时间
#define GLOBAL_TIMEOUT  4000    //  msecs
//  判定服务器死亡的时间
#define SERVER_TTL      5000    //  msecs
//  服务器数量
#define SERVER_MAX      2
 
 
//  =====================================================================
//  同步部分,在应用程序线程中工作
 
//  ---------------------------------------------------------------------
//  类结构
 
struct _clone_t {
    zctx_t *ctx;                //  上下文
    void *pipe;                 //  和后台代理间的通信套接字
};
 
//  该线程用于处理真正的clone类
static void clone_agent (void *args, zctx_t *ctx, void *pipe);
 
 
//  ---------------------------------------------------------------------
//  构造函数
 
clone_t *
clone_new (void)
{
    clone_t
        *self;
 
    self = (clone_t *) zmalloc (sizeof (clone_t));
    self->ctx = zctx_new ();
    self->pipe = zthread_fork (self->ctx, clone_agent, NULL);
    return self;
}
 
//  ---------------------------------------------------------------------
//  析构函数
 
void
clone_destroy (clone_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        clone_t *self = *self_p;
        zctx_destroy (&self->ctx);
        free (self);
        *self_p = NULL;
    }
}
 
//  ---------------------------------------------------------------------
//  在链接之前指定快照和更新事件的子树
//  发送给后台代理的消息内容为[SUBTREE][subtree]
 
void clone_subtree (clone_t *self, char *subtree)
{
    assert (self);
    zmsg_t *msg = zmsg_new ();
    zmsg_addstr (msg, "SUBTREE");
    zmsg_addstr (msg, subtree);
    zmsg_send (&msg, self->pipe);
}
 
//  ---------------------------------------------------------------------
//  连接至新的服务器端点
//  消息内容:[CONNECT][endpoint][service]
 
void
clone_connect (clone_t *self, char *address, char *service)
{
    assert (self);
    zmsg_t *msg = zmsg_new ();
    zmsg_addstr (msg, "CONNECT");
    zmsg_addstr (msg, address);
    zmsg_addstr (msg, service);
    zmsg_send (&msg, self->pipe);
}
 
//  ---------------------------------------------------------------------
//  设置新值
//  消息内容:[SET][key][value][ttl]
 
void
clone_set (clone_t *self, char *key, char *value, int ttl)
{
    char ttlstr [10];
    sprintf (ttlstr, "%d", ttl);
 
    assert (self);
    zmsg_t *msg = zmsg_new ();
    zmsg_addstr (msg, "SET");
    zmsg_addstr (msg, key);
    zmsg_addstr (msg, value);
    zmsg_addstr (msg, ttlstr);
    zmsg_send (&msg, self->pipe);
}
 
//  ---------------------------------------------------------------------
//  取值
//  消息内容:[GET][key]
//  如果没有clone可用,会返回NULL
 
char *
clone_get (clone_t *self, char *key)
{
    assert (self);
    assert (key);
    zmsg_t *msg = zmsg_new ();
    zmsg_addstr (msg, "GET");
    zmsg_addstr (msg, key);
    zmsg_send (&msg, self->pipe);
 
    zmsg_t *reply = zmsg_recv (self->pipe);
    if (reply) {
        char *value = zmsg_popstr (reply);
        zmsg_destroy (&reply);
        return value;
    }
    return NULL;
}
 
 
//  =====================================================================
//  异步部分,在后台运行
 
//  ---------------------------------------------------------------------
//  单个服务端信息
 
typedef struct {
    char *address;              //  服务端地址
    int port;                   //  端口
    void *snapshot;             //  快照套接字
    void *subscriber;           //  接收更新事件的套接字
    uint64_t expiry;            //  服务器过期时间
    uint requests;              //  收到的快照请求数
} server_t;
 
static server_t *
server_new (zctx_t *ctx, char *address, int port, char *subtree)
{
    server_t *self = (server_t *) zmalloc (sizeof (server_t));
 
    zclock_log ("I: adding server %s:%d...", address, port);
    self->address = strdup (address);
    self->port = port;
 
    self->snapshot = zsocket_new (ctx, ZMQ_DEALER);
    zsocket_connect (self->snapshot, "%s:%d", address, port);
    self->subscriber = zsocket_new (ctx, ZMQ_SUB);
    zsocket_connect (self->subscriber, "%s:%d", address, port + 1);
    zsockopt_set_subscribe (self->subscriber, subtree);
    return self;
}
 
static void
server_destroy (server_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        server_t *self = *self_p;
        free (self->address);
        free (self);
        *self_p = NULL;
    }
}
 
//  ---------------------------------------------------------------------
//  后台代理类
 
//  状态
#define STATE_INITIAL       0   //  连接之前
#define STATE_SYNCING       1   //  正在同步
#define STATE_ACTIVE        2   //  正在更新
 
typedef struct {
    zctx_t *ctx;                //  上下文
    void *pipe;                 //  与主线程通信的套接字
    zhash_t *kvmap;             //  键值表
    char *subtree;              //  子树
    server_t *server [SERVER_MAX];
    uint nbr_servers;           //  范围:0 - SERVER_MAX
    uint state;                 //  当前状态
    uint cur_server;            //  当前master,0/1
    int64_t sequence;           //  键值对编号
    void *publisher;            //  发布更新事件的套接字
} agent_t;
 
static agent_t *
agent_new (zctx_t *ctx, void *pipe)
{
    agent_t *self = (agent_t *) zmalloc (sizeof (agent_t));
    self->ctx = ctx;
    self->pipe = pipe;
    self->kvmap = zhash_new ();
    self->subtree = strdup ("");
    self->state = STATE_INITIAL;
    self->publisher = zsocket_new (self->ctx, ZMQ_PUB);
    return self;
}
 
static void
agent_destroy (agent_t **self_p)
{
    assert (self_p);
    if (*self_p) {
        agent_t *self = *self_p;
        int server_nbr;
        for (server_nbr = 0; server_nbr < self->nbr_servers; server_nbr++)
            server_destroy (&self->server [server_nbr]);
        zhash_destroy (&self->kvmap);
        free (self->subtree);
        free (self);
        *self_p = NULL;
    }
}
 
//  若线程被中断则返回-1
static int
agent_control_message (agent_t *self)
{
    zmsg_t *msg = zmsg_recv (self->pipe);
    char *command = zmsg_popstr (msg);
    if (command == NULL)
        return -1;
 
    if (streq (command, "SUBTREE")) {
        free (self->subtree);
        self->subtree = zmsg_popstr (msg);
    }
    else
    if (streq (command, "CONNECT")) {
        char *address = zmsg_popstr (msg);
        char *service = zmsg_popstr (msg);
        if (self->nbr_servers < SERVER_MAX) {
            self->server [self->nbr_servers++] = server_new (
                self->ctx, address, atoi (service), self->subtree);
            //  广播更新事件
            zsocket_connect (self->publisher, "%s:%d",
                address, atoi (service) + 2);
        }
        else
            zclock_log ("E: too many servers (max. %d)", SERVER_MAX);
        free (address);
        free (service);
    }
    else
    if (streq (command, "SET")) {
        char *key = zmsg_popstr (msg);
        char *value = zmsg_popstr (msg);
        char *ttl = zmsg_popstr (msg);
        zhash_update (self->kvmap, key, (byte *) value);
        zhash_freefn (self->kvmap, key, free);
 
        //  向服务端发送键值对
        kvmsg_t *kvmsg = kvmsg_new (0);
        kvmsg_set_key  (kvmsg, key);
        kvmsg_set_uuid (kvmsg);
        kvmsg_fmt_body (kvmsg, "%s", value);
        kvmsg_set_prop (kvmsg, "ttl", ttl);
        kvmsg_send     (kvmsg, self->publisher);
        kvmsg_destroy (&kvmsg);
puts (key);
        free (ttl);
        free (key);             //  键值对实际由哈希表对象控制
    }
    else
    if (streq (command, "GET")) {
        char *key = zmsg_popstr (msg);
        char *value = zhash_lookup (self->kvmap, key);
        if (value)
            zstr_send (self->pipe, value);
        else
            zstr_send (self->pipe, "");
        free (key);
        free (value);
    }
    free (command);
    zmsg_destroy (&msg);
    return 0;
}
 
 
//  ---------------------------------------------------------------------
//  异步的后台代理会维护一个服务端池,并处理来自应用程序的请求或应答。
 
static void
clone_agent (void *args, zctx_t *ctx, void *pipe)
{
    agent_t *self = agent_new (ctx, pipe);
 
    while (TRUE) {
        zmq_pollitem_t poll_set [] = {
            { pipe, 0, ZMQ_POLLIN, 0 },
            { 0,    0, ZMQ_POLLIN, 0 }
        };
        int poll_timer = -1;
        int poll_size = 2;
        server_t *server = self->server [self->cur_server];
        switch (self->state) {
            case STATE_INITIAL:
                //  该状态下,如果有可用服务,会发送快照请求
                if (self->nbr_servers > 0) {
                    zclock_log ("I: 正在等待服务器 %s:%d...",
                        server->address, server->port);
                    if (server->requests < 2) {
                        zstr_sendm (server->snapshot, "ICANHAZ?");
                        zstr_send  (server->snapshot, self->subtree);
                        server->requests++;
                    }
                    server->expiry = zclock_time () + SERVER_TTL;
                    self->state = STATE_SYNCING;
                    poll_set [1].socket = server->snapshot;
                }
                else
                    poll_size = 1;
                break;
            case STATE_SYNCING:
                //  该状态下我们从服务器端接收快照内容,若失败则尝试其他服务器
                poll_set [1].socket = server->snapshot;
                break;
            case STATE_ACTIVE:
                //  该状态下我们从服务器获取更新事件,失败则尝试其他服务器
                poll_set [1].socket = server->subscriber;
                break;
        }
        if (server) {
            poll_timer = (server->expiry - zclock_time ())
                       * ZMQ_POLL_MSEC;
            if (poll_timer < 0)
                poll_timer = 0;
        }
        //  ------------------------------------------------------------
        //  poll循环
        int rc = zmq_poll (poll_set, poll_size, poll_timer);
        if (rc == -1)
            break;              //  上下文已被关闭
 
        if (poll_set [0].revents & ZMQ_POLLIN) {
            if (agent_control_message (self))
                break;          //  中断
        }
        else
        if (poll_set [1].revents & ZMQ_POLLIN) {
            kvmsg_t *kvmsg = kvmsg_recv (poll_set [1].socket);
            if (!kvmsg)
                break;          //  中断
 
            //  任何服务端的消息将重置它的过期时间
            server->expiry = zclock_time () + SERVER_TTL;
            if (self->state == STATE_SYNCING) {
                //  保存快照内容
                server->requests = 0;
                if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
                    self->sequence = kvmsg_sequence (kvmsg);
                    self->state = STATE_ACTIVE;
                    zclock_log ("I: received from %s:%d snapshot=%d",
                        server->address, server->port,
                        (int) self->sequence);
                    kvmsg_destroy (&kvmsg);
                }
                else
                    kvmsg_store (&kvmsg, self->kvmap);
            }
            else
            if (self->state == STATE_ACTIVE) {
                //  丢弃过期的更新事件
                if (kvmsg_sequence (kvmsg) > self->sequence) {
                    self->sequence = kvmsg_sequence (kvmsg);
                    kvmsg_store (&kvmsg, self->kvmap);
                    zclock_log ("I: received from %s:%d update=%d",
                        server->address, server->port,
                        (int) self->sequence);
                }
                else
                    kvmsg_destroy (&kvmsg);
            }
        }
        else {
            //  服务端已死,尝试其他服务器
            zclock_log ("I: 服务器 %s:%d 无响应",
                    server->address, server->port);
            self->cur_server = (self->cur_server + 1) % self->nbr_servers;
            self->state = STATE_INITIAL;
        }
    }
    agent_destroy (&self);
}

        最后是克隆服务器的模型6代码:

        clonesrv6: Clone server, Model Six in C

//
// 克隆模式 - 服务端 - 模型6
//
 
//  直接编译,不建类库
#include "bstar.c"
#include "kvmsg.c"
 
//  bstar反应堆API
static int s_snapshots  (zloop_t *loop, void *socket, void *args);
static int s_collector  (zloop_t *loop, void *socket, void *args);
static int s_flush_ttl  (zloop_t *loop, void *socket, void *args);
static int s_send_hugz  (zloop_t *loop, void *socket, void *args);
static int s_new_master (zloop_t *loop, void *unused, void *args);
static int s_new_slave  (zloop_t *loop, void *unused, void *args);
static int s_subscriber (zloop_t *loop, void *socket, void *args);
 
//  服务端属性
typedef struct {
    zctx_t *ctx;                //  上下文
    zhash_t *kvmap;             //  存放键值对
    bstar_t *bstar;             //  bstar反应堆核心
    int64_t sequence;           //  更新事件编号
    int port;                   //  主端口
    int peer;                   //  同伴端口
    void *publisher;            //  发布更新事件的端口
    void *collector;            //  接收客户端更新事件的端口
    void *subscriber;           //  接受同伴更新事件的端口
    zlist_t *pending;           //  延迟的更新事件
    Bool primary;               //  是否为主机
    Bool master;                //  是否为master
    Bool slave;                 //  是否为slave
} clonesrv_t;
 
 
int main (int argc, char *argv [])
{
    clonesrv_t *self = (clonesrv_t *) zmalloc (sizeof (clonesrv_t));
    if (argc == 2 && streq (argv [1], "-p")) {
        zclock_log ("I: 作为主机master运行,正在等待备机slave连接。");
        self->bstar = bstar_new (BSTAR_PRIMARY, "tcp://*:5003",
                                 "tcp://localhost:5004");
        bstar_voter (self->bstar, "tcp://*:5556", ZMQ_ROUTER,
                     s_snapshots, self);
        self->port = 5556;
        self->peer = 5566;
        self->primary = TRUE;
    }
    else
    if (argc == 2 && streq (argv [1], "-b")) {
        zclock_log ("I: 作为备机slave运行,正在等待主机master连接。");
        self->bstar = bstar_new (BSTAR_BACKUP, "tcp://*:5004",
                                 "tcp://localhost:5003");
        bstar_voter (self->bstar, "tcp://*:5566", ZMQ_ROUTER,
                     s_snapshots, self);
        self->port = 5566;
        self->peer = 5556;
        self->primary = FALSE;
    }
    else {
        printf ("Usage: clonesrv4 { -p | -b }\n");
        free (self);
        exit (0);
    }
    //  主机将成为master
    if (self->primary)
        self->kvmap = zhash_new ();
 
    self->ctx = zctx_new ();
    self->pending = zlist_new ();
    bstar_set_verbose (self->bstar, TRUE);
 
    //  设置克隆服务端套接字
    self->publisher = zsocket_new (self->ctx, ZMQ_PUB);
    self->collector = zsocket_new (self->ctx, ZMQ_SUB);
    zsocket_bind (self->publisher, "tcp://*:%d", self->port + 1);
    zsocket_bind (self->collector, "tcp://*:%d", self->port + 2);
 
    //  作为克隆客户端连接同伴
    self->subscriber = zsocket_new (self->ctx, ZMQ_SUB);
    zsocket_connect (self->subscriber, "tcp://localhost:%d", self->peer + 1);
 
    //  注册状态事件处理器
    bstar_new_master (self->bstar, s_new_master, self);
    bstar_new_slave (self->bstar, s_new_slave, self);
 
    //  注册bstar反应堆其他事件处理器
    zloop_reader (bstar_zloop (self->bstar), self->collector, s_collector, self);
    zloop_timer  (bstar_zloop (self->bstar), 1000, 0, s_flush_ttl, self);
    zloop_timer  (bstar_zloop (self->bstar), 1000, 0, s_send_hugz, self);
 
    //  开启bstar反应堆
    bstar_start (self->bstar);
 
    //  中断,终止。
    while (zlist_size (self->pending)) {
        kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop (self->pending);
        kvmsg_destroy (&kvmsg);
    }
    zlist_destroy (&self->pending);
    bstar_destroy (&self->bstar);
    zhash_destroy (&self->kvmap);
    zctx_destroy (&self->ctx);
    free (self);
 
    return 0;
}
 
 
//  ---------------------------------------------------------------------
//  发送快照内容
 
static int s_send_single (char *key, void *data, void *args);
 
//  请求方信息
typedef struct {
    void *socket;           //  ROUTER套接字
    zframe_t *identity;     //  请求放标识
    char *subtree;          //  子树
} kvroute_t;
 
static int
s_snapshots (zloop_t *loop, void *snapshot, void *args)
{
    clonesrv_t *self = (clonesrv_t *) args;
 
    zframe_t *identity = zframe_recv (snapshot);
    if (identity) {
        //  请求在消息的第二帧中
        char *request = zstr_recv (snapshot);
        char *subtree = NULL;
        if (streq (request, "ICANHAZ?")) {
            free (request);
            subtree = zstr_recv (snapshot);
        }
        else
            printf ("E: 错误的请求,正在退出……\n");
 
        if (subtree) {
            //  发送状态快照
            kvroute_t routing = { snapshot, identity, subtree };
            zhash_foreach (self->kvmap, s_send_single, &routing);
 
            //  发送终止消息,以及消息编号
            zclock_log ("I: 正在发送快照,版本号:%d", (int) self->sequence);
            zframe_send (&identity, snapshot, ZFRAME_MORE);
            kvmsg_t *kvmsg = kvmsg_new (self->sequence);
            kvmsg_set_key  (kvmsg, "KTHXBAI");
            kvmsg_set_body (kvmsg, (byte *) subtree, 0);
            kvmsg_send     (kvmsg, snapshot);
            kvmsg_destroy (&kvmsg);
            free (subtree);
        }
    }
    return 0;
}
 
 
//  每次发送一个快照键值对
static int
s_send_single (char *key, void *data, void *args)
{
    kvroute_t *kvroute = (kvroute_t *) args;
    kvmsg_t *kvmsg = (kvmsg_t *) data;
    if (strlen (kvroute->subtree) <= strlen (kvmsg_key (kvmsg))
    &&  memcmp (kvroute->subtree,
                kvmsg_key (kvmsg), strlen (kvroute->subtree)) == 0) {
        //  先发送接收方的地址
        zframe_send (&kvroute->identity,
            kvroute->socket, ZFRAME_MORE + ZFRAME_REUSE);
        kvmsg_send (kvmsg, kvroute->socket);
    }
    return 0;
}
 
 
//  ---------------------------------------------------------------------
//  从客户端收集更新事件
//  如果我们是master,则将该事件写入kvmap对象;
//  如果我们是slave,则将其写入延迟队列
 
static int s_was_pending (clonesrv_t *self, kvmsg_t *kvmsg);
 
static int
s_collector (zloop_t *loop, void *collector, void *args)
{
    clonesrv_t *self = (clonesrv_t *) args;
 
    kvmsg_t *kvmsg = kvmsg_recv (collector);
    kvmsg_dump (kvmsg);
    if (kvmsg) {
        if (self->master) {
            kvmsg_set_sequence (kvmsg, ++self->sequence);
            kvmsg_send (kvmsg, self->publisher);
            int ttl = atoi (kvmsg_get_prop (kvmsg, "ttl"));
            if (ttl)
                kvmsg_set_prop (kvmsg, "ttl",
                    "%" PRId64, zclock_time () + ttl * 1000);
            kvmsg_store (&kvmsg, self->kvmap);
            zclock_log ("I: 正在发布更新事件:%d", (int) self->sequence);
        }
        else {
            //  如果我们已经从master中获得了该事件,则丢弃该消息
            if (s_was_pending (self, kvmsg))
                kvmsg_destroy (&kvmsg);
            else
                zlist_append (self->pending, kvmsg);
        }
    }
    return 0;
}
 
//  如果消息已在延迟队列中,则删除它并返回TRUE
 
static int
s_was_pending (clonesrv_t *self, kvmsg_t *kvmsg)
{
    kvmsg_t *held = (kvmsg_t *) zlist_first (self->pending);
    while (held) {
        if (memcmp (kvmsg_uuid (kvmsg),
                    kvmsg_uuid (held), sizeof (uuid_t)) == 0) {
            zlist_remove (self->pending, held);
            return TRUE;
        }
        held = (kvmsg_t *) zlist_next (self->pending);
    }
    return FALSE;
}
 
 
//  ---------------------------------------------------------------------
//  删除带有过期时间的瞬间值
 
static int s_flush_single (char *key, void *data, void *args);
 
static int
s_flush_ttl (zloop_t *loop, void *unused, void *args)
{
    clonesrv_t *self = (clonesrv_t *) args;
    zhash_foreach (self->kvmap, s_flush_single, args);
    return 0;
}
 
//  如果键值对过期,则进行删除操作,并广播该事件
static int
s_flush_single (char *key, void *data, void *args)
{
    clonesrv_t *self = (clonesrv_t *) args;
 
    kvmsg_t *kvmsg = (kvmsg_t *) data;
    int64_t ttl;
    sscanf (kvmsg_get_prop (kvmsg, "ttl"), "%" PRId64, &ttl);
    if (ttl && zclock_time () >= ttl) {
        kvmsg_set_sequence (kvmsg, ++self->sequence);
        kvmsg_set_body (kvmsg, (byte *) "", 0);
        kvmsg_send (kvmsg, self->publisher);
        kvmsg_store (&kvmsg, self->kvmap);
        zclock_log ("I: 正在发布删除事件:%d", (int) self->sequence);
    }
    return 0;
}
 
 
//  ---------------------------------------------------------------------
//  发送心跳
 
static int
s_send_hugz (zloop_t *loop, void *unused, void *args)
{
    clonesrv_t *self = (clonesrv_t *) args;
 
    kvmsg_t *kvmsg = kvmsg_new (self->sequence);
    kvmsg_set_key  (kvmsg, "HUGZ");
    kvmsg_set_body (kvmsg, (byte *) "", 0);
    kvmsg_send     (kvmsg, self->publisher);
    kvmsg_destroy (&kvmsg);
 
    return 0;
}
 
 
//  ---------------------------------------------------------------------
//  状态改变事件处理函数
//  我们将转变为master
//
//  备机先将延迟列表中的事件更新到自己的快照中,
//  并开始接收客户端发来的快照请求。
 
static int
s_new_master (zloop_t *loop, void *unused, void *args)
{
    clonesrv_t *self = (clonesrv_t *) args;
 
    self->master = TRUE;
    self->slave = FALSE;
    zloop_cancel (bstar_zloop (self->bstar), self->subscriber);
 
    //  应用延迟列表中的事件
    while (zlist_size (self->pending)) {
        kvmsg_t *kvmsg = (kvmsg_t *) zlist_pop (self->pending);
        kvmsg_set_sequence (kvmsg, ++self->sequence);
        kvmsg_send (kvmsg, self->publisher);
        kvmsg_store (&kvmsg, self->kvmap);
        zclock_log ("I: 正在发布延迟列表中的更新事件:%d", (int) self->sequence);
    }
    return 0;
}
 
//  ---------------------------------------------------------------------
//  正在切换为slave
 
static int
s_new_slave (zloop_t *loop, void *unused, void *args)
{
    clonesrv_t *self = (clonesrv_t *) args;
 
    zhash_destroy (&self->kvmap);
    self->master = FALSE;
    self->slave = TRUE;
    zloop_reader (bstar_zloop (self->bstar), self->subscriber,
                  s_subscriber, self);
 
    return 0;
}
 
//  ---------------------------------------------------------------------
//  从同伴主机(master)接收更新事件;
//  接收该类更新事件时,我们一定是slave。
 
static int
s_subscriber (zloop_t *loop, void *subscriber, void *args)
{
    clonesrv_t *self = (clonesrv_t *) args;
    //  获取快照,如果需要的话。
    if (self->kvmap == NULL) {
        self->kvmap = zhash_new ();
        void *snapshot = zsocket_new (self->ctx, ZMQ_DEALER);
        zsocket_connect (snapshot, "tcp://localhost:%d", self->peer);
        zclock_log ("I: 正在请求快照:tcp://localhost:%d",
                    self->peer);
        zstr_send (snapshot, "ICANHAZ?");
        while (TRUE) {
            kvmsg_t *kvmsg = kvmsg_recv (snapshot);
            if (!kvmsg)
                break;          //  中断
            if (streq (kvmsg_key (kvmsg), "KTHXBAI")) {
                self->sequence = kvmsg_sequence (kvmsg);
                kvmsg_destroy (&kvmsg);
                break;          //  完成
            }
            kvmsg_store (&kvmsg, self->kvmap);
        }
        zclock_log ("I: 收到快照,版本号:%d", (int) self->sequence);
        zsocket_destroy (self->ctx, snapshot);
    }
    //  查找并删除
    kvmsg_t *kvmsg = kvmsg_recv (subscriber);
    if (!kvmsg)
        return 0;
 
    if (strneq (kvmsg_key (kvmsg), "HUGZ")) {
        if (!s_was_pending (self, kvmsg)) {
            //  如果master的更新事件比客户端的事件早到,则将master的事件存入延迟列表,
            //  当收到客户端更新事件时会将其从列表中清除。
            zlist_append (self->pending, kvmsg_dup (kvmsg));
        }
        //  如果更新事件比kvmap版本高,则应用它
        if (kvmsg_sequence (kvmsg) > self->sequence) {
            self->sequence = kvmsg_sequence (kvmsg);
            kvmsg_store (&kvmsg, self->kvmap);
            zclock_log ("I: 收到更新事件:%d", (int) self->sequence);
        }
        else
            kvmsg_destroy (&kvmsg);
    }
    else
        kvmsg_destroy (&kvmsg);
 
    return 0;
}

        这段程序只有几百行,但还是花了一些时间来进行调通的。这个模型中包含了故障恢复,瞬间值,子树等等。虽然我们前期设计得很完备,但要在多个套接字之间进行调试还是很困难的。以下是我的工作方式:

                1、由于使用了反应堆(bstar,建立在zloop之上),我们节省了大量的代码,让程序变得简洁明了。整个服务以一个线程运行,因此不会出现跨线程的问题。只需将结构指针(self)传递给所有的处理器即可。此外,使用发应堆后可以让代码更为模块化,易于重用。

                2、我们逐个模块进行调试,只有某个模块能够正常运行时才会进入下一步。由于使用了四五个套接字,因此调试的工作量是很大的。我直接将调试信息输出到了屏幕上,因为实在没有必要专门开一个调试器来工作。

                3、因为一直在使用valgrind工具进行测试,因此我能确定程序没有内存泄漏的问题。在C语言中,内存泄漏是我们非常关心的问题,因为没有什么垃圾回收机制可以帮你完成。正确地使用像kvmsg、czmq之类的抽象层可以很好地避免内存泄漏。

        这段程序肯定还会存在一些BUG,部分读者可能会帮助我调试和修复,我在此表示感谢。

        测试模型6时,先开启主机和备机,再打开一组客户端,顺序随意。随机地中止某个服务进程,如果程序设计得是正确的,那客户端获得的数据应该都是一致的。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/59842.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Docker三大核心概念(镜像、容器和仓库)与虚拟化

目录 1. Docker是什么 2. Docker与虚拟化 3. Docker虚拟化的好处 4. Docker核心概念 4.1.镜像 4.2.容器 4.3.仓库 5. CentOS7 安装docker(在线方式) 5.1.内核版本信息检查 5.2 卸载可能存在的旧版本 5.3 安装必要的系统工具 5.4 添加docker-ce安装源 5.5 更新yum缓存 5.…

web前端期末大作业:个人网站设计——响应式个人小站网站HTML+CSS+JavaScript

&#x1f389;精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业…

STC 51单片机54——气压水压计HX710B 串口显示均值滤波+滑窗滤波

//气压模块为红色模块&#xff0c;传感器型号未知&#xff0c;其信号放大器型号为HX710B // STC15W408AS 11.0592MHz 波特率9600&#xff0c;串口输出大气压强值 // STC15W408AS没有定时器1&#xff0c;所以用定时器2做波特率发生器 // 采用电脑USB供电会有很大的干扰&#xff…

Unity工具 - 工具聚合页(UEWindow)

随着项目工程的推进&#xff0c;开发者们会根据工作内容的需要在Unity内开发众多的工具。随着工具的增多&#xff0c;Unity 的Menu菜单也会逐渐臃肿&#xff0c;过于分散&#xff0c;工具代码也难以查找。在此问题的基础上&#xff0c;开发了工具聚合页(UEWindow) 这一功能来管…

R绘图案例|基于分面的面积图

简介 最近参加一个统计建模的比赛。模型建模后&#xff0c;需要展示不同模型的性能指标&#xff0c;数据如下所示&#xff1a; 其中&#xff0c;第 1 列是不同样本&#xff0c;共376条。第 2-4 列是随机森林得到的结果&#xff0c;第 5-7 列是XGBoost的结果。一共使用了三种评…

数字验证学习笔记——UVM学习3 核心基类

一、核心基类 UVM世界中的类最初都是从一个uvm_void根类&#xff08;root class&#xff09;继承来的&#xff0c;而实际上这个类并没有成员变量和方法。 uvm_void只是一个虚类&#xff08;virtual class&#xff09;&#xff0c;还在等待将来继承于它的子类去开垦。在继承与u…

适合新手的Pytorch的中文文档

&#x1f50e;大家好&#xff0c;我是Sonhhxg_柒&#xff0c;希望你看完之后&#xff0c;能对你有所帮助&#xff0c;不足请指正&#xff01;共同学习交流&#x1f50e; &#x1f4dd;个人主页&#xff0d;Sonhhxg_柒的博客_CSDN博客 &#x1f4c3; &#x1f381;欢迎各位→点赞…

牛客练习赛106

牛客练习赛106 C D 脑筋急转弯的构造题 E 染色法判断二分图 结论&#xff0c;这个图是二分图说明不存在奇环 设左边是x&#xff0c;右边是y 则有xyn,xyn, xyn,且x∗y>边数n∗(n−1)/2−mx*y>边数n*(n-1)/2-m x∗y>边数n∗(n−1)/2−m 也就是说左式最大是n∗n/4(xn/…

【1774. 最接近目标价格的甜点成本】

来源&#xff1a;力扣&#xff08;LeetCode&#xff09; 描述&#xff1a; 你打算做甜点&#xff0c;现在需要购买配料。目前共有 n 种冰激凌基料和 m 种配料可供选购。而制作甜点需要遵循以下几条规则&#xff1a; 必须选择 一种 冰激凌基料。可以添加 一种或多种 配料&…

[附源码]计算机毕业设计ssm校园一卡通服务平台

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

Apache+PHP8+MYSQL的配置(目前最新版本)

已有很多年没有WEB开发了&#xff0c;本机都没了测试的服务环境&#xff0c;前几天GO语言的一个测试用例需要用到WEB&#xff0c;于是快速搭建一个Apche环境&#xff0c;也顺便将PHP和MYSQL的环境也配置好&#xff0c;贴出来方便自己和他人&#xff0c;临时需要的时候就更快了&…

机器人控制算法八之路径规划算法:RRT、RRT-Connect、Dynamic-Domain RRTs*

机器人控制算法八之路径规划算法&#xff1a;RRT、RRT-Connect、Dynamic-Domain RRTs* 本文主要介绍基于RRT快速搜索随机树的路径规划算法及其改进&#xff0c;主要参考以下论文&#xff1a; 1.1998 Rapidly-exploring random trees: A new tool for path planning2.IEEE2000 R…

[附源码]计算机毕业设计基于springboot的家政服务平台

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

java毕业设计项目_第167期ssm多用户博客个人网站_计算机毕业设计

java毕业设计项目_第167期ssm多用户博客个人网站_计算机毕业设计 【源码请到资源专栏下载】 今天分享的项目是《ssm多用户博客个人网站》 该项目分为2个角色&#xff0c;管理员和用户。 用户可以浏览前台,包含功能有&#xff1a; 首页、博文类型、学生博客、论坛信息 、新闻资讯…

[附源码]Python计算机毕业设计Django框架的食品安全监督平台的设计与实现

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

Jsp+MySQL学生学籍信息管理系统 Java毕业设计

中学生学籍信息管理系统主要有三个访问权限。首先是管理员,管理员是整个系统的所有功能的管理者,有对学科、班级、教师、学生学籍信息、课表、学生身体素质以及教务公告的管理权限,可以对以上所有信息进行增删改查&#xff1a;教师角色则有查看教务公告和修改所教授科目对应班级…

大数据:Sqoop 简介与安装

一、Sqoop 简介 Sqoop 是一个常用的数据迁移工具&#xff0c;主要用于在不同存储系统之间实现数据的导入与导出&#xff1a; 导入数据&#xff1a;从 MySQL&#xff0c;Oracle 等关系型数据库中导入数据到 HDFS、Hive、HBase 等分布式文件存储系统中&#xff1b; 导出数据&am…

C++ Primer Plus第五版笔记(p101-150)

1 数组和vector类似&#xff0c;数组的大小确定不变&#xff0c;不能随意向数组中增加元素。 2 数组维度必须是一个常量表达式 3 不允许用auto关键字由初始值列表推断类型&#xff0c;另外和vector一样&#xff0c;数组的元素应该为对象&#xff0c;因此不存在引用的数组 4 字符…

7. TTL 延迟队列

二八佳人体似酥&#xff0c;腰间仗剑斩愚夫。虽然不见人头落&#xff0c;暗里教君骨髓枯。 创建两个队列 QA和 QB&#xff0c;两者队列 TTL 分别设置为 10S 和 40S&#xff0c;然后在创建一个交换机 X和死信交 换机 Y&#xff0c;它们的类型都是 direct&#xff0c;创建一个死信…

SpringBoot_项目打包部署

SpringBoot项目可以是jar类型的maven项目&#xff0c;也可以是一个war类型的maven项目&#xff0c;取决于我们要不要整合jsp使用。但是不管是哪种项目类型&#xff0c;已经不是我们传统意义上的项目结构了 在本地使用SpringBoot的启动器即可访问我们开发的项目。如果我们将项目…