分布式异步任务处理组件(七)

news2025/1/12 10:39:05

分布式异步任务处理组件底层网络通信模型的设计--如图:

  1. 使用Java原生NIO来实现TCP通信模型
  2. 普通节点维护一个网络IO线程,负责和主节点的网络数据通信连接--这里的网络数据是指组件通信协议之下的直接面对字节流的数据读写,上层会有另一个线程负责网络通信协议的实现;---也就是说维护一个selector线程,负责处理socketchannel的IO事件;
  3. Leader节点网络通信层有多个线程--一个selector线程负责接受其他节点的连接请求,然后为每个连接建立一个线程并分配单独的selector来处理各自连接上的IO事件--如此设计的原因是各节点的状态严格依赖与主节点的心跳和其他通信,防止主节点线程阻塞导致心跳失败;从而引发节点下线带来的大量同步工作--后续会聊到;
  4. 各节点网络通信线程之上会有一个线程专门负责组件的网络通信协议,就是将网络传输的字节流解码成组件的通信协议包,因为NIO的buffer是数据块,所以首先通过读写队列将字节转化为字节流,通过协议转化为网络通信命令包,同时解决粘包半包等问题;
  5. 网络通信线程和协议实现线程之间通过读写两个队列来实现(网络IO线程的读队列就是协议线程的写队列,反过来一样,所以这里读写队列是相对的;),为了保证性能,避免重复创建对象和对象回收,设计了ByteBuffer缓存机制和异步读写队列数据结构--详细结构如图--
  6. 说一下三个队列--读写队列和缓存队列,用来实现IO通信线程和协议通信线程之间的数据通信--两个线程基本上会轮训处理网络IO事件,和上层协议事件,基本过程如下--
    1. 从网络IO线程角度出发--
      1. 当产生可读事件时,网络IO线程会从缓存队列中获取一个空的ByteBuffer,这里设计为当没有可用的缓存Buffer对象时会新建一个--具体在队列实现里讲,可能会产生写扩张现象,后期性能优化时考虑加入回收机制;
      2. 将socket缓冲区中的网络数据read进Buffer中,然后将Buffer对象入队到IO写队列中;
      3. 然后检查IO读队列不为空时,对IO读队列出队,获取要发送的数据Buffer对象,发送到其他节点中;
  7. 异步多线程队列,支持两个线程同时出队入队操作;原理和代码贴下来,基本实现:
package org.example.web.buffer;

import org.example.web.api.SocketBufferQueue;

import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;

public class AsynchronousQueue<T extends AbstractBuffer> implements SocketBufferQueue {
    //异步读写队列实现原理;
    /*
    * 当队列中的元素个数>1时,读线程和写线程可以同时进行,因为这时候不涉及操作共享变量
    *当队列中的元素个数<=1时,读写队列中只能有一个线程操作读或者写,因为此时会涉及队列头尾指针的操作;
    * 实现原理,写线程在获取写锁时可以正常做写操作:此时有两种情况--
    *     1,获取写锁之后队列为空,此时不会有读线程做读操作,只有获得写锁的该线程可以put,put完成之后将头尾指针同时指向改为以元素即可;此时队列元素个数为1;
    *     2,获取写锁之后队列中只有一个元素,这时也可以保证只有该线程在做写入,因为只有一个元素的情况下,读线程要读取该元素必须同时获得读锁和写锁;此时队列元素个数为2;
    *     3,读线程获取读锁之后有三种情况;size>1;size=1;size=0;
    *     4, 重点是保证不能多个线程同时进入队列元素为零的状态;就是读线程消费了最后一个元素,正好此时写线程在队列为空的时候写入,读写线程会同时操作头尾指针,造成错乱,所以在元素数量为1
    * 的时候就要进行同步操作;原理:
    *           1.读线程获取读锁之后如果size=1,此时不会先消费,而是试图获取写锁,防止此时有写线程同时操作,获取写锁之后再判断size是否为1,如果为1则做出队操作,然后释放写锁,如果为2则直接释放写锁--再进行出队操作;
    *           2,这里读线程获取读锁之后判断size=1,再获取读锁成功之后有两种情况--
    *                   1,有写线程在读线程之前获取到了写锁,则读线程获取到写锁的时候size>=2了(可能不止一个),
    *                   2,判断size=1之后直接获取到了写锁,此时就应该阻塞其他写线程做入队操作,等待自己完成出队操作之后再释放写锁;
    *     5,再说一下size怎么保证同步,
    *           1,在size<=1的时候严格保证线程同步操作,保证size;
    *           2,在size>1的时候,此时可以理解为队列同时在出队和入队,size在两个线程操作的时候先出队-1还是先入队+1其实是没有关系的,因为原子操作保证了最后结果是没有问题的就行;
    * */
    private AtomicInteger size;
    protected T head;
    protected T tail;
    private Object readLock;
    private Object writeLock;
    //这里考虑使用cas还是Synchronized


    AsynchronousQueue(){
        this.writeLock=new Object();
        this.readLock=new Object();
    }
    AsynchronousQueue(int initSize){
        this();
        this.size=new AtomicInteger(initSize);
    }
    //空队列初始化要创建一个node
    AsynchronousQueue(T node){
        this(1);
        this.head=node;
        this.tail=this.head;
    }
    public boolean offerFirstOne(T node){
        synchronized (this.writeLock){
            if(this.size.get()>0){
                return false;
            }
            this.head=this.tail=node;
            return this.size.compareAndSet(0,1);
        }
    }

    public boolean offer(T node){
        preOfferElement(node);
        synchronized (this.writeLock){
            if(this.size.get()==0){
                return this.offerFirstOne(node);
            }else{
                T temp=this.head;
                node.next=temp;
                temp.pre=node;
                this.head=node;
            }
            return this.size.incrementAndGet() > 1;
        }
    }
    private void preOfferElement(T bufferNode){
        bufferNode.next=null;
        bufferNode.pre=null;
    }
    public T pollLastOne(){
        return this.size.compareAndSet(1,0)?this.tail:null;
    }

    public T poll(){
        synchronized (this.readLock){
            if(this.size.get()==0){
                return null;
            }
            if(this.size.get()==1){
                synchronized (this.writeLock){
                    if(this.size()>1){
                        return this.getTailElement();
                    }
                    if(this.size()==1){
                        this.pollLastOne();
                    }
                }
            }
            return this.getTailElement();
        }
    }

    private T getTailElement(){
        if(this.size()>1){
            this.tail= (T) this.tail.pre;
            this.size.decrementAndGet();
            return (T) this.tail.next;
        }
        return null;
    }

    public int size(){
        return this.size.get();
    }
    public int increamentSize(){
        return this.size.incrementAndGet();
    }
    public int decrementSize(){
        return this.size.decrementAndGet();
    }
    private class BufferNode{
        private ByteBuffer buffer;
        private BufferNode pre;
        private BufferNode next;
        BufferNode(ByteBuffer byteBuffer){
            this.buffer=byteBuffer;
        }
        BufferNode(){
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/839033.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

装饰器模式(C++)

定义 动态(组合)地给一个对象增加一些额外的职责。就增加功能而言&#xff0c;Decorator模式比生成子类(继承)更为灵活(消除重复代码&减少子类个数)。 一《设计模式》 GoF 装饰器模式&#xff08;Decorator Pattern&#xff09;允许向一个现有的对象添加新的功能&#xf…

视频安防监控EasyCVR平台海康大华设备国标GB28181告警布防的报文说明

TSINGSEE青犀视频监控综合管理平台EasyCVR基于云边端协同&#xff0c;可支持海量视频的轻量化接入与汇聚管理。平台既具备传统安防视频监控的能力&#xff0c;比如&#xff1a;视频监控直播、云端录像、云存储、录像检索与回看、告警上报、平台级联、云台控制、语音对讲等&…

第一课-前提-Stable Diffusion 教程

学习 SD 的前提是电脑配置! SD 参考配置: 建议选择台式机 i5 CPU, 内存16GB,N卡 RTX3060, 8G显存以上的配置(最低配) 在此基础上的配置越高越好。 比如,cpu i7 更好,显卡能有 RTX4090 更好,32显存要能有最好,嘿嘿嘿。 如何查看自己的显卡配置? Win+R 输入 “dxdiag…

vue2-v-if和v-for的优先级是什么?

1、v-if和v-for的区别 作用&#xff1a; v-if指令用于条件性地渲染一块内容&#xff0c;这块内容只会在指令的表达式返回true值的时候被渲染。 v-for指令基于一个数组来渲染一个列表&#xff0c;v-for指令需要使用item in items 形式的特殊语法&#xff0c;其中&#xff0c;it…

自学(黑客)技术,从入门到精通!

1.网络安全是什么 网络安全可以基于攻击和防御视角来分类&#xff0c;我们经常听到的 “红队”、“渗透测试” 等就是研究攻击技术&#xff0c;而“蓝队”、“安全运营”、“安全运维”则研究防御技术。 2.网络安全市场 一、是市场需求量高&#xff1b; 二、则是发展相对成熟入…

Vue3 模板语法简单应用

去官网学习-》 模板语法 | Vue.js 运行示例&#xff1a; 代码&#xff1a;HelloWorld.vue <template><div class"hello"><h1>Vue 模板语法</h1><h2>{{ msg }}</h2><h2>{{ textHtml }}</h2><h2 v-html"text…

【单运放RC振荡器1负反馈方波2正反馈正弦波方波】2021-12-9

缘由multisim运放芯片给一个连上电源另一个引脚打叉不能仿真-测试-CSDN问答 正反馈电阻影响转化阀值,负反馈电阻影响频率 固定负反馈为2时无法起震荡,电位器调节到2%可振荡且波形失真,电位器调节到0%且固定负反馈为2时才能正弦波输出

Spring Boot 集成Seata

Seata的集成方式有&#xff1a; 1. Seata-All 2. Seata-Spring-Boot-Starter 3. Spring-Cloud-Starter-Seata 本案例使用Seata-Spring-Boot-Starter演示&#xff1a; 第一步&#xff1a;下载Seata 第二步&#xff1a;为了更好看到效果&#xff0c;我们将Seata的数据存储改…

MyBatis 查询数据库之二(增、删、改、查操作)

目录 1. 配置打印 MyBatis 执行的SQL 2. 查询操作 2.1 通过用户 ID 查询用户信息、查询所有用户信息 (1) Mapper 接口 (2)UserMapper.xml 查询所有用户的具体实现 SQL (3)进行单元测试 3. 增加操作 3.1 在 mapper&#xff08;interface&#xff09;里面添加增加方法的声…

git仓库与本地暂存区的同步问题

向下同步 对于远程仓库的项目&#xff0c;初始化一个配置文件&#xff0c;配置远程仓库及相关信息&#xff0c;赋值远程仓库的地址&#xff0c;使用git pull命令即可拉取仓库代码。 git pull [remote_addr] 该部分完成向下同步 向上同步 向上同步时会遇到很多的问题&#xf…

viewerjs 如何新增下载图片功能(npm包补丁)

文章目录 先实现正常的效果实现下载图片改变viewerjs的build函数源码改变之后&#xff0c;执行npm i 之后node_modules源码又变回了原样 1、viwerjs所有功能都很完善&#xff0c;但唯独缺少了图片的下载 2、需求&#xff1a;在用viwerjs旋转图片后&#xff0c;可以直接下载旋转…

计算机视觉与图形学-神经渲染专题-Seal-3D(基于NeRF的像素级交互式编辑)

摘要 随着隐式神经表示或神经辐射场 (NeRF) 的流行&#xff0c;迫切需要与隐式 3D 模型交互的编辑方法&#xff0c;以完成后处理重建场景和 3D 内容创建等任务。虽然之前的作品从不同角度探索了 NeRF 编辑&#xff0c;但它们在编辑灵活性、质量和速度方面受到限制&#xff0c;无…

【CI/CD】图解六种分支管理模型

图解六种分支管理模型 任何一家公司乃至于一个小组织&#xff0c;只要有写代码的地方&#xff0c;就有代码版本管理的主场&#xff0c;初入职场&#xff0c;总会遇到第一个拦路虎 git 管理流程&#xff0c;但是每一个企业似乎都有自己的 git 管理流程&#xff0c;倘若我们能掌握…

深度学习之双线性插值

1、单线性插值 单线性插值是一种用于估计两个已知数据点之间未知点的方法。它基于线性关系&#xff0c;通过计算目标位置的值&#xff0c;使用已知点之间的线性函数进行插值。这在图像处理中常用于放缩、旋转等操作&#xff0c;计算简单&#xff0c;产生平滑结果&#xff0c;但…

zookeeper集群和kafka的相关概念就部署

目录 一、Zookeeper概述 1、Zookeeper 定义 2、Zookeeper 工作机制 3、Zookeeper 特点 4、Zookeeper 数据结构 5、Zookeeper 应用场景 &#xff08;1&#xff09;统一命名服务 &#xff08;2&#xff09;统一配置管理 &#xff08;3&#xff09;统一集群管理 &#xff08;4&a…

关于openwrt的802.11w 管理帧保护使用

目录 关于openwrt的802.11w 管理帧保护使用先来看看界面上的提示 实际遇到的问题总结 关于openwrt的802.11w 管理帧保护使用 先来看看界面上的提示 注意&#xff1a;有些无线驱动程序不完全支持 802.11w。例如&#xff1a;mwlwifi 可能会有一些问题 实际遇到的问题 802.11w 管…

如何维护你的电脑:打造IT人的重要武器

文章目录 方向一&#xff1a;介绍我的电脑方向二&#xff1a;介绍我的日常维护措施1. 定期清理和优化2. 保持良好的上网习惯和安全防护3. 合理安排软件和硬件的使用4. 数据备份和系统还原 方向三&#xff1a;推荐的维护技巧1. 数据分区和多系统安装2. 内部清洁和散热优化3. 安全…

vue2商城项目-01-总结

1.配置相关 1.1eslint关闭 说明&#xff1a;根目录创建vue.config.js module.exports {// 关闭eslintlintOnSave: false,};1.2src配置别名 说明&#xff1a;根目录创建jsconfig.json文件 {"compilerOptions": {"baseUrl": "./","path…

通俗理解大模型的各大微调方法:从LoRA、QLoRA到P-Tuning V1/V2

前言 PEFT 方法仅微调少量(额外)模型参数&#xff0c;同时冻结预训练 LLM 的大部分参数 第一部分 高效参数微调的发展史 1.1 Google之Adapter Tuning&#xff1a;嵌入在transformer里 原有参数不变 只微调新增的Adapter 谷歌的研究人员首次在论文《Parameter-Efficient Tran…

代码随想录算法训练营day25 | 216. 组合总和 III,17. 电话号码的字母组合

目录 216. 组合总和 III 17. 电话号码的字母组合 216. 组合总和 III 难度&#xff1a;medium 类型&#xff1a;回溯 思路&#xff1a; 与77组合类似的题目。 代码随想录算法训练营day24 | 回溯问题&#xff0c;77. 组合_Chamberlain T的博客-CSDN博客 注意两处剪枝。 代码…