飞书API 2-7:如何将 MySQL 数据库的查询结果写入多维表(下)

news2025/4/8 19:45:30

一、引入

上一篇,解决了数据持续插入更新的问题。在一些场景下,如果数据量较大,需要跑多个任务调用接口插入,但是逐个跑任务又太久,又该怎么提高执行速度呢?
没错!就是多线程。

本文就来探讨下怎么使用多线程来完成多任务的更新操作。

二、测试使用多线程

本次使用 Python 的一个标准库concurrent,无序额外安装,直接调用即可。

2.1 任务正常执行

为了方便测试观察效果,这里使用上一篇最后代码中的insert_records()函数进行改装。参考如下:

import time,random

def insert_records(access_token,app_token,table_id,request_body,task_id):
    print('开始插入……',task_id)
    time.sleep(2)
    print('完成调用……',task_id)
    print(f"成功插入第 {task_id} 任务的数据。关联函数:insert_records。")
    return task_id

新增一个函数,用于多线程调用插入函数,创建 3 个 worker 跑任务,使用wait()方法等待任务都提交并执行结束,然后使用as_completed()方法获取执行结果,并打印:

from concurrent.futures import ThreadPoolExecutor, as_completed
import concurrent.futures

def multi_threading_task(access_token,app_token,table_id,ls_datas, exe_func):
    print('\n【多线程】开始将数据更新到飞书多维表...')
    with ThreadPoolExecutor(max_workers=3) as executor:
        print('任务数:%s' % len(ls_datas))
        all_task = {executor.submit(exe_func, access_token,app_token,table_id,ls_datas[task_id], task_id): task_id for task_id in range(len(ls_datas))}
        concurrent.futures.wait(all_task)

        for future in as_completed(all_task):
            task_id = future.result()
            print('返回结果:%s' % task_id)

使用以下代码调用上面 2 个函数。

access_token = ''
app_token = ''
table_id = ''
ls_datas = [i for i in range(5)]
exe_func = insert_records
multi_threading_task(access_token,app_token,table_id,ls_datas, exe_func)

执行结果如下:
image.png

2.2 异常监控处理

在实际生产过程可能会出现任务异常,一般会有两种异常:

  • 调用接口失败:如access_token 无效等;
  • 调用接口成功,但是更新数据失败:如请求体错误等。

针对这两种情况,需要对执行的的任务进行监控,一旦发生异常需要对异常进行处理。前者可通过try…except…捕获错误,后者则通过返回的错误码(“code”)判断。
注意:第一种情况会抛出错误中断执行,第二类仅给错误信息,仍可继续执行。

在代码结构设计上,insert_records()函数依旧完成数据插入功能即可,而在多线程函数multi_threading_task()需要加上上面两种异常情况的处理,并且另外新增一个函数,用于处理失败的任务重跑。

完善insert_records()函数,模拟加入上面涉及的两种错误:

  • 第一类错误:获取当前秒数,等于30时抛出错误;
  • 第二类错误:给你一个列表,code 从列表中随机取值,当 code == 1 打印错误,需要将 code 返回,以便后续判断是否执行成功。
import time,random

def insert_records(access_token,app_token,table_id,request_body,task_id):
    print('开始插入……',task_id)
    time.sleep(2)
    # 模拟第一类错误,获取当前秒数,等于30时抛出错误
    seconds = int(time.time()) % 60
    if seconds == 30:
        print(seconds)
        raise '触发第一类错误:调用失败'
    else:
        print('完成调用……',task_id)
    # 模拟第二类错误:当 code == 1 打印错误
    code = random.choice([0,0,0,0,0,0,0,0,0,1])
    if code == 0:
        print(f"成功插入第 {task_id} 任务的数据。关联函数:insert_records。")
    else:
        print('触发第二类错误:更新失败',task_id)
    return task_id, code

迭代多线程函数,使用try…except…捕获第一类错误,通过返回的错误码判断,获取第二类错误,并将这两类错误的任务 ID 和对应的请求体数据记录下来,然后调用失败重跑函数。

from concurrent.futures import ThreadPoolExecutor, as_completed
import concurrent.futures

def multi_threading_task(access_token,app_token,table_id,ls_datas, exe_func):
    print('\n【多线程】开始将数据更新到飞书多维表...')
    failed_tasks = {}		# 用于记录失败的任务,结构{task_id:ls_datas[task_id]}
    with ThreadPoolExecutor(max_workers=3) as executor:
        print('任务数:%s' % len(ls_datas))
        
        all_task = {executor.submit(exe_func, access_token,app_token,table_id,ls_datas[task_id], task_id): task_id for task_id in range(len(ls_datas))}
        concurrent.futures.wait(all_task)

        for future in as_completed(all_task):
            task_id = future.result()
            print('返回结果:%s' % task_id)
            try:
                task_id, code = future.result()
                #飞书返回错误码,不报错,需要加一层判断。
                if code == 0:
                    print(f"任务 {task_id} 成功")
                elif res_code != 0:
                    failed_tasks[task_id] = ls_datas[task_id]
                    print(f'任务 {task_id} 失败,飞书返回的code:{code}。')
            except Exception as e:
                task_id = all_task[future]
                print(f"调用失败!!!任务 {task_id} 调用接口失败: {e}")
                failed_tasks[task_id] = ls_datas[task_id]
    redo_failed_tasks(access_token,app_token,table_id,failed_tasks, exe_func)
    
def rerun_failed_tasks(access_token,app_token,table_id,failed_tasks, exe_func, rerun_num=3):
    pass

事实上,失败重跑函数和multi_threading_task()的处理逻辑差不多,直接使用它来进行完善即可,不必重写一个函数,参考代码如下。

  • ls_datas是所有请求体数据,根据前面的处理逻辑,返回的结构是列表,每个元素都是一个请求体,视为一个任务。后续任务失败返回的结构是字典,键为任务 ID,值为请求体,保证任务 ID 的编号和原始请求体的编号一致。提交任务时,根据类型遍历取出每一个任务提交;
  • 为了给重跑任务不同的提示,新增两个分支判断,结合上一点,重跑的任务的标识为ls_datas为字典时;
  • 在最后加了一个重跑次数的设置,默认重跑 3 次。
from concurrent.futures import ThreadPoolExecutor, as_completed
import concurrent.futures

redo_cnt = 0

def multi_threading_task(access_token,app_token,table_id,ls_datas, exe_func,redo_num=3):
    global redo_cnt
    print('\n【多线程】开始将数据更新到飞书多维表...')
    print('---------------------------------------------------------')
    failed_tasks = {}		# 用于记录失败的任务,结构{task_id:ls_datas[task_id]}
    with ThreadPoolExecutor(max_workers=3) as executor:
        print('任务数:%s' % len(ls_datas))
        all_task = ''
        if isinstance(ls_datas,list):
            all_task = {executor.submit(exe_func, access_token,app_token,table_id,ls_datas[task_id], task_id): task_id for task_id in range(len(ls_datas))}
        elif isinstance(ls_datas,dict):
            all_task = {executor.submit(exe_func, access_token,app_token,table_id,ls_datas[task_id], task_id): task_id for task_id in ls_datas}
        concurrent.futures.wait(all_task)

        for future in as_completed(all_task):
            try:
                task_id, code = future.result()
                #飞书返回错误码,不报错,需要加一层判断。
                if isinstance(ls_datas,dict) and code == 0:
                    print(f"重跑任务 {task_id} 成功")
                elif isinstance(ls_datas,dict) and code != 0:
                    failed_tasks[task_id] = ls_datas[task_id]
                    print(f'重跑任务:{task_id} 依旧失败,飞书返回的code:{code}。')
                elif code == 0:
                    print(f"任务 {task_id} 成功")
                elif code != 0:
                    failed_tasks[task_id] = ls_datas[task_id]
                    print(f'任务 {task_id} 失败,飞书返回的code:{code}。')
            except Exception as e:
                task_id = all_task[future]
                print(f"调用失败!!!任务 {task_id} 调用接口失败: {e}")
                failed_tasks[task_id] = ls_datas[task_id]
    if failed_tasks:
        redo_cnt += 1
        if redo_cnt == redo_num + 1:
            key = ','.join([str(task_id) for task_id in failed_tasks.keys()])
            print('重跑次数超过3次,以下任务重跑三次依旧报错:',key)
            raise "重跑三次依旧报错"
        multi_threading_task(access_token,app_token,table_id,failed_tasks, exe_func)
        

2.3 测试代码小结和执行结果

将 2.2 代码整合,为了方便观察错误的发生,修改了触发两类错误的条件,使得发生的概率更大。

from concurrent.futures import ThreadPoolExecutor, as_completed
import concurrent.futures
import time,random

def insert_records(access_token,app_token,table_id,request_body,task_id):
    print('开始插入……',task_id)
    time.sleep(2)
    # 模拟第一类错误,获取当前秒数,等于30时抛出错误
    seconds = int(time.time()) % 60
    # if seconds == 30:
    if seconds in(28,29,30,31,32,33):
        print(seconds)
        raise '触发第一类错误:调用失败'
    else:
        print('完成调用……',task_id)
    # 模拟第二类错误:当 code == 1 打印错误
    code = random.choice([0,0,0,1,1,1,1,1,1,1])
    if code == 0:
        print(f"成功插入第 {task_id} 任务的数据。关联函数:insert_records。")
    else:
        print('触发第二类错误:更新失败',task_id)
    return task_id, code

def multi_threading_task(access_token,app_token,table_id,ls_datas, exe_func,redo_num=3):
    global redo_cnt
    print('\n【多线程】开始将数据更新到飞书多维表...')
    print('---------------------------------------------------------')
    failed_tasks = {}		# 用于记录失败的任务,结构{task_id:ls_datas[task_id]}
    with ThreadPoolExecutor(max_workers=3) as executor:
        print('任务数:%s' % len(ls_datas))
        all_task = ''
        if isinstance(ls_datas,list):
            all_task = {executor.submit(exe_func, access_token,app_token,table_id,ls_datas[task_id], task_id): task_id for task_id in range(len(ls_datas))}
        elif isinstance(ls_datas,dict):
            all_task = {executor.submit(exe_func, access_token,app_token,table_id,ls_datas[task_id], task_id): task_id for task_id in ls_datas}
        concurrent.futures.wait(all_task)

        for future in as_completed(all_task):
            try:
                task_id, code = future.result()
                #飞书返回错误码,不报错,需要加一层判断。
                if isinstance(ls_datas,dict) and code == 0:
                    print(f"重跑任务 {task_id} 成功")
                elif isinstance(ls_datas,dict) and code != 0:
                    failed_tasks[task_id] = ls_datas[task_id]
                    print(f'重跑任务:{task_id} 依旧失败,飞书返回的code:{code}。')
                elif code == 0:
                    print(f"任务 {task_id} 成功")
                elif code != 0:
                    failed_tasks[task_id] = ls_datas[task_id]
                    print(f'任务 {task_id} 失败,飞书返回的code:{code}。')
            except Exception as e:
                task_id = all_task[future]
                print(f"调用失败!!!任务 {task_id} 调用接口失败: {e}")
                failed_tasks[task_id] = ls_datas[task_id]
    if failed_tasks:
        redo_cnt += 1
        if redo_cnt == redo_num + 1:
            key = ','.join([str(task_id) for task_id in failed_tasks.keys()])
            print('重跑次数超过3次,以下任务重跑三次依旧报错:',key)
            raise "重跑三次依旧报错"
        multi_threading_task(access_token,app_token,table_id,failed_tasks, exe_func)


redo_num = 3
redo_cnt = 0
access_token = ''
app_token = ''
table_id = ''
ls_datas = [i for i in range(5)]
exe_func = insert_records
multi_threading_task(access_token,app_token,table_id,ls_datas, exe_func,redo_num)

上面代码某一次执行的完整执行记录参考如下:

  • 一开始所有任务都触发了第一类错误;
  • 第一次重跑:任务 0 触发了第二类错误,任务 1、2、4 触发了第一类错误,只有任务 3 重跑成功;
  • 第二次重跑:任务 0、1、2 触发了第二类错误,只有任务 4 重跑成功;
  • 第三次重跑:任务 0、1、2 触发了第二类错误;
  • 第四次重跑:保错,超过三次。
【多线程】开始将数据更新到飞书多维表...
---------------------------------------------------------
任务数:5
开始插入…… 0
开始插入…… 1
开始插入…… 2
29
29
29
开始插入…… 3
开始插入…… 4
31
31
调用失败!!!任务 1 调用接口失败: exceptions must derive from BaseException
调用失败!!!任务 4 调用接口失败: exceptions must derive from BaseException
调用失败!!!任务 2 调用接口失败: exceptions must derive from BaseException
调用失败!!!任务 0 调用接口失败: exceptions must derive from BaseException
调用失败!!!任务 3 调用接口失败: exceptions must derive from BaseException

【多线程】开始将数据更新到飞书多维表...
---------------------------------------------------------
任务数:5
开始插入…… 1
开始插入…… 4
开始插入…… 2
33
33
开始插入…… 0
开始插入…… 3
33
完成调用…… 0
完成调用…… 3
触发第二类错误:更新失败 0
成功插入第 3 任务的数据。关联函数:insert_records。
重跑任务:0 依旧失败,飞书返回的code:1。
调用失败!!!任务 1 调用接口失败: exceptions must derive from BaseException
调用失败!!!任务 4 调用接口失败: exceptions must derive from BaseException
重跑任务 3 成功
调用失败!!!任务 2 调用接口失败: exceptions must derive from BaseException

【多线程】开始将数据更新到飞书多维表...
---------------------------------------------------------
任务数:4
开始插入…… 0
开始插入…… 1
开始插入…… 4
完成调用…… 0
完成调用…… 4
成功插入第 4 任务的数据。关联函数:insert_records。
完成调用…… 1
开始插入…… 2
触发第二类错误:更新失败 1
触发第二类错误:更新失败 0
完成调用…… 2
触发第二类错误:更新失败 2
重跑任务:2 依旧失败,飞书返回的code:1。
重跑任务:1 依旧失败,飞书返回的code:1。
重跑任务:0 依旧失败,飞书返回的code:1。
重跑任务 4 成功

【多线程】开始将数据更新到飞书多维表...
---------------------------------------------------------
任务数:3
开始插入…… 2
开始插入…… 1
开始插入…… 0
完成调用…… 1
完成调用…… 2
触发第二类错误:更新失败 1
触发第二类错误:更新失败 2
完成调用…… 0
触发第二类错误:更新失败 0
重跑任务:2 依旧失败,飞书返回的code:1。
重跑任务:0 依旧失败,飞书返回的code:1。
重跑任务:1 依旧失败,飞书返回的code:1。
重跑次数超过3次,以下任务重跑三次依旧报错: 2,0,1
Traceback (most recent call last):
  File "g:\git\gitee\my_work\Python\feishu\test\test_mul_threading_task_2.0.py", line 72, in <module>
    multi_threading_task(access_token,app_token,table_id,ls_datas, exe_func,redo_num)
  File "g:\git\gitee\my_work\Python\feishu\test\test_mul_threading_task_2.0.py", line 62, in multi_threading_task
    multi_threading_task(access_token,app_token,table_id,failed_tasks, exe_func)
  File "g:\git\gitee\my_work\Python\feishu\test\test_mul_threading_task_2.0.py", line 62, in multi_threading_task
    multi_threading_task(access_token,app_token,table_id,failed_tasks, exe_func)
  File "g:\git\gitee\my_work\Python\feishu\test\test_mul_threading_task_2.0.py", line 62, in multi_threading_task
    multi_threading_task(access_token,app_token,table_id,failed_tasks, exe_func)
  File "g:\git\gitee\my_work\Python\feishu\test\test_mul_threading_task_2.0.py", line 61, in multi_threading_task
    raise "重跑三次依旧报错"
TypeError: exceptions must derive from BaseException

三、飞书多维表执行多线程任务

将上面标题二的代码逻辑迁移到飞书多维表的更新上,实现多线程跑任务更新飞书多维表。
具体的改动包含三个地方:

  • 三个变更函数:insert_records()update_records()delete_records()
  • 新增多线程函数multi_threading_task()
  • 最后调用的时候,直接调用多线程函数。

第一处改动:新增参数task_id;数据变更失败时,不要直接抛出错误,改为打印异常信息;返回任务 ID 和状态码。

第二处改动:直接新增多线程函数即可。

第三次改动:main()函数中倒数4行中,遍历调用变更函数(insert_records()update_records()delete_records())的代码修改为调用multi_threading_task(),直接将请求体列表传递即可。另外一个注意点是统计重跑次数的变量redo_cnt,或在main()中定义,然后使用global redo_cnt声明为全局变量,或在函数外直接定义一个全局的redo_cnt变量。
注:以下代码仅展示改动部分,不能直接执行。

from concurrent.futures import ThreadPoolExecutor, as_completed
import concurrent.futures


# ------------------------------------------------------------------------------------------------------------------------
# 第一处改动
def insert_records(access_token,app_token,table_id,request_body,task_id):
    url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records/batch_create"
    
    payload =  json.dumps(request_body).replace(': NaN',': null')
    
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {access_token}'
    }

    response = requests.request("POST", url, headers=headers, data=payload)
    code = response.json()['code']
    if code == 0:
        len_record = len(request_body["records"])
        print(f"成功插入 {len_record} 数据。关联函数:insert_records。")
    else:
        msg = response.json().get("msg")
        print(f"插入数据失败,失败信息:{msg}。关联函数:insert_records。")
    return task_id, code

def update_records(access_token,app_token,table_id,request_body,task_id):
    url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records/batch_update"
    payload =  json.dumps(request_body).replace(': NaN',': null')

    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {access_token}'
    }

    response = requests.request("POST", url, headers=headers, data=payload)
    code = response.json()['code']
    if code == 0:
        len_record = len(request_body["records"])
        print(f"成功更新 {len_record} 条数据。关联函数:update_records。")
    else:
        msg = response.json().get("msg")
        print(f"更新数据失败,失败信息:{msg}。关联函数:update_records。")
    return task_id, code

def delete_records(access_token,app_token,table_id,request_body,task_id):
    url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records/batch_delete"
    payload =  json.dumps(request_body)

    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {access_token}'
    }

    response = requests.request("POST", url, headers=headers, data=payload)
    code = response.json()['code']
    if code == 0:
        len_record = len(request_body["records"])
        print(f"成功删除 {len_record} 数据。关联函数:delete_records。")
    else:
        msg = response.json().get("msg")
        print(f"更新数据失败,失败信息:{msg}。关联函数:delete_records。")
    return task_id, code

# ------------------------------------------------------------------------------------------------------------------------
# 第二处改动
def multi_threading_task(access_token,app_token,table_id,ls_datas, exe_func,redo_num=3):
    global redo_cnt
    print('\n【多线程】开始将数据更新到飞书多维表...')
    print('---------------------------------------------------------')
    failed_tasks = {}		# 用于记录失败的任务,结构{task_id:ls_datas[task_id]}
    with ThreadPoolExecutor(max_workers=3) as executor:
        print('任务数:%s' % len(ls_datas))
        all_task = ''
        if isinstance(ls_datas,list):
            all_task = {executor.submit(exe_func, access_token,app_token,table_id,ls_datas[task_id], task_id): task_id for task_id in range(len(ls_datas))}
        elif isinstance(ls_datas,dict):
            all_task = {executor.submit(exe_func, access_token,app_token,table_id,ls_datas[task_id], task_id): task_id for task_id in ls_datas}
        concurrent.futures.wait(all_task)

        for future in as_completed(all_task):
            try:
                task_id, code = future.result()
                #飞书返回错误码,不报错,需要加一层判断。
                if isinstance(ls_datas,dict) and code == 0:
                    print(f"重跑任务 {task_id} 成功")
                elif isinstance(ls_datas,dict) and code != 0:
                    failed_tasks[task_id] = ls_datas[task_id]
                    print(f'重跑任务:{task_id} 依旧失败,飞书返回的code:{code}。')
                elif code == 0:
                    print(f"任务 {task_id} 成功")
                elif code != 0:
                    failed_tasks[task_id] = ls_datas[task_id]
                    print(f'任务 {task_id} 失败,飞书返回的code:{code}。')
            except Exception as e:
                task_id = all_task[future]
                print(f"调用失败!!!任务 {task_id} 调用接口失败: {e}")
                failed_tasks[task_id] = ls_datas[task_id]
    if failed_tasks:
        redo_cnt += 1
        if redo_cnt == redo_num + 1:
            key = ','.join([str(task_id) for task_id in failed_tasks.keys()])
            print('重跑次数超过3次,以下任务重跑三次依旧报错:',key)
            raise "重跑三次依旧报错"
        multi_threading_task(access_token,app_token,table_id,failed_tasks, exe_func)

# ------------------------------------------------------------------------------------------------------------------------
# 第三处改动
def main(bitable_url, connect_info, sql, fields_type, foreign_key, data_type=1, page_size=500):
    """前面部分的调用不变,注释掉倒数4行,新增以下未注释代码"""
    # if ls_cre:[insert_records(access_token,app_token,table_id,cre_body) for cre_body in ls_cre]
    # if ls_ups:[update_records(access_token,app_token,table_id,ups_body) for ups_body in ls_ups]
    # if ls_del:[delete_records(access_token,app_token,table_id,del_body) for del_body in ls_del]
    # print('更新完成。关联函数:main')
    global redo_cnt
    redo_cnt = 0
    redo_num = 3
    if ls_cre:multi_threading_task(access_token,app_token,table_id,ls_cre,insert_records,redo_num)
    if ls_ups:multi_threading_task(access_token,app_token,table_id,ls_ups,update_records,redo_num)
    if ls_del:multi_threading_task(access_token,app_token,table_id,ls_del,delete_records,redo_num)
    print('更新完成。关联函数:main')

将改动的内容放到完整代码中,然后在多维表上面对数据进行调整以观察执行结果。
image.png

最终执行结果如下,本次执行未触发异常,如果你想测试一下异常情况,可以增加更多数据,然后跑多一些任务来观察。另外,飞书的 API 还是比较稳定,出错概率比较低。
image.png

注:完整代码我放百度网盘了,需要的同步可以后台回复【飞书27】领取。

四、小结

本文探讨了怎么通过多线程调用飞书 API,从而提高数据更新到飞书多维表的速度。主要通过模拟飞书任务进行测试多线程的功能,跑通之后再迁移到飞书多维表的更新代码上,完成迭代。

经过上中下三篇,终于将如何将 MySQL 数据库的查询结果写入多维表梳理完成。小结一下:

  • 上篇:只做插入功能,适合单次数据更新;
  • 中篇:新增更新和删除功能,基本满足了通用的业务场景;
  • 下篇:新增多线程功能,主要提高执行速度。

五、附录:完整代码

import requests
import json
import datetime
import pandas as pd
from sqlalchemy import create_engine, text
from urllib.parse import urlparse, parse_qs
from concurrent.futures import ThreadPoolExecutor, as_completed
import concurrent.futures

# 读取飞书多维表数据:参考《飞书API(7):MySQL 入库通用版本》
def get_table_params(bitable_url):
    # bitable_url = "https://feishu.cn/base/aaaaaaaa?table=tblccc&view=vewddd"
    parsed_url = urlparse(bitable_url)              #解析url:(ParseResult(scheme='https', netloc='feishu.cn', path='/base/aaaaaaaa', params='', query='table=tblccc&view=vewddd', fragment='')
    query_params = parse_qs(parsed_url.query)       #解析url参数:{'table': ['tblccc'], 'view': ['vewddd']}
    app_token = parsed_url.path.split('/')[-1]
    table_id = query_params.get('table', [None])[0]
    view_id = query_params.get('view', [None])[0]
    print(f'成功解析链接,app_token:{app_token},table_id:{table_id},view_id:{view_id}。关联方法:get_table_params。')
    return app_token, table_id, view_id

def get_tenant_access_token(app_id, app_secret):
    url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
    payload = json.dumps({
        "app_id": app_id,
        "app_secret": app_secret
    })
    headers = {'Content-Type': 'application/json'}
    response = requests.request("POST", url, headers=headers, data=payload)
    tenant_access_token = response.json()['tenant_access_token']
    print(f'成功获取tenant_access_token:{tenant_access_token}。关联函数:get_table_params。')
    return tenant_access_token

def get_bitable_datas(tenant_access_token, app_token, table_id, view_id, page_token='', page_size=20):
    url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records/search?page_size={page_size}&page_token={page_token}&user_id_type=user_id"
    payload = json.dumps({"view_id": view_id})
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {tenant_access_token}'
    }
    response = requests.request("POST", url, headers=headers, data=payload)
    print(f'成功获取page_token为【{page_token}】的数据。关联函数:get_bitable_datas。')
    return response.json()


def get_all_bitable_datas(tenant_access_token, app_token, table_id, view_id, page_token='', page_size=20):
    has_more = True
    feishu_datas = []
    while has_more:
        response = get_bitable_datas(tenant_access_token, app_token, table_id, view_id, page_token, page_size)
        if response['code'] == 0:
            page_token = response['data'].get('page_token')
            has_more = response['data'].get('has_more')
            # print(response['data'].get('items'))
            # print('\n--------------------------------------------------------------------\n')
            feishu_datas.extend(response['data'].get('items'))
        else:
            raise Exception(response['msg'])
    print(f'成功获取飞书多维表所有数据,返回 feishu_datas。关联函数:get_all_bitable_datas。')
    return feishu_datas

# ------------------------------------------------------------------------------------------------------------------------
# 读取数据库数据,参考《飞书API 2-5:如何将 MySQL 数据库的查询结果写入多维表(上)》

def get_datas(sql, connect_info, fields_type):
    # connect_info = 'mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8'\
    # .format("your_user_name", "your_password", "127.0.0.1", "3306","my_datas")
    engine = create_engine(connect_info)
    result = pd.read_sql(sql, engine)
    result = result.astype(fields_type,errors='ignore')
    # result.info()
    return result

# def format_to_reqbody(df,change_size=500):
#     df_dict = df.to_dict(orient='records')
#     datas_field = [{'fields' : data} for data in df_dict]

#     df_ls = []
#     for i in range(0,len(datas_field), change_size):
#         data_records = {"records": datas_field[i:i+change_size]}
#         # data_to_table = json.dumps(data_records).replace(': NaN',': null')
#         df_ls.append(data_records)
    
#     print('数据格式化并切割:切割数据集为 %s 份。关联函数:format_to_reqbody' % (len(df_ls)))
#     return df_ls


# ------------------------------------------------------------------------------------------------------------------------------

# 新增格式化、分类与分组
def format_db_datas(df_db, foreign_key):
    """格式化数据库数据:整理为 API 所需格式,并将 外键 提取出来"""
    # 处理为结构 [{foreign_key: <外键值>,'fields' : <所有明细数据>]
    datas_ls = df_db.to_dict(orient='records')
    datas_field = [{foreign_key: data[foreign_key],'fields' : data} for data in datas_ls]
    # 转为DataFrame,以便下 Python SQL 处理
    df_db = pd.DataFrame(datas_field)
    print(f'成功提取【数据库】的关键字段:fields, {foreign_key}。方法:format_db_datas')
    return df_db

def format_feishu_datas(feishu_datas, foreign_key, data_type=1):
    """格式化飞书数据:提取关键字段:外键 和 record_id"""
    if feishu_datas == []:
        #如果飞书多维表没有数据,返回无值的DataFrame
        df_tb = pd.DataFrame(columns=['record_id', foreign_key])
    else:
        df_tb = pd.DataFrame(feishu_datas)
        
        if data_type == 1:
            df_tb[foreign_key] = df_tb['fields'].map(lambda x: x.get(foreign_key)[0]['text'] if x.get(foreign_key) else None)
        elif data_type  in (2, 3, 13, 5):  #数字、单选、手机号、日期
            df_tb[foreign_key] = df_tb['fields'].map(lambda x: x.get(foreign_key) if x.get(foreign_key) else None)
        else:
            raise '暂不支持改类型!'

        # 实际生产过程,可能新增行行为。
        df_tb_ok = df_tb[~df_tb[foreign_key].isna()]
        df_tb_ok = df_tb_ok[['record_id', foreign_key]]
    invalid_num = df_tb[df_tb[foreign_key].isna()].shape[0]
    print(f'成功提取【飞书表单】的关键字段:record_id, {foreign_key}。发现无效数据 {invalid_num} 条。方法:format_feishu_datas')
    return df_tb_ok

def classify_datas(df_from, df_join, on=None, left_on=None, right_on=None):
    """将两个[{},{}]或 DF 结构的数据分类。ls_from 为数据库数据,ls_join 为多维表数据"""    
    if on is not None:
        df_cre = df_from.merge(df_join, how='left' , on=on).query('record_id != record_id')[['fields']] # 等同于'record_id.isnull()',因为 NaN != NaN
        df_ups = df_from.merge(df_join, how='inner', on=on)[['record_id', 'fields']]
        df_del = df_from.merge(df_join, how='right', on=on).query('fields != fields')[['record_id']] # 等同于'fields.isnull()',因为 NaN != NaN
    else:
        df_cre = df_from.merge(df_join, how='left' , left_on=left_on, right_on=right_on).query('record_id != record_id')[['fields']] # 等同于'record_id.isnull()',因为 NaN != NaN
        df_ups = df_from.merge(df_join, how='inner', left_on=left_on, right_on=right_on)
        df_del = df_from.merge(df_join, how='right', left_on=left_on, right_on=right_on).query('fields != fields')[['record_id']] # 等同于'fields.isnull()',因为 NaN != NaN

    print('数据分类:新增数据集(df_cre)的数据量为 %s, 更新数据集(df_ups)的数据量为 %s, 删除数据集(df_ups)的数据量为 %s。方法:classify_datas' % (df_cre.shape[0], df_ups.shape[0], df_del.shape[0]))
    return df_cre, df_ups, df_del

def cut_datas(df,mode='dict',page_size=500):
    """
    按指定尺寸切分数据,并转为API要求格式
    切割数据:接口数据上限500
    mode: 模式,默认是字典格式 dict(插入/更新结构),即 DF.to_dict(orient='records')。还支持 list(删除结构),即 Series.to_list()
    """
    df_ls = []
    for i in range(0,df.shape[0], page_size):
        if mode=='dict':
            data_records = df.iloc[i:i+page_size].to_dict(orient='records')
        elif mode=='list':
            data_records = df.iloc[i:i+page_size].record_id.to_list()
        #插入和更新都使用该结构
        # data_to_table = json.dumps({"records": data_records}).replace('NaN','null')
        # data_to_table = json.dumps({"records": data_records}).replace(': NaN',': null')  # 使用re也可以,不过大材小用了。
        data_to_table = {"records": data_records}
        # _data_to_table = json.dumps({"records": data_records})
        # data_to_table = re.sub(r': NaN', ': null', _data_to_table)
        df_ls.append(data_to_table)
    print('数据切割:切割数据集为 %s 份。方法:datas_processing.cut_dataset' % (len(df_ls)))
    return df_ls

# -----------------------------------------------------------------------------------------------------------------------
# 第一处改动
def insert_records(access_token,app_token,table_id,request_body,task_id):
    url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records/batch_create"
    
    payload =  json.dumps(request_body).replace(': NaN',': null')
    
    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {access_token}'
    }

    response = requests.request("POST", url, headers=headers, data=payload)
    code = response.json()['code']
    if code == 0:
        len_record = len(request_body["records"])
        print(f"成功插入 {len_record} 数据。关联函数:insert_records。")
    else:
        msg = response.json().get("msg")
        print(f"插入数据失败,失败信息:{msg}。关联函数:insert_records。")
    return task_id, code

def update_records(access_token,app_token,table_id,request_body,task_id):
    url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records/batch_update"
    payload =  json.dumps(request_body).replace(': NaN',': null')

    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {access_token}'
    }

    response = requests.request("POST", url, headers=headers, data=payload)
    code = response.json()['code']
    if code == 0:
        len_record = len(request_body["records"])
        print(f"成功更新 {len_record} 条数据。关联函数:update_records。")
    else:
        msg = response.json().get("msg")
        print(f"更新数据失败,失败信息:{msg}。关联函数:update_records。")
    return task_id, code

def delete_records(access_token,app_token,table_id,request_body,task_id):
    url = f"https://open.feishu.cn/open-apis/bitable/v1/apps/{app_token}/tables/{table_id}/records/batch_delete"
    payload =  json.dumps(request_body)

    headers = {
        'Content-Type': 'application/json',
        'Authorization': f'Bearer {access_token}'
    }

    response = requests.request("POST", url, headers=headers, data=payload)
    code = response.json()['code']
    if code == 0:
        len_record = len(request_body["records"])
        print(f"成功删除 {len_record} 数据。关联函数:delete_records。")
    else:
        msg = response.json().get("msg")
        print(f"更新数据失败,失败信息:{msg}。关联函数:delete_records。")
    return task_id, code

# ------------------------------------------------------------------------------------------------------------------------
# 第二处改动
def multi_threading_task(access_token,app_token,table_id,ls_datas, exe_func,redo_num=3):
    global redo_cnt
    print('\n【多线程】开始将数据更新到飞书多维表...')
    print('---------------------------------------------------------')
    failed_tasks = {}		# 用于记录失败的任务,结构{task_id:ls_datas[task_id]}
    with ThreadPoolExecutor(max_workers=3) as executor:
        print('任务数:%s' % len(ls_datas))
        all_task = ''
        if isinstance(ls_datas,list):
            all_task = {executor.submit(exe_func, access_token,app_token,table_id,ls_datas[task_id], task_id): task_id for task_id in range(len(ls_datas))}
        elif isinstance(ls_datas,dict):
            all_task = {executor.submit(exe_func, access_token,app_token,table_id,ls_datas[task_id], task_id): task_id for task_id in ls_datas}
        concurrent.futures.wait(all_task)

        for future in as_completed(all_task):
            try:
                task_id, code = future.result()
                #飞书返回错误码,不报错,需要加一层判断。
                if isinstance(ls_datas,dict) and code == 0:
                    print(f"重跑任务 {task_id} 成功")
                elif isinstance(ls_datas,dict) and code != 0:
                    failed_tasks[task_id] = ls_datas[task_id]
                    print(f'重跑任务:{task_id} 依旧失败,飞书返回的code:{code}。')
                elif code == 0:
                    print(f"任务 {task_id} 成功")
                elif code != 0:
                    failed_tasks[task_id] = ls_datas[task_id]
                    print(f'任务 {task_id} 失败,飞书返回的code:{code}。')
            except Exception as e:
                task_id = all_task[future]
                print(f"调用失败!!!任务 {task_id} 调用接口失败: {e}")
                failed_tasks[task_id] = ls_datas[task_id]
    if failed_tasks:
        redo_cnt += 1
        if redo_cnt == redo_num + 1:
            key = ','.join([str(task_id) for task_id in failed_tasks.keys()])
            print('重跑次数超过3次,以下任务重跑三次依旧报错:',key)
            raise "重跑三次依旧报错"
        multi_threading_task(access_token,app_token,table_id,failed_tasks, exe_func)

def get_tenant_access_token(app_id, app_secret):
    url = "https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal"
    payload = json.dumps({
        "app_id": app_id,
        "app_secret": app_secret
    })
    headers = {'Content-Type': 'application/json'}
    response = requests.request("POST", url, headers=headers, data=payload)
    tenant_access_token = response.json()['tenant_access_token']
    print(f'成功获取tenant_access_token:{tenant_access_token}。关联函数:get_table_params。')
    return tenant_access_token


def main(bitable_url, connect_info, sql, fields_type, foreign_key, data_type=1, page_size=500):
    # 基本配置
    app_token, table_id, view_id = get_table_params(bitable_url)
    app_id = 'your_app_id'
    app_secret = 'your_app_secret'
    access_token = get_tenant_access_token(app_id, app_secret)    
    
    # 读取数据库数据并格式化
    df_db = get_datas(sql, connect_info, fields_type)
    df_from = format_db_datas(df_db, foreign_key)
    # 读取多维表数据并格式化
    feishu_datas = get_all_bitable_datas(access_token, app_token, table_id, view_id, page_size=page_size)
    df_join = format_feishu_datas(feishu_datas, foreign_key, data_type)
    # 数据分类
    df_cre, df_ups, df_del = classify_datas(df_from, df_join, on=foreign_key)

    ls_cre = cut_datas(df_cre,mode='dict',page_size=page_size)
    ls_ups = cut_datas(df_ups,mode='dict',page_size=page_size)
    ls_del = cut_datas(df_del,mode='list',page_size=page_size)
    
    global redo_cnt
    redo_cnt = 0
    redo_num = 3
    if ls_cre:multi_threading_task(access_token,app_token,table_id,ls_cre,insert_records,redo_num)
    if ls_ups:multi_threading_task(access_token,app_token,table_id,ls_ups,update_records,redo_num)
    if ls_del:multi_threading_task(access_token,app_token,table_id,ls_del,delete_records,redo_num)
    print('更新完成。关联函数:main')

if __name__ == '__main__':
    bitable_url = 'https://vl933ry4wy.feishu.cn/base/EPYFbi4ThahvLUsJ9nUchaXQnLh?table=tblnE4CHrysoKYNO&view=vewH9qJSRL'
    connect_info = 'mysql+pymysql://{}:{}@{}:{}/{}?charset=utf8'\
                .format("root", "123456", "127.0.0.1", "3306","my_datas")
    sql = '''
    select uo.user_id                               as "用户ID"
      ,u.nickname                                   as "昵称"
      ,(case u.sex when 0 then '' when 1 then '' else '未知' end) as "性别"
      ,u.mobile                                     as "手机号"
      ,u.city                                       as "城市"
      ,uo.id                                        as "订单号"
      ,uo.paid_time*1000                            as "下单时间"
      ,uo.amount/100                                as "下单金额"
    from my_datas.user_orders uo
    join my_datas.users u on u.id=uo.user_id
    where uo.production_id=10;
    '''
    fields_type = {"用户ID": int, "昵称": str, "性别": str, "手机号": str, "城市": str, "订单号": int, "下单时间": 'int64', "下单金额": float}
    foreign_key ='订单号'
    data_type = 2
    page_size = 2
    main(bitable_url, connect_info, sql, fields_type, foreign_key, data_type, page_size)

:::info
飞书API 2-7 大纲

  • 引入:问题:如果数据量比较大,如何提高速度?
  • 使用多线程
    • 多线程设置:先不加ID
    • 异常监控处理:加ID,以便识别处理
  • 小结
    :::

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1977770.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Nuxt】服务端渲染 SSR

SSR 概述 服务器端渲染全称是&#xff1a;Server Side Render&#xff0c;在服务器端渲染页面&#xff0c;并将渲染好HTML返回给浏览器呈现。 SSR应用的页面是在服务端渲染的&#xff0c;用户每请求一个SSR页面都会先在服务端进行渲染&#xff0c;然后将渲染好的页面&#xf…

STM32 | ADC+RS485(第十天)

点击上方"蓝字"关注我们 01、ADC概述 ADC, Analog-to-Digital Converter的缩写,指模/数转换器或者模拟/数字转换器。是指将连续变量的模拟信号转换为离散的数字信号的器件。真实世界的模拟信号.例如温度、压力、声音或者图像等,需要转换成更容易储存、处理和发射的…

【公考新手教程】公考新手小白备考规划

公考 公考相关考试国考省考 行测常识判断言语理解与表达数量关系判断推理&#xff08;重中之重&#xff09;图推定义判断类比逻辑判断 资料分析&#xff08;重中之重&#xff09; 申论&#xff08;很重要&#xff0c;提升困难&#xff09;公基推荐考公软件粉笔华图在线bilibili…

零基础开始学习鸿蒙开发-文章推荐栏获取接口数据并展示

目录 1.新建文章列表布局页面&#xff0c;通过静态数据&#xff0c;编写好布局页面。 1.1 通过行ArticleCard布局构建单个文章展示的item项 1.2 使用了ObjectLink装饰器&#xff08;尽管这不是ArkUI标准API的一部分&#xff0c;特定框架或自定义的扩展&#xff09;&#xff0c…

在自定义数据集上训练现有的Detectron2模型

这段内容介绍了将使用一个气球分割数据集&#xff08;仅包含一个类别&#xff1a;气球&#xff09;来训练一个气球分割模型。训练过程将以一个预训练在COCO数据集上的模型为基础&#xff0c;这些模型可以在Detectron2的模型库中获取。需要注意的是&#xff0c;COCO数据集本身并…

神仙公司名单(苏州)

神仙公司&#xff08;苏州&#xff09; 前几天虽辟谣了 苏州佳能 N12/2N12 的裁员赔偿&#xff0c;但也同时指出了苏州是外企偏多&#xff0c;就业环境和生活节奏平衡的好城市。 但提到苏州公司&#xff0c;大家第一印象是微软、思科 和 Zoom 此类以 WLB 闻名的明星企业。 确实…

阿里「轨迹可控版Sora」,告别「抽卡」,让视频生成更符合物理规律

目前&#xff0c;扩散模型能够生成多样化且高质量的图像或视频。此前&#xff0c;视频扩散模型采用 U-Net 架构 &#xff0c;主要侧重于合成有限时长&#xff08;通常约为两秒&#xff09;的视频&#xff0c;并且分辨率和纵横比受到固定限制。 Sora 的出现打破了这一限制&#…

【教学类-73-01】20240804镂空瓶子01

背景需求&#xff1a; 瓶子里的春天呀&#xff01; - 小红书 (xiaohongshu.com)https://www.xiaohongshu.com/explore/63ef87f8000000000703acae?app_platformandroid&ignoreEngagetrue&app_version8.47.0&share_from_user_hiddentrue&xsec_sourceapp_share&…

谷粒商城实战笔记-vagrant避坑指南

文章目录 一&#xff0c;虚拟机磁盘空间不足问题原因解决方案 二&#xff0c;虚拟机导致C盘空间不足 一&#xff0c;虚拟机磁盘空间不足 使用vagrant管理虚拟机的过程中遇到了一个问题&#xff0c;虚拟机安装完成后&#xff0c;很快磁盘dev/sda1就满了&#xff0c;40G的空间&a…

链表篇: 04-寻找两个链表的第一个公共结点

解题思路&#xff1a; 方案一&#xff1a;长度统计法 从题目中可以看到&#xff0c;两个链表有长度差&#xff0c;这里可以先让长度比较长的链表先把长度差走完&#xff0c;这里假设为 pHead1, 先让 pHead1 把长度差走完&#xff0c;之后让两个链表同时往后进行遍历&#xff…

01:【stm32】软件安装及stm32的简要介绍

软件安装及stm32的简要介绍 1、软件安装1.1、安装Keil5 MDK软件1.2、安装DFP1.3、安装ARMCC编译器1.4、安装ST-Link驱动1.5、程序下载 2、stm32的介绍 1、软件安装 1.1、安装Keil5 MDK软件 ①先在D盘新建一个名为Keil5的文件夹。然后在Keil5文件夹里面新建2个文件夹&#xff0…

Vue从入门到精通全网最强保姆级教程

Vue是什么&#xff1f;为什么要学习他 Vue是什么&#xff1f; Vue是前端优秀框架&#xff0c; 是一套用于构建用户界面的渐进式框架 为什么要学习Vue 1 Vue是目前前端最火的框架之一 2 Vue是目前企业技术栈中要求的知识点 3 Vue可以提升开发体验 4 Vue学习难度较低 5 ..…

使用GCC编译Notepad++的插件

Notepad的本体1是支持使用MSVC和GCC编译的2&#xff0c;但是Notepad插件的官方文档3里却只给出了MSVC的编译指南4。 网上也没有找到相关的讨论&#xff0c;所以我尝试在 Windows 上使用 MinGW&#xff0c;基于 GCC-8.1.0 的 posix-sjlj 线程版本5&#xff0c;研究一下怎么编译…

【Kubernetes】Deployment 的创建和使用(实战)

Deployment 的创建和使用 创建 deployment-demo.yaml 文件&#xff0c;并在其中输入以下内容&#xff1a; apiVersion: apps/v1 kind: Deployment metadata:name: deployment-demolabels:app: nginx spec:replicas: 3selector:matchLabels:app: nginxtemplate:metadata:labels…

Go语言加Vue3零基础入门全栈班15 gin+gorm+vue3用户管理系统实战录播课 2024年08月04日 课程笔记

预览 登录页面&#xff1a; 首页&#xff1a; 用户列表&#xff1a; 新增用户&#xff1a; 删除用户&#xff1a; 暗黑模式&#xff1a; 概述 如果您没有Golang的基础&#xff0c;应该学习如下前置课程。 01 Golang零基础入门课_20240726_149元02 Golang面向对象…

3.11.样式迁移

样式迁移 ​ 使用卷积神经网络&#xff0c;自动的将一个图像中的风格应用在另一图像之上&#xff0c;即样式迁移(style transfer) ​ 为了完成这一过程&#xff0c;我们需要两张输入图像&#xff1a;一张是内容图像&#xff0c;一张是风格图像&#xff0c;随后使用神经网络修…

【Nuxt】内置组件和全局样式使用

内置组件 Nuxt3框架也提供一些内置的组件&#xff0c;常用的如下&#xff1a; SEO组件&#xff1a;Html、Body、Head、Title、Meta、Style、Link、NoScript、BaseNuxtWelcome:欢迎页面组件&#xff0c;该组件是nuxt/ui的部分NuxtLayout:是Nuxt自带的页面布局组件NuxtPage:是N…

《Windows API每日一练》24.1 WinSock简介

本节将逐一介绍WinSock的主要特性和组件&#xff0c;套接字、WinSock动态库的使用。 本节必须掌握的知识点&#xff1a; Windows Socket接口简介 Windows Socket接口的使用 第178练&#xff1a;网络时间校验 24.1.1 Windows Socket接口简介 ■以下是WinSock的主要特性和组件…

Nginx代理(反向代理详解)

概述 正向代理&#xff1a; 正向代理通常用于客户端需要访问外部网络资源&#xff0c;但出于安全或策略考虑&#xff0c;客户端无法直接访问这些资源。正向代理服务器位于客户端和目标服务器之间&#xff0c;客户端通过代理服务器发送请求&#xff0c;代理服务器再将请求转发…

云原生应用程序简介

云原生应用程序简介 提示 该内容摘自电子书《为 Azure 构建云原生 .NET 应用程序》&#xff0c;可在**.NET Docs**上获取&#xff0c;也可以免费下载 PDF 并离线阅读。 另一天&#xff0c;在办公室研究“下一件大事”。 你的手机响了。这是友好的招聘人员打来的&#xff0c;他每…