【开源项目】ShenYu网关中Disruptor的使用

news2025/1/9 2:20:43

模块封装

shenyu-disruptor定义了DisruptorProviderDisruptorProviderManageDataEventQueueConsumerFactoryDisrutporThreadFactory等一系列通用接口
该模块的搭建了一个disruptor的初始化框架,
DisruptorProviderManage提供Disruptor的初始化,可以在初始化是自定义参数,而初始化参数中,包含消费者工厂,初始化会将消费者工厂放置到QueueConsumer的成员变量当中,有QueueConsumer进行消息的侦听,一旦有消息,则由消费者工厂QueueConsumerFactory创建QueueConsumerExecutor进行消息的处理,QueueConsumerExecutor可以拿到消息,是具体的操作。而在DisruptorProviderManage对象中,成员变量provide是此次初始化的disruptor的生产者,由此provider进行消息的发布。
所以,这个模块是对disruptor的通用封装,可以使用任何类型的数据,外界使用该模块需要进行的操作是,继承QueueConsumerExecutor其executor方法用来写具体的逻辑操作,实现QueueConsumerFactory接口,用来创建自己的实现的QueueConsumerExecutor,将工厂类用做DisruptorProviderManage的构造参数,获得对象,之后调用DisruptorProviderManage对象的start方法进行disruptor的初始化,disruptor便启动了,启动之后,就可以正常使用disruptor了,之后发布消息,则使用DisruptorProviderManage对象获取provider,进行消息的发布和disruptor的关闭。

项目启动

RegisterClientServerDisruptorPublisher#start,启动DisruptorProviderManage

    public void start(final Map<String, ShenyuClientRegisterService> shenyuClientRegisterService) {
        RegisterServerExecutorFactory factory = new RegisterServerExecutorFactory();
        factory.addSubscribers(new URIRegisterExecutorSubscriber(shenyuClientRegisterService));
        factory.addSubscribers(new MetadataExecutorSubscriber(shenyuClientRegisterService));
        factory.addSubscribers(new ApiDocExecutorSubscriber(shenyuClientRegisterService));
        providerManage = new DisruptorProviderManage<>(factory);
        providerManage.startup();
    }

DisruptorProviderManage#startup(boolean),初始化Disruptor配置。

    public void startup(final boolean isOrderly) {
        OrderlyExecutor executor = new OrderlyExecutor(isOrderly, consumerSize, consumerSize, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
                DisruptorThreadFactory.create("shenyu_disruptor_consumer_", false), new ThreadPoolExecutor.AbortPolicy());
        int newConsumerSize = this.consumerSize;
        EventFactory<DataEvent<T>> eventFactory;
        if (isOrderly) {
            newConsumerSize = 1;
            eventFactory = new OrderlyDisruptorEventFactory<>();
        } else {
            eventFactory = new DisruptorEventFactory<>();
        }
        Disruptor<DataEvent<T>> disruptor = new Disruptor<>(eventFactory,
                size,
                DisruptorThreadFactory.create("shenyu_disruptor_provider_" + consumerFactory.fixName(), false),
                ProducerType.MULTI,
                new BlockingWaitStrategy());
        @SuppressWarnings("all")
        QueueConsumer<T>[] consumers = new QueueConsumer[newConsumerSize];
        for (int i = 0; i < newConsumerSize; i++) {
            consumers[i] = new QueueConsumer<>(executor, consumerFactory);
        }
        disruptor.handleEventsWithWorkerPool(consumers);
        disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
        disruptor.start();
        RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();
        provider = new DisruptorProvider<>(ringBuffer, disruptor, isOrderly);
    }

发布事件

ShenyuClientRegisterEventPublisher#publishEvent,发布事件

    public void publishEvent(final DataTypeParent data) {
        DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();
        provider.onData(data);
    }

DisruptorProvider#onData,调用ringBuffer处理数据

    public void onData(final T data) {
        if (isOrderly) {
            throw new IllegalArgumentException("The current provider is  of orderly type. Please use onOrderlyData() method.");
        }
        try {
            ringBuffer.publishEvent(translatorOneArg, data);
        } catch (Exception ex) {
            logger.error("ex", ex);
        }
    }

QueueConsumer#onEvent,处理数据

    @Override
    public void onEvent(final DataEvent<T> t) {
        if (t != null) {
            ThreadPoolExecutor executor = orderly(t);
            QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
            queueConsumerExecutor.setData(t.getData());
            // help gc
            t.setData(null);
            executor.execute(queueConsumerExecutor);
        }
    }

创建QueueConsumerExecutor,获取所有的getSubscribers,进行分组。

        @Override
        public QueueConsumerExecutor<Collection<DataTypeParent>> create() {
            Map<DataType, ExecutorTypeSubscriber<DataTypeParent>> maps = getSubscribers()
                    .stream()
                    .map(e -> (ExecutorTypeSubscriber<DataTypeParent>) e)
                    .collect(Collectors.toMap(ExecutorTypeSubscriber::getType, Function.identity()));
            return new RegisterServerConsumerExecutor(maps);
        }

处理事件

RegisterServerConsumerExecutor#run,线程执行,获取对应的ExecutorSubscriber,调用executor

    @Override
    public void run() {
        Collection<DataTypeParent> results = getData()
                .stream()
                .filter(this::isValidData)
                .collect(Collectors.toList());
        if (CollectionUtils.isEmpty(results)) {
            return;
        }
        selectExecutor(results).executor(results);
    }


    private ExecutorSubscriber<DataTypeParent> selectExecutor(final Collection<DataTypeParent> list) {
        final Optional<DataTypeParent> first = list.stream().findFirst();
        return subscribers.get(first.orElseThrow(() -> new RuntimeException("the data type is not found")).getType());
    }

相关博客

  • 【开源项目】Disruptor框架介绍及快速入门

  • 【源码解析】Disruptor框架的源码解析

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/566568.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

分布式事务的21种武器 - 4

在分布式系统中&#xff0c;事务的处理分布在不同组件、服务中&#xff0c;因此分布式事务的ACID保障面临着一些特殊难点。本系列文章介绍了21种分布式事务设计模式&#xff0c;并分析其实现原理和优缺点&#xff0c;在面对具体分布式事务问题时&#xff0c;可以选择合适的模式…

软件设计师数据结构速过

加法规则&#xff1a;多项相加&#xff0c;保留最高阶项&#xff0c;并将系数化为 1 乘法规则&#xff1a;多项相乘都保留&#xff0c;并将系数化为 1 加法乘法混合规则&#xff1a;先小括号再乘法规则最后加法规则 时间复杂度估算看最内层循环&#xff0c;如若没有循环和递归则…

终于!我们把 CEO 炒了,让 ChatGPT 出任 CEO

⚠️ FBI Warning&#xff1a;本文纯属作者自娱自乐&#xff0c;数字人的观点不代表 CEO 本人的观点&#xff0c;请大家不要上当受骗&#xff01;&#xff01; 哪个公司的 CEO 不想拥有一个自己的数字克隆&#xff1f; 想象&#x1f914;一下&#xff0c;如果 CEO 数字克隆上线…

【初识django】——django——如桃花来

目录索引 django引入&#xff1a;常见的web框架&#xff1a;下载问题&#xff1a;*下载Django之前确保工具不会发生版本问题*下载django:*检查是否下载成功&#xff1a;**注意事项&#xff1a;* 创建django项目&#xff1a;在cmd中创建&#xff1a;*整个命令流程&#xff1a;**…

React学习笔记六-refs

此文章是本人在学习React的时候&#xff0c;写下的学习笔记&#xff0c;在此纪录和分享。此为第六篇&#xff0c;主要介绍react中的refs。 目录 1.refs基本使用 1.1字符串类型ref小案例 2.回调形式的ref 2.1回调形式ref小案例 2.2回调ref中调用次数问题 3.createRef 3.…

SpringBoot 插件 spring-boot-maven-plugin 原理,以及SpringBoo工程部署的 jar 包瘦身实战

spring-boot-maven-plugin 我们直接使用 maven package &#xff08;maven自带的package打包功能&#xff09;&#xff0c;打包Jar包的时候&#xff0c;不会将该项目所依赖的Jar包一起打进去&#xff0c;在使用java -jar命令启动项目时会报错&#xff0c;项目无法正常启动。这…

TOP RPA·脱普×实在丨日用品企业脱普签约实在智能,构建全域数据智能运营系统

近日&#xff0c;实在智能与脱普日用化学品&#xff08;中国&#xff09;有限公司&#xff08;简称“脱普企业”&#xff09;在脱普企业上海总部举行“全域数据智能运营”项目启动会&#xff0c;双方领导及项目组关键成员共同参会&#xff0c;就项目目标、实施进程、沟通机制等…

Spring Boot中使用Spring Data Elasticsearch访问Elasticsearch

Spring Boot中使用Spring Data Elasticsearch访问Elasticsearch Elasticsearch是一个分布式的全文搜索和分析引擎&#xff0c;它可以将海量数据进行快速的查询和聚合。Spring Data Elasticsearch是Spring Data家族中的一个成员&#xff0c;它提供了与Elasticsearch的集成&…

一起来学习怎样识别表格文件吧

你有没有经历过手头有一堆纸质表格&#xff0c;但是又不想手动输入数据的烦恼&#xff1f;现在&#xff0c;表格识别计数的出现&#xff0c;可以帮助你轻松解决这个问题。它通过拍照扫描&#xff0c;来自动提取表格中的信息&#xff0c;并将其转化为可编辑的电子文档。那么&…

c# 动态表达式

准备&#xff1a; 创建一个空项目&#xff0c;nuget查找并安装ExpressionEvaluator 示例&#xff1a; using ExpressionEvaluator; using System; 一、计算简单表达式 public string Test1() { return SimpleEval("0.1*(Math.Pow(10,2)20)"); …

AI小作文搞崩科大讯飞股价 科技“魔法”反噬科企

5月24日午后&#xff0c;A股公司科大讯飞的股价突然走出深V造型&#xff0c;闪崩8%。科大讯飞回应称&#xff0c;股价下跌系某生成式AI写作虚假小作文导致&#xff0c;谣传风险为不实消息。 网传的一篇“小作文”谣称“科大讯飞被曝采集用户隐私数据研究人工智能引发争议”&am…

Windows下编写的shell脚本无法在Linux上执行

这通常是由于回车换行符不兼容导致的。 出现无法执行&#xff0c;提示诸如“ 未预期的符号“$\r”附近有语法错误”&#xff0c;“syntax error near unexpected token in”之类的错误&#xff0c;可尝试此文方法。 1.查看shell脚本的换行符格式 vi/vim进入文件&#xff0c;…

2023年湖北建筑起重信号司索工报名流程是什么?个人可以报名吗?

2023年湖北建筑起重信号司索工报名流程是什么&#xff1f;个人可以报名吗&#xff1f; 建筑起重信号司索工是特种作业人员工种即是建设厅特种工。证书全国通用&#xff0c;两年需要年审一次&#xff0c;六年需要换一次证。报考有一定的条件和要求。搜一下启程别就知道啦。 湖北…

【Leetcode】697. 数组的度

[哈希表] Given a non-empty array of non-negative integers nums, the degree of this array is defined as the maximum frequency of any one of its elements. Your task is to find the smallest possible length of a (contiguous) subarray of nums, that has the sa…

20230525下载youtube的字幕的方法

20230525下载youtube的字幕的方法 百度&#xff1a;youtube 字幕 (英语自动生成)下载 【可以直接下载字幕&#xff01;】 https://zhuanlan.zhihu.com/p/349506890?ivk_sa1025883i 下载YouTube油管字幕的2种方法 二&#xff0e;使用在线网站下载YouTube字幕文件 二&#xff0e…

【剧前爆米花--爪哇岛寻宝】TCP保证效率,应对特殊情况等相关机制

作者&#xff1a;困了电视剧 专栏&#xff1a;《JavaEE初阶》 文章分布&#xff1a;这是一篇关于网络编程的文章&#xff0c;在这篇文章中我会着重介绍TCP保证效率&#xff0c;应对特殊情况等相关机制&#xff0c;希望对你有所帮助&#xff01; 目录 效率 批量传输 滑动窗口 …

【九章刷题录】C/C++:数组中重复的数字(JZ3)

精品题解 &#x1f449; 九章刷题录&#x1f448; 猛戳订阅 JZ3 - 数组中重复的数字 &#x1f4dc; 目录&#xff1a; 「 法一 」暴力大法&#xff08;BF&#xff09; 「 法二 」排序 遍历 「 法三 」哈希集合 「 法四 」哈希无序集 「 法五 」原地哈希 「 法六 」Map …

北京筑龙作为软件服务商出席《国企阳光采购标准》研讨会

近日&#xff0c;由中国企业国有产权交易机构协会市场创新专业委员会主办、青岛产权交易所有限公司承办的《国企阳光采购标准》研讨会在青岛召开&#xff0c;该会议共有19家已开展采购业务的各地产权交易机构参加&#xff0c;北京筑龙作为软件服务商出席会议。 《国企阳光采购标…

【Linux】线程同步

文章目录 条件变量相关函数初始化条件变量-pthread_cond_init销毁条件变量-pthread_cond_destroy等待条件变量-pthread_cond_wait唤醒等待条件变量pthread_cond_broadcastpthread_cond_signal 小例子关于等待函数的补充条件变量使用规范 条件变量相关函数 初始化条件变量-pthr…

多台linux设备之间设置免密登陆

1、首先&#xff0c;先搞一个公钥&#xff0c;如果已经有公钥了&#xff0c;请自行跳过 我这是有了&#xff0c;如果没有&#xff0c;也没关系&#xff0c;造一个就好&#xff0c;命令为&#xff1a; ssh-keygen -t rsa 一路回车就行&#xff0c;看下执行结果&#xff1a; 2、…