RestAPI调用
功能:实现Restful风格的API调用
步骤一:配置RestAPI数据源,在url中填Restful风格的url,若是需要账号密码登录等可以切换“验证方法”
步骤二:在离线同步中创建离线同步任务,数据来源选择配置的Restful数据源
其他接口调用
功能:实现非Restful风格的接口调用
使用节点:PyODPS,其中PyODSP2的python版本为2.7,PyODSP3的python版本为3.7
实现方法:导入所需的第三方python包,采用python编码实现
第三方包导入:
公有云:直接pip安装所需包即可
专有云:导入第三方包方法如下:
步骤一:
PyPI · The Python Package Index等平台下载对应版本的python包,将文件后缀改为.zip
步骤二:上传资源,将第三方包上传至dataworks(注意:所有上传文件均要打包上传,直接上传文件会乱码导致无法使用)
步骤三:在PyODPS中引用资源,然后开发接口
##@resource_reference{"test.zip"}
import sys
import os
root_path =os.path.dirname(os.path.abspath('test.zip'))
os.system("unzip test.zip-d" + root_path)
sys.path.append(root_path)
import pyrfc
遇到问题:若是需要第三方包较多、包需要安装以及需要SDK等工具时,建议使用虚拟机配置环境后打包上传
推荐方法:在本地linux虚拟机(不用在意linux版本),安装Anaconda后配置对应的虚拟环境,将虚拟环境的python包整体打包(路径一般为:~/anaconda/envs/env_name/lib/pyyhon3.7/site-packages)
注意只打包site-packages内的文件,另外如果需要用到其他sdk等工具,可以将文件(X.so)直接放入即可
实例:中台实现SAP RFC接口调用
Python环境已经导入系统:包名为pack_for_sap_rfc.zip
##@resource_reference{"pack_for_sap_rfc.zip"}
import sys
import os
import pandas as pd
from odps import ODPS
from odps.df import DataFrame
# 运行环境导入
root_path =os.path.dirname(os.path.abspath('pack_for_sap_rfc.zip'))
os.system("unzippack_for_sap_rfc.zip -d" + root_path)
sys.path.append(root_path)
import pyrfc
# SAP服务器IP
ASHOST = 'x.x.x.x'
# 客户端号
CLIENT = '800'
# 系统数字
SYSNR = '03'
# 用户名
USER = 'xxx
# 密码
PASSWD = 'xxx'
# 创建连接
conn = pyrfc.Connection(ashost=ASHOST, sysnr=SYSNR, client=CLIENT, user=USER, passwd=PASSWD)
# 调用接口,其中'test'为RFC接口名,其他为传入参数
result =conn.call('test',
I_DATUM_FROM='',
I_DATUM_TO='')
# 调用时间,传入时间参数
ds = args['ds']
# 获取返回结果中的表
row_lists = [pd.Series(row) for row in result["ET_CONTRACT"]]
# 获取列名
column = result["ET_CONTRACT"][0].keys()
# 根据列名创建Dataframe
df = pd.DataFrame(row_lists, columns=column)
#print(df.head(10))
# 删除指定分区 方式一
# t =o.get_table("table_name")
#t.delete_partition('ds='+ds, if_exists=True)
# 删除指定分区 方式二
o.execute_sql("altertable table_name drop if exists partition(ds ='{}')".format(ds))
# 往指定分区插入数据
o.write_table("table_name", df.values.tolist(), partition='ds={}'.format(ds),create_partition=True)
注意:提交周期任务时添加时间参数