Python - 深度学习系列38 重塑实体识别5-预测并行化改造

news2025/1/10 21:00:00

说明

在重塑实体识别4中梳理了数据流,然后我发现pipeline的串行效率太低了,所以做了并行化改造。里面还是有不少坑的,记录一下。

内容

1 pipeline

官方的pipeline看起来的确是比较好用的,主要是实现了比较好的数据预处理。因为在训练/使用过程中都要进行数据的令牌化与反令牌化,有些字符会被特殊处理,例如 '##A’等。
在这里插入图片描述
在使用过程中,我用200条新闻数据进行测试,用pipeline方法花了11分钟处理完毕,期间CUDA的使用率大约为10%。按此估算,即使用多接口并行的方式,那么一分钟最多处理2000条,一天最多处理0.14*2000~30万条数据。这个效率太低了。

2 并行化

最终的结论是不到30秒处理200条,显存只占用2.6G,理论上可以支持3个服务并行(以确保GPU的完全利用)。按最保守的估计,改造后的并行化应该可以提升3倍的效率,稍微激进一点,可以提升10倍的效率。这个之后可以进行测试。

一些主要的点如下

2.1 结果解析

结果可以分为:

  • 1 仅含解析出的实体列表,用逗号连接字符串表示。
  • 2 含实体及其起始位置的表示,这个用于标注反馈、二次增强处理。
  • 3 仅含BIO标签,主要用于和测试数据进行效果比对。

对应的相关函数,看起来有点繁杂,我自己都不太想看第二眼。

from datasets import ClassLabel
# 定义标签列表
label_list = ['B', 'I', 'O']
# 创建 ClassLabel 对象
class_label = ClassLabel(names=label_list)
def convert_entity_label_batch(x):
    x1 = x
    return class_label.int2str(x1)
# 定义函数将整数Tensor转换为字符串 | 反令牌函数,但是用不上;因为predict label列表的长度和 ss_padding相同
def tensor_to_string(tensor, tokenizer = None , skip_special_tokens = True):
    return tokenizer.decode(tensor.tolist(), skip_special_tokens=skip_special_tokens).replace(' ','')

from datasets import ClassLabel
def detokenize(word_piece):
    """
    将 WordPiece 令牌还原为原始句子。
    """
    if word_piece.startswith('##'):
        x = word_piece[2:]
    else:
        x = word_piece
    return x
import re
def extract_bio_positions(bio_string):
    pattern = re.compile(r'B(I+)(O|$)')
    matches = pattern.finditer(bio_string)
    
    results = []
    for match in matches:
        start, end = match.span()
        results.append((start, end - 1))  # end-1 to include the last 'I'
    
    return results

# 0.1ms
def parse_ent_pos_map_batch(some_dict = None):

    word_list = some_dict['token_words']
    label_list = [int(x) for x in list(some_dict['label_list'])]

    min_len = min(len(word_list),len(label_list))
    word_list = word_list[:min_len]
    label_list = label_list[:min_len]
    

    label_list1 =  list(map(convert_entity_label_batch,label_list))
    oriword_list1 = list(map(detokenize,word_list))
    ori_word_str =''.join(oriword_list1)
    # 补到等长
    label_str = ''
    for i in range(len(label_list1)):
        len_of_ori_word = len(oriword_list1[i])
        if len_of_ori_word == 1:
            tem_str = label_list1[i]
        else:
            if label_list1[i] in ['I','O']:
                tem_str = label_list1[i] * len_of_ori_word
            else:
                tem_str = 'B' + 'I' * (len_of_ori_word -1)        
        label_str += tem_str

    pos_list = extract_bio_positions(label_str)
    part_ent_list = [(ori_word_str[x[0]:x[1]] , *x) for x in pos_list]
    return part_ent_list

# =============
def make_BIO_by_len(some_len):
    default_str = 'I' * some_len
    str_list = list(default_str)
    str_list[0] ='B'
    return str_list
def gen_BIO_list2(some_dict):
    the_content = some_dict['clean_data']
    ent_list =  some_dict['ent_tuple_list']

    content_list = list(the_content)
    tag_list = list('O'* len(content_list))
    
    for ent_info in ent_list:
        start = ent_info[1]
        end = ent_info[2]
        label_len = end-start
        tem_bio_list = make_BIO_by_len(label_len)
        tag_list[start:end] = tem_bio_list

    res_dict = {}
    res_dict['x'] = content_list
    res_dict['y'] = tag_list
    return res_dict

def trim_len(some_dict = None):
    padding_BIO = some_dict['padding_BIO']
    ss_len = some_dict['ss_len']
    return padding_BIO[:ss_len]

2.2 批量预测

看起来同样很繁杂,但是不得不细看。首先,数据会按照几个长度 20,50,198分为三部分处理,batch_predict每次仅处理一个批次。在这里,将数据转为定长的令牌长度,然后转入CUDA进行批量预测。结果再按照实体-位置 tuple, 实体列表和BIO三种方式进行解析。

from functools import partial
import transformers 
import torch 
from transformers import AutoModelForMaskedLM, AutoTokenizer,AutoModelForTokenClassification
from functools import partial
# some_batch 是原文经过padding的数据,['ss_hash','ss','ss_len', 'ss_padding'], 其中ss_padding的长度是固定的
# 模型文件和令牌文件都放在model_path之下,model比较大,避免重载;而tokenize会有padding过程,必须重载
# 模型先载入cuda
def batch_predict(some_batch, ss_padding_len = None, model = None, model_path = None):
    # 因为tokenize会在令牌的前后加上分隔令牌,所以+2
    if ss_padding_len is None:
        ss_padding_len = some_batch['ss_padding'].apply(len).max()
    print('ss_padding_len is %s ' % ss_padding_len)
    max_len = ss_padding_len+2
    tokenizer = AutoTokenizer.from_pretrained(model_path)
    tencoder = partial(tokenizer.encode,truncation=True, max_length=max_len, is_split_into_words=True, return_tensors="pt",  padding='max_length')
    some_batch['ss_padding_token'] = some_batch['ss_padding'].apply(list).apply(tencoder)

    # 构成矩阵
    minput = torch.cat(list(some_batch['ss_padding_token'].values))
    # 将数据搬到GPU中处理再返回
    with torch.no_grad():
        input_cuda = minput.to(device)
    
        outputs_cuda = model(input_cuda).logits
        predictions = torch.argmax(outputs_cuda, dim=2)
    
        predictions_list = list(predictions.to('cpu').numpy())
    
    predict_list1 = []
    for predictions in predictions_list:
        tem_pred_tag = [int(x) for x in predictions[1:-1]]
        predict_list1.append(tem_pred_tag)

    some_batch['label_list'] = predict_list1
    _s = cols2s(some_df =some_batch, cols= ['ss_padding','label_list'], cols_key_mapping= ['token_words', 'label_list'])
    _s1 = _s.apply(parse_ent_pos_map_batch)
    some_batch['ent_tuple_list'] = list(_s1)
    some_batch['ent_list'] = some_batch['ent_tuple_list'].apply(lambda x: ','.join([a[0] for a in x ]))

    _s = cols2s(some_batch, cols= ['ss_padding', 'ent_tuple_list'], cols_key_mapping= ['clean_data', 'ent_tuple_list'])
    s1 = _s.apply(gen_BIO_list2)
    ent_tuple_res_df1 = pd.DataFrame(s1.to_list())
    some_batch['padding_BIO'] = list(ent_tuple_res_df1['y'].apply(lambda x: ''.join(x)))
    _s00 = cols2s(some_batch, cols = ['ss_len', 'padding_BIO'], cols_key_mapping=['ss_len', 'padding_BIO'])
    some_batch['BIO'] = list(_s00.apply(trim_len))
    return some_batch    

3 迭代器

在推送数据处理时,可以采用迭代器来控制不同的批次数据

# 迭代器切分
import pandas as pd
class DataFrameBatchIterator:
    def __init__(self, dataframe, batch_size):
        self.dataframe = dataframe
        self.batch_size = batch_size
        # 【我增加的】
        self.fail_batch_list = []
    def __iter__(self):
        num_rows = len(self.dataframe)
        num_batches = (num_rows - 1) // self.batch_size + 1
        for i in range(num_batches):
            start_idx = i * self.batch_size
            end_idx = (i + 1) * self.batch_size
            batch_data = self.dataframe.iloc[start_idx:end_idx]
            yield batch_data
    # 【我增加的】
    def clear_fail(self):
        self.fail_batch_list = []
    # 【我增加的】
    def get_some_batch(self, batch_idx):
        return self.dataframe.iloc[self.batch_size * batch_idx: self.batch_size * (batch_idx + 1)]
    # 【我增加的】记录失败的批次
    def rec_fail_batch_idx(self, batch_idx):
        self.fail_batch_list.append(batch_idx)
# 创建一个示例 DataFrame
data = {'Name': ['John', 'Jane', 'Mike', 'Alice', 'Bob'],
        'Age': [25, 30, 35, 28, 32],
        'City': ['New York', 'Paris', 'London', 'Tokyo', 'Sydney']}
df = pd.DataFrame(data)
# 创建 DataFrame 迭代器
batch_iterator = DataFrameBatchIterator(df, batch_size=2)
import tqdm
# 使用迭代器逐批次处理数据
for i,batch in tqdm.tqdm(enumerate(batch_iterator)):
    try:
        # 在这里可以对当前批次的数据进行相应的操作
        # 例如进行数据清洗、特征处理、模型训练等
        # 示例:打印当前批次的数据
#         raise Exception(e) 
        print(batch)
    except:
        print('>>> %s Fail' % i)
        batch_iterator.rec_fail_batch_idx(i)

以下是实际的调度

# 假设处理长度为1万的句子
# 20 * 2000 ~ 4w
# 50 * 800 ~  4w
# 200 * 200 ~ 4w
import warnings 
warnings.filterwarnings('ignore')
batch_slice_para = {20:2000, 50:800, 200:200}
batch_len_list = sorted(list(batch_slice_para.keys()))
batch_len_list.insert(0,0)

batch_df_list = []
for i in range(len(batch_len_list)):
    if i >0:
        sel = (ss_df['ss_len'] >=batch_len_list[i-1]) & (ss_df['ss_len'] < batch_len_list[i])
        if sel.sum():
            padding_len = batch_len_list[i]
            padding_batch = batch_slice_para[padding_len]
            
            tem_df= ss_df[sel]
            # tem_df['ss_padding'] = tem_df['ss'].apply(lambda x: x.ljust(padding_len,'a'))
            tem_df['ss_padding'] = tem_df['ss']
            tem_df_iterator = DataFrameBatchIterator(tem_df, padding_batch)
            batch_df_list.append(tem_df_iterator)
        else:
            batch_df_list.append(None)

对每个批次执行处理,载入模型

label_list = ['B','I','O']
model_checkpoint = 'model03'
model = AutoModelForTokenClassification.from_pretrained(model_checkpoint, num_labels=len(label_list))

device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Device: %s' % device)
# 自动切换设备
if model.device.type != device:
    model.to(device)
    print('>>> 检测到模型设备与当前指定不一致,切换 %s' % device )
else:
    print('>>> 模型设备一致,不切换 %s' % device)

分批次预测(主要是确保显存不溢出)

batch_res_list = []
for some_iter in batch_df_list:
    for some_batch in some_iter:
        batch_res = batch_predict(some_batch, model = model, model_path = 'model03')
        batch_res_list.append(batch_res)

结果合并

batch_res_df = pd.concat(batch_res_list, ignore_index= True)
mdf = pd.merge(input_df , batch_res_df[['ss_hash', 'ent_list']],how='left', on ='ss_hash')

在这里插入图片描述

4 总结

一个在理论上证明可以显著提升效率的点在于,模型进行实体识别时先切分了短句,然后按短句进行了去重:相同短句的实体结果一定是相同的。

实验中,200条新闻产生了约5万个短句,去重后只剩下约3.5万。所以即使在这一步也是有提升的。当然,这种方式同样也可以被用于pipeline。

还有就是在处理填充时,并不按照最大长度统一填充。而是按照句子长度的统计特性分为了短、中、长三种方式。从统计上看,约70%的短句长度是在20个字符以内的,真正超过50个字符的短句(中间无分隔符),即使从语法上来看也是比较奇怪的。
这样在填充数据时浪费就比pipeline要小,同样显存可以装下更多的数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1788573.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【全网唯一】触摸精灵iOS版纯离线本地文字识别插件

目的 触摸精灵iOS是一款可以模拟鼠标和键盘操作的自动化工具。它可以帮助用户自动完成一些重复的、繁琐的任务&#xff0c;节省大量人工操作的时间。但触摸精灵的图色功能比较单一&#xff0c;无法识别屏幕上的图像&#xff0c;根据图像的变化自动执行相应的操作。本篇文章主要…

电脑没电关机,wsl和docker又挂了,附解决过程

如题&#xff0c;开了个会没带笔记本电源&#xff0c;点啊弄关机后docker打不开&#xff0c;我以为是docker坏了&#xff0c;结果docker报错&#xff1a; An unexpected error occurred while executing a WSL command. Either shut down WSL down with wsl --shutdown, and/or…

day32--Spring(一)

一、Spring简介 1 Spring课程介绍 问题导入 我们为什么要学习Spring框架&#xff1f; 1.1 为什么要学 Spring技术是JavaEE开发必备技能&#xff0c;企业开发技术选型命中率>90% 专业角度 简化开发&#xff0c;降低企业级开发的复杂性框架整合&#xff0c;高效整合其他技…

概率分布、回归分析、假设检验……用 DolphinDB 函数库快速实现概率统计分析

在金融和物联网等领域&#xff0c;概率统计与分析扮演着至关重要的角色。DolphinDB 作为一款强大的时序数据库&#xff0c;提供了一系列内置的概率统计与分析函数&#xff0c;能够满足用户的各种需求。 金融领域 风险管理&#xff1a;通过概率统计分析&#xff0c;金融机构可…

python数据分析——逻辑回归

参考资料&#xff1a;活用pandas库 逻辑回归 当响应变量为二值响应变量时&#xff0c;经常使用逻辑回归对数据建模。 # 导入pandas库 import pandas as pd # 导入数据集 acspd.read_csv(r"...\data\acs_ny.csv") # 展示数据列 print(acs.columns) # 展示数据集 pri…

进程间通信(27000字超详解)

&#x1f30e;进程间通信 文章目录&#xff1a; 进程间通信 进程间通信简介       进程间通信目的       初识进程间通信       进程间通信的分类 匿名管道通信       认识管道       匿名管道       匿名管道测试       管道的四种…

免费,Scratch蓝桥杯比赛历年真题--第15届蓝桥杯STEMA真题-2024年3月份(含答案解析和代码)

第15届蓝桥杯STEMA真题-2024年3月份 一、单选题 答案&#xff1a;D 解析&#xff1a;y坐标正值表示上&#xff0c;负值表示下&#xff0c;故答案为D。 答案&#xff1a;C 解析&#xff1a;18<25为真&#xff0c;或关系表示一真即为真&#xff0c;故答案为C。 答案&#xff…

2024蓝桥杯初赛决赛pwn题全解

蓝桥杯初赛决赛pwn题解 初赛第一题第二题 决赛getting_startedbabyheap 初赛 第一题 有system函数&#xff0c;并且能在bss上读入字符 而且存在栈溢出&#xff0c;只要过掉check函数即可 check函数中&#xff0c;主要是对system常规获取权限的参数&#xff0c;进行了过滤&…

linux部署运维3——centos7下导入导出mysql数据库的sql文件以及查询数据量最大的表信息

在实际项目开发或者项目运维过程中&#xff0c;数据库的导入导出操作比较频繁&#xff0c;如果可以借助第三方工具那当然算喜事一桩&#xff1b;但是如果不允许外部访问&#xff0c;那么就只能使用数据库自带的命令&#xff0c;也是相当方便的。 一.导入sql文件 1.在linux命令…

Asp.Net Core 实现分片下载的最简单方式

技术群里的朋友遇到了这个问题&#xff0c;起初的原因是他对文件增加了一个属性配置 fileResult.EnableRangeProcessing true;这个属性我从未遇到过&#xff0c;然后&#xff0c;去F1查看这个属性的描述信息也依然少的可怜&#xff0c;只有简单的描述为(获取或设置为 启用范围…

CCIG 2024:大模型技术及其前沿应用论坛深度解析

一、CCIG论坛介绍 中国图象图形大会&#xff08;CCIG 2024&#xff09;是一场备受瞩目的学术盛会&#xff0c;近期在陕西省西安市曲江国际会议中心举行。这次会议以“图聚智生&#xff0c;象合慧成”为主题&#xff0c;由中国图象图形学学会主办&#xff0c;旨在汇聚图像图形领…

一篇文章讲透数据结构之树and二叉树

一.树 1.1树的定义 树是一种非线性的数据结构&#xff0c;它是有n个有限结点组成的一个具有层次关系的集合。把它叫做树是因为它看起来像一棵倒挂的树&#xff0c;也就是说它是根在上&#xff0c;叶在下的。 在树中有一个特殊的结点&#xff0c;称为根结点&#xff0c;根结点…

从0开始制作微信小程序

目录 前言 正文 需要事先准备的 需要事先掌握的 什么是uniapp 平台应用的分类方式 什么是TypeScript 创建项目 项目文件作用 源码地址 尾声 &#x1f52d; Hi,I’m Pleasure1234&#x1f331; I’m currently learning Vue.js,SpringBoot,Computer Security and so on.&#x1…

大数据之CDH对Hdfs做Balance数据均衡/数据平衡/数据倾斜

问题的来源: 由于在hive工具运行sql,出现sql卡顿的情况,去cdh上查看yarn资源的分布情况,发现了整个cdh平台中hdfs和yarn资源分布不均匀,大量的爆红显示: 以下 DataNode 数据目录 位于小于其可用空间 10.0 吉字节 的文件系统中。 /data1/dfs/dn&#xff08;可用&#xff1a;7.2 …

(九)Spring教程——ApplicationContext中Bean的生命周期

1.前言 ApplicationContext中Bean的生命周期和BeanFactory中的生命周期类似&#xff0c;不同的是&#xff0c;如果Bean实现了org.springframework.context.ApplicationContextAware接口&#xff0c;则会增加一个调用该接口方法setApplicationContext()的步骤。 此外&#xff0c…

气膜建筑的施工对周边环境影响大吗?—轻空间

随着城市化进程的加快&#xff0c;建筑行业的快速发展也带来了环境问题。噪音、灰尘和建筑废料等对周边居民生活和生态环境造成了不小的影响。因此&#xff0c;选择一种环保高效的施工方式变得尤为重要。气膜建筑作为一种新兴的建筑形式&#xff0c;其施工过程对周边环境的影响…

python——网络编程

流程图 面向连接的套接字 面向连接的通信提供序列化的、可靠的和不重复的数据交付&#xff0c;而没有记录边界。主要的协议是传输控制协议&#xff08;TCP&#xff09;; TCP套接字&#xff0c;在python中&#xff0c;必须使用SOCK_STREAM作为套接字类型 tcp的特点 面向连接…

工业机器视觉系统如何实现精准检测?

机器视觉系统是指利用机器替代人眼做出各种测量和判断。一种比较复杂的系统。大多数系统监控对象都是运动物体&#xff0c;系统与运动物体的匹配和协调动作尤为重要&#xff0c;所以给系统各部分的动作时间和处理速度带来了严格的要求。在某些应用领域&#xff0c;例如机器人、…

C++高级 - 接口模板

目录 一. 接口 二. 模板 一. 接口 接口通常是通过抽象类或纯虚函数来实现的。 以下是一个使用抽象类来定义接口的示例代码&#xff1a; #include <iostream>class Interface { public:virtual void operation() 0; // 纯虚函数定义接口 };class ConcreteClass : pu…

网络安全||信息加解密技术以及密钥管理技术

一、信息加解密技术 对称加密 对称加密&#xff08;又称为私人密钥加密/共享密钥加密&#xff09;&#xff1a;加密与解密使用同一密钥。特点&#xff1a;加密强度不高&#xff0c;但效率高&#xff1b;密钥分发困难。&#xff08;大量明文为了保证加密效率一般使用对称加密&…