SDN控制器-ONOS中的最终一致性存储

news2024/7/6 18:14:46

ONOS中的数据存储基本上都是以KV进行存储的。按照一致性强弱类型可以分为强一致性存储(strong consistency)与弱一致性存储(eventually consistency)。

比较典型的,如ONOS中对于设备接口的存储,使用的是强一致类型存储,调用的是

org.onosproject.store.service.StorageService#consistentMapBuilder

所构建出来的ConsistentMap,由atomix项目提供封装,底层使用raft协议实现。遗憾的是atomix的JAVA版本目前已停止维护。

如ONOS中对于设备信息的存储,使用的是最终一致性(弱一致性)存储,实现机制使用的是gossip协议实现。

本文以gossip在ONOS中实现机制详细过程为目标,并使用理论+代码实践的方式进行综合性梳理记录。

gossip协议(理论)

A gossip protocol or epidemic protocol is a procedure or process of computer peer-to-peer communication that is based on the way epidemics spread.

Gossip protocol 或 Epidemic Protocol (流行病协议),是基于流行病传播方式的节点或者进程之间信息交换的协议。gossip协议在分布式系统中处理数据最终一致性的场景得到了广泛应用。

goosip协议中数据的传播方式有两种,分别为反熵(Anti-Entropy)和传谣(Anti-Entropy)。

传播方式

反熵

先回顾一下熵(Entropy) 的概念。
在信息论中,熵一般指的是对不确定程度的一种度量。如用熵来描述数据的随机性,熵越大则数据的有序性也就越差。对随机数来讲熵越大,随机数的随机性越好。

所谓反熵(Anti-Entropy) 即是熵(Entropy) 的反派,——和熵对着干

提炼一下:在集群的多个节点中,某个节点的数据在向集群中其他节点传播的过程中,在反熵作用下会同步节点的全部数据,最终达到集群中各节点数据完全的一致。

反熵传播使用“simple epidemics(SI model)”的方式,以固定的概率传播所有的数据。所有参与节点只有两种状态:

Suspective(病原):处于 susceptible 状态的节点代表其并没有收到来自其他节点的更新。
Infective(感染):处于 infective 状态的节点代表其有数据更新,并且会将这个数据分享给其他节点。

反熵传播方法每次节点两两交换自己的所有数据会带来非常大的通信负担,因此不会频繁使用,通常只用于新加入节点的数据初始化。

传谣

Rumor-Mongering(谣言传播)是gossip中的另一种数据传播机制。

使用“complex epidemics”(SIR model)的方式,以固定的概率仅传播新到达的数据。所有参与节点有三种状态:Suspective(病原)、Infective(感染)、Removed(愈除)。

Removed(愈除):其已经接收到来自其他节点的更新,但是其并不会将这个更新分享给其他节点。

谣言传播过程是消息只包含最新 update,谣言消息在某个时间点之后会被标记为removed,并且不再被传播。缺点是系统有一定的概率会不一致,通常用于节点间数据增量同步。

通信方式

推送push、拉pull、推拉模式(push/pull)

推送push

节点A随机选择联系节点B,并向其发送自己的信息,节点B在收到信息后比较/更新自己的数据。

拉取模式(pull)

节点A随机选择联系节点B,从对方获取信息,节点A在收到信息后比较/更新自己的数据。

推/拉模式(push/pull)

节点A向选择的节点B发送信息,同时从对方获取信息,节点A和节点B在收到信息后各自比较/更新自己的数据。

GOSSIP在ONOS中的应用(实践)

在了解了GOSSIP的理论知识后,再结合ONOS中的设备信息存储的原理,学习一下GOSSIP在ONOS中的具体实现细节。

ONOS集群管理

在一个onos集群中,由多个node节点组成了一个onos集群。

配置时通过修改onos根路径下的config/cluster.json配置文件配置集群信息。ONOS官方也提供了脚本可自动生成集群所需的配置信息,链接如下:https://wiki.onosproject.org/display/ONOS/Notes+on+cluster+formation+for+Docker+instances

配置示例:

{   
    "node": { 
        "ip": "192.168.1.11",
        "id": "onos-1",
        "port": 9876
    }, 
    "storage": [
        {   
            "ip": "192.168.2.21",
            "id": "atomix-1",
            "port": 5679
        }
    ],
    "name": "onos"
}

在onos集群中各onos节点一般使用9876端口进行通信(东西向通信),具体实现方式也由atomix框架提供。

集群信息的维护由ClusterManager调用AtomixClusterStore实现,集群中的节点信息保存在atomix中。在ONOS中如要获取集群中的节点信息,可调用如下方法获取:

atomixManager.getAtomix().getMembershipService().getMembers()

ONOS中也提供了ClusterCommunicationService用于集群中的节点与其他节点进行通信,如向集群中的所有节点发送消息(广播),直接调用ClusterCommunicationService中的broadcast方法即可,节点间发送和处理消息变得简单又方便。

atomix集群

生产环境中,为提高系统可用性onos和atomix都以集群方式运行。atomix各节点一般使用5679端口进行通信,底层使用RAFT协议实现数据的强一致性。

RAFT协议详细介绍:https://raft.github.io/

atomix成员管理机制

atomix集群中各节点的状态由atomix中的成员管理协议(MembershipProtocol)决定。
当前版本中的atomix支持以下两种协议:

  • HeartbeatMembershipProtocol
  • SwimMembershipProtocol

heartbeat与swim的区别可用如下两张图对比:
heartbeat方式通信示意图:
heartbeat
swim方式通信示意图:
swimProtocol

SWIM在每个周期的检测流程如下:

  1. 一个node A发送ping给list中的随机一个node(比如就叫B吧),如果B收到了就返回ack给A。
  2. 如果A没在预定的时间(小于周期T)内收到这个ack,它会在list中随机挑选 k 个node并发送 ping-req(B)请求它们帮助自己来确认B是否活着,若没有任何一个node告诉A说B活着,那A认为B挂了,并把它从list中移除,然后广播到整个网络中去。

总结一下
heartbeat方式下集群中进行一次数据交互的发包次数为N2,适用于小规模集群。反之如果集群中的主机较多,则使用swim协议可提升通信性能。

需要注意的是,在atomix协议中swim的实现使用到了UDP协议,一定条件下存在丢包出现不稳定的情况。如果集群中节点数较少还是推荐使用heartbeat的方式。以下atomix中使用hearbeat方式的某个示例:

cluster {
  clusterId: onos
  discovery {
    type: bootstrap
    nodes.1 {
      id: atomix-1
      address: "192.168.2.21:5679"
    }
    nodes.2 {
      id: atomix-2
      address: "192.168.2.22:5679"
    }
    nodes.3 {
      id: atomix-3
      address: "192.168.2.23:5679"
    }
  }
 
  protocol {
    type: heartbeat
    heartbeatInterval: 1s
    phiFailureThreshold: 10
    failureTimeout: 10s
  }
}
managementGroup {
  type: raft
  partitions: 1
  storage.level: disk
  members: [atomix-1, atomix-2, atomix-3]
}
partitionGroups.raft {
  type: raft
  partitions: 3
  partitionSize: 3
  storage.level: disk
  members: [atomix-1, atomix-2, atomix-3]
}

设备信息存储源码分析

下面进入正题,——以ONOS中的Device信息存储为例,分析整理一下GOSSIP在设备信息存储中的实现细节。

ONOS中设备信息的管理由实现了DeviceService的DeviceManager进行承载,在DeviceManger中对设备信息的操作使用的是面向接口的方式最终调用的是DeviceStore接口。

实现了DeviceStore的具体类有ECDeviceStore和GossipDeviceStore。当前较新版本的ONOS中使用的是GossipDeviceStore,也就是下文中要详细跟进的重点——Gossip协议实现的DeviceStore。

GossipDeviceStore#activate

ONOS中的类主要都是构建在OSGI上的,GossipDeviceStore也不例外。按照OSGI对象管理的方式,一个组件(Component)会执行被 @Activate 注解修饰的方法进行初始化时,执行被 @Deactivate 注解修饰的方法进行资源释放。

GossipDeviceStore作为Gossip协议实现的存储类,许多的核心步骤也主要在activate方法中进行整体的管理。

上源码:

    @Activate
    public void activate() {
        //资源申请,获得线程池
        executor = newCachedThreadPool(groupedThreads("onos/device", "fg-%d", log));

        backgroundExecutor =
                newSingleThreadScheduledExecutor(minPriority(groupedThreads("onos/device", "bg-%d", log)));

        //注册回调方法,接收处理节点发来的消息
        addSubscriber(DEVICE_UPDATE, this::handleDeviceEvent);
        addSubscriber(DEVICE_STATUS_CHANGE, this::handleDeviceStatusChangeEvent);
        addSubscriber(DEVICE_REMOVE_REQ, this::handleRemoveRequest);
        addSubscriber(DEVICE_REMOVED, this::handleDeviceRemovedEvent);
        addSubscriber(PORT_UPDATE, this::handlePortEvent);
        addSubscriber(PORT_STATUS_UPDATE, this::handlePortStatusEvent);
        addSubscriber(DEVICE_ADVERTISE, this::handleDeviceAdvertisement);

        // start anti-entropy thread;启动反熵定时任务
        backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(),
                                               initialDelaySec, periodSec, TimeUnit.SECONDS);

        ……
    }
    
    private <M> void addSubscriber(MessageSubject subject, Consumer<M> handler) {
        clusterCommunicator.addSubscriber(subject, SERIALIZER::decode, handler, executor);
    }

如上面所示,在GossipDeviceStore的activate方法中主要做了两件事:

  1. 申请线程池资源,启动反熵定时任务,5秒执行一次
  2. 添加消息处理器,接收其他节点的发来的消息并进行处理

createOrUpdateDevice

在有了activate方法的准备工作后,再看一下数据在进行变更时的具体过程。进入createOrUpdateDevice方法。

 @Override
    public synchronized DeviceEvent createOrUpdateDevice(ProviderId providerId,
                                                         DeviceId deviceId,
                                                         DeviceDescription deviceDescription) {
        NodeId localNode = clusterService.getLocalNode().id();
        NodeId deviceNode = mastershipService.getMasterFor(deviceId);
        boolean isMaster = localNode.equals(deviceNode);

        // Process device update only if we're the master,
        // otherwise signal the actual master.
        DeviceEvent deviceEvent = null;

        // If this node is the master for the device, acquire a new timestamp. Otherwise,
        // use a 0,0 or tombstone timestamp to create the device if it doesn't already exist.
        Timestamp newTimestamp;
        try {
            newTimestamp = isMaster
                    ? deviceClockService.getTimestamp(deviceId)
                    : removalRequest.getOrDefault(deviceId, DEFAULT_TIMESTAMP);
        } catch (IllegalStateException e) {
            newTimestamp = removalRequest.getOrDefault(deviceId, DEFAULT_TIMESTAMP);
            isMaster = false;
        }
        final Timestamped<DeviceDescription> deltaDesc = new Timestamped<>(deviceDescription, newTimestamp);
        final Timestamped<DeviceDescription> mergedDesc;
        final Map<ProviderId, DeviceDescriptions> device = getOrCreateDeviceDescriptionsMap(deviceId);

        synchronized (device) {
            //对device信息进行数据更新,操作节点中的MAP
            deviceEvent = createOrUpdateDeviceInternal(providerId, deviceId, deltaDesc);
            if (deviceEvent == null) {
                return null;
            }
            mergedDesc = device.get(providerId).getDeviceDesc();
        }

        // If this node is the master for the device, update peers.
        if (isMaster) {
            //此节点为主节点时,向集群中的所有节点发送通知
            log.debug("Notifying peers of a device update topology event for providerId: {} and deviceId: {}",
                    providerId, deviceId);
            notifyPeers(new InternalDeviceEvent(providerId, deviceId, mergedDesc));
        }
        notifyDelegateIfNotNull(deviceEvent);

        return deviceEvent;
    }

围绕gossip的主流程,createOrUpdateDevice方法中先是调用了节点中的方法,对device信息存储的map进行了数据更新,数据更新后调用了notifyPeers向集群中的所有节点发送了广播消息,进行数据变更通知。

通知消息发送代码片段如下:

    private void notifyPeers(InternalDeviceEvent event) {
        broadcastMessage(DEVICE_UPDATE, event);
    }
    
    private void broadcastMessage(MessageSubject subject, Object event) {
        clusterCommunicator.broadcast(event, subject, SERIALIZER::encode);
    }

看到这里再结合一下gossip的理论知识:数据在发生变更时会进行传播。

handleDeviceEvent

顺着消息的流程再看一下处理消息的实现。
根据初始化时的事件注册代码得到DEVICE_UPDATE的处理代码:

addSubscriber(DEVICE_UPDATE, this::handleDeviceEvent);

也就是GossipDeviceStore中的handleDeviceEvent方法

    private void handleDeviceEvent(InternalDeviceEvent event) {
        ProviderId providerId = event.providerId();
        DeviceId deviceId = event.deviceId();
        //从消息中获取出设备描述信息
        Timestamped<DeviceDescription> deviceDescription = event.deviceDescription();

        try {
            //调用内部方法更新本节点中的维护的设备信息
            notifyDelegateIfNotNull(createOrUpdateDeviceInternal(providerId, deviceId,
                    deviceDescription));
        } catch (Exception e) {
            log.warn("Exception thrown handling device update", e);
        }
    }

节点收到DEVICE_UPDATE消息后会直接调用本节点中的方法对数据进行更新,源码如下:

    private DeviceEvent createOrUpdateDeviceInternal(ProviderId providerId,
                                                     DeviceId deviceId,
                                                     Timestamped<DeviceDescription> deltaDesc) {

        // Collection of DeviceDescriptions for a Device
        Map<ProviderId, DeviceDescriptions> device
                = getOrCreateDeviceDescriptionsMap(deviceId);
        
        //对要操作的设备加锁操作
        synchronized (device) {
            // locking per device
            //已删除的设备不处理
            if (isDeviceRemoved(deviceId, deltaDesc.timestamp())) {
                log.debug("Ignoring outdated event: {}", deltaDesc);
                return null;
            }
            //获取出本节点维护的设备信息,没有就用传入的数据创建一个
            DeviceDescriptions descs = getOrCreateProviderDeviceDescriptions(device, providerId, deltaDesc);

            final Device oldDevice = devices.get(deviceId);
            final Device newDevice;
    
            //发来的DeviceDesc较新,则进行替换并组装到newDevice中
            if (deltaDesc == descs.getDeviceDesc() ||
                    deltaDesc.isNewer(descs.getDeviceDesc())) {
                // on new device or valid update
                descs.putDeviceDesc(deltaDesc);
                newDevice = composeDevice(deviceId, device);
            } else {
                // outdated event, ignored.
                return null;
            }
            
            
            
            if (oldDevice == null) {
                //本节点以前没有此设备的任何信息,就进行创建
                // REGISTER
                if (!deltaDesc.value().isDefaultAvailable()) {
                    return registerDevice(providerId, newDevice, deltaDesc.timestamp());
                }
                // ADD
                return createDevice(providerId, newDevice, deltaDesc.timestamp());
            } else {
                //更新设备信息为newDevice
                // UPDATE or ignore (no change or stale)
                return updateDevice(providerId, oldDevice, newDevice, deltaDesc.timestamp(),
                                    deltaDesc.value().isDefaultAvailable());
            }
        }
    }
    
    /**
     * Checks if given timestamp is superseded by removal request
     * with more recent timestamp.
     *
     * @param deviceId         identifier of a device
     * @param timestampToCheck timestamp of an event to check
     * @return true if device is already removed
     */
    private boolean isDeviceRemoved(DeviceId deviceId, Timestamp timestampToCheck) {
        Timestamp removalTimestamp = removalRequest.get(deviceId);
        if (removalTimestamp != null &&
                removalTimestamp.compareTo(timestampToCheck) > 0) {
            // removalRequest is more recent
            return true;
        }
        return false;
    }

createOrUpdateDeviceInternal方法中主要对传入的Timestamped与本节点存储的信息进行对比,根据时间戳进行对比,保持本节点中所存储的信息一直是最新的

SendAdvertisementTask

上面的DEVICE_UPDATE消息是设备信息变更后主动发送通知的处理流程。如仅有这一种变更通知,对于集群中节点会动态增删的场景则还不一定能保证数据的最终一致性。

这时,基于Gossip协议实现的SendAdvertisementTask方法则是核心。再回顾一下它的触发条件:

private long initialDelaySec = 5;
private long periodSec = 5;
backgroundExecutor.scheduleAtFixedRate(new SendAdvertisementTask(), initialDelaySec, periodSec, TimeUnit.SECONDS);

每5秒执行一次SendAdvertisementTask,也就是前文理论知识中提及到的传播。看一下具体实现:

    private final class SendAdvertisementTask implements Runnable {

        @Override
        public void run() {
            if (Thread.currentThread().isInterrupted()) {
                log.debug("Interrupted, quitting");
                return;
            }

            try {
                //本节点
                final NodeId self = clusterService.getLocalNode().id();
                //集群中的所有节点
                Set<ControllerNode> nodes = clusterService.getNodes();

                ImmutableList<NodeId> nodeIds = FluentIterable.from(nodes)
                        .transform(toNodeId())
                        .toList();
                
                //集群中只有一个节点,不进行传播
                if (nodeIds.size() == 1 && nodeIds.get(0).equals(self)) {
                    log.trace("No other peers in the cluster.");
                    return;
                }

                //随机选择一个集群中的其它节点
                NodeId peer;
                do {
                    int idx = RandomUtils.nextInt(0, nodeIds.size());
                    peer = nodeIds.get(idx);
                } while (peer.equals(self));
                
                //创建一个公告消息
                DeviceAntiEntropyAdvertisement ad = createAdvertisement();

                if (Thread.currentThread().isInterrupted()) {
                    log.debug("Interrupted, quitting");
                    return;
                }
                
                //向选择的节点进行单播,传递公告消息
                try {
                    unicastMessage(peer, DEVICE_ADVERTISE, ad);
                } catch (IOException e) {
                    log.debug("Failed to send anti-entropy advertisement to {}", peer);
                    return;
                }
            } catch (Exception e) {
                // catch all Exception to avoid Scheduled task being suppressed.
                log.error("Exception thrown while sending advertisement", e);
            }
        }
    }

将以上代码总结一下:
SendAdvertisementTask每间隔5秒执行一次,主要功能为:根据当前的设备信息生成一条公告消息,并在集群中随机选择一个节点,将公告消息进行传递

公告消息生成代码如下:

    private DeviceAntiEntropyAdvertisement createAdvertisement() {
        //当前节点
        final NodeId self = clusterService.getLocalNode().id();
        
        final int numDevices = deviceDescs.size();
        //生成公告消息map
        Map<DeviceFragmentId, Timestamp> adDevices = new HashMap<>(numDevices);
        final int portsPerDevice = 8; // random factor to minimize reallocation
        Map<PortFragmentId, Timestamp> adPorts = new HashMap<>(numDevices * portsPerDevice);
        Map<DeviceId, Timestamp> adOffline = new HashMap<>(numDevices);

        deviceDescs.forEach((deviceId, devDescs) -> {
            //遍历每个设备生成公告消息
            // for each Device...
            synchronized (devDescs) {
                
                //填充离线设备ID
                // send device offline timestamp
                Timestamp lOffline = this.offline.get(deviceId);
                if (lOffline != null) {
                    adOffline.put(deviceId, lOffline);
                }
                
                
                for (Entry<ProviderId, DeviceDescriptions>
                        prov : devDescs.entrySet()) {

                    // for each Provider Descriptions...
                    final ProviderId provId = prov.getKey();
                    final DeviceDescriptions descs = prov.getValue();
                    //放入设备描述信息的时间戳
                    adDevices.put(new DeviceFragmentId(deviceId, provId),
                                  descs.getDeviceDesc().timestamp());
                    
                    for (Entry<PortNumber, Timestamped<PortDescription>>
                            portDesc : descs.getPortDescs().entrySet()) {
                        //放入设备端口信息的时间戳
                        final PortNumber number = portDesc.getKey();
                        adPorts.put(new PortFragmentId(deviceId, provId, number),
                                    portDesc.getValue().timestamp());
                    }
                }
            }
        });
        //生成反熵公告消息
        return new DeviceAntiEntropyAdvertisement(self, adDevices, adPorts, adOffline);
    }

与前文理论知识部分提到的一样,为了提高反熵消息在节点中传播的性能,传播的消息一般仅传播摘要消息,这里的createAdvertisement方法中仅传递时间戳也是一种很好的实现方法。

handleDeviceAdvertisement

再看一下节点收到公告消息的处理细节,即GossipDeviceStore初始化部分的DEVICE_ADVERTISE消息,处理方法为handleDeviceAdvertisement

addSubscriber(DEVICE_ADVERTISE, this::handleDeviceAdvertisement);

源码如下:

    /**
     * Responds to anti-entropy advertisement message.
     * <p>
     * Notify sender about out-dated information using regular replication message.
     * Send back advertisement to sender if not in sync.
     *
     * @param advertisement to respond to
     */
    private void handleAdvertisement(DeviceAntiEntropyAdvertisement advertisement) {
        /*
         * NOTE that when an instance rejoins the cluster, it will generate
         * device events and send to the local apps through the delegate. This
         * approach might be not the best if the apps are not enough robust or
         * if there is no proper coordination in the cluster. Also, note that
         * any ECMap will act on the same way during the bootstrap process
         */
        final NodeId sender = advertisement.sender();

        Map<DeviceFragmentId, Timestamp> devAds = new HashMap<>(advertisement.deviceFingerPrints());
        Map<PortFragmentId, Timestamp> portAds = new HashMap<>(advertisement.ports());
        Map<DeviceId, Timestamp> offlineAds = new HashMap<>(advertisement.offline());

        // Fragments to request
        Collection<DeviceFragmentId> reqDevices = new ArrayList<>();
        Collection<PortFragmentId> reqPorts = new ArrayList<>();
        
        //遍历本节点中的设备
        for (Entry<DeviceId, Map<ProviderId, DeviceDescriptions>> de : deviceDescs.entrySet()) {
            final DeviceId deviceId = de.getKey();
            final Map<ProviderId, DeviceDescriptions> lDevice = de.getValue();

            //对遍历的设备加锁处理
            synchronized (lDevice) {
                // latestTimestamp across provider
                // Note: can be null initially
                // 从下线设备MAP中获取到此设备,可能为空
                Timestamp localLatest = offline.get(deviceId);

                // handle device Ads
                for (Entry<ProviderId, DeviceDescriptions> prov : lDevice.entrySet()) {
                    final ProviderId provId = prov.getKey();
                    final DeviceDescriptions lDeviceDescs = prov.getValue();

                    final DeviceFragmentId devFragId = new DeviceFragmentId(deviceId, provId);


                    Timestamped<DeviceDescription> lProvDevice = lDeviceDescs.getDeviceDesc();
                    Timestamp advDevTimestamp = devAds.get(devFragId);

                    if (advDevTimestamp == null || lProvDevice.isNewerThan(
                            advDevTimestamp)) {
                        // remote does not have it or outdated, suggest
                        log.trace("send to {} device update {} for {}", sender, lProvDevice, deviceId);
                        // 对端节点的device信息已过期,使用DEVICE_UPDATE将设备信息推送给对端
                        notifyPeer(sender, new InternalDeviceEvent(provId, deviceId, lProvDevice));
                    } else if (!lProvDevice.timestamp().equals(advDevTimestamp)) {
                        // local is outdated, request
                        log.trace("need update {} < {} for device {} from {}", lProvDevice.timestamp(),
                                advDevTimestamp, deviceId, sender);
                        // 本地节点的数据已过期,添加到请求集合中以拉取新设备信息
                        reqDevices.add(devFragId);
                    }

                    // handle port Ads
                    ……
                    ……
                    ……
                    // remove device Ad already processed
                    devAds.remove(devFragId);

                    // find latest and update
                    final Timestamp providerLatest = lDeviceDescs.getLatestTimestamp();
                    if (localLatest == null ||
                            providerLatest.compareTo(localLatest) > 0) {
                        localLatest = providerLatest;
                    }
                } // end local provider loop

                // checking if remote timestamp is more recent.
                Timestamp rOffline = offlineAds.get(deviceId);
                if (localLatest == null || (rOffline != null && rOffline.compareTo(localLatest) > 0)) {
                    // 对比对端节点与本节点的离线设备数据,识别出已过期数据
                    // remote offline timestamp suggests that the
                    // device is off-line
                    log.trace("remote offline timestamp from {} suggests that the device {} is off-line",
                            sender, deviceId);
                    // 本地节点数据已过期,本节点数据将其标注为已下线
                    markOfflineInternal(deviceId, rOffline);
                }

                Timestamp lOffline = offline.get(deviceId);
                if (lOffline != null && rOffline == null) {
                    // locally offline, but remote is online, suggest offline
                    log.trace("suggest to {} sthat the device {} is off-line", sender, deviceId);
                    // 本节点中device已离线,对端节点device仍在线,发送DEVICE_STATUS_CHANGE消息通知对端节点设备已离线
                    notifyPeer(sender, new InternalDeviceStatusChangeEvent(deviceId, lOffline, false));
                }

                // remove device offline Ad already processed
                offlineAds.remove(deviceId);
            } // end local device loop
        } // device lock

        // If there is any Ads left, request them
        log.trace("Ads left {}, {}", devAds, portAds);
        reqDevices.addAll(devAds.keySet());
        reqPorts.addAll(portAds.keySet());

        if (reqDevices.isEmpty() && reqPorts.isEmpty()) {
            log.trace("Nothing to request to remote peer {}", sender);
            //没有需要向外发送的pull消息,处理流程结束
            return;
        }

        log.debug("Need to sync {} {}", reqDevices, reqPorts);

        // 2-way Anti-Entropy for now
        try {
            // 本节点有需要pull的消息,向对端信息发送DEVICE_ADVERTISE消息以被动获取最新数据
            unicastMessage(sender, DEVICE_ADVERTISE, createAdvertisement());
        } catch (IOException e) {
            log.error("Failed to send response advertisement to " + sender, e);
        }

    }

在handleDeviceAdvertisement方法中定义了处理DEVICE_ADVERTISE消息的详细过程,即:节点间进行数据交换的具体过程。

收到远程节点发送的DEVICE_ADVERTISE消息后,本节点会与远程节点中的数据做对比,如本节点中的数据较新,本节点会将它所拥有的较新设备数据发送给对端节点;如本节点中的数据较旧,本节点也会向对端节点发送一个DEVICE_ADVERTISE消息以获得对端节点中的较新数据。

总结

ONOS中设备信息的存储基于GOSSIP协议实现,节点间进行数据交换主要有定时随机传播数据变更后主动广播两种方式。

通信的实现上基于atomix组件(底层基于NETTY)使得用户无需关心具体的通信过程。在节点数据交换的过程中,主要使用PUSH的方式进行数据交换。首次PUSH时本节点仅传递本节点的摘要信息(基于时间戳)推送到对端节点,对端节点收到摘要数据后进行对比,将新的数据PUSH到源节点。

基于gossip实现的设备信息存储,数据的落地都在各节点的内存中,这种方式可以向本节点的服务提供快速的访问,通过节点间数据的不定时传播也达到了数据的最终一致性。结合ONOS中某一个设备仅能拥有一个主节点的背景,设备信息使用gossip协议实现确实是一种比较好的选择。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1317617.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

XXE漏洞 [NCTF2019]Fake XML cookbook1

打开题目 查看源代码 发现我们post传入的数据都被放到了doLogin.php下面 访问一下看看 提示加载外部xml实体 bp抓包一下看看 得到flag 或者这样 但是很明显这样是不行的&#xff0c;因为资源是在admin上&#xff0c;也就是用户名那里 PHP引用外部实体&#xff0c;常见的利用…

【23-24 秋学期】NNDL 作业11 LSTM

目录 习题6-4 推导LSTM网络中参数的梯度&#xff0c; 并分析其避免梯度消失的效果 习题6-3P 编程实现下图LSTM运行过程 &#xff08;一&#xff09;numpy实现 &#xff08;二&#xff09;使用nn.LSTMCell实现 &#xff08;三&#xff09; 使用nn.LSTM实现 总结 &#x…

PMI相关证书的获取步骤及注意内容

近几年很多行业的从业人员都在考取PMI项目管理相关证书&#xff0c;可在中国大陆地区参加考试的认证主要有&#xff1a;PMP, PgMP, PMI-RMP, PMI-ACP, PMI-PBA, CAPM。PfMP, PMI-SP尚未在中国大陆地区开放考试。 现整理该类证书的相关获取步骤及注意内容 一、证书获取步骤 S…

动态规划(Dynamic Programming)

动态规划&#xff08;Dynamic Programming&#xff09;&#xff1a;是运筹学的一种最优化方法&#xff0c;只不过在计算机问题上应用比较多 DP常见步骤&#xff1a; 暴力递归/穷举记忆化搜索&#xff08;傻缓存 递归&#xff09;,使用备忘录/ DP Table 来优化穷举过程严格表结…

锁--07_2---- index merge(索引合并)引起的死锁

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 案例分析生产背景死锁日志表结构执行计划 EXPLAN为什么会用 index_merge&#xff08;索引合并&#xff09;为什么用了 index_merge就死锁了解决方案注&#xff1a;M…

SQL基础:操作环境搭建

在上一节中&#xff0c;我们简单讲述了数据库和SQL的基本概念。 本节我们讲述一下环境搭建&#xff0c;为下一节讲表的基本操作做下铺垫。 环境搭建 具体到操作&#xff0c;我们就要准备一些环境了。如果不进行练习&#xff0c;我们学习的知识将很快被遗忘。 MySQL安装&…

如何使用Lychee结合内网穿透搭建本地私人图床网站并实现远程访问

文章目录 1.前言2. Lychee网站搭建2.1. Lychee下载和安装2.2 Lychee网页测试2.3 cpolar的安装和注册 3.本地网页发布3.1 Cpolar云端设置3.2 Cpolar本地设置 4.公网访问测试5.结语 1.前言 图床作为图片集中存放的服务网站&#xff0c;可以看做是云存储的一部分&#xff0c;既可…

四舍五入浮点数

1.题目如下&#xff1a; 2.方法一&#xff1a; 直接取出小数部分第一位来判断。 1. 先乘以10。 2. 强制类型转换为整型&#xff0c;去掉小数部分。 3. 再模10&#xff0c;相当于取出原数的小数第一位。 代码实现&#xff1a; int way1(double n) {int a (int)(n * 10);int b…

kafka学习笔记--Kafka副本

本文内容来自尚硅谷B站公开教学视频&#xff0c;仅做个人总结、学习、复习使用&#xff0c;任何对此文章的引用&#xff0c;应当说明源出处为尚硅谷&#xff0c;不得用于商业用途。 如有侵权、联系速删 视频教程链接&#xff1a;【尚硅谷】Kafka3.x教程&#xff08;从入门到调优…

一些关于fMRI脑数据的预处理工具

一些关于fMRI脑数据的预处理工具 前言概述SPM12工具箱FSL工具箱FreeSurfer工具箱BrainNet Viewer工具箱circularGraph工具箱Nipype集成框架fMRIPrep集成框架参考文献 前言 March 25, 2022 这里是关于fMRI脑数据的预处理工具的相关调研 主要是关于数据的预处理&#xff0c;数据…

万兆网络之屏蔽线序接法(中)

在介绍优质网线选购之前&#xff0c;先简单介绍一下水晶头 1毛钱一颗跟1元一颗的水晶头&#xff0c;往往是金手指厚度差别&#xff0c;你可以想象压制的时候可能会有什么情况 另外&#xff0c;一些3元一颗的镀金水晶头会有15U、30U之类的是电镀厚度单位&#xff0c;数值越大镀…

【数据挖掘】国科大苏桂平老师数据库新技术课程作业 —— 第四次作业

云数据库研究 云计算与云数据库背景 云计算&#xff08;cloud computing&#xff09;是 IT 技术发展的最新趋势&#xff0c;正受到业界和学术界的广泛关注。云计算是在分布式处理、并行处理和网格计算等技术的基础上发展起来的&#xff0c;是一种新兴的共享基础架构的方法。它…

java内置的数据结构

Java语言提供了许多内置的数据结构&#xff0c;包括&#xff1a; 1. 数组&#xff08;Array&#xff09;&#xff1a;数组是最基本的数据结构之一&#xff0c;它是一个有序的元素集合&#xff0c;每个元素都有一个对应的索引。在Java中&#xff0c;数组可以通过声明和初始化来创…

2023年金属非金属矿山(地下矿山)安全管理人员证模拟考试题库及金属非金属矿山(地下矿山)安全管理人员理论考试试题

题库来源&#xff1a;安全生产模拟考试一点通公众号小程序 2023年金属非金属矿山&#xff08;地下矿山&#xff09;安全管理人员证模拟考试题库及金属非金属矿山&#xff08;地下矿山&#xff09;安全管理人员理论考试试题是由安全生产模拟考试一点通提供&#xff0c;金属非金…

《软件方法(下)》第8章2023版8.1 分析工作流概述

DDD领域驱动设计批评文集 做强化自测题获得“软件方法建模师”称号 《软件方法》各章合集 第8章 分析 之 分析类图——知识篇 墙上挂了根长藤&#xff0c;长藤上面挂铜铃 《长藤挂铜铃》&#xff1b;词&#xff1a;元庸&#xff0c;曲&#xff1a;梅翁&#xff08;姚敏&…

手麻、腿麻、麻痛…背后竟隐藏7大疾病!多一个人知道,少一个悲剧!

手脚麻木背后的7大病症&#xff1a;骨病、脑梗、肿瘤…… 1、神经问题 上图四只手上橙色的区域代表了麻木感&#xff0c;如果您的手麻集中在无名指和小指的区域&#xff0c;您可以拿一张纸&#xff0c;用五个手指分别试着夹住&#xff0c;检验您的五个手指力量&#xff1b;您还…

软件测试之鲁棒性测试

文章目录 前言一、鲁棒性测试是什么&#xff1f;二、鲁棒性测试的目的三、测试原理3.1 错误数据处理3.2 异常情况处理 前言 Bootloader软件刷写鲁棒性(Robustness)测试是指对Bootloader软件进行连续多次的刷写测试&#xff0c;且一次Fail都没发生&#xff0c;以此验证Bootload…

MySql的增、删、改、查(MySql数据库学习——五)

增&#xff08;数据添加/插入数据&#xff09; 使用 INSERT INTO SQL 语句来插入数据。我们可以通过 mysql> 命令提示窗口中向数据表中插入数据&#xff0c;或者 通过PHP 脚本来插入数据。 sql语句&#xff1a; INSERT INTO table_name ( field1, field2,...fieldN ) …

系列九、事务

一、事务 1.1、概述 事务是一组操作的集合&#xff0c;它是一个不可分割的工作单位&#xff0c;事务会把所有的操作作为一个整体一起向系统提交或者撤销操作请求&#xff0c;即&#xff1a;这些操作要么同时成功&#xff0c;要么同时失败。 例如: 张三给李四转账1000块钱&…

UI自动化Selenium 测试报告BeautifulReport使用及修改

一、BeautifulReport安装 pip安装 pip install BeautifulReport Pycharm中安装 二、原生报告样式 原生报告&#xff0c;因为我使用ddtunittest数据驱动模式&#xff0c;所以Excel中所有参数都会被拼接出来&#xff0c;导致测试方法里面有太多不需要展示的内容&#xff1b; …