Apache 神禹(shenyu)源码阅读(一)——Admin向Gateway的数据同步(Admin端)

news2025/1/23 10:30:38

源码版本:2.6.1

单机源码启动项目

启动教程:社区新人开发者启动及开发防踩坑指南

源码阅读

前言

开了个新坑,也是第一次阅读大型项目源码,写文章记录。

在写文章前,已经跑了 Divide 插件体验了一下(体验教程:Http快速开始)。

由于 shenyu 默认使用 H2 数据,但是我因为 IDEA 连接内存模式下的数据库有 BUG,连接不到,改用 MySQL(改用MySQL教程:Apache-Shenyu入门教程(demo实战及遇到的坑))。

认识 shenyu 架构以及本文的内容

shenyu 官方的一个架构图,红色圈部分是本文和下一篇文章研究的内容:
在这里插入图片描述

在查看 PluginChain 的过程中,想看 shenyu-admin(以下称 Admin)是如何向 Gateway 同步数据的。

同步数据我把它划分为三个部分:

  1. 一个是 Gateway 是如何连接上 Admin 的(通过 Websocket——shenyu 默认的同步方式)
  2. 一个是 Admin 通过 Websocket发送要同步的数据。
  3. 一个是 Gateway 从 Websocket 接收同步的数据进行同步。

本文研究第一个部分和第二个部分,下一篇研究第三个部分。

有博主(Apache ShenYu 源码阅读系列 - 基于 WebSocket 的数据同步)已经研究了这部分的内容,不过是21年的文章了,有些源码已经更新迭代过了,所以这篇文章就以最新的源码解读。

正文

1. 第一部分:Gateway 是如何连接上 Admin 的?

shenyu-bootstrap/src/main/resources/application.yml 中进行配置 websocket 属性。
在这里插入图片描述

对应的属性解释(来自官网https://shenyu.apache.org/zh/docs/user-guide/property-config/gateway-property-config):
在这里插入图片描述
如此 Admin(作为Server) 和 Gateway(作为Client)建立连接

2. 第二部分:Admin 如何通过 Websocket发送要同步的数据?

以创建 Selector 为例,解释在 Admin 创建的 Selector 是如何同步到 Gateway 的。

2.1 在 Divide 插件里创建一个新的 Selector

第1步:
在这里插入图片描述

第2步:
在这里插入图片描述

2.2 在新增 Selector 点击 Sure 后

请求会发到 shenyu-admin/src/main/java/org/apache/shenyu/admin/controller/SelectorController.java 的 #createSelector 方法中:

SelectorController.java在这里插入图片描述

2.3 进入104 行的 #createOrUpdate,也就是 SelectorService 接口的一个默认实现:

SelectorService.java
在这里插入图片描述

2.4 继续进入该接口的另一个方法 #create 中,来到 SelectorServiceImpl:

SelectorServiceImpl.java
在这里插入图片描述

这里我加的第 198 行注释看不懂没关系,接下来会解释这些注释。

2.5 先是 194 行划红线部分:

SelectorServiceImpl.java
在这里插入图片描述

2.5.1 Mybatis mapper

一个 Mybatis 的 mapper 配置,路径在 shenyu-admin/src/main/resources/mappers/selector-sqlmap.xml

selector-sqlmap.xml

<insert id="insertSelective" parameterType="org.apache.shenyu.admin.model.entity.SelectorDO">
        INSERT INTO selector
        <trim prefix="(" suffix=")" suffixOverrides=",">
                id,
            <if test="dateCreated != null">
                date_created,
            </if>
            <if test="dateUpdated != null">
                date_updated,
            </if>
            <if test="pluginId != null">
                plugin_id,
            </if>
            <if test="name != null">
                name,
            </if>
            <if test="matchMode != null">
                match_mode,
            </if>
            <if test="type != null">
                type,
            </if>
            <if test="sort != null">
                sort,
            </if>
            <if test="enabled != null">
                enabled,
            </if>
            <if test="loged != null">
                loged,
            </if>
            <if test="continued != null">
                continued,
            </if>
            <if test="matchRestful != null">
                match_restful,
            </if>
            <if test="handle != null">
                handle,
            </if>
        </trim>
        <trim prefix="values (" suffix=")" suffixOverrides=",">
                #{id, jdbcType=VARCHAR},
            <if test="dateCreated != null">
                #{dateCreated, jdbcType=TIMESTAMP},
            </if>
            <if test="dateUpdated != null">
                #{dateUpdated, jdbcType=TIMESTAMP},
            </if>
            <if test="pluginId != null">
                #{pluginId, jdbcType=VARCHAR},
            </if>
            <if test="name != null">
                #{name, jdbcType=VARCHAR},
            </if>
            <if test="matchMode != null">
                #{matchMode, jdbcType=INTEGER},
            </if>
            <if test="type != null">
                #{type, jdbcType=INTEGER},
            </if>
            <if test="sort != null">
                #{sort, jdbcType=INTEGER},
            </if>
            <if test="enabled != null">
                #{enabled, jdbcType=TINYINT},
            </if>
            <if test="loged != null">
                #{loged, jdbcType=TINYINT},
            </if>
            <if test="continued != null">
                #{continued, jdbcType=TINYINT},
            </if>
            <if test="matchRestful != null">
                #{matchRestful, jdbcType=TINYINT},
            </if>
            <if test="handle != null">
                #{handle, jdbcType=VARCHAR},
            </if>
        </trim>
    </insert>

可以看到是哪个属性不为空就写入数据库。

2.6 进入 197 行的SelectorServiceImpl 的一个实例方法 #createCondition 方法

SelectorServiceImpl.java在这里插入图片描述

同样还是SelectorServiceImpl.java
在这里插入图片描述
这里 selectorConditionMapper 和上面的 selectorMapper 类似,都是将属性选择性地插入数据库。

2.7 201 行的 #publishEvent

SelectorServiceImpl.java
在这里插入图片描述

2.7.1 进入该服务的 #publishEvent 后,方法如下:

/**
 * Implementation of the {@link org.apache.shenyu.admin.service.SelectorService}.
 * Maintain {@link SelectorDO} and {@link SelectorConditionDO} related data.
 */
@Service
public class SelectorServiceImpl implements SelectorService {
	// ...
	// Spring 框架的一个事件发布机制,事件发布者
	private final ApplicationEventPublisher eventPublisher;
	private final SelectorEventPublisher selectorEventPublisher;
	// ...
    private void publishEvent(final SelectorDO selectorDO, final List<SelectorConditionDTO> selectorConditions, final List<SelectorConditionDO> beforeSelectorCondition) {
        PluginDO pluginDO = pluginMapper.selectById(selectorDO.getPluginId());
        List<ConditionData> conditionDataList = ListUtil.map(selectorConditions, ConditionTransfer.INSTANCE::mapToSelectorDTO);
        List<ConditionData> beforeConditionDataList = ListUtil.map(beforeSelectorCondition, ConditionTransfer.INSTANCE::mapToSelectorDO);
        // build selector data.
        SelectorData selectorData = SelectorDO.transFrom(selectorDO, pluginDO.getName(), conditionDataList, beforeConditionDataList);
        // publish change event.
        // 将数据变动 DataChangedEvent 对象发布出去
        eventPublisher.publishEvent(new DataChangedEvent(ConfigGroupEnum.SELECTOR, DataEventTypeEnum.UPDATE,
                Collections.singletonList(selectorData)));
    }
}

小 tips:可以点击 publisher.publishEvent 旁边的带耳机的小图标,会跳转到监听这个事件的类中,如下图:
在这里插入图片描述

2.7.2 跳转到 DataChangedEventDispatcher,是这个分发器来监听 DatachangedEvent 的

DataChangedEventDispatcher.java

/**
 * Event forwarders, which forward the changed events to each ConfigEventListener.
 */
@Component
public class DataChangedEventDispatcher implements ApplicationListener<DataChangedEvent>, InitializingBean {
	// ...	
    @Override
    @SuppressWarnings("unchecked")
    public void onApplicationEvent(final DataChangedEvent event) {
        for (DataChangedListener listener : listeners) {
            switch (event.getGroupKey()) {
				// ...
                case SELECTOR:
                    listener.onSelectorChanged((List<SelectorData>) event.getSource(), event.getEventType());
                    break;
				// ...
                default:
                    throw new IllegalStateException("Unexpected value: " + event.getGroupKey());
            }
        }
    }
}
2.7.3 追踪 listener.onSelectorChanged() 方法,找到一个实现类 WebsocketDataChangedListener。

WebsocketDataChangedListener.java

public class WebsocketDataChangedListener implements DataChangedListener {
	// ...
    @Override
    public void onSelectorChanged(final List<SelectorData> selectorDataList, final DataEventTypeEnum eventType) {
        WebsocketData<SelectorData> websocketData =
                new WebsocketData<>(ConfigGroupEnum.SELECTOR.name(), eventType.name(), selectorDataList);
        // 由套接字收集器发送要同步的数据
        WebsocketCollector.send(GsonUtils.getInstance().toJson(websocketData), eventType);
    }
2.7.4 继续追踪 WebsocketCollector#send 方法,

WebsocketCollector.java

@ServerEndpoint(value = "/websocket", configurator = WebsocketConfigurator.class)
public class WebsocketCollector {
    // ...
    public static void send(final String message, final DataEventTypeEnum type) {
        if (StringUtils.isBlank(message)) {
            return;
        }
        // 如果是 MYSELF,是全量数据,从 ThreadLocal 中拿到 session,主动发消息 push
        if (DataEventTypeEnum.MYSELF == type) {
            Session session = (Session) ThreadLocalUtils.get(SESSION_KEY);
            if (Objects.nonNull(session)) {
                sendMessageBySession(session, message);
            }
        } else {
            // 否则向所有 session 发要同步的数据
            SESSION_SET.forEach(session -> sendMessageBySession(session, message));
        }
    }
}

通过 Websocket 发送要同步的数据,这里和官方介绍的是用 Websocket 作为默认的同步方法一致。

2.8 205 行的 SelectorEventPublisher#onCreated方法

SelectorServiceImpl.java
在这里插入图片描述
如果插入 selectorDO 进数据库成功,则发布出去这个创建成功的消息

SelectorEventPublisher.java

@Component
public class SelectorEventPublisher implements AdminDataModelChangedEventPublisher<SelectorDO> {
    // ...
    private final ApplicationEventPublisher publisher;
    @Override
    public void onCreated(final SelectorDO selector) {
        // 发布“选择器创建事件”
        publish(new SelectorCreatedEvent(selector, SessionUtil.visitorName()));
    }
    @Override
    public void publish(final AdminDataModelChangedEvent event) {
    	// 由 Spring 框架发布 AdminDataModelChangedEvent 事件
        publisher.publishEvent(event);
    }
}
AdminDataModelChangedEvent 由 RecordLogDataChangedAdapterListener 监听

现在我才知道的小 tips:可以点击 publisher.publishEvent 旁边的带耳机的小图标,会跳转到监听这个事件的类中,如下图:
在这里插入图片描述

@Component
public class RecordLogDataChangedAdapterListener implements DataChangedListener, ApplicationListener<AdminDataModelChangedEvent> {
    
    private final OperationRecordLogMapper logMapper;
	// ...
    @Override
    // 产生 OperationRecordLog 日志,并插入数据库,标记 event 已消费。
    public void onApplicationEvent(final AdminDataModelChangedEvent event) {
        // 判断 event 是否已消费
        if (event.isConsumed()) {
            return;
        }
        final OperationRecordLog log = new OperationRecordLog();
        log.setColor(event.getType().getColor());
        log.setContext(event.buildContext());
        log.setOperationTime(event.getDate());
        log.setOperationType(event.getType().getTypeName());
        log.setOperator(event.getOperator());
        logMapper.insert(log);
        event.consumed();
    }
}

一张图总结

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1446441.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java的常见api以及异常情况-2

目录 1、equals方法源码解读 2、replace替换方法 3、split分割方法 4、indexOf方法 5、常见的api 1、equals方法源码解读 public class API_test04 {public static void main(String[] args) {String str1 "rx";String str2 "rx";System.out.prin…

模态、模式和真实发生

模态和模式均是用来描述某一对象或系统可能出现的特性、状态或行为&#xff0c;它们既包括逻辑上的抽象可能性&#xff0c;也涵盖现实中具体的现象和事件结构。模态更多地关联于逻辑可能性和必然性&#xff0c;而模式则侧重于现象的重复性和规律性&#xff0c;两者都可以反映真…

【Java程序设计】【C00269】基于Springboot的漫画网站(有论文)

基于Springboot的漫画网站&#xff08;有论文&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于Springboot的漫画网站 本系统分为系统功能模块、管理员功能模块、以及用户功能模块。 系统功能模块&#xff1a;在系统首页可以查看首页、漫画投稿、…

websocket具体实践

websocket具体实践 参考&#xff1a; 如何使用websocket WebSocket客户端连接不上和掉线的问题以及解决方案 继6月份对websocket一顿了解之后&#xff0c;我们的项目也要上websocket了&#xff0c;虽然这部分不是我做&#xff0c;但是借此机会&#xff0c;我也想要尝试一下&am…

[C/C++] -- CMake使用

CMake&#xff08;Cross-platform Make&#xff09;是一个开源的跨平台构建工具&#xff0c;用于自动生成用于不同操作系统和编译器的构建脚本。它可以简化项目的构建过程&#xff0c;使得开发人员能够更方便地管理代码、依赖项和构建设置。 CMake 使用一个名为 CMakeLists.tx…

【Java程序设计】【C00266】基于Springboot的超市进存销管理系统(有论文)

【Java程序设计】【C00266】基于Springboot的超市进存销管理系统&#xff08;有论文&#xff09; 项目简介项目获取开发环境项目技术运行截图 项目简介 这是一个基于Springboot的超市进销存系统 本系统分为登录注册模块、管理员功能模块以及员工功能模块。 登录注册模块&#…

Solidworks:平面工程图练习

把草图变成工程图&#xff0c;遇到第一个问题是线宽需要用鼠标选中后再设置线宽和颜色。我觉得应该有一个自动设置现款的功能&#xff0c;不知道有没有&#xff0c;我找了半天也没找到。 另一个问题是&#xff0c;作业代号字体上下颠倒了&#xff0c;不知道这是啥意思。 第三个…

鸿蒙开发理论之页面和自定义组件生命周期

1、自定义组件和页面的关系 页面&#xff1a;即应用的UI页面。可以由一个或者多个自定义组件组成&#xff0c;Entry装饰的自定义组件为页面的入口组件&#xff0c;即页面的根节点&#xff0c;一个页面有且仅能有一个Entry。只有被Entry装饰的组件才可以调用页面的生命周期。自…

从零开始学howtoheap:理解fastbins的​unsorted bin攻击

how2heap是由shellphish团队制作的堆利用教程&#xff0c;介绍了多种堆利用技术&#xff0c;后续系列实验我们就通过这个教程来学习。环境可参见从零开始配置pwn环境&#xff1a;从零开始配置pwn环境&#xff1a;从零开始配置pwn环境&#xff1a;优化pwn虚拟机配置支持libc等指…

开源版发卡小程序源码,云盘发卡微信小程序源码带PC端

一款发卡小程序。带PC端 系统微信小程序前端采用nuiapp 后端采用think PHP6 PC前端采用vue开发 使用HBuilderX工具打开&#xff0c;运行到微信小程序工具&#xff0c;系统会自动打包微信小程序代码 修改文件common/request/request.js 改成你的后端网址 微信小程序端完全…

【原创 附源码】Flutter安卓及iOS海外登录--Google登录最详细流程

最近接触了几个海外登录的平台&#xff0c;踩了很多坑&#xff0c;也总结了很多东西&#xff0c;决定记录下来给路过的兄弟坐个参考&#xff0c;也留着以后留着回顾。更新时间为2024年2月8日&#xff0c;后续集成方式可能会有变动&#xff0c;所以目前的集成流程仅供参考&#…

vscode预览github上的markdown效果

需要安装的插件 Github Markdown Preview Markdown Checkboxes Markdown Emoji Markdown footnotes Markdown Preview Github Styling Markdown Preview Mermaid Support Markdown yaml Preamble 操作步骤 ①ctrlshiftv会弹出预览页面 ②点击Split Up ③把这个拖过去…

Linux第49步_移植ST公司的linux内核第1步_获取linux源码

已知ST公司的linux源码路径&#xff1a; /home/zgq/linux/atk-mp1/stm32mp1-openstlinux-5.4-dunfell-mp1-20-06-24/sources/arm-ostl-linux-gnueabi/linux-stm32mp-5.4.31-r0 1、创建“my_linux”目录 打开第1个终端 输入“ls回车” 输入“cd linux/回车”&#xff0c;切换…

svg基础(九)滤镜-feMorphology(形态学)

feMorphology:形态学滤镜 用来侵蚀或扩张输入的图像。它在增肥或瘦身效果方面特别有用。适合用来创建轮廓和边界。 1 用法 <feMorphology operator"" radius""/>2 属性 inoperator -dilate膨胀,erode侵蚀radius- 3 示例 <svg width"50…

【易学】周易入门 ③ ( 玄学五术 - 山医命相卜 | 天命无常 唯有德者居之 | 预测学模型 | 五行学说 | 五行相生 | 五行相克 )

文章目录 一、玄学五术 - 山医命相卜二、天命无常 唯有德者居之三、预测学模型四、五行学说1、五行相生2、五行相克 一、玄学五术 - 山医命相卜 玄学五术 : 山 : 修行 " 肉体 " 和 " 精神 " , 以寻求 身心超脱 ; 肉体修行 - 拳法 : 太极拳 , 五禽戏 , 易筋…

Git分支和迭代流程

Git分支 feature分支&#xff1a;功能分支 dev分支&#xff1a;开发分支 test分支&#xff1a;测试分支 master分支&#xff1a;生产环境分支 hotfix分支&#xff1a;bug修复分支。从master拉取&#xff0c;修复并测试完成merge回master和dev。 某些团队可能还会有 reale…

【技巧】Allegro实用技巧之模块复用

需求分析&#xff1a;使用Allegro软件进行PCB Layout设计时&#xff0c;当电路图中有很多路相同的模块&#xff0c;使用模块复用的的操作方法&#xff0c;可以显著提高工作效率&#xff0c;同时也可以使PCB布局在整体上显得美观。下面来讲述这个方法。 具体方法及说明&#xf…

linux优化空间完全卸载mysql——centos7.9

文章目录 ⭐前言⭐linux命令使用&#x1f496; 基础命令&#x1f496; 内存优化&#x1f496; 完全删除mysql ⭐结束 ⭐前言 大家好&#xff0c;我是yma16&#xff0c;本文分享 linux优化空间&完全卸载mysql——centos7.9。 linux内存分配 在Linux中&#xff0c;内存分配是…

AcWing 802. 区间和 离散化

文章目录 题目链接题目描述解题思路代码实现总结 题目链接 链接: AcWing 802. 区间和 题目描述 解题思路 离散化是一种常用的技巧&#xff0c;它能够将原始的连续数值转换为一组离散的值&#xff0c;从而简化问题的处理。在这段代码中&#xff0c;离散化的过程主要分为三个步…

分布式文件系统 SpringBoot+FastDFS+Vue.js

分布式文件系统 SpringBootFastDFSVue.js 一、分布式文件系统1.1.文件系统1.2.什么是分布式文件系统1.3.分布式文件系统的出现1.3.主流的分布式文件系统1.4.分布式文件服务提供商1.4.1.阿里OSS1.4.2.七牛云存储1.4.3.百度云存储 二、fastDFS2.1.fastDSF介绍2.2.为什么要使用fas…