说明
随着整个算网开发逐渐深入,各个组件、微服务的数量、深度在不断增加。由于算网是个人项目,我一直按照MVP(Minimum Viable Product )的原则在推进。由于最初的时候对架构、算法和业务的理解并没有那么深刻,所以MVP的内容还是在不断变化(增加)的。比较幸运的是,中间走的弯路比较少,整体方向上一直没有大偏差,应该可以在预期的时间内达到目标。
从工具和使用的角度来看,我一边在算网的概念下构造工具,一边在尝试使用这些工具提高我的效率/能力上限。过去,由于已知的、明确要实现的功能就很多,所以一直是不断急速前进,常见的一种情况是:工具开发好了,测试也好用,但是就扔在一边,继续开发别的新工具去了。
现在的情况是:完成一个MVP所需要新开发的功能几乎没有了,反而是将已有的工具用起来比较重要。
今年做的比较重要的改变是使用streamlit、gradio将微服务前后端一体化。这种方式最初是大语言模型广泛采用的,的确非常方便。对算网而言,大量的已开发组件和功能,将通过这种方式进行文档、测试和使用的一体化。
总之,现在的重点是实现(Realization)。
内容
1 GlobalFunc更新
1.1 程序
目前仍然采用vscode代码开发的方式,创建一个个新的py文件。
1.2 推送更新
步骤如下:
- 1 切到项目路径下
cd .../m4git/GlobalFunc
- 2 导入操作对象
from op044_GlobalFunc_01BaseOpt import gfbase
# 1 查看当前分枝
gfbase._get_current_branch()
# 2 扫描所有文件的信息
scan_dict = gfbase._get_scan_files()
# 3 提交git项目
gfbase._simple_commit_git()
# 4 刷新一个包的初始化文件
gfbase._generate_init_py('Base')
gfbase._generate_init_py('Parse')
gfbase._generate_init_py('TFIDF')
# 5 批量存储函数
some_pack_list = [x for x in scan_dict.keys() if x.startswith('Base.') or x.startswith('Parse.') or x.startswith('TFIDF.')]
for some_file in some_pack_list:
gfbase._save_a_file_rom(some_file)
2 GFGo Lite 更新
GFGo Lite 有所修改,文件在项目文件夹gfgo_lite_24090 下面
2.1 文件
切入gfgo_lite_build容器,里面已经有了一些依赖文件
- 1
init_funcs
里面存放了数据库连接相关的函数 - 2 按模块执行加载,然后初始化
以下是从Base模块中加载一些函数的示例
from init_funcs import create_file,generate_init_py,RedisOrMongo, Naive
import os
import json
# 声明空间
# 在容器中启动
redis_cfg = Naive()
redis_cfg.redis_agent_host = 'http://172.17.0.1:24021/'
redis_cfg.redis_connection_hash = None
# 模块Base
if True:
# 声明当前要创建的程序文件夹:默认为funcs
target_folder = 'GlobalFunc'
tier1 = 'sp_GlobalFunc'
var_space_name = 'Base'
# 分支,一般默认为master
branch_name = 'master'
tier2 = '_'.join([var_space_name, branch_name])
the_space_name = '.'.join([tier1,tier2])
target_folder = target_folder + '/' + var_space_name
os.makedirs(target_folder, exist_ok=True)
rom = RedisOrMongo(the_space_name, redis_cfg.dict(),backend='mongo', mongo_servername='m7.24065')
# 这个一般根据需要,或者代码中得来 --- 需要的列表项
func_list = [ 'from_pickle','to_pickle','is_file_exists',
'gen_time_axis','ATimer2','get_time_str1','cols2s',
'create_folder_if_notexist','flat_dict','flatten_list'
]
# func_list = [ 'from_pickle','to_pickle','pose_a_file']
for some_name in func_list:
# 获取 meta,data : data就是代码字符
the_data = rom.getx(some_name)
filename = the_data['meta']['name']
filedata = the_data['data']
create_file(target_folder, filename, filedata)
# 生成初始化文件
generate_init_py(target_folder)
在导入新的包时,需要手动修改GlobalFunc下的__init__.py
(与Base包和Parse包平级)。
from . import Base
from . import Parse
2.2 服务
在 /workspace
下直接启动服务即可 python3 server.py
。
3 应用
3.1 通用函数 UCS
UCS是一个规范,为了支持这个规范,必须依赖一些特定(同时也是固定)的函数。因为函数的通用性,所以这些函数放在了最外层,每个函数都占据了一个api接口。
3.1.1 block (int)
整型block分割三件套
import requests as req
some_dict = {}
some_dict['rec_id'] = 111111
res = req.post('http://127.0.0.1:8000/get_brick_name/', json = some_dict).json()
'0.0.0.11'
some_dict = {}
some_dict['brick_name'] = '0.0.0.11'
res = req.post('http://127.0.0.1:8000/get_brick_bounds/', json = some_dict).json()
[110000.0, 120000.0]
some_dict = {}
some_dict['start_brick_name'] = '0.0.0.9'
some_dict['end_brick_name'] = '0.0.0.12'
res = req.post('http://127.0.0.1:8000/get_brick_list/', json = some_dict).json()
['0.0.0.9', '0.0.0.10', '0.0.0.11'
3.1.2 time block
操作如下:时间支持字符和数值(时间戳)两种模式。
import requests as req
# char
some_dict = {}
some_dict['dt_str_or_ts'] = '2024-01-31 11:11:11'
res = req.post('http://127.0.0.1:8000/get_time_brick_name/', json = some_dict).json()
'2024.01.31.11'
# num
some_dict = {}
some_dict['dt_str_or_ts'] = 1706670671
res = req.post('http://127.0.0.1:8000/get_time_brick_name/', json = some_dict).json()
'2024.01.31.11'
# char
some_dict = {}
some_dict['brick_name'] = '2024.01.31.11'
some_dict['char_or_num'] = 'char'
res = req.post('http://127.0.0.1:8000/get_time_brick_bounds/', json = some_dict).json()
'''
In [13]: res
Out[13]: ['2024-01-31 11:00:00', '2024-01-31 12:00:00']
'''
# num
some_dict = {}
some_dict['brick_name'] = '2024.01.31.11'
some_dict['char_or_num'] = 'num'
res = req.post('http://127.0.0.1:8000/get_time_brick_bounds/', json = some_dict).json()
'''
In [15]: res
Out[15]: [1706670000, 1706673600]
'''
some_dict = {}
some_dict['start_brick_name'] = '2024.01.31.11'
some_dict['end_brick_name'] = '2024.02.02.11'
res = req.post('http://127.0.0.1:8000/get_time_brick_list/', json = some_dict).json()
'''
In [11]: res
Out[11]:
['2024.01.31.11',
'2024.01.31.12',
'2024.01.31.13',
'2024.01.31.14',
...
'''
3.2 功能函数
3.2.1 Base包的函数调用
以下是两个Base包函数的测试
import requests as req
# 测试1:调用Base包的函数
kwargs = {'ts':None,
'bias_hours':8}
pack_func = 'Base.get_time_str1'
some_dict = {}
some_dict['kwargs'] = kwargs
some_dict['pack_func'] = pack_func
res = req.post('http://127.0.0.1:8000/gfgo/', json = some_dict).json()
'2024-05-05 11:04:35'
# 测试2:列表扁平化
kwargs = {'nested_list':[[1,2],[3],[4,5]]}
pack_func = 'Base.flatten_list'
some_dict = {}
some_dict['kwargs'] = kwargs
some_dict['pack_func'] = pack_func
res = req.post('http://127.0.0.1:8000/gfgo/', json = some_dict).json()
[1, 2, 3, 4, 5]
有两点需要注意:
- 1 函数规范为全部关键字参数输入(主要是为了方便调用)
- 2 接口直接返回处理信息(而不是包上状态和消息)
3.3.3 Parse包的函数调用
x = "This is a sample text."
word_list = ["sample", "test", "string"]
kwargs = {'x':x, 'word_list':word_list}
pack_func = 'Parse.judge_existence'
some_dict = {}
some_dict['kwargs'] = kwargs
some_dict['pack_func'] = pack_func
res = req.post('http://127.0.0.1:8000/gfgo/', json = some_dict).json()
True
3.4 服务迭代
推送新的变化
docker push myregistry.domain.com:24052/worker.andy.gfgo_lite_24090:v101
启动服务
docker run -d \
--restart=always \
--name=gfgo_lite_24090 \
-v /etc/localtime:/etc/localtime \
-v /etc/timezone:/etc/timezone\
-v /etc/hostname:/etc/hostname\
-e "LANG=C.UTF-8" \
-w /workspace\
-p 24090:8000\
myregistry.domain.com:24052/worker.andy.gfgo_lite_24090:v101 \
sh -c "python3 server.py"
公网调用
In [3]:
...: some_dict = {}
...: some_dict['brick_name'] = '2024.01.31.11'
...: some_dict['char_or_num'] = 'num'
...: res = req.post('http://WAN_IP:24090/get_time_brick_bounds/', json = some_dict).json()
In [4]: res
Out[4]: [1706670000, 1706673600]
4 总结与展望
RuleSet As A Func
将复杂的规则(判定)作为一个函数调用。
Series Apply
每个函数都要支持列表(多个元素)的并行处理。
踩过的一个小坑:GlobalFunc使用了一个公网机的Redis做ROM,而GFGoLite使用m7本地的redis,导致了逻辑上看起来更新了,但是实际未更新。