openmetadata自定义连接器开发教程

news2025/1/15 19:40:15

openmetadata自定义连接器开发教程

一、开发通用自定义连接器教程

官网教程链接:

1.https://docs.open-metadata.org/v1.3.x/connectors/custom-connectors

2.https://github.com/open-metadata/openmetadata-demo/tree/main/custom-connector

(一)创建服务类型自定义连接器类

参考文档:https://docs.open-metadata.org/v1.3.x/sdk/python/build-connector/source#for-consumers-of-openmetadata-ingestion-to-define-custom-connectors-in-their-own-package-with-same-namespace

1.创建自定义连接器

示例:my_csv_connector.py

"""
自定义Database Service 从 CSV 文件中提取元数据
"""
import csv
import traceback

from pydantic import BaseModel, ValidationError, validator
from pathlib import Path
from typing import Iterable, Optional, List, Dict, Any

from metadata.ingestion.api.common import Entity
from metadata.ingestion.api.models import Either
from metadata.generated.schema.entity.services.ingestionPipelines.status import StackTraceError
from metadata.ingestion.api.steps import Source, InvalidSourceException
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
    OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.connections.database.customDatabaseConnection import (
    CustomDatabaseConnection,
)
from metadata.generated.schema.entity.data.database import Database
from metadata.generated.schema.entity.data.databaseSchema import DatabaseSchema
from metadata.generated.schema.api.data.createDatabaseSchema import (
    CreateDatabaseSchemaRequest,
)
from metadata.generated.schema.api.data.createDatabase import CreateDatabaseRequest
from metadata.generated.schema.entity.services.databaseService import (
    DatabaseService,
)
from metadata.generated.schema.entity.data.table import (
    Column,
)
from metadata.generated.schema.metadataIngestion.workflow import (
    Source as WorkflowSource,
)
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()


class InvalidCsvConnectorException(Exception):
    """
    Sample data is not valid to be ingested
    """


class CsvModel(BaseModel):
    name: str
    column_names: List[str]
    column_types: List[str]

    @validator("column_names", "column_types", pre=True)
    def str_to_list(cls, value):
        """
        Suppose that the internal split is in ;
        """
        return value.split(";")


class CsvConnector(Source):
    """
    Custom connector to ingest Database metadata.

    We'll suppose that we can read metadata from a CSV
    with a custom database name from a business_unit connection option.
    """

    # 内置方法
    def __init__(self, config: WorkflowSource, metadata: OpenMetadata):
        self.config = config
        self.metadata = metadata
        # 获取配置信息
        self.service_connection = config.serviceConnection.__root__.config

        self.source_directory: str = (
            # 获取CSV文件路径
            self.service_connection.connectionOptions.__root__.get("source_directory")
        )
        if not self.source_directory:
            raise InvalidCsvConnectorException(
                "未获取到source_directory配置信息"
            )

        self.business_unit: str = (
            # 获取自定义的数据库名称
            self.service_connection.connectionOptions.__root__.get("business_unit")
        )
        if not self.business_unit:
            raise InvalidCsvConnectorException(
                "未获取到business_unit配置信息"
            )

        self.data: Optional[List[CsvModel]] = None

        super().__init__()

    # 内置函数
    @classmethod
    def create(
            cls, config_dict: dict, metadata_config: OpenMetadataConnection
    ) -> "CsvConnector":
        config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
        connection: CustomDatabaseConnection = config.serviceConnection.__root__.config
        if not isinstance(connection, CustomDatabaseConnection):
            raise InvalidSourceException(
                f"Expected CustomDatabaseConnection, but got {connection}"
            )
        return cls(config, metadata_config)

    # 静态方法:按行读取
    @staticmethod
    def read_row_safe(row: Dict[str, Any]):
        try:
            return CsvModel.parse_obj(row)
        except ValidationError:
            logger.warning(f"Error parsing row {row}. Skipping it.")

    # 预处理:读取文件及数据
    def prepare(self):
        # Validate that the file exists
        source_data = Path(self.source_directory)
        if not source_data.exists():
            raise InvalidCsvConnectorException("Source Data path does not exist")

        try:
            with open(source_data, "r", encoding="utf-8") as file:
                reader = csv.DictReader(file)
                # 读取数据
                self.data = [self.read_row_safe(row) for row in reader]
        except Exception as exc:
            logger.error("Unknown error reading the source file")
            raise exc

    def yield_create_request_database_service(self):
        yield Either(
            # 串讲元数据读取服务
            right=self.metadata.get_create_service_from_source(
                entity=DatabaseService, config=self.config
            )
        )

    # 业务原数据库名处理方法
    def yield_business_unit_db(self):
        # 选择我们刚刚创建的服务(如果不是UI)
        # 获取提取服务对象
        service_entity: DatabaseService = self.metadata.get_by_name(
            entity=DatabaseService, fqn=self.config.serviceName
        )
        yield Either(
            right=CreateDatabaseRequest(
                name=self.business_unit,
                service=service_entity.fullyQualifiedName,
            )
        )

    # chems处理方法
    def yield_default_schema(self):
        # Pick up the service we just created (if not UI)
        database_entity: Database = self.metadata.get_by_name(
            entity=Database, fqn=f"{self.config.serviceName}.{self.business_unit}"
        )

        yield Either(
            right=CreateDatabaseSchemaRequest(
                name="default",
                database=database_entity.fullyQualifiedName,
            )
        )

    # 业务元数据处理方法
    def yield_data(self):
        """
        Iterate over the data list to create tables
        """
        database_schema: DatabaseSchema = self.metadata.get_by_name(
            entity=DatabaseSchema,
            fqn=f"{self.config.serviceName}.{self.business_unit}.default",
        )
        # 异常处理
        # 假设我们有一个要跟踪的故障
        # try:
        #     1/0
        # except Exception:
        #     yield Either(
        #         left=StackTraceError(
        #             name="My Error",
        #             error="Demoing one error",
        #             stackTrace=traceback.format_exc(),
        #         )
        #     )
        # 解析csv元数据信息(获取列名和类型)
        for row in self.data:
            yield Either(
                right=CreateTableRequest(
                    name=row.name,
                    databaseSchema=database_schema.fullyQualifiedName,
                    columns=[
                        Column(
                            name=model_col[0],
                            dataType=model_col[1],
                        )
                        for model_col in zip(row.column_names, row.column_types)
                    ],
                )
            )

    # 迭代器:元数据迭代返回
    def _iter(self) -> Iterable[Entity]:
        # 数据库元数据信息存储
        yield from self.yield_create_request_database_service()
        # 业务源数据库
        yield from self.yield_business_unit_db()
        # 业务schema
        yield from self.yield_default_schema()
        # 业务数据
        yield from self.yield_data()

    # 测试数据库连接
    def test_connection(self) -> None:
        pass

    # 连接关闭
    def close(self):
        pass

(二)将自定义连接器方法打包编译进ingestion镜像

项目目录:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

Dockerfile:

FROM openmetadata/ingestion:1.3.1

# Let's use the same workdir as the ingestion image
WORKDIR ingestion
USER airflow

# Install our custom connector
COPY connector connector
COPY setup.py .
COPY sample.csv .
#COPY person_info.proto .
RUN pip install --no-deps .

编译服务镜像

docker build -t om-ingestion:build -f Dockerfile .

(三)部署新版ingestion服务()

docker-compose up -d

docker-compose-ingestion.yml

version: "3.9"
volumes:
  ingestion-volume-dag-airflow:
  ingestion-volume-dags:
  ingestion-volume-tmp:
  es-data:
services:  
  ingestion:
    container_name: om_ingestion
    image: om-ingestion:build
    environment:
      AIRFLOW__API__AUTH_BACKENDS: "airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session"
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__OPENMETADATA_AIRFLOW_APIS__DAG_GENERATED_CONFIGS: "/opt/airflow/dag_generated_configs"
      DB_SCHEME: ${AIRFLOW_DB_SCHEME:-postgresql+psycopg2}
      DB_HOST: ${AIRFLOW_DB_HOST:-host.docker.internal}
      DB_PORT: ${AIRFLOW_DB_PORT:-5432}
      AIRFLOW_DB: ${AIRFLOW_DB:-airflow_db}
      DB_USER: ${AIRFLOW_DB_USER:-airflow_user}
      DB_PASSWORD: ${AIRFLOW_DB_PASSWORD:-airflow_pass}

      # extra connection-string properties for the database
      # EXAMPLE
      # require SSL (only for Postgres)
      # properties: "?sslmode=require"
      DB_PROPERTIES: ${AIRFLOW_DB_PROPERTIES:-}
      # To test the lineage backend
      # AIRFLOW__LINEAGE__BACKEND: airflow_provider_openmetadata.lineage.backend.OpenMetadataLineageBackend
      # AIRFLOW__LINEAGE__AIRFLOW_SERVICE_NAME: local_airflow
      AIRFLOW__LINEAGE__OPENMETADATA_API_ENDPOINT: http://host.docker.internal:8585/api
      AIRFLOW__LINEAGE__JWT_TOKEN: eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJhbGciOiJSUzI1NiIsInR5cCI6IkpXVCJ9.eyJpc3MiOiJvcGVuLW1ldGFkYXRhLm9yZyIsInN1YiI6ImluZ2VzdGlvbi1ib3QiLCJlbWFpbCI6ImluZ2VzdGlvbi1ib3RAb3Blbm1ldGFkYXRhLm9yZyIsImlzQm90Ijp0cnVlLCJ0b2tlblR5cGUiOiJCT1QiLCJpYXQiOjE3MDk3MDkyNDMsImV4cCI6bnVsbH0.U7XIYZjJAmJ-p3WTy4rTGGSzUxZeNpjOsHzrWRz7n-zAl-GZvznZWMKX5nSX_KwRHAK3UYuO1UX2-ZbeZxdpzhyumycNFyWzwMs8G6iEGoaM6doGhqCgHileco8wcAoaTXKHTnwa80ddWHt4dqZmikP7cIhLg9etKAepQNQibefewHbaLOoCrFyo9BqFeZzNaVBo1rogNtslWaDO6Wnk_rx0jxRLTy57Thq7R7YS_nZd-JVfYf72BEFHJ_WDZym4k-dusV0PWGzMPYIXq3s1KbpPBt_tUSz4cUrXbLuI5-ZsOWIvUhsLeHJDU-35-RymylhMrQ92kZjsy7v2nl6apQ
    entrypoint: /bin/bash
    command:
      - "/opt/airflow/ingestion_dependency.sh"
    expose:
      - 8080
    ports:
      - "8080:8080"
    networks:
      - app_net_ingestion
    volumes:
      - ingestion-volume-dag-airflow:/opt/airflow/dag_generated_configs
      - ingestion-volume-dags:/opt/airflow/dags
      - ingestion-volume-tmp:/tmp

networks:
  app_net_ingestion:
    ipam:
      driver: default
      config:
        - subnet: "172.16.240.0/24"

(四)根据服务类型选择对应类型的custom服务创建采集器测试

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

点击保存添加元数据提取器:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

二、开发内置连接器教程(Streamsets)

官网教程链接:https://docs.open-metadata.org/v1.3.x/developers/contribute/developing-a-new-connector

(一)定义连接器class类json模版(streamSetsConnection.json)

目录openmetadata-spec/src/main/resources/json/schema/entity/services/connections/pipeline/streamSetsConnection.json

{
  "$id": "https://open-metadata.org/schema/entity/services/connections/pipeline/streamSetsConnection.json",
  "$schema": "http://json-schema.org/draft-07/schema#",
  "title": "StreamSetsConnection",
  "description": "StreamSets Metadata Pipeline Connection Config",
  "type": "object",
  "javaType": "org.openmetadata.schema.services.connections.pipeline.StreamSetsConnection",
  "definitions": {
    "StreamSetsType": {
      "description": "Service type.",
      "type": "string",
      "enum": ["StreamSets"],
      "default": "StreamSets"
    },
    "basicAuthentication": {
      "title": "Username Authentication",
      "description": "Login username",
      "type":"object",
      "properties": {
        "username": {
          "title": "Username",
          "description": "StreamSets user to authenticate to the API.",
          "type": "string"
        }
      },
      "additionalProperties": false
    }
  },
  "properties": {
    "type": {
      "title": "Service Type",
      "description": "Service Type",
      "$ref": "#/definitions/StreamSetsType",
      "default": "StreamSets"
    },
    "hostPort": {
      "expose": true,
      "title": "Host And Port",
      "description": "Pipeline Service Management/UI URI.",
      "type": "string",
      "format": "uri"
    },
    "streamSetsConfig": {
      "title": "StreamSets Credentials Configuration",
      "description": "We support username authentication",
      "oneOf": [
        {
          "$ref": "#/definitions/basicAuthentication"
        }
      ]
    },
    "supportsMetadataExtraction": {
      "title": "Supports Metadata Extraction",
      "$ref": "../connectionBasicType.json#/definitions/supportsMetadataExtraction"
    }
  },
  "additionalProperties": false,
  "required": ["hostPort", "streamSetsConfig"]
}

(二)开发采集器源码:

目录:ingestion/src/metadata/ingestion/source/pipeline/streamsets/*

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

1.streamsets连接客户端(client.py)

import logging
import traceback
from typing import Any, Iterable, Optional

import requests
from requests import HTTPError
from requests.auth import HTTPBasicAuth

# 设置日志记录器
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)

REQUESTS_TIMEOUT = 60 * 5


def clean_uri(uri: str) -> str:
    """清理URI,确保它以HTTP或HTTPS开头"""
    if not uri.startswith(("http://", "https://")):
        return "http://" + uri
    return uri


class StreamSetsClient:
    """
    在StreamSets Data Collector REST API之上的包装器
    """

    def __init__(
            self,
            host_port: str,
            username: Optional[str] = None,
            password: Optional[str] = None,
            verify: bool = False,
    ):
        self.api_endpoint = clean_uri(host_port) + "/rest"
        self.username = username
        self.password = password
        self.verify = verify
        self.headers = {"Content-Type": "application/json"}

    def get(self, path: str) -> Optional[Any]:
        """
        GET方法包装器
        """
        try:
            res = requests.get(
                f"{self.api_endpoint}/{path}",
                verify=self.verify,
                headers=self.headers,
                timeout=REQUESTS_TIMEOUT,
                auth=HTTPBasicAuth(self.username, self.password),
            )
            res.raise_for_status()
            return res.json()
        except HTTPError as err:
            logger.warning(f"Connection error calling the StreamSets API - {err}")
            raise err

        except ValueError as err:
            logger.warning(f"Cannot pick up the JSON from API response - {err}")
            raise err

        except Exception as err:
            logger.warning(f"Unknown error calling StreamSets API - {err}")
            raise err

    def list_pipelines(self) -> Iterable[dict]:
        """
        List all pipelines
        """
        try:
            return self.get("v1/pipelines")
        except Exception as err:
            logger.error(traceback.format_exc())
            raise err

    def get_pipeline_details(self, pipeline_id: str) -> dict:
        """
        Get a specific pipeline by ID
        """
        return self.get(f"v1/pipeline/{pipeline_id}?rev=0&get=pipeline")

    def test_list_pipeline_detail(self) -> Iterable[dict]:
        """
        Test API access for listing pipelines
        """
        return self.list_pipelines()

2.连接器和测试连接器(connection.py)

"""
源连接处理程序
"""
from typing import Optional

from metadata.generated.schema.entity.automations.workflow import (
    Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.pipeline.streamSetsConnection import (
    BasicAuthentication,
    StreamSetsConnection,
)
from metadata.ingestion.connections.test_connections import test_connection_steps
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.streamsets.client import StreamSetsClient


def get_connection(connection: StreamSetsConnection) -> StreamSetsClient:
    """
    Create connection
    """
    if isinstance(connection.streamSetsConfig, BasicAuthentication):
        return StreamSetsClient(
            host_port=connection.hostPort,
            username=connection.streamSetsConfig.username,
            password="95bd7977208bc935cac3656f4a9eea3a",
            verify=False,
        )


def test_connection(
        metadata: OpenMetadata,
        client: StreamSetsClient,
        service_connection: StreamSetsConnection,
        automation_workflow: Optional[AutomationWorkflow] = None,
) -> None:
    """
    元数据工作流或自动化工作流期间
    测试连接。这可以作为一部分执行
    """

    def custom_executor():
        list(client.list_pipelines())

    test_fn = {"GetPipelines": custom_executor}

    test_connection_steps(
        metadata=metadata,
        test_fn=test_fn,
        service_type=service_connection.type.value,
        automation_workflow=automation_workflow,
    )

3.元数据提取器(metadata.py)

"""
提取StreamSets 源的元数据 
"""
import traceback
from typing import Iterable, List, Optional, Any

from metadata.generated.schema.entity.services.ingestionPipelines.status import StackTraceError
from pydantic import BaseModel, ValidationError

from metadata.generated.schema.api.data.createPipeline import CreatePipelineRequest
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.pipeline import Task
from metadata.generated.schema.entity.services.connections.pipeline.streamSetsConnection import (
    StreamSetsConnection,
)
from metadata.generated.schema.metadataIngestion.workflow import (
    Source as WorkflowSource,
)
from metadata.ingestion.api.models import Either
from metadata.ingestion.api.steps import InvalidSourceException
from metadata.ingestion.models.pipeline_status import OMetaPipelineStatus
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.ingestion.source.pipeline.pipeline_service import PipelineServiceSource
from metadata.utils.helpers import clean_uri
from metadata.utils.logger import ingestion_logger

logger = ingestion_logger()


class StagesDetails(BaseModel):
    instanceName: str
    label:str
    stageType: str
    stageName: str
    description: str
    inputLanes: List[str]
    outputLanes: List[str]
    downstream_task_names: set[str] = set()

class StreamSetsPipelineDetails(BaseModel):
    """
    Defines the necessary StreamSets information
    """
    uuid: str
    pipelineId: str
    title: str
    name: str
    created: int
    creator: str
    description: str


class StreamsetsSource(PipelineServiceSource):
    """
    执行必要的方法,从 Airflow 的元数据数据库中提取管道元数据
    """

    @classmethod
    def create(cls, config_dict: dict, metadata: OpenMetadata):
        logger.info("create..........")
        config: WorkflowSource = WorkflowSource.parse_obj(config_dict)
        logger.info(f"WorkflowSource: {config}")
        connection: StreamSetsConnection = config.serviceConnection.__root__.config
        logger.info(f"StreamSetsConnection: {connection}")
        if not isinstance(connection, StreamSetsConnection):
            raise InvalidSourceException(
                f"Expected StreamSetsConnection, but got {connection}"
            )
        return cls(config, metadata)

    def yield_pipeline(
            self, pipeline_details: StreamSetsPipelineDetails
    ) -> Iterable[Either[CreatePipelineRequest]]:
        logger.info("yield_pipeline.......")
        try:
            connection_url = None
            if self.service_connection.hostPort:
                connection_url = (
                    f"{clean_uri(self.service_connection.hostPort)}/rest/v1/pipelines"
                )
            logger.info(f"pipeline_details:{pipeline_details}")
            logger.info(f"connection_url:{connection_url}")
            pipeline_request = CreatePipelineRequest(
                name=pipeline_details.name,
                displayName=pipeline_details.title,
                sourceUrl=f"{clean_uri(self.service_connection.hostPort)}/collector/pipeline/{pipeline_details.pipelineId}",
                tasks=self._get_tasks_from_details(pipeline_details),
                service=self.context.pipeline_service,
            )
            yield Either(right=pipeline_request)
            self.register_record(pipeline_request=pipeline_request)
        except TypeError as err:
            self.context.task_names = set()
            yield Either(
                left=StackTraceError(
                    name=pipeline_details.dag_id,
                    error=(
                        f"Error building DAG information from {pipeline_details}. There might be Airflow version"
                        f" incompatibilities - {err}"
                    ),
                    stackTrace=traceback.format_exc(),
                )
            )
        except ValidationError as err:
            self.context.task_names = set()
            yield Either(
                left=StackTraceError(
                    name=pipeline_details.dag_id,
                    error=f"Error building pydantic model for {pipeline_details} - {err}",
                    stackTrace=traceback.format_exc(),
                )
            )

        except Exception as err:
            self.context.task_names = set()
            yield Either(
                left=StackTraceError(
                    name=pipeline_details.dag_id,
                    error=f"Wild error ingesting pipeline {pipeline_details} - {err}",
                    stackTrace=traceback.format_exc(),
                )
            )

    # 获取解析管道详情
    def _get_tasks_from_details(
            self, pipeline_details: StreamSetsPipelineDetails
    ) -> Optional[List[Task]]:
        logger.info("_get_tasks_from_details.......")
        logger.info(f"StreamSetsPipelineDetails:{pipeline_details}")
        try:
            stages = self.get_stages_by_pipline(pipeline_details)
            return [
                Task(
                    name=stage.instanceName,
                    displayName=stage.label,
                    sourceUrl=f"{clean_uri(self.service_connection.hostPort)}/collector/pipeline/{pipeline_details.pipelineId}",
                    taskType=stage.stageType,
                    description=stage.description,
                    downstreamTasks=list(stage.downstream_task_names)
                    if stage.downstream_task_names
                    else [],
                )
                for stage in stages
            ]
        except Exception as err:
            logger.debug(traceback.format_exc())
            logger.warning(
                f"Wild error encountered when trying to get tasks from Pipeline Details {pipeline_details} - {err}."
            )
        return None

    def yield_pipeline_lineage_details(
            self, pipeline_details: StreamSetsPipelineDetails
    ) -> Iterable[Either[AddLineageRequest]]:
        logger.info("yield_pipeline_lineage_details..........")

        """
        将连接转换为管道实体
        :param pipeline_details: 来自  StreamSets的pipeline_details对象
        return:使用任务创建管道请求
        """
        pass

    def get_pipelines_list(
            self
    ) -> Optional[List[StreamSetsPipelineDetails]]:
        logger.info("get_pipelines_list..........")
        """Get List of all pipelines"""
        if self.connection.list_pipelines() is not None:
            for list_pipeline in self.connection.list_pipelines():
                logger.info(f"pipeline:{list_pipeline}")
                try:
                    yield StreamSetsPipelineDetails(
                        uuid=list_pipeline.get("uuid"),
                        pipelineId=list_pipeline.get("pipelineId"),
                        title=list_pipeline.get("title"),
                        name=list_pipeline.get("name"),
                        created=list_pipeline.get("created"),
                        creator=list_pipeline.get("creator"),
                        description=list_pipeline.get("description"),
                    )
                except (ValueError, KeyError, ValidationError) as err:
                    logger.debug(traceback.format_exc())
                    logger.warning(
                        f"Cannot create StreamSetsPipelineDetails from {list_pipeline} - {err}"
                    )
                except Exception as err:
                    logger.debug(traceback.format_exc())
                    logger.warning(
                        f"Wild error encountered when trying to get pipelines from Process Group {list_pipeline} - {err}."
                    )
        else:
            return None

    # 获取上下关联关系
    def get_stages_lane(
            self, stages: Optional[List[StagesDetails]]
    ) -> {}:
        logger.info("get_stages_lane......")
        input_lane_to_stage_map = {}
        for stage in stages:
            logger.info(f"stage_info:{stage}")
            for input_lane in stage.get("inputLanes", []):
                try:
                    if input_lane_to_stage_map.get(input_lane) is None:
                        input_lane_to_stage_map[input_lane] = set()
                        input_lane_to_stage_map[input_lane].add(stage.get("instanceName"))
                    else:
                        input_lane_to_stage_map[input_lane].add(stage.get("instanceName"))
                except Exception as err:
                    logger.debug(traceback.format_exc())
                    logger.warning(
                        f"Wild error encountered when trying to get stages from Pipeline Details {stages} - {err}.")
        logger.info(f"input_lane_to_stage_map:{input_lane_to_stage_map}")
        return input_lane_to_stage_map

    def get_stages_by_pipline(
            self, pipeline_details: StreamSetsPipelineDetails
    ) -> Optional[List[StagesDetails]]:
        logger.info("get_stages_by_pipline")
        pipeline_detail = self.connection.get_pipeline_details(pipeline_details.pipelineId)
        stages = []
        if pipeline_detail.get("stages"):
            stages = pipeline_detail.get("stages")
        input_lane_to_stage_map = self.get_stages_lane(stages)
        for stage in stages:
            logger.info(f"stage:{stage}")
            try:
                input_lanes = stage.get("inputLanes", [])
                output_lanes = stage.get("outputLanes", [])
                downstream_stage_names = set()
                for output_lane in stage.get("outputLanes", []):
                    if output_lane in input_lane_to_stage_map.keys():
                        for down_stage in input_lane_to_stage_map.get(output_lane, []):
                            downstream_stage_names.add(down_stage)
                yield StagesDetails(
                    instanceName=stage.get("instanceName"),
                    label=stage["uiInfo"].get("label"),
                    stageType=stage["uiInfo"].get("stageType"),
                    stageName=stage.get("stageName"),
                    description=stage["uiInfo"].get("description"),
                    inputLanes=input_lanes,
                    outputLanes=output_lanes,
                    downstream_task_names=downstream_stage_names
                )
            except (ValueError, KeyError, ValidationError) as err:
                logger.debug(traceback.format_exc())
                logger.warning(
                    f"Cannot create StagesDetails from {stage} - {err}"
                )
            except Exception as err:
                logger.debug(traceback.format_exc())
                logger.warning(
                    f"Wild error encountered when trying to get pipelines from Process Group {stage} - {err}."
                )

    def get_pipeline_name(
            self, pipeline_details: StreamSetsPipelineDetails
    ) -> str:
        return pipeline_details.name

    def yield_pipeline_status(
            self, pipeline_details: StreamSetsPipelineDetails
    ) -> Iterable[Either[OMetaPipelineStatus]]:
        pass

(三)修改前端ui源码,添加连接器对象

目录:openmetadata-ui/src/main/resources/ui/src/utils/PipelineServiceUtils.ts

/*
 *  Copyright 2022 Collate.
 *  Licensed under the Apache License, Version 2.0 (the "License");
 *  you may not use this file except in compliance with the License.
 *  You may obtain a copy of the License at
 *  http://www.apache.org/licenses/LICENSE-2.0
 *  Unless required by applicable law or agreed to in writing, software
 *  distributed under the License is distributed on an "AS IS" BASIS,
 *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *  See the License for the specific language governing permissions and
 *  limitations under the License.
 */

import { cloneDeep } from 'lodash';
import { COMMON_UI_SCHEMA } from '../constants/Services.constant';
import { PipelineServiceType } from '../generated/entity/services/pipelineService';
import airbyteConnection from '../jsons/connectionSchemas/connections/pipeline/airbyteConnection.json';
import airflowConnection from '../jsons/connectionSchemas/connections/pipeline/airflowConnection.json';
import customPipelineConnection from '../jsons/connectionSchemas/connections/pipeline/customPipelineConnection.json';
import dagsterConnection from '../jsons/connectionSchemas/connections/pipeline/dagsterConnection.json';
import databricksPipelineConnection from '../jsons/connectionSchemas/connections/pipeline/databricksPipelineConnection.json';
import domoPipelineConnection from '../jsons/connectionSchemas/connections/pipeline/domoPipelineConnection.json';
import fivetranConnection from '../jsons/connectionSchemas/connections/pipeline/fivetranConnection.json';
import gluePipelineConnection from '../jsons/connectionSchemas/connections/pipeline/gluePipelineConnection.json';
import nifiConnection from '../jsons/connectionSchemas/connections/pipeline/nifiConnection.json';
import splineConnection from '../jsons/connectionSchemas/connections/pipeline/splineConnection.json';
import streamSetsConnection from '../jsons/connectionSchemas/connections/pipeline/streamSetsConnection.json';

export const getPipelineConfig = (type: PipelineServiceType) => {
  let schema = {};
  const uiSchema = { ...COMMON_UI_SCHEMA };
  switch (type) {
    case PipelineServiceType.Airbyte: {
      schema = airbyteConnection;

      break;
    }

    case PipelineServiceType.Airflow: {
      schema = airflowConnection;

      break;
    }
    case PipelineServiceType.GluePipeline: {
      schema = gluePipelineConnection;

      break;
    }
    case PipelineServiceType.Fivetran: {
      schema = fivetranConnection;

      break;
    }
    case PipelineServiceType.Dagster: {
      schema = dagsterConnection;

      break;
    }
    case PipelineServiceType.Nifi: {
      schema = nifiConnection;

      break;
    }
    case PipelineServiceType.StreamSets: {
      schema = streamSetsConnection;

      break;
    }
    case PipelineServiceType.DomoPipeline: {
      schema = domoPipelineConnection;

      break;
    }
    case PipelineServiceType.CustomPipeline: {
      schema = customPipelineConnection;

      break;
    }
    case PipelineServiceType.DatabricksPipeline: {
      schema = databricksPipelineConnection;

      break;
    }
    case PipelineServiceType.Spline: {
      schema = splineConnection;

      break;
    }

    default:
      break;
  }

  return cloneDeep({ schema, uiSchema });
};

(四)前端ui源码,添加MD说明文档

路径:openmetadata-ui/src/main/resources/ui/public/locales/en-US/Pipeline/StreamSets.md

# StreamSets
在本节中,我们将提供使用 StreamSets 连接器的指南和参考。

## 要求
系统 支持 StreamSets 连接器的 1 种连接类型:
- **基本认证**:使用用户名对 StreamSets 进行登陆。

您可以在 [docs](https://docs.open-metadata.org/connectors/pipeline/StreamSets) 中找到有关 StreamSets 连接器的详细信息。

## 连接详细信息
$$section
### Host and Port $(id="hostPort")
管道服务管理 URI。这应指定为格式为"scheme://hostname:port"的 URI 字符串。例如,“http://localhost:8443”、“http://host.docker.internal:8443”。
$$

$$section
### StreamSets Config $(id="StreamSetsConfig")
OpenMetadata 支持基本身份验证(用户名/密码身份验证。有关详细信息,请参阅要求部分。
$$

$$section
### Username $(id="username")
用于连接到 StreamSets 的用户名。此用户应该能够向 StreamSets API 发送请求并访问“资源”终结点。
$$

(五)创建 Java ClassConverter(可选)

(六)构建dockefile重新构建镜像

server服务Dockerfile

# Build stage
FROM alpine:3.19 AS build

COPY openmetadata-dist/target/openmetadata-*.tar.gz /
#COPY docker/openmetadata-start.sh /

RUN mkdir -p /opt/openmetadata && \
    tar zxvf openmetadata-*.tar.gz -C /opt/openmetadata --strip-components 1 && \
    rm openmetadata-*.tar.gz

# Final stage
FROM alpine:3.19

EXPOSE 8585

RUN adduser -D openmetadata && \
    apk update && \
    apk upgrade && \
    apk add --update --no-cache bash openjdk17-jre tzdata
ENV TZ=Asia/Shanghai

COPY --chown=openmetadata:openmetadata --from=build /opt/openmetadata /opt/openmetadata
COPY --chmod=755 docker/openmetadata-start.sh /

USER openmetadata

WORKDIR /opt/openmetadata
ENTRYPOINT [ "/bin/bash" ]
CMD ["/openmetadata-start.sh"]

ingestion服务Dockerfile

路径:ingestion/Dockerfile

FROM apache/airflow:2.7.3-python3.10

#FROM docker-compose-ingestion-ingestion:latest
USER root
RUN curl -sS https://packages.microsoft.com/keys/microsoft.asc | apt-key add -
RUN curl -sS https://packages.microsoft.com/config/debian/11/prod.list > /etc/apt/sources.list.d/mssql-release.list
# Install Dependencies (listed in alphabetical order)
ENV DEBIAN_FRONTEND=noninteractive
RUN apt-get -qq update \
    && apt-get -qq install -y \
    tzdata \
    alien \
    build-essential \
    default-libmysqlclient-dev \
    freetds-bin \
    freetds-dev \
    gcc \
    gnupg \
    libaio1 \
    libevent-dev \
    libffi-dev \
    libpq-dev \
    librdkafka-dev \
    libsasl2-dev \
    libsasl2-2 \
    libsasl2-modules \
    libsasl2-modules-gssapi-mit \
    libssl-dev \
    libxml2 \
    libkrb5-dev \
    openjdk-11-jre \
    openssl \
    postgresql \
    postgresql-contrib \
    tdsodbc \
    unixodbc \
    unixodbc-dev \
    unzip \
    vim \
    git \
    wget --no-install-recommends \
    # Accept MSSQL ODBC License
    && ACCEPT_EULA=Y apt-get install -y msodbcsql18 \
    && rm -rf /var/lib/apt/lists/*

RUN if [[ $(uname -m) == "arm64" || $(uname -m) == "aarch64" ]]; \
 then \
 wget -q https://download.oracle.com/otn_software/linux/instantclient/191000/instantclient-basic-linux.arm64-19.10.0.0.0dbru.zip -O /oracle-instantclient.zip && \
 unzip -qq -d /instantclient -j /oracle-instantclient.zip && rm -f /oracle-instantclient.zip; \
 else \
 wget -q https://download.oracle.com/otn_software/linux/instantclient/1917000/instantclient-basic-linux.x64-19.17.0.0.0dbru.zip -O /oracle-instantclient.zip && \
 unzip -qq -d /instantclient -j /oracle-instantclient.zip && rm -f /oracle-instantclient.zip; \
 fi

ENV LD_LIBRARY_PATH=/instantclient

# Security patches for base image
# monitor no fixed version for
#    https://security.snyk.io/vuln/SNYK-DEBIAN11-LIBTASN16-3061097
#    https://security.snyk.io/vuln/SNYK-DEBIAN11-MARIADB105-2940589
#    https://security.snyk.io/vuln/SNYK-DEBIAN11-BIND9-3027852
#    https://security.snyk.io/vuln/SNYK-DEBIAN11-EXPAT-3023031 we are already installed the latest
RUN echo "deb http://deb.debian.org/debian bullseye-backports main" > /etc/apt/sources.list.d/backports.list
RUN apt-get -qq update \
    && apt-get -qq install -t bullseye-backports -y \
    curl \
    libpcre2-8-0 \
    postgresql-common \
    expat \
    bind9

# Required for Starting Ingestion Container in Docker Compose
# Provide Execute Permissions to shell script
COPY --chown=airflow:0 --chmod=775 ingestion/ingestion_dependency.sh /opt/airflow
# Required for Ingesting Sample Data
COPY --chown=airflow:0 --chmod=775 ingestion /home/airflow/ingestion

COPY --chown=airflow:0 --chmod=775 openmetadata-airflow-apis /home/airflow/openmetadata-airflow-apis


# Required for Airflow DAGs of Sample Data
#COPY --chown=airflow:0 ingestion/examples/airflow/dags /opt/airflow/dags

USER airflow
ARG AIRFLOW_CONSTRAINTS_LOCATION="https://raw.githubusercontent.com/apache/airflow/constraints-2.7.3/constraints-3.10.txt"
ENV TZ=Asia/Shanghai

# Disable pip cache dir
# https://pip.pypa.io/en/stable/topics/caching/#avoiding-caching
ENV PIP_NO_CACHE_DIR=1
# Make pip silent
ENV PIP_QUIET=1

RUN pip install --upgrade pip

WORKDIR /home/airflow/openmetadata-airflow-apis
RUN pip install "."

WORKDIR /home/airflow/ingestion


# 提供要安装的引入依赖项的参数。默认为全部提供要安装的引入依赖项的参数。默认为全部
ARG INGESTION_DEPENDENCY="all"
RUN pip install ".[${INGESTION_DEPENDENCY}]"

# Temporary workaround for https://github.com/open-metadata/OpenMetadata/issues/9593
RUN echo "Image built for $(uname -m)"
RUN if [[ $(uname -m) != "aarch64" ]]; \
 then \
 pip install "ibm-db-sa~=0.4"; \
 fi

# bump python-daemon for https://github.com/apache/airflow/pull/29916
RUN pip install "python-daemon>=3.0.0"
# remove all airflow providers except for docker and cncf kubernetes
RUN pip freeze | grep "apache-airflow-providers" | grep --invert-match -E "docker|http|cncf" | xargs pip uninstall -y
# Uninstalling psycopg2-binary and installing psycopg2 instead 
# because the psycopg2-binary generates a architecture specific error 
# while authenticating connection with the airflow, psycopg2 solves this error
RUN pip uninstall psycopg2-binary -y
RUN pip install psycopg2 mysqlclient==2.1.1
# Make required folders for openmetadata-airflow-apis
RUN mkdir -p /opt/airflow/dag_generated_configs

EXPOSE 8080
# This is required as it's responsible to create airflow.cfg file
RUN airflow db init && rm -f /opt/airflow/airflow.db

(七)构建服务镜像

根目录下执行构建server:

docker build -t om-server:build -f docker/development/Dockerfile .

根目录下执行构建ingestion:

docker build -t om-ingestion:build -f ingestion/Dockerfile .

(八)部署新版服务

docker-compose -f docker/development/docker-compose-postgres.yml up -d

(九)访问服务,创建streamsets元数据采集

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1981650.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【ARM】SMMU系统虚拟化(3)_ VMSAv8-64 address translation stages

讲解颗粒度granule size如何影响地址转换的过程: 对于每个颗粒度来说: 输入的地址范围如何影响起始的lookup levels。对于stage2 转换来说,给链接的转换页表造成的可能的影响。TTBR 地址和indexing对于起始的lookup 1.以4KB的translation g…

初始JVM ! ! ! 相信我,中国人不骗中国人,这一篇也就入门了!!!

目录 1.JVM到底是什么? 2.JVM的三大核心是什么? 1. 解释和运行 2. 内存管理 3. 即时编译(JIT) 3.常见的Jvm虚拟机有哪些? 1.3.1 Java虚拟机规范 1.3.2 Java虚拟机规范 4. JVM的组成 4.1、类的生命周期 4.1…

Android手机端远程控制ESP32引脚电平实例

一、背景介绍 ESP32是一款高度集成的低功耗系统级芯片,它结合了双核处理器、无线通信、低功耗特性和丰富的外设,适用于各种物联网(IoT)应用,如果与Android结合,在手机端远程控制ESP32引脚高低电平输出&…

武汉流星汇聚:依托亚马逊平台助力初创企业出海,共创国际品牌辉煌

在全球跨境电商的浩瀚蓝海中,亚马逊如同一座灯塔,以其庞大的市场规模、强大的品牌影响力和成熟的运营体系,引领着无数企业扬帆出海,探索未知的市场机遇。对于中国卖家而言,亚马逊不仅是通往全球消费者的桥梁&#xff0…

一文详解:企业优化仓库管理——WMS系统

如有你有一间小仓库,几位工人埋头于一摞摞的纸质单据中,手工记录着每一次货物的进出与变动。目前看来,这样的时间和效率,作为管理者的你还可以接受。 但是,令你头疼的是,随着企业规模的扩大,你…

springboot信息安全技术在投票网站-计算机毕业设计源码12740

目录 1 绪论 1.1 选题背景与意义 1.2国内外研究现状 1.3论文结构与章节安排 2系统分析 2.1 可行性分析 2.2 系统流程分析 2.2.1系统开发流程 2.2.2 用户登录流程 2.2.3 系统操作流程 2.2.4 添加信息流程 2.2.5 修改信息流程 2.2.6 删除信息流程 2.3 系统功能分析 …

ai自动配音工具

AI拟音大师,给你的无声视频添加生动而且同步的音效 😝文件夹是一种基于文本的视频到音频生成框架,可以生成高质量的音频,在语义上相关,并与输入视频时间同步。 下载地址:https://pan.quark.cn/s/5a2be1cc5551

微服务概述及如何搭建微服务

1. 微服务架构—SpringCloud 1.1 单体应用架构 将项目所有模块【功能】打成jar包或者war包,然后部署一个进程 优点: 部署简单:由于是完整的结构体,可以直接部署在一个服务器上即可技术单一:项目不需要复杂的技术栈&am…

windows启动nacos时报Caused by: java.lang.UnsatisfiedLinkErro错误

Caused by: java.lang.UnsatisfiedLinkError: C:\Users\Administrator\AppData\Local\Temp\2\librocksdbjni6009210463092880400.dll: Can’t find dependent libraries 因为电脑没有vc,或vc版本问题,下载对应的vc安装就可以了 VC“2015-2022”运行库官…

基础复习(反射、注解、动态代理)

反射 反射,指的是加载类的字节码到内存,并以编程的方法解刨出类中的各个成分(成员变量、方法、构造器等)。 1.获取类的字节码 (3种方式) public class Test1Class{public static void main(String[] arg…

全球社区的建立:Facebook在跨文化交流中的角色

在全球化日益加深的今天,跨文化交流成为了人们日常生活中的重要部分。Facebook作为全球最大的社交网络平台之一,正在发挥着越来越重要的作用。通过其广泛的用户基础和丰富的功能,Facebook不仅连接了来自不同国家和地区的人们,也在…

数据迁移数亿小文件该如何做到?

在数字化转型的浪潮中,企业在数据管理上遇到了诸多挑战,尤其是面对海量小文件的数据迁移问题。这类迁移任务不仅复杂,而且对效率、数据一致性和完整性的要求极高。 处理数亿小文件的数据迁移并非易事。当文件数量庞大,尤其是文件大…

吴恩达老师机器学习-ex8

data1 导入库,读取数据并进行可视化 因为这次的数据是mat文件,需要使用scipy库中的loadmat进行读取数据。 通过对数据类型的分析,发现是字典类型,查看该字典的键,可以发现又X等关键字。 import numpy as np import…

matplotlib库学习之绘图透明度设置(精炼准确)

matplotlib库学习之透明颜色设置 一、简介 在数据可视化中,透明度设置可以使图表更具层次感,特别是在多层叠加图表时。matplotlib库提供了多种方法来设置图表各个部分的透明度,包括图形、文本、图例、坐标轴等部分。 二、为什么要设置成透明…

某PM2项目管理系统 ExcelIn.aspx 文件上传漏洞复现

FOFA:body="PM2项目管理系统BS版增强工具.zip" 访问漏洞url抓包 上传压缩包 请求包 POST /FlowChartDefine/ExcelIn.aspx HTTP/1.1 Host: Accept-Encoding: gzip, deflate Content-Type: multipart/form-data; boundary=---------------------------335518608136…

WPF学习(2)-UniformGrid控件(均分布局)+StackPanel控件(栈式布局)

UniformGrid控件(均分布局) UniformGrid和Grid有些相似,只不过UniformGrid的每个单元格面积都是相等的,不管是横向的单元格,或是纵向的单元格,它们会平分整个UniformGrid。 UniformGrid控件提供了3个属性…

自动化测试使用jenkins做CICD持续集成(docker)

1.什么是(CI/CD) 1.1持续集成 定义:频繁地(一天多次)将代码集成到主干。将软件个人研发的部分向软件整体部分 交付,频繁进行集成以便更快地发现其中的错误。 每完成一点更新,就集成到主干&…

儿童活动床栏亚马逊temu美国ASTM F2085测试报告16 CFR 1224标准CPC认证办理

亚马逊temu美国站儿童活动床栏ASTM F2085-19测试报告活动床栏16 CFR 1224标准CPC认证 什么是便携式床栏杆? 便携式床栏杆是一种旨在安装在成人床上以防止儿童跌落的装置。便携式床栏适用于在没有帮助的情况下方便儿童(通常为 2 至 5 岁)上下…

编程深水区之并发①:什么是并发编程

并发编程是一种让程序能够执行多个任务的编程技术,多个任务的执行时间有重合,如交替执行、同时执行等。相对于传统的从上到下依次同步执行代码,我们也称并发编程为异步编程。目前,常见的并发模型主要有两种,一是多线程…

MySQL --- 内置函数介绍

目录 一、日期函数 二、字符串函数 三、数学函数 四、 其他函数 一、日期函数 current_date()当前日期current_time()当前时间current_timestamp()当前时间戳date(datetime)返回datetime的日期部分date_add(date,interval d_value_type) 在date中添加时间/日期…