Neo4j实现表字段级血缘关系

news2025/1/22 21:35:33

需求背景

需要在前端页面展示当前表字段的所有上下游血缘关系,以进一步做数据诊断治理。大致效果图如下:
在这里插入图片描述首先这里解释什么是表字段血缘关系,SQL 示例:

CREATE TABLE IF NOT EXISTS table_b
AS SELECT order_id, order_status FROM table_a;

如上 DDL 语句中,创建的 table_b 的 order_id 和 order_status 字段来源于 table_a,代表table_a 就是 table_b 的来源表,也叫上游表,table_b 就是 table_a 下游表,另外 table_a.order_id 就是 table_b.order_id 的上游字段,它们之间就存在血缘关系。

INSERT INTO table_c
SELECT a.order_id, b.order_status
FROM table_a a JOIN table_b b ON a.order_id = b.order_id;

如上 DML 语句中,table_c 的 order_id 字段来源于 table_a,而 order_status 来源于 table_b,表示 table_c 和 table_a、table_b 之间也存在血缘关系。

由上也可看出想要存储血缘关系,还需要先解析 sql,这块儿主要使用了开源项目 calcite 的解析器,这篇文章不再展开,本篇主要讲如何存储和如何展示

环境配置

参考另一篇:SpringBoot 配置内嵌式 Neo4j

Node 数据结构定义

因为要展示表的字段之间的血缘关系,所以直接将表字段作为图节点存储,表字段之间的血缘关系就用图节点之间的关系表示,具体 node 定义如下:

public class ColumnVertex {
  // 唯一键
  private String name;

  public ColumnVertex(String catalogName, String databaseName, String tableName, String columnName) {
    this.name = catalogName + "." + databaseName + "." + tableName + "." + columnName;
  }

  public String getCatalogName() {
    return Long.parseLong(name.split("\\.")[0]);
  }

  public String getDatabaseName() {
    return name.split("\\.")[1];
  }

  public String getTableName() {
    return name.split("\\.")[2];
  }

  public String getColumnName() {
    return name.split("\\.")[3];
  }
}

通用 Service 定义

public interface EmbeddedGraphService {
    // 添加图节点以及与上游节点之间的关系
    void addColumnVertex(ColumnVertex currentVertex, ColumnVertex upstreamVertex);
    // 寻找上游节点
    List<ColumnVertex> findUpstreamColumnVertex(ColumnVertex currentVertex);
    // 寻找下游节点
    List<ColumnVertex> findDownstreamColumnVertex(ColumnVertex currentVertex);
}

Service 实现

import javax.annotation.Resource;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Result;
import org.neo4j.graphdb.Transaction;
import org.springframework.stereotype.Service;

@Service
public class EmbeddedGraphServiceImpl implements EmbeddedGraphService {

  @Resource private GraphDatabaseService graphDb;

  @Override
  public void addColumnVertex(ColumnVertex currentVertex, ColumnVertex upstreamVertex) {
    try (Transaction tx = graphDb.beginTx()) {
      tx.execute(
          "MERGE (c:ColumnVertex {name: $currentName}) MERGE (u:ColumnVertex {name: $upstreamName})"
              + " MERGE (u)-[:UPSTREAM]->(c)",
          Map.of("currentName", currentVertex.getName(), "upstreamName", upstreamVertex.getName()));
      tx.commit();
    }
  }

  @Override
  public List<ColumnVertex> findUpstreamColumnVertex(ColumnVertex currentVertex) {
    List<ColumnVertex> result = new ArrayList<>();
    try (Transaction tx = graphDb.beginTx()) {
      Result queryResult =
          tx.execute(
              "MATCH (u:ColumnVertex)-[:UPSTREAM]->(c:ColumnVertex) WHERE c.name = $name RETURN"
                  + " u.name AS name",
              Map.of("name", currentVertex.getName()));
      while (queryResult.hasNext()) {
        Map<String, Object> row = queryResult.next();
        result.add(new ColumnVertex().setName((String) row.get("name")));
      }
      tx.commit();
    }
    return result;
  }

  @Override
  public List<ColumnVertex> findDownstreamColumnVertex(ColumnVertex currentVertex) {
    List<ColumnVertex> result = new ArrayList<>();
    try (Transaction tx = graphDb.beginTx()) {
      Result queryResult =
          tx.execute(
              "MATCH (c:ColumnVertex)-[:UPSTREAM]->(d:ColumnVertex) WHERE c.name = $name RETURN"
                  + " d.name AS name",
              Map.of("name", currentVertex.getName()));
      while (queryResult.hasNext()) {
        Map<String, Object> row = queryResult.next();
        result.add(new ColumnVertex().setName((String) row.get("name")));
      }
      tx.commit();
    }
    return result;
  }
}

遍历图节点

实现逻辑:

  1. restful 接口入参:当前表(catalogName, databaseName, tableName)
  2. 定义返回给前端的数据结构,采用 nodes 和 edges 方式返回,然后前端再根据节点与边关系渲染出完整的血缘关系图;
public class ColumnLineageVO {
  List<ColumnLineageNode> nodes;
  List<ColumnLineageEdge> edges;
}

public class ColumnLineageNode {
  private String databaseName;
  private String tableName;
  private List<String> columnNames;
}

public class ColumnLineageEdge {
  private ColumnLineageEdgePoint source;
  private ColumnLineageEdgePoint target;
}

public class ColumnLineageEdgePoint {
  private String databaseName;
  private String tableName;
  private String columnName;
}
  1. 查询表字段;
  2. 采用递归的方式,利用当前表字段遍历与当前表字段关联的所有上下游图节点;
  3. 将所有节点封装成 List ColumnLineageVO 返回给前端 。
public ColumnLineageVO getColumnLineage(Table table) {
    ColumnLineageVO columnLineageVO = new ColumnLineageVO();
    List<ColumnLineageNode> nodes = new ArrayList<>();
    List<ColumnLineageEdge> edges = new ArrayList<>();
    // Deduplication
    Set<String> visitedNodes = new HashSet<>();
    Set<String> visitedEdges = new HashSet<>();
    Map<String, List<ColumnVertex>> upstreamCache = new HashMap<>();
    Map<String, List<ColumnVertex>> downstreamCache = new HashMap<>();

    ColumnLineageNode currentNode =
        ColumnLineageNode.builder()
            .databaseName(table.getDatabaseName())
            .tableName(table.getTableName())
            .type(TableType.EXTERNAL_TABLE.getDesc())
            .build();
    nodes.add(currentNode);
    visitedNodes.add(currentNode.getDatabaseName() + "." + currentNode.getTableName());

    for (String columnName : table.getColumnNames()) {
      ColumnVertex currentVertex =
          new ColumnVertex(
              table.getScriptId(), table.getDatabaseName(), table.getTableName(), columnName);
      traverseUpstreamColumnVertex(
          currentVertex, nodes, edges, visitedNodes, visitedEdges, upstreamCache);
      traverseDownstreamColumnVertex(
          currentVertex, nodes, edges, visitedNodes, visitedEdges, downstreamCache);
    }

    columnLineageVO.setNodes(nodes);
    columnLineageVO.setEdges(edges);
    return columnLineageVO;
  }

private void traverseUpstreamColumnVertex(
      ColumnVertex currentVertex,
      List<ColumnLineageNode> nodes,
      List<ColumnLineageEdge> edges,
      Set<String> visitedNodes,
      Set<String> visitedEdges,
      Map<String, List<ColumnVertex>> cache) {
    List<ColumnVertex> upstreamVertices;
    if (cache.containsKey(currentVertex.getName())) {
      upstreamVertices = cache.get(currentVertex.getName());
    } else {
      upstreamVertices = embeddedGraphService.findUpstreamColumnVertex(currentVertex);
      cache.put(currentVertex.getName(), upstreamVertices);
    }
    for (ColumnVertex upstreamVertex : upstreamVertices) {
      String nodeKey = upstreamVertex.getDatabaseName() + "." + upstreamVertex.getTableName();
      if (!visitedNodes.contains(nodeKey)) {
        ColumnLineageNode upstreamNode =
            ColumnLineageNode.builder()
                .databaseName(upstreamVertex.getDatabaseName())
                .tableName(upstreamVertex.getTableName())
                .type(TableType.EXTERNAL_TABLE.getDesc())
                .build();
        nodes.add(upstreamNode);
        visitedNodes.add(nodeKey);
      }
      String edgeKey =
          upstreamVertex.getDatabaseName()
              + upstreamVertex.getTableName()
              + upstreamVertex.getColumnName()
              + currentVertex.getDatabaseName()
              + currentVertex.getTableName()
              + currentVertex.getColumnName();
      if (!visitedEdges.contains(edgeKey)) {
        ColumnLineageEdge edge = createEdge(upstreamVertex, currentVertex);
        edges.add(edge);
        visitedEdges.add(edgeKey);
      }
      traverseUpstreamColumnVertex(upstreamVertex, nodes, edges, visitedNodes, visitedEdges, cache);
    }
  }
  
private void traverseDownstreamColumnVertex(
      ColumnVertex currentVertex,
      List<ColumnLineageNode> nodes,
      List<ColumnLineageEdge> edges,
      Set<String> visitedNodes,
      Set<String> visitedEdges,
      Map<String, List<ColumnVertex>> cache) {
    List<ColumnVertex> downstreamVertices;
    if (cache.containsKey(currentVertex.getName())) {
      downstreamVertices = cache.get(currentVertex.getName());
    } else {
      downstreamVertices = embeddedGraphService.findDownstreamColumnVertex(currentVertex);
      cache.put(currentVertex.getName(), downstreamVertices);
    }
    for (ColumnVertex downstreamVertex : downstreamVertices) {
      String nodeKey = downstreamVertex.getDatabaseName() + "." + downstreamVertex.getTableName();
      if (!visitedNodes.contains(nodeKey)) {
        ColumnLineageNode downstreamNode =
            ColumnLineageNode.builder()
                .databaseName(downstreamVertex.getDatabaseName())
                .tableName(downstreamVertex.getTableName())
                .type(TableType.EXTERNAL_TABLE.getDesc())
                .build();
        nodes.add(downstreamNode);
        visitedNodes.add(nodeKey);
      }
      String edgeKey =
          currentVertex.getDatabaseName()
              + currentVertex.getTableName()
              + currentVertex.getColumnName()
              + downstreamVertex.getDatabaseName()
              + downstreamVertex.getTableName()
              + downstreamVertex.getColumnName();
      if (!visitedEdges.contains(edgeKey)) {
        ColumnLineageEdge edge = createEdge(currentVertex, downstreamVertex);
        edges.add(edge);
        visitedEdges.add(edgeKey);
      }
      traverseDownstreamColumnVertex(
          downstreamVertex, nodes, edges, visitedNodes, visitedEdges, cache);
    }
  }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/913708.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

分布式数据库架构:高可用、高性能的数据存储

在现代信息时代&#xff0c;数据是企业发展的核心。为了支持海量数据的存储、高并发访问以及保证数据的可靠性&#xff0c;分布式数据库架构应运而生。分布式数据库架构是一种将数据存储在多个物理节点上&#xff0c;并通过一系列复杂的协调和管理机制来提供高可用性和高性能的…

云农场种植:互联网+智慧牧场,为农业注入新的活力和创新

随着科技的不断发展&#xff0c;数字化农业正逐渐成为现代农业的趋势。传统农业面临着土地资源有限、劳动力不足等问题&#xff0c;而云农场种植模式通过数字化技术的运用&#xff0c;互联网养殖着重于“绿色、特色产品和智慧生态”&#xff0c;通过建立“线上养殖线下托养线上…

配置NTP时间服务器

1.配置ntp时间服务器&#xff0c;确保客户端主机能和服务主机同步时间 ​ 客户端主机 同步成功 2.配置ssh免密登陆&#xff0c;能够通过客户端主机通过redhat用户和服务端主机基于公钥验证方式进行远程连接

【严重】Coremail 远程命令执行漏洞

漏洞描述 Coremail是广东盈世计算机科技有限公司推出的一款大型企业邮件系统。 在 Coremail XT5/XT6 版本中&#xff0c;邮件处理功能存在溢出风险&#xff0c;攻击者构造恶意邮件&#xff0c;向任意邮箱地址发送该恶意邮件&#xff0c;当服务器处理邮件时&#xff0c;会触发…

凯迪正大—直流电阻测试仪

一、产品概述 武汉凯迪正大直流电阻测量仪是变压器制造中半成品、成品出厂试验、安装、交接试验及电力部门预防性试验的必测项目&#xff0c;能有效发现变压器线圈的选材、焊接、连接部位松动、缺股、断线等制造缺陷和运行后存在的隐患。 为了满足变压器直流电阻测量的需要&a…

浏览器原生的 画中画 特性

Chrome 116 作为Google浏览器的最新稳定版本已正式发布。Chrome 浏览器支持视频画中画&#xff08;HTMLVideoElement&#xff09;已有一段时间&#xff0c;而 Chrome 116 则新增了文档画中画模式。这种"文档画中画"模式提供了一个始终在顶部的窗口&#xff0c;可以填…

sql server 、mysql CTE 公用表表达式

sql server 详细 mysql CTE CTE 是一个命名的临时结果集&#xff0c;作用范围是当前语句。CTE可以理解成一个可以复用的子查询&#xff0c;当然跟子查询还是有点区别的&#xff0c;CTE可以引用其他CTE&#xff0c;但子查询不能引用其它子查询。所以&#xff0c;开发中建议…

新高-新低指数(NH-NL)指标公式,判断多空力度

在《以交易为生》这本书中&#xff0c;作者埃尔德根据其经验&#xff0c;认为新高-新低指数(NH-NL)是股市的最佳领先指标。在任意一天中&#xff0c;创一年新高的股票是强势股&#xff0c;而创一年新低的股票是弱势股。新高-新低指数通过比较强势股和弱势股的数量来跟踪市场领导…

【Jenkins】持续集成部署学习

【Jenkins】持续集成部署学习 【一】安装部署【1】Jenkins所处位置【2】Docker安装Gitlab&#xff08;1&#xff09;首先准备一台空的虚拟机服务器&#xff08;2&#xff09;安装服务器所需的依赖&#xff08;3&#xff09;Docker的安装&#xff08;4&#xff09;阿里云镜像加速…

「UG/NX」Block UI 截面构建器SectionBuilder

✨博客主页何曾参静谧的博客📌文章专栏「UG/NX」BlockUI集合📚全部专栏「UG/NX」NX二次开发「UG/NX」BlockUI集合「VS」Visual Studio「QT」QT5程序设计「C/C+&#

容器插件工具kubectl-images

容器插件工具 简单方便查找镜像源信息&#xff0c;kubectl-images 代码地址&#xff1a;https://github.com/chenjiandongx/kubectl-images 此工具可以快捷执行命令来查看集群内ns&#xff0c;pod&#xff0c;及镜像等信息&#xff1b; 查看帮助项 ~ kubectl images --help …

HarmonyOS开发第一步,熟知开发工具DevEco Studio

俗话说的好&#xff0c;工欲善其事&#xff0c;必先利其器&#xff0c;走进HarmonyOS第一步&#xff0c;开发工具必须先行&#xff0c;当然了&#xff0c;关于开发工具的使用&#xff0c;官网和其他的博客也有很多的讲解&#xff0c;但是并没有按照常用的功能进行概述&#xff…

ELK中Logstash的基本配置和用法

文章目录 Logstash的条件判断Logstash的输入插件Stdin输入文件内容输入filter过滤器 Logstash的输出插件测试收集日志启动kibana在kibana中配置索引数据 在 《Elasticsearch搜索引擎系统入门》中简单描述了Logstah的安装&#xff0c;本篇文章将较为详细的讲解Logstash的基本配置…

Docker搭建个人网盘、私有仓库

1、使用mysql:5.6和 owncloud 镜像&#xff0c;构建一个个人网盘 [rootlocalhost ~]# docker pull mysql:5.6 [rootlocalhost ~]# docker pull owncloud [rootlocalhost ~]# docker run -itd --name mysql --env MYSQL_ROOT_PASSWORD123456 mysql:5.6 [rootlocalhost ~]# doc…

汽车企业数据泄露频发,其中特斯拉数据泄露影响达7.5万人

据美国有线电视新闻网&#xff08;CNN Business&#xff09;8月19日报道&#xff0c;特斯拉此前发生的大规模数据泄露事件&#xff0c;泄露了超过7.5万人的个人信息&#xff0c;这是“内部不法行为”的结果。 特斯拉在发给员工的通知中表示&#xff0c;被泄露的“特斯拉文件”包…

CMS数据库搭建

前置条件&#xff1a;在虚拟机中安装phpstudy。 1.将cms的压缩包通过远程桌面放到虚拟机&#xff0c;将压缩包解压&#xff0c;将解压后的cms文件夹放到phpstudy安装目录下的www文件夹中&#xff0c;路径如下&#xff08;安装时的路径可能不同&#xff09;&#xff1a; C:\ph…

易基因:MeRIP-seq等揭示ALKBH5介导m6A去甲基化调控皮肤创面再上皮化分子机制|科研进展

大家好&#xff0c;这里是专注表观组学十余年&#xff0c;领跑多组学科研服务的易基因。 哺乳动物的损伤皮肤屏障完整性恢复通过创面愈合基本机制实现&#xff0c;这是一个包括凝血、炎症、再上皮化&#xff08;re-epithelialization&#xff09;、肉芽组织形成和疤痕重塑的多…

MSTP多生成树协议(第二课)

MSTP负载均衡 实验 需求 1&#xff09;PC1属于 vlan 10 &#xff0c;IP地址为 192.168.10.1/24&#xff0c; 网关为 192.168.10.2542&#xff09;PC2属于 vlan 20 &#xff0c;IP地址为 192.168.20.1/24&#xff0c; 网关为 192.168.20.254**3&#xff09;确保PC1与PC2互通4…

pytorch中的register_buffer

今天在一个模型的init中遇到了self.register_buffer(‘running_mean’, torch.zeros(num_features)) register_buffer(self, name, tensor)是一个PyTorch中的方法&#xff0c;它的作用是向模块&#xff08;module&#xff09;中添加一个持久的缓冲区&#xff08;buffer&#xf…

【优选算法】—— 字符串匹配算法

在本期的字符串匹配算法中&#xff0c;我将给大家带来常见的两种经典的示例&#xff1a; 1、暴力匹配&#xff08;BF&#xff09;算法 2、KMP算法 目录 &#xff08;一&#xff09;暴力匹配&#xff08;BF&#xff09;算法 1、思想 2、演示 3、代码展示 &#xff08;二&…