说明
如果说微服务化(API接口、Web页面、Docker镜像)是架构方面的基准,那么数据模型就是逻辑处理方面的基准
内容
以下是一个样例:
import re
def extract_utf8_chars(input_string = None):
# 定义一个正则表达式,用于匹配所有的UTF-8字符
utf8_pattern = re.compile(r'[\u0000-\U0010FFFF]')
# 使用findall方法找到所有匹配的字符
utf8_chars = utf8_pattern.findall(input_string)
return ''.join(utf8_chars)
def toDBC(some_char):
tem_str_ord = ord(some_char)
res = None
if tem_str_ord >65280 and tem_str_ord < 65375:
res =tem_str_ord - 65248
# 12288全角空格,160  空格
if tem_str_ord in [12288,160]:
res = 32
res_var_ord = res or tem_str_ord
return chr(res_var_ord)
def tranform_half_widh(some_str = None):
res_list = []
return ''.join([toDBC(x) for x in some_str])
# 强分割
import re
def split_sentences_with_punctuation(text):
# 定义句子分隔符
punctuation = r'([。?!?!\n])'
# 根据句子分隔符进行分割,并保留分隔符
parts = re.split(punctuation, text)
# 将分隔符与句子重新组合
sentences = []
for i in range(0, len(parts), 2):
sentence = parts[i].strip()
if i + 1 < len(parts):
sentence += parts[i + 1]
sentences.append(sentence)
return sentences
import re
def ensure_period(sentence= None):
"""
如果句子不是以句号、问号、感叹号或者感叹问号结尾,则在结尾添加一个句号。
参数:
sentence (str): 待检查的句子。
返回:
str: 添加句号后的句子。
"""
# 使用正则表达式匹配句子末尾的标点符号
if not re.search(r'[。?!?!]$', sentence):
sentence += '。'
return sentence
# 强分割断句,确保末尾的强分隔符。
from typing import List, Optional
from pydantic import BaseModel,FieldValidationInfo, field_validator
class Item(BaseModel):
doc_id:str
content:str
# 验证器:确保 content 以强分隔符结尾
@field_validator('content',mode='before')
def ensure_utf8_and_halfwidth(cls, v):
v = tranform_half_widh(extract_utf8_chars(v)) # 转换为半角字符
return v
# 给到一个document,将之分割为句子
class DocumentSplit(BaseModel):
input_data_listofdict: List[Item] = [{'doc_id':'1', 'content':'这是第一篇文章。'}, {'doc_id':'2', 'content':'这是第二篇文章。'}]
使用propery
from pydantic import BaseModel
class User(BaseModel):
first_name: str
last_name: str
@property
def full_name(self):
return f"{self.first_name} {self.last_name}"
@property
def full_name2(self):
return self.full_name.upper()
def dict(self):
data = {}
data['first_name'] = self.first_name
data['last_name'] = self.last_name
data['full_name'] = self.full_name
data['full_name2'] = self.full_name2
return data
# property 属性是计算属性,需要时生成,不占空间,同时也不会随默认的dict方法输出
# 创建一个User对象
user = User(first_name='John', last_name='Doe')
# 使用dict()方法输出,包含full_name和full_name2
user_dict = user.dict()
print(user_dict) # 输出: {'first_name': 'John', 'last_name': 'Doe', 'full_name': 'John Doe', 'full_name2': 'JOHN DOE'}
以上大致展示了pydantic的一个使用。
1 数据模型
以前一直有类似pydantic的想法,构造一个中间对象来进行数据的校验和传递。做了一些实验,但是都不太满意。随着大模型的兴盛,我才注意到这个工具。大模型之所以要用这个,大约是因为数据传递的复杂性很容易出错,尤其是langchain类型的任务。
总体上来说,数据模型在大型的、长期的、复杂的数据处理任务中有非常重要的作用。一个显而易见的问题是,我们没办法在有限的资源下,约束自己,约束他人,约束数据。
例如,由于我们记忆力、精力的天然局限,我们甚至不记得自己1个月前些的代码为什么要这么做。-- 难以约束自己
我们在项目协同中,无法告知其他人,需要输出什么样的数据。 – 难以约束他人
数据问题通常会以一种情理之中、意料之外的情况发生。 – 难以约束数据
所以,如果为每一个重要的数据节点都设置了数据模型,我们就可以很大程度上避免这些问题。
从整个数据处理来看,如果从图的角度分析,那么会有点(数据节点)和边(处理过程)。
数据节点是一些非常重要的数据形态持久化。
在一些特殊的里程碑事件上,我们需要确保数据是可观察的。例如,数据的创建阶段,我们需要看到数据,确保这个初始阶段的结果是可靠的。在过程中,可能又分为几个重要阶段,例如模式识别分类。最后,我们准备将数据交付到产品或者客户的数据库里。
在这些里程碑阶段我们都非常清楚业务上的要求和期待是什么,而且需要让其他相关的人也可以看得到(以便分工或者交付)。
数据节点是复杂过程的接力棒。
在大型数据处理过程中我们无法对每一个过程中的数据进行数据存储和展示:
- 1 我们没有足够的时间把事情做到如此的粒度
- 2 存储每一个过程带来的数据存储成本太高了
- 3 很多时候只是程序访问这些数据节点,人并不关注
但是,在开发、运行和调试的过程中,我们需要数据模型。
- 1 开发时,我们需要向其他人,甚至是未来的自己说明这个步骤的输入和输出是什么
- 2 运行时,确保我们的输出是可靠的,不符合数据模型的数据根本进不来。如果出错了,大的衔接步骤很快可以发现。
- 3 调试时,顺着数据模型向下走,容易找到问题点。
2 任务对象
最近做了不少的实践,以下我觉得比较适合作为ending。
在处理大规模、复杂和长期的任务时,必然要进行分布式计算。在调度之前,如果我们能做好planning,那么就会高效很多。
可靠性与效率就像 Precision和Recall一样,是一对对冲子。
以下是我的对象,以前有做过一些任务系统设计的实验,最终没有数据模型,也就没有固化下来。下面的模型设计是为了确保NoSQL和SQL数据库都可以存储的(In Case我也不知道最终后面用哪种)。所以,也可以看到数据模型是抽象的,独立与数据库类型的设计,是松耦合设计。
from typing import List, Optional,Dict
from pydantic import BaseModel
# 标准任务 这个任务定义是块级别的 - 表 - rabbitmq 分发
class AndyBlockTask(BaseModel):
# 命名部分
name : str
tier1 : str
tier2 : str
ord_id : int # 按顺序编号
# UCS部分
shard : int = 0
part : int = 0
block : int = 0
brick : int
# uri: 资源访问路径:主要是为了各种数据库兼容,最佳是字典。可以把这个作为一个短链,让worker执行时再去拿。
uri : str
# 基本设定
timeout : int = 600
max_retries : int = 3
# 任务执行部分
is_claimed : int = 0
is_done : int = 0
is_error : int = 0
# ---- 执行时信息
last_claimed_time : Optional[str]
done_time : Optional[str]
done_duration: Optional[int]
# 任务统计
timeout_cnt : int = 0
execute_cnt : int = 0 # 被认领次数,用于限定再次分发
worker_info : Optional[str] # 执行者信息
worker_comments : Optional[str] # worker返回的信息,主要是与异常相关的
worker_get_data_duration: Optional[int] # 获取数据的时间
worker_process_data_duation: Optional[int] # 处理数据的时间
# 主键
@property
def pid(self):
return '.'.join([self.tier1, self.tier2, self.name, str(self.ord_id)])
# ucs名称-在块状表中也是pk, 在明细数据表中是索引
@property
def brick_name(self):
return '.'.join([str(self.shard),str(self.part),str(self.block), str(self.brick)])
def dict(self):
data_dict = {}
# 主键/索引
data_dict['pid'] = self.pid
data_dict['brick_name'] = self.brick_name
# 命名
data_dict['name'] = self.name
data_dict['tier1'] = self.tier1
data_dict['tier2'] = self.tier2
data_dict['ord_id'] = self.ord_id
# UCS部分
data_dict['shard'] = self.shard
data_dict['part'] = self.part
data_dict['block'] = self.block
data_dict['brick'] = self.brick
# uri: 任务数据源
data_dict['uri'] = self.uri
# 基本设定
data_dict['timeout'] = self.timeout
data_dict['max_retries'] = self.max_retries
# 任务执行部分
data_dict['is_claimed'] = self.is_claimed
data_dict['is_done'] = self.is_done
data_dict['is_error'] = self.is_error
# 任务执行时信息
data_dict['last_claimed_time'] = self.last_claimed_time
data_dict['done_time'] = self.done_time
data_dict['done_duration'] = self.done_duration
data_dict['worker_info'] = self.worker_info
data_dict['worker_comments'] = self.worker_comments
data_dict['worker_get_data_duration'] = self.worker_get_data_duration
data_dict['worker_process_data_duation'] = self.worker_process_data_duation
# 任务执行统计
data_dict['timeout_cnt'] = self.timeout_cnt
data_dict['execute_cnt'] = self.execute_cnt # 被认领次数,用于限定再次分发
在使用的时候,可以先将任务进行规划后写入某个数据库中。然后调度程序将这些任务发到消息队列(RabbitMQ)中,然后由对应的worker进行响应。
在一段时间之后,调度程序会根据执行结果决定是否重发。