影刀RPA_启动任务api
#启动任务api
import requests
import json
from time import sleep
yingdao_Info={
"accessKeyId":"XXX",
"accessKeySecret":"XXX",
"scheduleUuid":'XXX',
"robotUuid1":"XXX",
"robotUuid2":"XXX"}
#1.获取token
def get_access_token():
url="https://api.yingdao.com/oapi/token/v2/token/create"
headers={
"Content-Type":"application/x-www-form-urlencoded"
}
params={
"accessKeyId":yingdao_Info["accessKeyId"],
"accessKeySecret":yingdao_Info["accessKeySecret"]
}
response = requests.post(url=url,headers=headers,params=params)
return response.json()['data']['accessToken']
# print(get_access_token())
#2、启动任务
def start_task():
url="https://api.yingdao.com/oapi/dispatch/v2/task/start"
headers={
"Authorization":f"Bearer {access_token}",
"Content-Type":"application/json"
}
body={
"scheduleUuid": yingdao_Info["scheduleUuid"],
"scheduleRelaParams":[{
"robotUuid":yingdao_Info["robotUuid1"],
"params":[{"name":"单号","value":"123456","type":"str"}]
},
{
"robotUuid":yingdao_Info["robotUuid2"],
"params":[{"name":"姓名","value":"小王","type":"str"}]
}]
}
response = requests.post(url=url,headers=headers,json=body)
print(f"影刀返回:\n{response.json()}")
return response.json()
#3、查询任务状态
def query_task():
url = "https://api.yingdao.com/oapi/dispatch/v2/task/query"
payload = json.dumps({
"taskUuid": taskUuid
})
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {access_token}'
}
response = requests.request("POST", url, headers=headers, data=payload)
# print(response.json())
return response.json()
access_token=get_access_token()
taskUuid=start_task()['data']['taskUuid']
while True:
result_data=query_task()
status_name=result_data['data']["statusName"]
sleep(5)
print(f"任务状态:{status_name}")
if status_name=="完成":
output_parames=result_data['data']["jobDataList"]
print(f"输出参数:{output_parames}")
break
elif status_name =="异常":
break