Gossip协议

news2024/11/28 3:32:12

Gossip协议

  • 一、Gossip协议
    • 1.1 工作原理
    • 1.2 Gossip优点
    • 1.3 Gossip传播方式
      • 1.3.1 Anti-Entropy(反熵)
      • 1.3.2 Rumor-Mongering(谣言传播)
      • 1.3.3 结合
    • 1.4 Gossip协议的通信方式
      • 1.4.1 Push
      • 1.4.2 Pull
      • 1.4.3 Push&Pull
  • 二、手撸简易版Gossip协议
    • 2.1 传播过程
    • 2.2 代码实现

一、Gossip协议

Gossip协议是一个通信协议,一种传播消息的方式,灵感来自于:瘟疫、社交网络等,在分布式系统中被广泛使用,主要通过 Gossip 协议来保证网络中所有节点的数据一致性,这一点就显得尤为重要。

使用Gossip协议的有:Redis Cluster、Consul、Apache Cassandra等。

废话不多说,我们先上一些百度百科下的Gossip工作原理,优势等。

1.1 工作原理

Gossip 单词意思为“流言蜚语”,都说技术来源于生活,那我们不妨可以想想,生活中的流言传播方式,下面我简单画了一个图:在这里插入图片描述
假如小明的同学听说小明昨晚尿床了,然后就悄悄告诉了他的 3 个好朋友,他的3个好朋友听说,觉得也非常有意思,于是也告诉了他们各自的好朋友(实际情况好朋友可能会交叉重复),最后,进过多伦传播(指数增长),是不是“小明昨晚尿床”已经到达了全校皆知的地方。
传播速度:假设每人每次传播人数为 gnum,经过 n 轮传播,最终人数如下:

s = 1 + gnum(1) + gnum(2) + gunm(3) + … + gunm(n)

Gossip协议的工作原理就类似与上面的“流言蜚语”传播。Gossip协议利用一种随机的方式将信息传播到整个网络汇总,并在一定时间内,使得系统内的所有节点数据一致。

Gossip是一种去中心化的分布式一致性协议,保证了数据在集群中的传播和状态一致性。

1.2 Gossip优点

  • 可扩展性:Gossip协议是可扩展的,一般只需要 O(logN) 轮就可以将信息传播到所有的节点,其中 N 代表节点的个数。每个节点仅发送固定数量的消息,在数据传输时,由于交叉重复传播的性质,节点并不需要等待消息的 ack,及时消息传输失败,也可以通过其他节点将信息传递给失败节点,系统可以轻松扩展到数百万个进程。看到这里是不是觉得非常牛逼了。
  • 容错性:任何节点的宕机或重启,都不影响整个协议的运行。
  • 异步性:相比其他分布式一致性协议,如Raft、ZAB协议,都需要等待节点ack。
  • 健壮性:Gossip协议是去中心化的,集群中节点都是对等的,任何节点都可以随时加入或离开。
  • 最终一致性:Gossip协议实现信息指数级的传播,在有限的时间内能够是的所有节点拥有最新的数据。

1.3 Gossip传播方式

1.3.1 Anti-Entropy(反熵)

以固定的概率传播所有的数据。

工作方式:每个节点周期性地随机选择其他节点,然后通过相互交换自己的所有数据来消除两者之间的差异。

节点状态:使用 Anti-Entropy 的传播方式,包含两种状态(SI model)

  1. Infective:节点有数据更新,并且会将数据传播给其他节点
  2. susceptible:节点没有收到来自其他节点的更新

优点:Anti-Entropy 非常可靠

缺点:每次节点两两交换所有数据,会给节点带来非常大的通信负载,以及降低集群节点数据收敛速度。

1.3.2 Rumor-Mongering(谣言传播)

仅传播新到达的数据。

工作方式:当某一个节点存在数据更新后,节点将变为活跃状态,并周期性的联系其他节点对齐传播消息,直到所有的节点都接收该信息。

Rumor-Mongering 的消息只包含最新 update,谣言消息在某个时间点之后会被标记为 removed,并且不再被传播。

节点状态:使用 Rumor-Mongering 的传播方式,相比于 Anti-Entropy 多了一种状态(SIR model)

  1. Infective:节点有数据更新,并且会将数据传播给其他节点
  2. susceptible:节点没有收到来自其他节点的更新
  3. removed:表示已经接收到来自其他节点的更新,但不会将这个更新分享给其他节点。

优点:每次信息传播,只传播新的信息,大大减少了节点通信的负担,加速集群节点数据收敛速度。

缺点:因为消息会在某个时间标记为 removed 状态,之后就不会同步到其他节点,所以 Rumor-Mongering 类型的 Gossip 协议有极小概率会使得数据更新不会到达所有节点。

1.3.3 结合

一般来说,为了在通信代价和可靠性之前取得泽中,实际生产中,会将两种传播方式进行结合使用,使得在保证传播时效的同时,数据也满足一致性。

1.4 Gossip协议的通信方式

不管是 Anti-Entropy 还是 Rumor-Mongering都涉及到节点间的通信,节点间的通信方式主要有三种:Push、Pull、Push&Pull。

1.4.1 Push

发起信息交换的节点A随机选择联系节点B,并向其发送信息,节点B在接收到信息后更新比自己新的数据。

一般拥有新信息的节点才会作为发起节点。

1.4.2 Pull

发起信息交换的节点A随机选取联系节点B,并从对方获取信息。

一般无新信息的节点才会作为发起节点。

1.4.3 Push&Pull

发起信息交换的节点A向选择的节点B发送信息,同时从对方获取数据,用于更新本地数据。

二、手撸简易版Gossip协议

2.1 传播过程

在这里插入图片描述

Gossip协议传播过程如上:

  • 节点A存在信息更新,每次随机选取两个未感染过的节点进行传播,选取B、C两个节点;
  • B、C节点接收到update msg后,分别选取两个未感染过的节点进行传播,选取节点B -> D、E,C -> B、F;
  • 最后,节点D、E、F再进行传播,使得整个集群内所有节点都接受到节点A的update msg,达到最终一致性;

2.2 代码实现

数据存储结构:

@Data
@AllArgsConstructor
public class DataItem implements Serializable {
    private static final long serialVersionUID = 8820238286107945662L;
    // key
    private String key;
    // 数据
    private String value;
    // 时间戳:通过timestamp可判断消息是否为最新
    private Long timestamp;
}

更新消息数据结构:

@Data
public class UpdateMsg implements Serializable {
    private static final long serialVersionUID = -1338552478428807702L;
    // 更新数据
    private DataItem updateData;
    // 消息已经感染集群节点id
    private Set<Long> gossipServerIds;
}

服务节点

@Data
@Slf4j
public class ServerNode {
    // 节点id
    private Long serverId;
    // 传播个数,也叫扇出
    private Integer fanout;
    // 集群节点
    private List<ServerNode> serverNodes;
    // 集群数据:存储节点数据
    private Map<String, DataItem> dataMap = new ConcurrentHashMap<>(16);
    // 扇出
    private BlockingQueue<DataItem> fanoutQueue = new LinkedBlockingQueue<>();
    private Thread fanoutThread = new Thread(() -> {
        // 负责update msg扇出
        while (true) {
            try {
                DataItem dataItem = fanoutQueue.take();
                sendUpdateMsg(dataItem, Collections.singletonList(this.serverId));
            } catch (Exception e) {
                log.error("fanout exception", e);
            }
        }
    }, "fanout-thread");
    // 消息接收
    private BlockingQueue<UpdateMsg> receiveQueue = new LinkedBlockingQueue<>();
    private Thread receiveThread = new Thread(() -> {
        // 负责接收update msg消息处理
        while (true) {
            try {
                UpdateMsg updateMsg = receiveQueue.take();
                new Thread(() -> {
                    receiveUpdateMsg(updateMsg);
                }).start();
            } catch (Exception e) {
                log.error("fanout exception", e);
            }
        }
    }, "receive-thread");

    public ServerNode(Long serverId, Integer fanout) {
        this.serverId = serverId;
        this.fanout = fanout;
        this.serverNodes = new ArrayList<>();
        this.fanoutThread.start();
        this.receiveThread.start();
    }

    public void setData(String key, String value) {
        log.info("server id={} key={}, value={}", this.serverId, key, value);
        dataMap.put(key, new DataItem(key, value, System.currentTimeMillis()));
        // 发送消息同步
        fanoutQueue.offer(this.dataMap.get(key));
    }

    /**
     * 发送消息同步
     */
    private void sendUpdateMsg(DataItem dataItem, List<Long> excludeIds) {
        // 随机获取集群中fanout个节点
        List<ServerNode> serverNodes = getFanoutServerNodes(excludeIds);
        if (CollectionUtils.isEmpty(serverNodes)) {
            log.info("==========集群数据保持一致,停止广播========");
            return;
        }
        List<Long> serverIds = serverNodes.stream().map(ServerNode::getServerId).collect(Collectors.toList());

        for (ServerNode serverNode : serverNodes) {
            UpdateMsg updateMsg = new UpdateMsg();
            updateMsg.setUpdateData(dataItem);

            Set<Long> gossipServerIds = new HashSet<>(excludeIds);
            gossipServerIds.add(this.serverId);
            gossipServerIds.addAll(serverIds);
            updateMsg.setGossipServerIds(gossipServerIds);
            // 信息传播
            // TODO 进程之间,可通过socket进行传播
            serverNode.getReceiveQueue().offer(updateMsg);
        }
    }

    /**
     * 接收 UpdateMsg
     */
    public void receiveUpdateMsg(UpdateMsg updateMsg) {
        DataItem updateData = updateMsg.getUpdateData();
        // 更新本地消息
        boolean updateFlag = updateLocalMsg(updateData);
        if (updateFlag) {
            Set<Long> gossipServerIds = updateMsg.getGossipServerIds();
            gossipServerIds.add(this.serverId);
            // 发送消息
            sendUpdateMsg(updateData, new ArrayList<>(gossipServerIds));
        }
    }

    /**
     * 更新本地消息
     */
    private synchronized boolean updateLocalMsg(DataItem updateData) { // 加锁:TODO 优化空间,锁的粒度可以设置的更小
        DataItem dataItem = this.dataMap.get(updateData.getKey());
        if (dataItem == null) {
            this.dataMap.putIfAbsent(updateData.getKey(), updateData);
            return true;
        }
        // 比较谁的数据更新
        if (dataItem.getTimestamp() > updateData.getTimestamp()) {
            // 并发情况下,历史数据直接丢弃
            return false;
        }
        this.dataMap.put(updateData.getKey(), updateData);
        return true;
    }

    /**
     * 随机获取集群中fanout个节点
     */
    private List<ServerNode> getFanoutServerNodes(List<Long> excludeIds) {
        List<ServerNode> includeServerNodes = this.serverNodes;
        if (excludeIds != null && excludeIds.size() > 0) {
            includeServerNodes = new ArrayList<>();
            for (ServerNode serverNode : this.serverNodes) {
                if (excludeIds.contains(serverNode.getServerId()) || Objects.equals(serverNode.getServerId(), this.getServerId())) {
                    continue;
                }
                includeServerNodes.add(serverNode);
            }
        }
        if (CollectionUtils.isEmpty(includeServerNodes)) {
            return null;
        }
        // 随机获取 fanout 个节点下标
        List<Integer> randomIndexList = getRandomIndexList(includeServerNodes.size(), this.fanout);
        List<ServerNode> fanoutList = new ArrayList<>();
        for (Integer index : randomIndexList) {
            fanoutList.add(includeServerNodes.get(index));
        }
        return fanoutList;
    }

    private List<Integer> getRandomIndexList(int size, int num) {
        List<Integer> result = new ArrayList<>();
        if (num >= size) {
            for (int i = 0; i < size; i++) {
                result.add(i);
            }
            return result;
        }
        SecureRandom random = new SecureRandom();
        List<Integer> indexs = new ArrayList<>();
        for (int i = 0; i < size; i++) {
            indexs.add(i);
        }
        for (int j = 0; j < num; j++) {
            // 随机生成一个下标
            int index = random.nextInt(indexs.size());
            // 根据下标去取list中的值
            result.add(indexs.get(index));
            // 从list移除该值
            indexs.remove(index);
        }
        return result;
    }
}

测试:

@Slf4j
public class GossipTest {
    // 集群节点个数
    private static final Integer SERVER_NUM = 100;
    // 扇出
    private static final Integer FANOUT = 50;

    public static void main(String[] args) throws Exception {
        log.info("=========start time");
        // 初始化
        List<ServerNode> serverNodes = initAndGetServerNodes();
        // 随机选取节点进行数据更新
        Random random = new Random();
        String key = "key1";
        int updateNum = 1;
        for (int i = 0; i < updateNum; i++) {
            int index = random.nextInt(serverNodes.size());
            serverNodes.get(index).setData(key, "value" + i);
            Thread.sleep(50);
        }
        // wait
        Thread.sleep(1000 * 10);
        Set<String> valueSet = new HashSet<>();
        serverNodes.stream().map(ServerNode::getDataMap).forEach(dataMap -> {
            valueSet.add(dataMap.get(key).getValue());
        });
        log.info("server data value set={}", valueSet);
    }

    private static List<ServerNode> initAndGetServerNodes() {
        List<ServerNode> serverNodes = new ArrayList<>(SERVER_NUM);
        for (int i = 0; i < SERVER_NUM; i++) {
            serverNodes.add(i, new ServerNode(10000L + i, FANOUT));
            serverNodes.get(i).setServerNodes(serverNodes);
        }
        return serverNodes;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/940256.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

媒介盒子:医疗软文怎么写才能实现营销效果?

随着互联网的快速发展,医疗行业也逐渐意识到了网络营销的重要性。而作为网络营销的一种形式,医疗软文在传播医疗知识、宣传医疗品牌方面具有独特的优势。本文将从选题、内容、形式等多个方面进行探讨&#xff0c;如何写一篇有效的医疗营销软文&#xff1f; 1、选题非常关键 首…

Python“牵手”天猫商品列表数据,关键词搜索天猫API接口数据,天猫API接口申请指南

天猫平台API接口是为开发电商类应用程序而设计的一套完整的、跨浏览器、跨平台的接口规范&#xff0c;天猫API接口是指通过编程的方式&#xff0c;让开发者能够通过HTTP协议直接访问天猫平台的数据&#xff0c;包括商品信息、店铺信息、物流信息等&#xff0c;从而实现天猫平台…

MySql014——分组的GROUP BY子句排序ORDER BYSELECT子句顺序

前提&#xff1a;使用《MySql006——检索数据&#xff1a;基础select语句&#xff08;使用products表、查询单列、多列、所有列、DISTINCT去除重复行、LIMIT限制返回结果的行数、了解完全限定&#xff09;》中创建的products表 一、GROUP BY子句基础用法 SELECT vend_id, COU…

【Debug】解决RecursionError: maximum recursion depth exceeded in comparison报错

&#x1f680;Debug专栏 目录 &#x1f680;Debug专栏 ❓❓问题&#xff1a; &#x1f527;&#x1f527;分析&#xff1a; &#x1f3af;&#x1f3af;解决方案&#xff1a; ❓❓问题&#xff1a; 循环中报错RecursionError: maximum recursion depth exceeded in compari…

IC设计各岗位收入水平对比,看看哪个更适合你?

根据人才招聘平台对于2023年已有数据统计&#xff0c;芯片工程师岗位均薪为26012元&#xff0c;位列全行业第一。 这里需要说明一下&#xff0c;这里的“芯片工程师”涵盖了设计、制造、封测等多环节岗位。并非只有芯片设计岗。 从行业招聘薪酬同比增速来看&#xff0c;电子技…

Bigemap在路桥行业是怎么应用的?

选择Bigemap的原因&#xff1a; 奥维下架了&#xff0c;后来了解到的bigemap&#xff0c;于是测试了这款软件 使用场景&#xff1a; 下载影像、矢量路网做前期策划&#xff0c;下载完数据后导出cad ,做一些标注&#xff0c;最终出图下载等高线&#xff0c;作为前期选址依据 …

Linux环境下SVN服务器的搭建与公网访问:使用cpolar端口映射的实现方法

文章目录 前言1. Ubuntu安装SVN服务2. 修改配置文件2.1 修改svnserve.conf文件2.2 修改passwd文件2.3 修改authz文件 3. 启动svn服务4. 内网穿透4.1 安装cpolar内网穿透4.2 创建隧道映射本地端口 5. 测试公网访问6. 配置固定公网TCP端口地址6.1 保留一个固定的公网TCP端口地址6…

在日本做程序员能攒到钱吗?

如果你就是无欲无求&#xff0c;和人合租&#xff0c;自己做饭&#xff0c;不买高级食材&#xff0c;没有业余爱好&#xff0c;那我可以肯定告诉你一定能攒下钱&#xff0c;问题你是吗&#xff1f;能不能攒下钱丰俭由人&#xff0c;拿的少也有人能攒下钱&#xff0c;拿的多的也…

安防视频监控平台EasyNVR视频监控汇聚平台页面无法上传授权文件的问题解决方案

TSINGSEE青犀视频安防监控平台EasyNVR可支持设备通过RTSP/Onvif协议接入&#xff0c;并能对接入的视频流进行处理与多端分发&#xff0c;包括RTSP、RTMP、HTTP-FLV、WS-FLV、HLS、WebRTC等多种格式。在智慧安防等视频监控场景中&#xff0c;EasyNVR可提供视频实时监控直播、云端…

Viobot硬件控制

一.使用上位机控制 TOF版本设备点击TOF ON即可开启TOF&#xff0c;开启后按键会变成TOF OFF&#xff0c;点击TOF OFF即可关闭TOF 补光灯版本设备点击LED ON即可开启LED &#xff0c;开启后按键会变成LED OFF&#xff0c;点击LED OFF即可关闭LED 设置页面的viobot栏&#xff0c…

13.redis集群、主从复制、哨兵

1.redis主从复制 主从复制是指将一台redis服务器&#xff08;主节点-master&#xff09;的数据复制到其他的redis服务器&#xff08;从节点-slave&#xff09;&#xff0c;默认每台redis服务器都是主节点&#xff0c;每个主节点可以有多个或没有从节点&#xff0c;但一个从节点…

代码随想录算法训练营第五十二天|LeetCode 84.柱状图中最大的矩形

LeetCode 84.柱状图中最大的矩形 代码如下&#xff08;Java&#xff09;&#xff1a;暴力解法 class Solution {public int largestRectangleArea(int[] heights) {int length heights.length;int[] minLeftIndex new int[length];int[] minRightIndex new int[length];min…

Java实现根据关键词搜索1688商品新品数据方法,1688API节课申请指南

要通过1688的API获取商品新品数据&#xff0c;您可以使用1688开放平台提供的接口来实现。以下是一种使用Java编程语言实现的示例&#xff0c;展示如何通过1688开放平台API获取商品新品数据&#xff1a; 首先&#xff0c;确保您已注册成为1688开放平台的开发者&#xff0c;并创…

【猿灰灰赠书活动 - 03期】- 【RHCSA/RHCE 红帽Linux认证学习指南(第7版) EX200 EX300】

说明&#xff1a;博文为大家争取福利&#xff0c;与清华大学出版社合作进行送书活动 图书&#xff1a;《RHCSA/RHCE 红帽Linux认证学习指南(第7版) EX200 & EX300》 一、好书推荐 图书介绍 《RHCSA/RHCE 红帽Linux认证学习指南&#xff08;第7版&#xff09; EX200 & E…

亚马逊云科技CEO分享企业内决策流程、领导力原则与重组下的思考

亚马逊云科技首席执行官Adam Selipsky几乎从一开始就在那里&#xff1a;他于2005年加入&#xff0c;在效力亚马逊11年后于2016年离开&#xff0c;转而经营Tableau&#xff0c;并于2021年成为亚马逊云科技首席执行官。当时亚马逊云科技前首席执行官安迪贾西(Andy Jassy)接替杰夫…

从数据孤岛到企业xPA的演化

“数据孤岛”一直以来是企业在信息化进程中面临的比较头疼的问题&#xff0c;由于数据独立存在于不同部门之中&#xff0c;无法进行相互联动&#xff0c;致使数据库无法兼容&#xff0c;这无形中加大了跨部门合作的沟通成本。在此背景下&#xff0c;一种新兴的规划方法——扩展…

知虾shopee数据分析工具:shopee出单的商机利器

当今数字化时代&#xff0c;数据已经成为商业成功的关键要素之一。而Shopee作为东南亚最大的电商平台之一&#xff0c;其强大的数据分析工具正为商家提供了宝贵的市场洞察和决策支持。本文将深入探讨Shopee数据分析工具如何帮助商家抓住商机并取得成功。 洞察消费者需求&#x…

Android网络请求,全方位优雅解析

网络请求的基本流程 网络请求步骤&#xff08;用户输入一个网址到网页最终展现到用户面前&#xff09;大致流程总结如下&#xff1a; 在客户端浏览器中输入网址URL。发送到DNS(域名服务器)获得域名对应的WEB服务器的IP地址。客户端浏览器与WEB服务器建立TCP(传输控制协议)连接…

【MySQL】引擎类型

与其他DBMS一样&#xff0c;MySQL有一个 具体管理和处理数据的内部引擎 。在使用create table语句时&#xff0c;该引擎具体创建表&#xff0c;而在使用select或进行其他数据库处理时&#xff0c;该引擎在内部处理你的请求。多数时候&#xff0c;引擎都隐藏在DBMS内&#xff0…

苹果为 Vision Pro 头显申请游戏手柄专利

苹果Vision Pro 推出后&#xff0c;美国专利局公布了两项苹果公司申请的游戏手柄专利&#xff0c;其中一项的专利图如下图所示。据 PatentlyApple 报道&#xff0c;虽然申请专利并不能保证苹果公司会推出游戏手柄&#xff0c;但是苹果公司同时也为游戏手柄申请了商标&#xff0…