DolphinScheduler应用实战笔记

news2024/9/20 0:53:37

DolphinScheduler应用实战笔记

  • 一、前言
  • 二、DS执行SQL或存储过程
  • 二、DS调用DataX同步数据
  • 三、DS调用HTTP接口
  • 四、DS依赖(DEPENDENT)节点
  • 五、DS SPARK 节点
  • 六、DS Flink 节点
  • 七、DS Flink 节点
  • 八、DS SQL 节点
  • 九、DS Java程序
  • 十、DS Python节点

一、前言

DolphinScheduler(后文简称DS)在项目中的应用实战笔记,包含DS执行SQL或存储过程、DS调用DataX同步数据、DS调用HTTP接口。
首先,在DS网页上创建项目:
在这里插入图片描述
点击项目名称进入项目
在这里插入图片描述
这两个工具栏:工作流定义和任务实例会经常使用

二、DS执行SQL或存储过程

点击操作栏的数据源中心,并创建数据源
在这里插入图片描述
在这里插入图片描述

选好并提交要执行sql的数据源即可,例如配置Hive。

点击操作栏的项目管理,点击项目进入,选择工作流定义,然后创建工作流:
在这里插入图片描述
拖拽出SQL标签,并填写数据源配置
在这里插入图片描述
填好节点名称、数据源配置和SQL:
在这里插入图片描述
在这里插入图片描述
SQL语句也可以是调用存储过程:(部分版本的hive不支持存储过程)

表结构:

CREATE TABLE `dev.test`(
  `id` bigint DEFAULT NULL COMMENT 'ID', 
  `tradedate` timestamp DEFAULT NULL COMMENT '日期', 
  `infosource` string DEFAULT NULL COMMENT '信息来源'
)
COMMENT '测试表'
ROW FORMAT SERDE 
  'org.apache.hadoop.hive.ql.io.orc.OrcSerde' 
WITH SERDEPROPERTIES ( 
  'serialization.format'='1') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
  'hdfs://nameservice1/quark1/user/hive/warehouse/dev.db/test'
TBLPROPERTIES (
  'timelyre.replaced.count.col'='ID', 
  'timelyre.timestamp.col'='tradeDate', 
  'timelyre.tag.cols'='id,InfoSource')

存储过程语句:

CREATE
OR        REPLACE PROCEDURE dev.sp_test(IN_DATA_DATE IN INT)

          AS V_EXE_BEGIN_TIME TIMESTAMP;

--程序开始时间
V_EXE_END_TIME TIMESTAMP;

--程序结束时间
BEGIN --记录程序开始执行时间
V_EXE_BEGIN_TIME:=SYSTIMESTAMP;
EXECUTE IMMEDIATE 'truncate  table dev.ai_exce_rate';

INSERT INTO TABLE dev.test (id, tradedate, infosource) SELECT client_id, last_modify, key FROM dev.ads_acc_stock_trade_detail LIMIT 10;

--记录程序结束时间
V_EXE_END_TIME:=SYSTIMESTAMP;

--打印执行日志
DBMS_OUTPUT.PUT_LINE(
'过程: sp_test ,业务日期:'||IN_DATA_DATE||',执行完成。开始时间:'||V_EXE_BEGIN_TIME||',结束时间:'||V_EXE_END_TIME
);
END

调用存储过程

CALL dev.sp_test(${IN_TRADE_DATE})

SQL不是select语句,就选择非查询,然后点击保存。
在页面再次点击保存:
在这里插入图片描述
在这里插入图片描述
租户即是刚刚创建的项目,全局变量就是整个任务都有效。
在这里插入图片描述
保存后,点击上线按钮,上线任务并执行:
在这里插入图片描述
在这里插入图片描述
运行起来后可以去任务实例中,查询任务执行情况:
在这里插入图片描述

二、DS调用DataX同步数据

在操作栏点击资源中心,并新建datax的json
在这里插入图片描述
json样例:

{
  "job": {
    "setting": {
      "speed": {
        "channel": 3,
        "byte": 1048576
      },
      "errorLimit": {
        "record": 0,
        "percentage": 0.02
      }
    },
    "content": [
      {
        "reader": {
          "name": "hdfsreader",
          "parameter": {
            "path": "/inceptor1/user/hive/warehouse/zx91.db/swscdc/ods_zx91_qt_tradingdaynew_all_day/dc_trade_date=${inTradeDate}",
            "defaultFS": "hdfs://nameservice1",
            "hadoopConfig": {
              "dfs.nameservices": "nameservice1",
              "dfs.ha.namenodes.nameservice1": "nn1,nn2",
              "dfs.namenode.rpc-address.nameservice1.nn1": "${srcNameService1}",
              "dfs.namenode.rpc-address.nameservice1.nn2": "${srcNameService2}",
              "dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
            },
            "fileType": "orc",
            "fieldDelimiter": "\t",
            "column": [
              {
                "index": "0",
                "type": "decimal"
              },
              {
                "index": "1",
                "type": "timestamp"
              },
              {
                "index": "2",
                "type": "decimal"
              },
              {
                "index": "3",
                "type": "decimal"
              },
              {
                "index": "4",
                "type": "decimal"
              },
              {
                "index": "5",
                "type": "decimal"
              },
              {
                "index": "6",
                "type": "decimal"
              },
              {
                "index": "7",
                "type": "decimal"
              },
              {
                "index": "8",
                "type": "timestamp"
              },
              {
                "index": "9",
                "type": "decimal"
              },
              {
                "index": "10",
                "type": "string"
              },
              {
                "index": "11",
                "type": "int"
              }
            ]
          }
        },
        "writer": {
          "name": "hdfswriter",
          "parameter": {
            "defaultFS": "hdfs://nameservice1",
            "hadoopConfig": {
              "dfs.nameservices": "nameservice1",
              "dfs.ha.namenodes.nameservice1": "nn1,nn2",
              "dfs.namenode.rpc-address.nameservice1.nn1": "timelyre02:8020",
              "dfs.namenode.rpc-address.nameservice1.nn2": "timelyre03:8020",
              "dfs.client.failover.proxy.provider.nameservice1": "org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider"
            },
            "fileType": "orc",
            "path": "/quark1/user/hive/warehouse/dev.db/ylf/qt_tradingdaynew/dc_trade_date=${inTradeDate}",
            "fileName": "qt_tradingdaynew",
            "writeMode": "nonConflict",
            "fieldDelimiter": "\t",
            "column": [
              {
                "name": "id",
                "type": "bigint"
              },
              {
                "name": "tradingdate",
                "type": "timestamp"
              },
              {
                "name": "iftradingday",
                "type": "int"
              },
              {
                "name": "secumarket",
                "type": "int"
              },
              {
                "name": "ifweekend",
                "type": "int"
              },
              {
                "name": "ifmonthend",
                "type": "int"
              },
              {
                "name": "ifquarterend",
                "type": "int"
              },
              {
                "name": "ifyearend",
                "type": "int"
              },
              {
                "name": "xgrq",
                "type": "timestamp"
              },
              {
                "name": "jsid",
                "type": "bigint"
              },
              {
                "name": "dc_etl_time",
                "type": "string"
              },
              {
                "name": "dc_trade_date",
                "type": "bigint"
              }
            ]
          }
        }
      }
    ]
  }
}

注意读和写的字段顺序要一样。
再编写shell文件,来加载datax的json文件,datax_push_tdh.sh:

#!/bin/bash
# ********************************************************************************
# * Filename     : datax_push_tdh.sh
# * Author       : dcx
# * Version      : 1.0.0
# * Created Date : 20240827
# * Description  : DataX 推送TDH集群任务执行器
# * History      :
# * <author> <version> <modified Date> <description>
# * dcx 1.0.0 2024-08-27 基于datax_push.sh修改,用于推送时序数据库(timelyre)
# *
# ********************************************************************************

# Module 1: 设置环境变量
export HADOOP_USER_NAME=swscdc
## dolphinscheduler 生产集群 TDH 客户端
source /usr/local/bigdata_client/TDH-oc-Client/init.sh
## dolphinscheduler 开发集群 TDH 客户端
# source /usr/local/bigdata_client/TDH-Client/init.sh

# Module 2: 公共函数
## 定义帮助文档
function ShowUsage() {
    cat <<EOF
USAGE: $0 [OPTIONS]

OPTIONS:
    -h --help           查看帮助文档.
    -d --dc-trade-date  [*]业务日期,格式为YYYYMMDD.
    -j --json-file-name [*]Datax Json 配置文件名.
    --src-name-service  源 HDFS HA NameService,格式为"TDHHOST1:PORT;TDHHOST2:PORT".
    --src-jdbc          源数据库连接信息,格式为jdbc连接串.
    --src-username      源数据库用户名.
    --src-password      源数据库用户密码.
    --tar-name-service  目标 HDFS HA NameService,格式为"TDHHOST1:PORT;TDHHOST2:PORT".
    --tar-jdbc          目标数据库连接信息,格式为jdbc连接串.
    --tar-username      目标数据库用户名.
    --tar-password      目标数据库用户密码.
    --tar-tablename     目标数据库表名(推送TDH集群进行日分区处理).
    --push-days         数据推送周期,格式为"days,0/1",days:天数(正整数>=1与负整数<=-1),0:自然日,1:交易日,即推送n个自然日或交易日的数据. 
    --jvm-buffer        JVM堆参数,即为防止jvm报内存溢出,根据实际情况调整jvm的堆参数,建议将内存设置为4G或者8G.
    --inc-value         非时间戳增量标识字段值.
    --dependence        当前依赖项,格式为"table_name1;table_name2;table_name3".
    --tar-es-url        推送ESURL信息.
    --custom-partition  自定义分区值,即非日分区传入.
    --nonpath-err       HDFS路径不存在时强制成功,该参数表示中止执行并不提示错误,否则默认将置为报错状态.

EOF
}
## 定义日志记录函数
function Logger() {
    echo -e "$(date +%Y/%m/%d\ %H:%M:%S)" : "${0##.*/}" "[${1}]"- "${2}"
}

# Module 3: 获取传入参数与参数值校验
## 使用 getopt 获取参数
OPTINOS=$(getopt --option hd:j: --long help,dc-trade-date:,json-file-name:,src-name-service:,src-jdbc:,src-username:,src-password:,tar-name-service:,tar-jdbc:,tar-username:,tar-password:,tar-tablename:,push-days:,jvm-buffer:,inc-value:,dependence:,tar-es-url:,custom-partition:,nonpath-err -n "$0" -- "$@")
VALID_OPTINOS=$?
[[ "${VALID_OPTINOS}" -ne "0" ]] && {
    echo "Try '$0 -h or --help' for more information."
    exit 10
}
## 使用 set 将 ${OPTINOS} 设置为位置参数
eval set -- "${OPTINOS}"
## 使用 shift 和 while 循环解析位置参数
while true; do
    case "$1" in
    -h | --help)
        ShowUsage
        exit 0
        ;;
    -d | --dc-trade-date)
        Logger "INFO" "业务日期: $2"
        DC_TRADE_DATE=$2
        shift 2
        ;;
    -j | --json-file-name)
        Logger "INFO" "Datax Json 配置文件名: $2"
        JSON_FILE_NAME=$2
        shift 2
        ;;
    --src-name-service)
        Logger "INFO" "源 HDFS HA NameService: $2"
        SRC_NAME_SERVICE=$2
        shift 2
        ;;
    --src-jdbc)
        Logger "INFO" "源数据库连接信息: $2"
        SRC_JDBC=$2
        shift 2
        ;;
    --src-username)
        Logger "INFO" "源数据库用户名: $2"
        SRC_USERNAME=$2
        shift 2
        ;;
    --src-password)
        Logger "INFO" "源数据库用户密码: $2"
        SRC_PASSWORD=$2
        shift 2
        ;;
    --tar-name-service)
        Logger "INFO" "目标 HDFS HA NameService: $2"
        TAR_NAME_SERVICE=$2
        shift 2
        ;;
    --tar-jdbc)
        Logger "INFO" "目标数据库连接信息: $2"
        TAR_JDBC=$2
        shift 2
        ;;
    --tar-username)
        Logger "INFO" "目标数据库用户名: $2"
        TAR_USERNAME=$2
        shift 2
        ;;
    --tar-password)
        Logger "INFO" "目标数据库用户密码: $2"
        TAR_PASSWORD=$2
        shift 2
        ;;
    --tar-tablename)
        Logger "INFO" "目标数据库表名(推送TDH集群进行日分区处理): $2"
        TAR_TABLE_NAME=$2
        shift 2
        ;;
    --push-days)
        Logger "INFO" "数据推送周期: $2"
        PUSH_DAYS=$2
        shift 2
        ;;
    --jvm-buffer)
        Logger "INFO" "JVM堆参数: $2"
        JVM_BUFFER=$2
        shift 2
        ;;
    --inc-value)
        Logger "INFO" "非时间戳增量标识字段值: $2"
        INC_VALUE=$2
        shift 2
        ;;
    --dependence)
        Logger "INFO" "当前依赖项: $2"
        DEPENDENCE=$2
        shift 2
        ;;
    --tar-es-url)
        Logger "INFO" "推送ES的URL信息: $2"
        TAR_ES_URL=$2
        shift 2
        ;;
    --custom-partition)
        Logger "INFO" "自定义分区值: $2"
        CUSTOM_PARTITION=$2
        shift 2
        ;;
    --nonpath-err)
        Logger "INFO" "HDFS路径不存在时强制成功: True"
        NONPATH_ERR="True"
        shift 1
        ;;
    --)
        shift
        break
        ;;
    *)
        Logger "WARNING" "无效选项: $1"
        ShowUsage
        exit 10
        ;;
    esac
done
## 校验参数值
if [[ -z "${DC_TRADE_DATE}" ]]; then
    Logger "ERROR" "[-d | --dc-trade-date] : 业务日期不能为空!"
    ShowUsage
    exit 10
elif ! [[ "$(expr "${DC_TRADE_DATE}" : "[1-3][0-9][0-9][0-9][0-1][0-9][0-3][0-9]")" = "8" ]]; then
    Logger "ERROR" "[-d | --dc-trade-date] : 业务日期格式错误!"
    ShowUsage
    exit 10
elif [[ -z "${JSON_FILE_NAME}" ]]; then
    Logger "ERROR" "[-j | --json-file-name] : DATAX JSON 配置文件名不能为空!"
    ShowUsage
    exit 10
elif [[ -n "${PUSH_DAYS}" && ! "${PUSH_DAYS}" =~ ^(-)?[1-9]+,(0|1)$ ]]; then
    Logger "ERROR" "[--push-days] : 数据推送周期格式错误!"
    ShowUsage
    exit 10
fi

# Module 4: 定义全局变量
# 日志表
LOG_TABLE_NAME="dwd:dwd_dc_etl_exec_logs"
## 记录开始时间
ETL_START_TIME="$(date "+%Y-%m-%d %H:%M:%S")"
## 日志表 任务名称
PROC_NAME=${JSON_FILE_NAME#*.}
## 日志表 任务类型,1:存储过程,2:数据推送,3:数据推送
PROC_TYPE=3
## 日志表 执行表名或者工作流名
TABLE_NAME=${JSON_FILE_NAME#*.}
## 日志表 任务执行结果标志位.1:成功,2:失败
EXEC_FLAG=1
# 是否写日志标志,0-不写入,0-写入
IS_WRITE_LOG=1
# Inceptor 生产集群连接信息
INCEPTOR_JDBC="jdbc:inceptor2://10.151.6.28:10000/dev"
INCEPTOR_USERNAME="swscdc"
INCEPTOR_PASSWORD=$(echo "U3dzYzYwMDM2OQo=" | base64 -d)
# 设置全局DataX执行参数
PARAMS="-DparaReaderJdbc=${SRC_JDBC} -DsrcUsername=${SRC_USERNAME} -DsrcPassword=${SRC_PASSWORD} -DcustomPartition=${CUSTOM_PARTITION} -DtarEsUrl=${TAR_ES_URL} -DparaWriterJdbc=${TAR_JDBC} -DtarUsername=${TAR_USERNAME} -DtarPassword=${TAR_PASSWORD} -DincValue='${INC_VALUE}'"

# Module 5: 业务处理函数
## 获取 json 文件引入资源函数
function GetJsonFile() {
    for file in "$1"/*; do
        if test -f "$file"; then
            if [[ "${file##*/}" == "${JSON_FILE_NAME#*.}.json" ]]; then
                JSON_FILE="${file}"
            fi
        else
            GetJsonFile "${file}"
        fi
    done
}
## 分区处理函数
function PartitionDel() {
    Logger "INFO" "[PartitionDel] - 开始增删分区 ..."
    Logger "INFO" "[PartitionDel] - TABLE : [${TAR_TABLE_NAME}], PARTITION : [${PARAM_TRADE_DATE}]"
    local alter_sql="ALTER TABLE ${TAR_TABLE_NAME} DROP PARTITION (dc_trade_date=${PARAM_TRADE_DATE});ALTER TABLE ${TAR_TABLE_NAME} ADD PARTITION (dc_trade_date=${PARAM_TRADE_DATE});"
    Logger "INFO" "[PartitionDel] - SQL : [${alter_sql}]."
    local part_del_times=1
    while true; do
        beeline -u "${TAR_JDBC}" -n ${TAR_USERNAME} -p ${TAR_PASSWORD} -e "${alter_sql}"
        local beeline_result=$?
        if [[ "${beeline_result}" = "0" ]]; then
            Logger "INFO" "[PartitionDel] - 增删分区完成."
            break
        else
            Logger "INFO" "[PartitionDel] - 增删分区失败,将进行失败重做."
            if [[ ${part_del_times} -ge 10 ]]; then
                Logger "ERROR" "[PartitionDel] - 失败重做次数已超过限制[>=10],退出执行."
                exit 10
            else
                Logger "INFO" "[PartitionDel] - 失败重做次数: [${part_del_times}], 失败重做间隔: [30s]"
                sleep 30s
                part_del_times=$((part_del_times + 1))
            fi
        fi
    done
}
## 调用 datax 执行函数
function PushData() {
    Logger "INFO" "[PushData] - 开始数据推送 ..."
    # 设置DataX执行参数
    local now_params
    # 判断源数据库类型,inceptor或者关系型数据库
    if [[ -n "${SRC_NAME_SERVICE}" ]]; then
        ## 推送前检查HDFS路径是否存在
        PathIsExist
        if [[ ${NOW_STATUS} == "False" ]]; then
            return
        fi
        ## HDFS HA NameService 参数值分割处理
        local src_name_service1 src_name_service2
        src_name_service1=$(echo "${SRC_NAME_SERVICE}" | awk -F ";" '{print $1}')
        src_name_service2=$(echo "${SRC_NAME_SERVICE}" | awk -F ";" '{print $2}')
        now_params="-DinTradeDate=${PARAM_TRADE_DATE} ${PARAMS} -DsrcNameService1=${src_name_service1} -DsrcNameService2=${src_name_service2}"
    else
        now_params="-DinTradeDate=${PARAM_TRADE_DATE} ${PARAMS}"
    fi
    ## 判断目标是否为TDH集群
    if [[ -n "${TAR_NAME_SERVICE}" ]]; then
        ## HDFS HA NameService 参数值分割处理
        local tar_name_service1 tar_name_service2
        tar_name_service1=$(echo "${TAR_NAME_SERVICE}" | awk -F ";" '{print $1}')
        tar_name_service2=$(echo "${TAR_NAME_SERVICE}" | awk -F ";" '{print $2}')
        now_params="-DinTradeDate=${PARAM_TRADE_DATE} ${PARAMS} -DtarNameService1=${tar_name_service1} -DtarNameService2=${tar_name_service2}"
    fi
    # 根据是否设置了JVM_BUFFER参数来决定是否需要修改JVM堆参数
    if [[ -n "${JVM_BUFFER}" ]]; then
        Logger "INFO" "[PushData] - JVM堆参数 : ${JVM_BUFFER}G"
        JVM="-Xms${JVM_BUFFER}G -Xmx${JVM_BUFFER}G"
    else
        # JVM堆参数默认为2G
        JVM="-Xms2G -Xmx2G"
    fi
    # 执行DataX任务
    Logger "INFO" "[PushData] - DataX执行参数: ${now_params}"
    python /usr/local/datax/bin/datax.py --jvm="${JVM}" "${JSON_FILE}" -p"${now_params}"
    local datax_result=$?
    if [[ "${datax_result}" == 0 ]]; then
        Logger "INFO" "[PushData] - 数据推送完成."
    else
        Logger "ERROR" "[PushData] - 数据推送失败."
        exit 10
    fi
}
## 写入 Hyperbase 日志函数
function WriteEtlLogToHyperbase() {
    local key=${JSON_FILE_NAME#*.}-${PARAM_TRADE_DATE}
    local data_date=${PARAM_TRADE_DATE}
    local etl_end_time
    etl_end_time="$(date "+%Y-%m-%d %H:%M:%S")"
    local write_log_times=1
    Logger "INFO" "[WriteEtlLogToHyperbase] - 写入推送完成日志信息 ..."
    Logger "INFO" "[WriteEtlLogToHyperbase] - TABLE : [dwd.dwd_dc_etl_exec_logs]"
    Logger "INFO" "[WriteEtlLogToHyperbase] - Hyperbase写入值: {key : ${key}, proc_name : ${PROC_NAME}, proc_type : ${PROC_TYPE}, table_name : ${TABLE_NAME}, data_date : ${data_date}, etl_start_time : ${data_date}, etl_end_time : ${etl_end_time}, exec_flag : ${EXEC_FLAG}}"
    while true; do
        echo "hput '${LOG_TABLE_NAME}','${key}','f:q1','${PROC_NAME}'
              hput '${LOG_TABLE_NAME}','${key}','f:q2','${PROC_TYPE}'
              hput '${LOG_TABLE_NAME}','${key}','f:q3','${TABLE_NAME}'
              hput '${LOG_TABLE_NAME}','${key}','f:q4','${data_date}'
              hput '${LOG_TABLE_NAME}','${key}','f:q5','${ETL_START_TIME}'
              hput '${LOG_TABLE_NAME}','${key}','f:q6','${etl_end_time}'
              hput '${LOG_TABLE_NAME}','${key}','f:q7','${EXEC_FLAG}'" | hbase shell -n >/dev/null 2>&1
        local write_result=$?
        if [[ "${write_result}" == "0" ]]; then
            Logger "INFO" "[WriteEtlLogToHyperbase] - 日志信息写入完成."
            break
        else
            Logger "INFO" "[WriteEtlLogToHyperbase] - 日志信息写入失败,将进行失败重做."
            if [[ ${write_log_times} -ge 3 ]]; then
                Logger "ERROR" "[WriteEtlLogToHyperbase] - 失败重做次数已超过限制[>=3],退出执行."
                exit 10
            else
                Logger "INFO" "[WriteEtlLogToHyperbase] - 失败重做次数: [${write_log_times}], 失败重做间隔: [10s]"
                sleep 10s
                write_log_times=$((write_log_times + 1))
            fi
        fi
    done
}
## 检查当前依赖项是否完成函数
function CheckSourceByHbaseShell() {
    Logger "INFO" "[CheckSourceByHbaseShell] - 检查当前依赖项是否完成 ..."
    # 设置分隔符为分号
    IFS=';'
    # 将字符串转换为数组
    read -ra arr <<< "${DEPENDENCE}"
    # 将IFS变量重置为默认值
    IFS=$' \t\n'
    # 循环读取数组
    for elem in "${arr[@]}"; do
        local key="${elem}-${PARAM_TRADE_DATE}"
        Logger "INFO" "[CheckSourceByHbaseShell] - 依赖项: ${elem}, key: ${key}"
        while true; do
            local hbase_result check_result
            hbase_result=$(echo "hget '${LOG_TABLE_NAME}','${key}'" | hbase shell -n 2>&1)
            check_result=$(echo "${hbase_result}"| awk '/row\(s\)/{print $0}' | awk -F " " '{print $1}' | sed ':a;N;$!ba;s/\n//g' | sed s/[[:space:]]//g)
           if [[ -n "${check_result}" && "${check_result}" != "0" ]]; then
                Logger "INFO" "[CheckSourceByHbaseShell] - 检查到依赖项已完成."
                break
           else
                Logger "INFO" "[CheckSourceByHbaseShell] - 检查到依赖项未完成,将等待1m后再次检查."
                sleep 1m
           fi
        done
    done
}
## 获取自然日或交易日函数
function GetNatureOrTradeDays() {
    local is_trade_date
    DAYS=$(echo "${PUSH_DAYS}" | awk -F "," '{print $1}')
    is_trade_date=$(echo "${PUSH_DAYS}" | awk -F "," '{print $2}')
    local sub_query_sql
    local days_abs
    if [[ "${is_trade_date}" == "0" ]]; then
        DAYS_FLAG="自然日"
        if [[ "${DAYS}" -gt 0 ]]; then
            sub_query_sql="AND T.DATE >= ${DC_TRADE_DATE} ORDER BY T.DATE LIMIT ${DAYS};"
        else
            days_abs=$((0 - "${DAYS}"))
            sub_query_sql="AND T.DATE <= ${DC_TRADE_DATE} ORDER BY T.DATE DESC LIMIT ${days_abs};"
        fi
    else
        DAYS_FLAG="交易日"
        if [[ "${DAYS}" -gt 0 ]]; then
            sub_query_sql="AND T.IS_TRADE_DATE = 1 AND T.DATE >= ${DC_TRADE_DATE} ORDER BY T.DATE LIMIT ${DAYS};"
        else
            days_abs=$((0 - "${DAYS}"))
            sub_query_sql="AND T.IS_TRADE_DATE = 1 AND T.DATE <= ${DC_TRADE_DATE} ORDER BY T.DATE DESC LIMIT ${days_abs};"
        fi
    fi
    # 初始化空数组
    DAYS_ARR=()
    local query_sql="SELECT T.DATE FROM DIM.DIM_TRADE_DATE_ADD_YEAR T WHERE T.MKT_CODE = '1' ${sub_query_sql}"
    local query_result trade_days
    query_result=$(beeline -u "${INCEPTOR_JDBC}" -n "${INCEPTOR_USERNAME}" -p "${INCEPTOR_PASSWORD}" -e "${query_sql}" --showHeader=false --silent=true)
    trade_days=$(echo "${query_result}" | awk -F "|" '{print $2}' | sed ':a;N;$!ba;s/\n/;/g' | sed s/[[:space:]]//g | grep -oP '\d{8}' | paste -sd ' ' -)
    # 设置分隔符为分号
    IFS=' '
    # 将字符串转换为数组
    read -ra DAYS_ARR <<< "${trade_days}"
    # 将IFS变量重置为默认值
    IFS=$' \t\n'
}
## HDFS路径检查函数
function PathIsExist() {
    Logger "INFO" "[PathIsExist] - 开始检查HDFS路径是否存在 ..."
    local hdfs_path
    hdfs_path=$(sed -n '/.*"path": "\(.*\)",/{s//\1/p;q}' "${JSON_FILE}" | sed "s/\${inTradeDate}/${PARAM_TRADE_DATE}/g" | sed "s/\${customPartition}/${CUSTOM_PARTITION}/g")
    hdfs_path=$(echo -e "${hdfs_path}" | sed -e 's/\n//g' -e 's/\r//g')
    Logger "INFO" "[PathIsExist] - fs.defaultFS path : ${hdfs_path}"
    Logger "INFO" "[PathIsExist] - 路径检查命令 : hadoop fs -test -e ${hdfs_path} >/dev/null 2>&1"
    hadoop fs -test -e "${hdfs_path}" >/dev/null 2>&1
    local test_result=$?
    if [[ "${test_result}" == 0 ]]; then
        Logger "INFO" "[PathIsExist] - 已检查到配置项fs.defaultFS,path存在."
        Logger "INFO" "[PathIsExist] - HDFS路径检查完成."
        NOW_STATUS="True"
    else
        if [[ "${NONPATH_ERR}" == "True" ]]; then
            Logger "INFO" "[PathIsExist] - 已检查到配置项fs.defaultFS,path不存在,则默认本次无任何数据进行推送,中止执行,任务状态将置为成功!"
            Logger "INFO" "[PathIsExist] - HDFS路径检查完成."
            Logger "INFO" "[PushData] - 数据推送完成."
            NOW_STATUS="False"
        else
            Logger "ERROR" "[PathIsExist] - 无法读取路径${hdfs_path}下的所有文件,请确认您的配置项fs.defaultFS,path的值是否正确,是否有读写权限,网络是否已断开!"
            Logger "INFO" "[PathIsExist] - HDFS路径检查完成."
            Logger "ERROR" "[PushData] - 数据推送失败."
            exit 10
        fi
    fi
}
## 构建 main 函数
function Main() {
    GetJsonFile .
    ## 判断 json 文件是否是否存在且是否为空文件
    if [[ ! -s ${JSON_FILE} ]]; then
        Logger "INFO" "[Main] - 配置文件 : ${JSON_FILE_NAME#*.}.json"
        Logger "ERROR" "[Main] - 配置文件不存或者为空文件,请检查是否引入资源或者引入错误资源."
        exit 10
    fi
    ## 判断是否需要推送某个时间段内的数据
    if [[ -z "${PUSH_DAYS}" ]]; then
        ## 设置后续函数的中PARAM_TRADE_DATE值为DC_TRADE_DATE
        PARAM_TRADE_DATE="${DC_TRADE_DATE}"
        # 判断是否需要进行当前依赖项检查
        if [[ -n "${DEPENDENCE}" ]]; then
            CheckSourceByHbaseShell
        fi
        ## 判断是否需要推送TDH集群进行日分区处理
        if [[ -n "${TAR_TABLE_NAME}" ]]; then
            ## 增删分区
            PartitionDel
        fi
        ## 推送数据
        PushData
        # 写入推送成功日志
        if [[ ${IS_WRITE_LOG} -gt 0 ]]; then
            ## 向生产环境日志表写入日志,因为旧集群未启用HBase,所以暂时不写日志.
            WriteEtlLogToHyperbase
        fi
    else
        ## 进行循环推数
        GetNatureOrTradeDays
        Logger "INFO" "[Main] - 推送近${DAYS}个${DAYS_FLAG}内的数据 ..."
        Logger "INFO" "[Main] - ${DAYS_FLAG} : ${DAYS_ARR[*]}"
        for idx in "${!DAYS_ARR[@]}"; do
            PARAM_TRADE_DATE="${DAYS_ARR[idx]}"
            Logger "INFO" "[Main] - [${idx}] - 业务日期 : ${DC_TRADE_DATE}, 数据日期 : ${PARAM_TRADE_DATE}"
            # 判断是否需要进行当前依赖项检查
            if [[ -n "${DEPENDENCE}" ]]; then
                CheckSourceByHbaseShell
            fi
            ## 判断是否需要推送TDH集群进行日分区处理
            if [[ -n "${TAR_TABLE_NAME}" ]]; then
                ## 增删分区
                PartitionDel
            fi
            ## 推送数据
            PushData
            ## 写入推送成功日志
            if [[ ${IS_WRITE_LOG} -gt 0 ]]; then
                ## 向生产环境日志表写入日志,因为旧集群未启用HBase,所以暂时不写日志.
                WriteEtlLogToHyperbase
            fi
        done
    fi
}

# Module 6: 执行 main 函数
Main

这个就仅供参考~
然后就新建工作流定义,选择shell任务节点,需要填好名称、执行脚本和引入json和shell脚本。
在这里插入图片描述
执行脚本:

sh tag_platform/shell/datax_push_tdh.sh -d ${IN_TRADE_DATE} -j ${JSON_FILE_NAME} --src-name-service ${SRC_NAME_SERVICE} --tar-jdbc ${TAR_JDBC} --tar-username ${TAR_USERNAME} --tar-password ${TAR_PASSWORD} --tar-tablename ${TAR_TABLENAME} 

然后上线任务并执行即可。

三、DS调用HTTP接口

选择HTTP任务节点
在这里插入图片描述

参数说明:

节点名称:一个工作流定义中的节点名称是唯一的。

运行标志:标识这个节点是否能正常调度,如果不需要执行,可以打开禁止执行开关。

描述信息:描述该节点的功能。

任务优先级:worker 线程数不足时,根据优先级从高到低依次执行,优先级一样时根据先进先出原则执行。

Worker分组:任务分配给worker组的机器机执行,选择Default,会随机选择一台worker机执行。

失败重试次数:任务失败重新提交的次数,支持下拉和手填。

失败重试间隔:任务失败重新提交任务的时间间隔,支持下拉和手填。

超时告警:勾选超时告警、超时失败,当任务超过"超时时长"后,会发送告警邮件并且任务执行失败.

请求地址:http 请求 URL。

请求类型:支持 GETPOStHEADPUTDELETE。

请求参数:支持 ParameterBodyHeaders。

校验条件:支持默认响应码、自定义响应码、内容包含、内容不包含。

校验内容:当校验条件选择自定义响应码、内容包含、内容不包含时,需填写校验内容。

自定义参数:是 http 局部的用户自定义参数,会替换脚本中以${变量}的内容。

四、DS依赖(DEPENDENT)节点

运行说明:依赖节点,就是依赖检查节点。比如 A 流程依赖昨天的 B 流程执行成功,依赖节点会去检查 B 流程在昨天是否有执行成功的实例。

例如,A 流程为周报任务,B、C 流程为天任务,A 任务需要 B、C 任务在上周的每一天都执行成功,如图示:
在这里插入图片描述
假如,周报 A 同时还需要自身在上周二执行成功:
在这里插入图片描述

五、DS SPARK 节点

执行说明:通过 SPARK 节点,可以直接直接执行 SPARK 程序,对于 spark 节点,worker 会使用 spark-submit 方式提交任务 参数说明:

程序类型:支持 JAVAScalaPython 三种语言

主函数的 class:是 Spark 程序的入口 Main Class 的全路径

主 jar 包:是 Spark 的 jar 包

部署方式:支持 yarn-cluster、yarn-client、和 local 三种模式

Driver:设置 Driver 内核数 及 内存数

Executor:设置 Executor 数量、Executor 内存数、Executor 内核数

命令行参数:是设置 Spark 程序的输入参数,支持自定义参数变量的替换。

其他参数:支持 --jars、--files、--archives、--conf 格式

资源:如果其他参数中引用了资源文件,需要在资源中选择指定

自定义参数:是 MR 局部的用户自定义参数,会替换脚本中以${变量}的内容

注意:JAVAScala 只是用来标识,没有区别,如果是 Python 开发的 Spark 则没有主函数的 class ,其他都是一样

六、DS Flink 节点

参数说明:

程序类型:支持 JAVAScalaPython 三种语言

主函数的 class:是 Flink 程序的入口 Main Class 的全路径

主 jar 包:是 Flink 的 jar 包

部署方式:支持 cluster、local 三种模式

slot 数量:可以设置slot数

taskManage 数量:可以设置 taskManage 数

jobManager 内存数:可以设置 jobManager 内存数

taskManager 内存数:可以设置 taskManager 内存数

命令行参数:是设置Spark程序的输入参数,支持自定义参数变量的替换。

其他参数:支持 --jars、--files、--archives、--conf 格式

资源:如果其他参数中引用了资源文件,需要在资源中选择指定

自定义参数:是 Flink 局部的用户自定义参数,会替换脚本中以${变量}的内容

注意:JAVAScala 只是用来标识,没有区别,如果是 Python 开发的 Flink 则没有主函数的class,其他都是一样

七、DS Flink 节点

例如:

项目管理 -> 工作流 -> 工作流定义 -> 创建工作流
------------------------------------------------------
Task 1:拖拽 SQL 节点到画布,新增一个 SQL 任务
节点名称:Test_sql_hive_01
... ...
数据源:Hive  test_hiveserver2
sql类型:查询   表格:√ 附件:√
主题:Test Hive
收件人:tourist@sohh.cn
sql语句(结尾不要加分号):
    select * from test_table where score=${i}
自定义参数:
    i -> IN -> INTEGER -> 97
前置sql:
    INSERT INTO test_table values(null, 'Dog',97)
后置sql-> 确认添加
Task 2:拖拽 SQL 节点到画布,新增一个 SQL 任务
节点名称:Test_sql_hive_02
... ...
数据源:Hive  test_hiveserver2_ha
sql类型:非查询
sql语句(结尾不要加分号):
    create table test_table2 as select * from test_table
自定义参数:
前置sql:
后置sql-> 确认添加
------------------------------------------------------
串联任务节点 Test_sql_hive_01、 Test_sql_hive_02
------------------------------------------------------
保存 ->
设置 DAG 图名称:Test_sql_hive
选择租户:Default
超时告警:off
设置全局:
------------------------------------------------------
添加 -> 上线 -> 运行

八、DS SQL 节点

参数说明:

数据源:选择对应的数据源

sql 类型:支持查询和非查询两种,查询是 select  类型的查询,是有结果集返回的,可以指定邮件通知为 表格、附件 或 表格与附件 三种模板。非查询是没有结果集返回的,是针对 update、delete、insert 三种类型的操作

主题、收件人、抄送人:邮件相关配置

sql 参数:输入参数格式为 key1=value1;key2=value2…

sql 语句:SQL 语句

UDF 函数:对于 HIVE 类型的数据源,可以引用资源中心中创建的 UDF 函数,其他类型的数据源暂不支持 UDF 函数

自定义参数:SQL 任务类型自定义参数会替换 sql 语句中 ${变量}。而存储过程是通过自定义参数给方法参数设置值,自定义参数类型和数据类型同存储过程任务类型一样。

前置 sql:执行 “sql语句” 前的操作

后置 sql:执行 “sql语句” 后的操作

例如MySQL:

项目管理 -> 工作流 -> 工作流定义 -> 创建工作流
------------------------------------------------------
Task 1:拖拽 SQL 节点到画布,新增一个 SQL 任务
节点名称:Test_sql_mysql_01
... ...
数据源:MYSQL   test01_mysql
sql类型:查询   表格:√ 附件:√
主题:Test MySQL
收件人:tourist@sohh.cn
sql语句:
    select * from test_table where score=${i};
自定义参数:
    i -> IN -> INTEGER -> 97
前置sql:
    INSERT INTO test_table values(null, 'Dog',97)
后置sql-> 确认添加
Task 2:拖拽 SQL 节点到画布,新增一个 SQL 任务
节点名称:Test_sql_mysql_02
... ...
数据源:MYSQL   test01_mysql
sql类型:非查询
sql语句:
    create table test_table2 as select * from test_table;
自定义参数:
前置 sql:
后置 sql-> 确认添加
------------------------------------------------------
串联任务节点 Test_sql_mysql_01、Test_sql_mysql_02
------------------------------------------------------
保存 ->
设置 DAG 图名称:Test_sql_mysql
选择租户:Default
超时告警:off
设置全局:
------------------------------------------------------
添加 -> 上线 -> 运行

九、DS Java程序

Java 程序参数说明:

程序类型:JAVA

主函数的 class:是 MR 程序的入口 Main Class 的全路径

主jar包:是 MR 的 jar 包

命令行参数:是设置 MR 程序的输入参数,支持自定义参数变量的替换

其他参数:支持 –D-files、-libjars、-archives格式

资源:如果其他参数中引用了资源文件,需要在资源中选择指定

自定义参数:是MR局部的用户自定义参数,会替换脚本中以${变量}的内容

例如:

# 将 MR 的示例 jar 包上传到 资源中心;并创建测试文本上传到 HDFS 目录
# CDH 版本 Jar 包位置:/opt/cloudera/parcels/CDH/lib/hadoop-mapreduce/hadoop-mapreduce-examples.jar
项目管理 -> 工作流 -> 工作流定义 -> 创建工作流
------------------------------------------------------
拖拽 MR 节点到画布,新增一个 MR 任务
节点名称:Test_mr_java_01
... ...
程序类型:JAVA
主函数的class:wordcount
主jar包:hadoop-mapreduce-examples.jar
命令行参数:/tmp/test.txt /tmp/output
其他参数:
资源:
自定义参数:
-> 确认添加
------------------------------------------------------
保存 ->
设置DAG图名称:Test_mr_java
选择租户:Default
超时告警:off
设置全局:
------------------------------------------------------
添加 -> 上线 -> 运行(运行MR的权限问题此处不再描述)
------------------------------------------------------
查看结果:
sudo -u hdfs hadoop fs -cat /tmp/output/*

十、DS Python节点

运行说明:使用 python 节点,可以直接执行 python 脚本,对于 python 节点,worker会使用 python ** 方式提交任务。参数说明:脚本:用户开发的 Python 程序 资源:是指脚本中需要调用的资源文件列表 自定义参数:是 Python 局部的用户自定义参数,会替换脚本中以 ${变量} 的内容。
例如:

项目管理 -> 工作流 -> 工作流定义 -> 创建工作流
------------------------------------------------------
拖拽 Python 节点到画布,新增一个 Python 任务
节点名称:Test_python_01
... ...
脚本:
    #!/user/bin/python
    # -*- coding: UTF-8 -*-
    for num in range(0, 10): print 'Round %d ...' % num
资源:
自定义参数:
-> 确认添加
------------------------------------------------------
保存 ->
设置 DAG 图名称:Test_python
选择租户:Default
超时告警:off
设置全局:
------------------------------------------------------
添加 -> 上线 -> 运行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2121204.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java实现在线聊天室

分为客户端和服务器端两个部分。服务器负责处理客户端之间的通信&#xff0c;客户端则提供一个用户界面来发送和接收消息。 技术栈 Java语言SocketSwingUI 要点 一个服务端&#xff0c;多台客户端每个客户端登录时输入用户名消息格式化&#xff1a;服务器接收到消息时&#…

数据资产管理:真能推动数据要素市场发展还是只是空谈?

数据资产管理&#xff1a;真能推动数据要素市场发展还是只是空谈&#xff1f; 前言数据资产管理 前言 数据已成为企业和组织的重要资产&#xff0c;其价值的充分发挥对于推动业务发展和提升竞争力至关重要。数据资产管理作为一种有效的管理手段&#xff0c;正逐渐受到广泛关注…

Open Source, Open Life 第九届中国开源年会论坛征集正式启动

中国开源年会 COSCon 是业界最具影响力的开源盛会之一&#xff0c;由开源社在2015年首次发起&#xff0c;而今年我们将迎来第九届 COSCon&#xff01; 以其独特定位及日益增加的影响力&#xff0c;COSCon 吸引了越来越多的国内外企业、高校、开源组织/社区的大力支持。与一般企…

java基础(1)数据类型,运算符,逻辑控制语句以及基本应用

目录 ​编辑 1.前言 2.正文 2.1数据类型与变量 2.1.1字面常量 2.1.2数据类型 2.1.3变量 2.1.3.1整型 2.1.3.2浮点型 2.1.3.3字符型 2.1.3.4布尔型 2.1.4类型转换与类型提升 2.1.4.1字符串的拼接 2.1.4.2整型转字符串 2.1.4.3字符串转整数 2.2运算符 2.2.1算术运…

小红书笔记数单日突破1.3万,8月全网都被这只猴子刷屏了!

8月20日早上10点&#xff0c;《黑神话&#xff1a;悟空》正式发布后立刻席卷全网&#xff0c;众多玩家在游戏正式发布后火速进入游戏界面&#xff0c;甚至有多家公司宣布放假让员工玩《黑神话悟空》&#xff0c;不论是玩游戏的、还是不玩游戏的&#xff0c;都为之献上巨大的关注…

一个让LLM更具创造力的“超级提示词“

1. “超级提示词” 开源项目简介 Github 上最近开源了一个名为 “超级提示词” (Super Prompt) 的项目&#xff0c;该项目旨在激发大语言模型&#xff08;LLM&#xff09;的创造力和发散思维。通过输入这些提示词&#xff0c;LLM 能够生成更多新颖的想法&#xff0c;对于需要创…

Linux网络:总结协议拓展

1. TCP/IP四层模型总结 2. 网络协议拓展 DNS协议&#xff08;地址解析协议&#xff09; TCP/IP使用IP地址和端口号来确定网络中一台主机的一个程序。 但是这样标定不方便记忆&#xff0c;于是开始引出主机名&#xff08;字符串&#xff09;&#xff0c;使用hosts文件来描述…

Monte Carlo方法解决强化学习问题

本文继续深入探讨蒙特卡罗 (MC)方法。这些方法的特点是能够仅从经验中学习,不需要任何环境模型,这与动态规划(DP)方法形成对比。 这一特性极具吸引力 - 因为在实际应用中,环境模型往往是未知的,或者难以精确建模转移概率。以21点游戏为例:尽管我们完全理解游戏规则,但通过DP方…

智慧教室无纸化同屏方案是否适用RTMP?

智慧教室无纸化方案技术背景 智慧教室无纸化方案是一种基于现代信息技术&#xff0c;旨在通过数字化手段实现教学过程的无纸化、智能化和高效化的解决方案。该方案以学生为中心&#xff0c;强调互动化的数字教学服务&#xff0c;旨在提升教学质量和学习效率&#xff0c;同时减…

SprinBoot+Vue宠物店管理系统的设计与实现

目录 1 项目介绍2 项目截图3 核心代码3.1 Controller3.2 Service3.3 Dao3.4 application.yml3.5 SpringbootApplication3.5 Vue 4 数据库表设计5 文档参考6 计算机毕设选题推荐7 源码获取 1 项目介绍 博主个人介绍&#xff1a;CSDN认证博客专家&#xff0c;CSDN平台Java领域优质…

Comsol 双层薄板夹芯结构声辐射响应

阻尼双层板的声辐射特性可以通过以下原理进行解释&#xff1a; 1. 双层板的振动模态&#xff1a;当双层板受到外力作用时&#xff0c;会发生振动&#xff0c;其振动模态会影响其声辐射特性。在双层板内部&#xff0c;振动模态分为两种类型&#xff1a;弯曲模态和剪切模态。而在…

ADG切换异常

一、问题现象 备库切换主库的过程中&#xff1a; alter database commit to switchover to primary with session shutdown; 有一个进程始终是active状态&#xff0c;导致无法完成切换。 主库已切换成&#xff1a;RECOVERY NEEDED --备用数据库还没有接收到切换请求 SWITC…

(Lane Detection-3)PVALane————OpenLane数据集的SOTA模型

PVALane作为最新的SOTA模型&#xff0c;横空出世&#xff0c;让我们来一起仔细阅读这篇文章 提出问题 上图展示了当前Lane Detection中的主要问题&#xff1a; 受益于3D物体检测的成功&#xff0c;当前的3D车道线检测模型&#xff08;通常通过使用逆透视映射&#xff08;IPM&…

Leetcode 最大子数组和

使用“Kadane’s Algorithm”来解决。 Kadane’s Algorithm 在每个步骤中都保持着一个局部最优解&#xff0c;即以当前元素为结尾的最大子数组和(也就是局部最优解)&#xff0c;并通过比较这些局部最优解和当前的全局最优解来找到最终的全局最优解。 Kadane’s Algorithm的核…

git使用手册

其余内容可以参考洪的学习笔记 一 安装 Linux sudo apt-get install gitWIndows “Git”->“Git Bash” 1.1 配置 分布式版本控制&#xff0c;所有机器自报家门 git config --global user.name "Your Name" git config --global user.email "emailexa…

HashTable哈希表;对象流

HashTable HashMap&HashTable 编号比较HashMapHashTable1线程安全✕✓2keynull✓✕3valuenull✓✕4效率高低 package com.ffyc.map;import javax.print.DocFlavor; import java.util.HashMap; import java.util.Hashtable; import java.util.Map;public class HashTableD…

LLMs技术 | 整合Ollama实现本地LLMs调用

前言 近两年AIGC发展的非常迅速&#xff0c;从刚开始的只有ChatGPT到现在的很百家争鸣。从开始的大参数模型&#xff0c;再到后来的小参数模型&#xff0c;从一开始单一的文本模型到现在的多模态模型等等。随着一起进步的不仅仅是模型的多样化&#xff0c;还有模型的使用方式。…

如何修改tomcat服务器的端口号?

四、如何修改tomcat服务器的端口号&#xff1f; 有3种方式修改tomcat服务器的端口号&#xff1a; &#xff08;1&#xff09;修改 conf/server.xml &#xff08;2&#xff09;集成的本地Tomcat中通过HTTP port修改 &#xff08;3&#xff09;pom.xml文件中通过tomcat插件的co…

EG边缘计算网关连接中移ONENET物联网平台(MQTT协议)

上文&#xff1a;EG边缘计算网关连接阿里云物联网平台&#xff08;MQTT协议&#xff09; 需求概述 本章节主要实现一个流程&#xff1a;EG8200mini采集Modbus RTU数据&#xff0c;通过MQTT协议连接中移ONENET物联网平台 Modbus RTU采集此处不做过多赘述&#xff0c;可参考其…

什么是迟滞?

在描述两个量之间的关系时&#xff0c;我们通常可以说特定的输入值对应于特定的输出值。例如&#xff0c;放大器具有输入电压和输出电压&#xff0c;这些电压与增益相关&#xff08;在现实生活中增益是频率的函数而不是常数&#xff09;。如果我们忽略饱和等非理想因素&#xf…