阿里MAXCOMPUTE数据专辑信息读取并同步数据表
在阿里云大数据体系中,我们可以使用数据地图的数据专辑,对数据的类别等进行一个管理
那么管理后的数据,我们想要落表进行相关的数据分析,如何做呢?
查看阿里云官方文档可以知道,我们可以通过阿里云OpenAPI取得专辑和对应的数据表信息,之后将结果落入MaxCompute中
Code
"""
@author:Biglucky
@date:2024-07-26
请求专辑信息并且写入到ODPS中
参数:
1、一组阿里云账号和需要访问的endpoint
ALIBABA_CLOUD_ACCESS_KEY_ID :key信息
ALIBABA_CLOUD_ACCESS_KEY_SECRET :secret信息
ALIBABA_CLOUD_ENDPOINT :阿里云开放API endpoint
ODPS_ENDPOINT :Maxcompute的endpoint
2、一个ODPS表,用于存储album信息
TABLE_PROJECT :MAXCOMPUTE的空间名称
TABLE_NAME :MAXCOMPUTE的表名称
创建好的table 包含列为:
{ album_id string
,album_name string 专辑名称
,entity_type string 类型
,entity_name string 表名称
,project_name string 项目名称
,add_album_time string 数据表添加到转机时间
}
3、安装好相关的包
STEPS:
1、读取阿里云开放API的album信息
2、读取album下的存放在DataFrame对象信息
3、将数据入到ODPS中
"""
import sys
from alibabacloud_tea_openapi.client import Client as OpenApiClient
from alibabacloud_tea_openapi import models as open_api_models
from alibabacloud_tea_util import models as util_models
from alibabacloud_openapi_util.client import Client as OpenApiUtilClient
import pandas as pd
from odps import ODPS
from odps.df import DataFrame
# 配置信息:海外公共组账号
ALIBABA_CLOUD_ACCESS_KEY_ID = "你的KEY"
ALIBABA_CLOUD_ACCESS_KEY_SECRET ="你的SECRET"
ALIBABA_CLOUD_ENDPOINT = "开放API的endpoint" # https://next.api.aliyun.com/product/dataworks-public 进行查询
# OUTPUT TABLE
TABLE_NAME = "你的存储Table"
TABLE_PROJECT = "你的空间名称"
ODPS_ENDPOINT = "MaxCompute endpoint信息" #http://service.ap-southeast-1.maxcompute.aliyun.com/api
def album_list(client):
"""
功能:传入一个阿里client,读取album信息,并且用df格式化返回
client : OpenApiClient
return df: DataFrame
"""
#配置接口param参数
params = open_api_models.Params(
# API Name,
action='ListMetaCollections',
# API Version,
version='2020-05-18',
# Protocol,
protocol='HTTPS',
# HTTP Method,
method='POST',
auth_type='AK',
style='RPC',
# API PATH,
pathname=f'/',
# Request body content format,
req_body_type='json',
# Response body content format,
body_type='json'
)
queries = {}
queries['CollectionType'] = 'ALBUM' #请求类型是数据专辑
queries['PageSize']= '100'
runtime = util_models.RuntimeOptions()
request = open_api_models.OpenApiRequest(
query=OpenApiUtilClient.query(queries)
)
result = client.call_api(params, request, runtime)
df = pd.DataFrame.from_records( result["body"]["Data"]["CollectionList"]) #将专辑id整合成DataFrame之后进行返回
return df
def album_detail (album_id,client):
"""
function:
requst for the table list of the album by album id
request param:
* album_id : the id number of the album
* client : the client of the openAPI
return:
total_list : DataFrame the table list of the album(album id)
"""
params = open_api_models.Params(
# API Name,
action='ListMetaCollectionEntities',
# API Version,
version='2020-05-18',
# Protocol,
protocol='HTTPS',
# HTTP Method,
method='POST',
auth_type='AK',
style='RPC',
# API PATH,
pathname=f'/',
# Request body content format,
req_body_type='json',
# Response body content format,
body_type='json'
)
queries = {}
queries['CollectionQualifiedName'] = album_id #CollectionQualifiedName is the album id
queries['PageSize'] = 50
for i in range(0,300,50):
queries['NextToken'] = i
runtime = util_models.RuntimeOptions()
request = open_api_models.OpenApiRequest(
query=OpenApiUtilClient.query(queries)
)
result = client.call_api(params, request, runtime)
df = pd.DataFrame.from_records( result["body"]["Data"]["EntityList"]) # get the table list of the album(album id)
if i == 0 :
total_list = df
elif (len(df)==0) :
break
else :
total_list = pd.concat([total_list,df],ignore_index = True)
return total_list
def __main__():
#STEP 1 initialize client instance
config = open_api_models.Config(
access_key_id = ALIBABA_CLOUD_ACCESS_KEY_ID
,access_key_secret = ALIBABA_CLOUD_ACCESS_KEY_SECRET
)
config.endpoint = ALIBABA_CLOUD_ENDPOINT
client = OpenApiClient(config)
#STEP 2 get the whole album numbers
df_album = album_list(client)
albums = df_album[["QualifiedName","Name"]]
#STEP 3 requst each album by album id to get the table list and table name
albums_tables = pd.DataFrame()
for i in range(0,len(albums)):
album_id = albums.iloc[i,0]
album_name = albums.iloc[i,1]
album_detail_tables = album_detail(album_id,client)
album_detail_tables["album_id"] = album_id
album_detail_tables["album_name"] = album_name
#concat the whole information
albums_tables = pd.concat([albums_tables,album_detail_tables[["album_id","album_name","EntityContent","QualifiedName"]]],ignore_index=True)
#STEP 4 format the dataframe
albums_tables["entity_type"] = albums_tables["EntityContent"].apply(lambda x: x["entityType"])
albums_tables["entity_name"] = albums_tables["EntityContent"].apply(lambda x: x["name"])
albums_tables["project_name"] = albums_tables["EntityContent"].apply(lambda x: x["projectName"])
albums_tables["add_album_time"] = albums_tables["EntityContent"].apply(lambda x: x["addToCollectionTimestamp"])
albums_tables = albums_tables.drop(columns = ["EntityContent","QualifiedName"])
#STEP 5 insert the data into odps table
o = ODPS(access_id=ALIBABA_CLOUD_ACCESS_KEY_ID
,secret_access_key=ALIBABA_CLOUD_ACCESS_KEY_SECRET
,project = TABLE_PROJECT
,endpoint = ODPS_ENDPOINT
)
odps_df = DataFrame(albums_tables)
pt = 'ds=' + args['YYYY-MM-DD'] # read the dataworks params
odps_df.persist(name=TABLE_NAME,partition=pt,odps=o,create_partition=True)
#run
__main__()
Reference
- 阿里云,ListMetaCollections - 查询集合信息
https://help.aliyun.com/zh/dataworks/developer-reference/api-dataworks-public-2020-05-18-listmetacollections?spm=a2c4g.11186623.0.0.7acc43f9jyudaO
-
阿里云,ListMetaCollectionEntities - 查询集合中的实体
https://help.aliyun.com/zh/dataworks/developer-reference/api-dataworks-public-2020-05-18-listmetacollectionentities?spm=a2c4g.11186623.0.0.663143f9J7Ywoe