背景
需求将MongoDB数据入仓MaxCompute
环境说明
MongoDB
100+个Collections:orders_1、orders_2、…、orders_100
前期准备
1、MongoDB数据源配置
需要先保证DW和MongoDB网络是能够联通的,需要现在集成任务中配置MongoDB的数据源信息。
具体可以查看我的另外一篇:https://blog.csdn.net/qq_16018407/article/details/142991582
2、赋值节点
选择赋值节点,赋值节点新增后打开,可以看到有Python、shell、ODPS SQL
Python 读取最后一次Print字符串,Shell读取最后一次echo输出的字符串,如”orders_1,order_2“ 就按照”,“逗号被拆分成2个元素用于后续循环
ODPS SQL 则是每一行是遍历的一个元素
每一次循环都会传入遍历的元素,如python :
print "orders_1,orders_2";
则会当做[“orders_1”,“orders_2”]数组进行遍历,每次一个元素会传入到遍历的循环中执行
实操界面:
print "orders_1,orders_2";
赋值节点会自动出现一个outputs给后面的节点读取
3、循环任务
新增完毕后进入到循环内部,会看到一个start 和end节点,这个时候我们再选择一个离线同步任务,将流程串起来
点开离线集成任务,切换到离线集成任务的脚本模式,赋值节点的collectionName会以”${dag.foreach.current}“ 参数传入到循环内部的流程中。
在集成任务脚本中,将对应的collectionName替换为 ${dag.foreach.current} 即可
{
"transform": false,
"type": "job",
"version": "2.0",
"steps": [
{
"stepType": "mongodb",
"parameter": {
"objectIdOutputType": "json",
"useSplitVector": false,
"datasource": "你的mongodb数据源名称",
"envType": 1,
"cursorTimeoutInMs": "3600000",
"column": [
{
"name": "col_combine",
"type": "combine"
}
],
"tableComment": "This kind of datasource dosen't support get table comment. This is a comment produced by di.",
"batchSize": "1000",
"collectionName": "${dag.foreach.current}"
},
"name": "Reader",
"category": "reader"
},
{
"stepType": "odps",
"parameter": {
"partition": "col=${dag.foreach.current}",
"truncate": true,
"datasource": "你输出数据表的MaxCompute空间名称",
"envType": 1,
"isSupportThreeModel": false,
"tunnelQuota": "default",
"column": [
"你的ODPS表的字段,因为我这里是想要将所有数据放在一个字段,所以这里就只预留了一个字段"
],
"emptyAsNull": false,
"tableComment": "",
"table": "你的ODPS表",
"consistencyCommit": false
},
"name": "Writer",
"category": "writer"
},
{
"copies": 1,
"parameter": {
"nodes": [],
"edges": [],
"groups": [],
"version": "2.0"
},
"name": "Processor",
"category": "processor"
}
],
"setting": {
"errorLimit": {
"record": "0"
},
"locale": "zh_CN",
"speed": {
"throttle": false,
"concurrent": 1
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
整个循环流程,点击右侧打开配置进行相关调度配置,最下方需要配置节点上下文 loopDataArray这个参数是读取外部的赋值节点,是必须配置的参数
日志
循环节点无法在dataworks的开发界面直接运营进行测试,只能发布以后在运维中心进行查看
最终效果
后期拓展
这里因为业务需求所以没有循环的参数是通过python print写死输出的
优雅一些的方式就是通过数据表维护,就可以动态读取数据表的内容,然后作为循环参数传入了
相关文档
for-each节点由哪些组成,应用逻辑是什么_大数据开发治理平台 DataWorks(DataWorks)-阿里云帮助中心. (2021, August 18). Aliyun.com. https://help.aliyun.com/zh/dataworks/user-guide/logic-of-for-each-nodes?spm=a2c4g.11186623.4.5.20a4d43aNd6b0E&scm=20140722.H_299261._.ID_299261-OR_rec-V_1#section-50c-r2v-mhd
赋值节点的操作步骤_大数据开发治理平台 DataWorks(DataWorks)-阿里云帮助中心. (2019, September 10). Aliyun.com. https://help.aliyun.com/zh/dataworks/user-guide/configure-an-assignment-node?spm=a2c4g.11186623.0.0.2947b24b0wmXD7#task-2485378
for-each节点由哪些组成,应用逻辑是什么_大数据开发治理平台 DataWorks(DataWorks)-阿里云帮助中心. (2021, August 18). Aliyun.com. https://help.aliyun.com/zh/dataworks/user-guide/logic-of-for-each-nodes?spm=a2c4g.11186623.0.0.45634a14sGs7jS