背景:
接上一篇 flask_apscheduler实现定时推送飞书消息,当检查出的异常结果比较多的时候,群里会有很多推送消息,一条条检查工作量会比较大,且容易出现遗漏。
现在需要将定时任务执行的结果记录到文件,最好是飞书的云文件中,通过分享云文档的方式分析给响应的人员。
功能:
飞书群机器人没有文件上传的的功能,满足这个功能需要使用飞书应用机器人。创建飞书应用后,需要完成机器人配置,以及上传文件的权限申请。
待使用的接口功能:
- 实现文件上传,参考文档。通过该接口实现将定时任务执行结果保存上传至飞书云文档。
2. 更新云文档权限设置,参考文档。修改上传至云文档的文件权限,使组织内成员可阅读。
实现:
-
实现效果:
-
功能代码:
# -*- coding:UTF-8 -*- """ @ProjectName : HotelGo2DelonixPmx @FileName : webhook @Description : 飞书消息推送 @Time : 2023/9/17 13:36 @Author : Qredsun """ import os import requests class FeishuApplication(): TENANT_ACCESS_TOKEN_URL = 'https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal' GET_USER_ID_URL = 'https://open.feishu.cn/open-apis/contact/v3/users/batch_get_id' IM_MESSAGES_URL = 'https://open.feishu.cn/open-apis/im/v1/messages' FILES_UPLOAD_URL = 'https://open.feishu.cn/open-apis/drive/v1/files/upload_all' DRIVE_FILES_URL = 'https://open.feishu.cn/open-apis/drive/v1/files' FILE_PERMISSION = 'https://open.feishu.cn/open-apis/drive/v2/permissions/token/public' CREATE_FOLDER = 'https://open.feishu.cn/open-apis/drive/v1/files/create_folder' def __init__(self, app_id, app_secret): self.app_id = app_id self.app_secret = app_secret self.get_tenant_access_token() self._url_prefix = None self._file_url_prefix = None def get_tenant_access_token(self): url = self.TENANT_ACCESS_TOKEN_URL data = { "app_id" : self.app_id, "app_secret": self.app_secret } response = requests.post(url, json=data) response.raise_for_status() res_data = response.json() if res_data: self._tenant_access_token = res_data['tenant_access_token'] self.headers = { 'Content-Type' : 'application/json', 'Authorization': f'Bearer {self._tenant_access_token}' } logger.debug(f'自建应用更新token成功') return self._tenant_access_token else: logger.error(f'自建应用获取token失败:{response.text}') return False def get_user_open_id(self, user_info): # 单用户id查询 url = self.GET_USER_ID_URL params = { "user_id_type": "open_id" } payload = { "emails" : [], "mobiles": [] } if '@' in user_info: payload["emails"].append(user_info) response = requests.post(url, headers=self.headers, params=params, json=payload) elif user_info.isalnum(): payload["mobiles"].append(user_info) response = requests.post(url, headers=self.headers, params=params, json=payload) response.raise_for_status() res_data = response.json() if res_data: self.open_id = res_data['data']["user_list"][0]["user_id"] return self.open_id else: logger.error(f'获取用户{user_info} open_id 失败:{response.text}') return None def send_single_message(self, msg = "single chat msg", open_id = ''): if not open_id: logger.error('缺少对话用户 open_id ') return url = self.IM_MESSAGES_URL params = { "receive_id_type": "open_id" } msgContent = { "text": msg } req = { "receive_id": open_id, # chat id "msg_type" : "text", "content" : json.dumps(msgContent) } payload = json.dumps(req) response = requests.request("POST", url, params=params, headers=self.headers, data=payload) response.raise_for_status() res_data = response.json() if res_data: self.open_id = res_data['data']["chat_id"] return True else: logger.error(f'给用户 {self.open_id} 发送消息失败:{response.text}') return False def remove_file_or_folder(self, file_token, file_type='file'): url = self.DRIVE_FILES_URL url += f'/{file_token}' payload = '' params = { 'type':file_type } response = requests.request("DELETE", url, headers=self.headers, params=params, data=payload) response.raise_for_status() result = response.json() if result.get("code") and result.get("code") != 0: logger.error(f'移除文件失败:{response.text}') return False else: logger.debug(f'移除文件成功:{response.text}') return True def update_permissions(self, folder_token = '', file_type='file'): url = self.FILE_PERMISSION url = url.replace('token', folder_token) params = { 'type': file_type } payload = json.dumps({ "comment_entity" : "anyone_can_view", "copy_entity" : "anyone_can_view", "external_access_entity" : "open", "link_share_entity" : "tenant_editable", "manage_collaborator_entity": "collaborator_can_view", "security_entity" : "anyone_can_view", "share_entity" : "anyone" }) response = requests.request("PATCH", url, headers=self.headers, data=payload, params=params) response.raise_for_status() result = response.json() if result.get("code") and result.get("code") != 0: logger.error(f'更新文件权限失败:{response.text}') return False else: logger.debug(f'更新文件权限成功:{response.text}') return True """上传文件""" def upload_file(self, file_path = "../data/result/23_09_25_订房检查任务.xlsx", parent_node = 'ErVlfbxP8lqZ1sdMIWkc11TQn8g'): if not os.path.isfile(file_path): logger.error(f'{file_path} 文件路径没有指定特定文件') return url = self.FILES_UPLOAD_URL file_size = os.path.getsize(file_path) file_name = os.path.basename(file_path) payload = { 'file_name' : file_name, 'parent_type': 'explorer', 'parent_node': parent_node, 'size' : f'{file_size}' } files = [ ('file', (file_name, open(os.path.abspath(file_path), 'rb'), 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')) ] headers = { 'Authorization': self.headers['Authorization'] } resp = requests.request("POST", url, headers=headers, data=payload, files=files) resp.raise_for_status() result = resp.json() if result.get("code") and result.get("code") != 0: logger.error(f'文件上传失败:{resp.text}') return False else: file_token = result['data']['file_token'] logger.debug(f'文件上传成功:{resp.text}') return file_token """获取文件夹下的清单""" def expoler(self, direction = 'DESC', order_by = 'EditedTime'): url = self.DRIVE_FILES_URL params = { 'direction': direction, 'order_by' : order_by } resp = requests.request("GET", url, headers=self.headers, params=params) resp.raise_for_status() result = resp.json() if result.get("code") and result.get("code") != 0: logger.error(f'获取云空间列表失败:{resp.text}') return None else: self.files = result['data']['files'] self.update_url_prefix() logger.debug(f'获取云空间列表成功: {self.files}') return self.files """新建文件夹""" def create_folder(self, folder_name = "", folder_token = ""): url = self.CREATE_FOLDER payload = { "folder_token": folder_token, "name" : folder_name } resp = requests.request("POST", url, headers=self.headers, json=payload) resp.raise_for_status() result = resp.json() if result.get("code") and result.get("code") != 0: logger.error(f'新建文件夹失败:{resp.text}') return None else: self.folder_token = result['data']['token'] logger.debug(f'新建文件夹成功: {self.folder_token}') folder_url = result['data']['url'] start_index = folder_url.find('//') + 2 r_index = folder_url.find('/', start_index) + 1 self._url_prefix = folder_url[:r_index] logger.debug(f'更新应用地址前缀:{self._url_prefix}') self._file_url_prefix = self._url_prefix + 'file/' logger.debug(f'更新云文件前缀:{self._file_url_prefix}') return self.folder_token def update_url_prefix(self): for obj in self.files: if obj['type'] == 'folder': obj_url = obj['url'] start_index = obj_url.find('//') + 2 r_index = obj_url.find('/', start_index) + 1 self._url_prefix = obj_url[:r_index] logger.debug(f'更新应用地址前缀:{self._url_prefix}') self._file_url_prefix = self._url_prefix + 'file/' logger.debug(f'更新云文件前缀:{self._file_url_prefix}') break return self._url_prefix def upload_schedule_result(upload_file, app_id, app_secret): robot = FeishuApplication(app_id, app_secret) default_folder = 'schedule_demo' file_path = upload_file parent_node = '' robot.expoler() if not robot.files.__len__(): # 创建文件夹 result = robot.create_folder(default_folder) if result: parent_node = result else: for file in robot.files: if default_folder == file['name']: parent_node = file['parent_token'] parent_node = file['token'] break # 移除文件 robot.remove_file_or_folder('O3MgbgYKgo7NgtxUNc4cqkQZnWe') upload_file_token = robot.upload_file(file_path=file_path, parent_node=parent_node) if upload_file_token: result = robot.update_permissions(upload_file_token) if result: file_url = f'{robot._file_url_prefix}{upload_file_token}' logger.debug(f'待分享的文件url: {file_url}') else: file_url = '' logger.debug('上传结果至飞书失败') return file_url if __name__ == '__main__': upload_file = "../data/result/23_09_24_订房检查任务.xlsx" app_id = 'XXXX' app_secret = 'XXX' upload_schedule_result(upload_file, app_id, app_secret)