建模杂谈系列256 规则函数化改造

news2025/1/23 8:06:45

说明

之前尝试用FastAPI来构造规则,碰到的问题是由于请求量过大(TPS > 1000), 从而导致微服务端口资源耗尽。所以现在的point是:

  • 1 如何使用函数来替代微服务(同时要保留使用微服务的优点)
  • 2 进一步抽象并规范规则的执行
  • 3 等效合并规则的方法

内容

0 机制讨论

过去在使用tornado作为后端服务的时候,是没有碰到端口耗尽的问题的,也许是tornado本身采取的是长连接,更适合大批量数据请求的后端任务。
FastAPI更适合做短、平的IO类异步需求,不可以用于级联,TPS大约400-1200的样子。
这次的业务场景是实体匹配,我们需要从原文中提取出实体,然后完成匹配。

数据样本:

ent_list = ['基金', '美芯晟', '高新兴', '骏成科技', '证券时报', '深市主板', '创业板', '沪市', '科创板', '计算机', 
'机械设备', '共有3', '潍柴动力', '乐心医疗', '嘉曼服饰', '敏芯股份', '渝开发', '长虹美菱', '德联集团', '数据宝', 
'中航西飞', '顺络电子', '基金家数', '华利集团', '杰瑞股份', '邦彦技术', '兴瑞科技', '深天马', '漫步', 
'金力永磁', '太阳能', '普蕊斯', '德方纳米', '华锐精密', '伊之密', '西子洁能', '陕西华达', '浙江鼎力', '诺瓦星云', '远光']

original_text_half_width = '''昨日基金共对31家公司进行调研,扎堆调研美芯晟、高新兴、骏成科技等。证券时报•数据宝统计,6月5日共40家公司被机构调研,按调研机构类型看,基金参与31家公司的调研活动,其中,10家以上基金扎堆调研公司共6家。美芯晟最受关注,参与调研的基金达27家;高新兴、骏成科技等分别获18家、15家基金集体调研。基金参与调研的公司中,按所属板块统计,深市主板公司有13家,创业板公司有13家,沪市主板公司有1家,科创板公司有4家。所属行业来看,基金调研的公司共涉及13个行业,所属电子行业最多,有7家公司上榜;计算机、机械设备等紧随其后,分别有4家、4家公司上榜。从基金调研公司的A股总市值统计,总市值在500亿元以上的共有3家,其中总市值超千亿元的有潍柴动力等,总市值不足100亿元的有17家,分别是乐心医疗、嘉曼服饰、敏芯股份等。市场表现上,基金调研股中,近5日上涨的有10只,涨幅居前的有敏芯股份、高新兴、骏成科技等,涨幅为21.46%、19.43%、13.83%;下跌的有21只,跌幅居前的有渝开发、长虹美菱、德联集团等,跌幅为12.31%、9.92%、9.22%。数据宝统计,基金参与调研股中,近5日资金净流入的有12只,中航西飞近5日净流入资金1.53亿元,主力资金净流入最多;净流入资金较多的还有高新兴、顺络电子等,净流入资金分别为8217.28万元、3140.67万元。(数据宝)6月5日基金调研公司一览代码简称基金家数最新收盘价(元)近5日涨跌幅(%)行业688458美芯晟27 35.85 -4.48电子300098高新兴18 3.75 19.43计算机301106骏成科技15 32.42 13.83电子002138顺络电子14 25.27 7.76电子300979华利集团14 67.90 1.19纺织服饰300803指南针13 42.92 -0.60计算机002353杰瑞股份9 34.33 -2.05机械设备301276嘉曼服饰8 22.71 -0.74纺织服饰688132邦彦技术8 18.55 5.64国防军工688286敏芯股份6 44.60 21.46电子000338潍柴动力6 15.74 -2.24汽车002937兴瑞科技5 20.93 1.45电子000514渝开发4 3.49 -12.31房地产000050深天马A4 7.31 -2.40电子002351漫步者3 12.73 -0.08电子300748金力永磁2 14.06 -2.77有色金属000591太阳能2 5.05 -6.31公用事业301257普蕊斯2 40.05 -4.53医药生物000768中航西飞2 25.23 4.69国防军工300769德方纳米2 33.88 -2.98电力设备300562乐心医疗1 8.61 -6.62医药生物688059华锐精密1 54.53 -6.79机械设备002666德联集团1 3.94 -9.22基础化工300415伊之密1 22.34 1.79机械设备002534西子洁能1 10.99 -4.18电力设备301517陕西华达1 61.60 2.56国防军工301362民爆光电1 34.00 -7.34家用电器603338浙江鼎力1 61.86 -4.93机械设备301589诺瓦星云1 219.68 -8.31计算机000521长虹美菱1 8.90 -9.92家用电器002063远光软件1 5.70 -1.55计算机注:本文系新闻报道,不构成投资建议,股市有风险,投资需谨慎。'''

在逻辑上,我们会按照实际情况设计分级,在程序上,我们要有一个合并的逻辑。这种逻辑要简单,不要offend逻辑。

1 现有的服务

采用“WaterFall”的方法逐步批量的处理并分流数据。

一条规则是如此

# reject
@app.post("/r000/")
async def r000(justent:JustEnt):
    the_ent = justent.some_ent
    the_result = RuleResult()
    try:
        if judge_existence(the_ent, word_list=r0_exe_clude_list):
            the_result.status = 'reject'
        else:
            the_result.status = 'pass'
        return the_result.dict()
    except Exception as e:
        raise HTTPException(status_code=400, detail=str(e))

在发起调用时,采用异步方式,每次根据请求的目标先品出参数,然后将渠道的结果进行解析。

import time 
def waterfall_api_mode(last_fall, rule_name ,reject_list = None, get_list = None, mappling_list = None, raw = None , base_url = None):
    next_fall = []
    last_ent_list = last_fall 
    pure_rule_url = rule_name + '/'

    if len(last_ent_list):
        rule_url = base_url + pure_rule_url
        # api mode
        tick1 = time.time()
        task_list = []
        for ent in last_ent_list:
            tem_dict = {}
            tem_dict['task_id'] =  ent 
            tem_dict['url'] = rule_url
            if raw is None :
                tem_dict['json_params'] = {'some_ent':ent}
            else:
                tem_dict['json_params'] = {'some_ent':ent,'raw':raw}
            task_list.append(tem_dict)
        rule_res = asyncio.run(json_player(task_list, concurrent = 10))
        # 解析结果,保留pass
        for tem_res in rule_res:
            for k,v in tem_res.items():
                # print(k,v)
                if v['status'] == 'pass':
                    next_fall.append(k)
                elif v['status'] == 'get':
                    if get_list is not None :
                        get_list.append(v['data'])
                    if mappling_list is not None :
                        mappling_list.append({'ent':k,'mapping_ent': v['data']})
                elif v['status'] == 'reject':
                    if reject_list is not None :
                        reject_list.append(k)

        tick2 = time.time()
        print('takes %.2f ' %(tick2-tick1))
    return next_fall

在批量调用规则时,采用几乎一样的形式即可,这是非常简洁的地方。

    #  ============= fall of short 
    # r100_1
    next_fall_short = waterfall_api_mode(next_fall_short, 'r100_1',base_url = base_url)
    # r100
    next_fall_short = waterfall_api_mode(next_fall_short, 'r100', get_list = mr.short_result,mappling_list=mr.mapping_list ,base_url = base_url)
    # r102
    next_fall_short = waterfall_api_mode(next_fall_short, 'r102', get_list = mr.short_result,mappling_list=mr.mapping_list ,base_url = base_url)
    # r102_1
    next_fall_short = waterfall_api_mode(next_fall_short, 'r102_1', get_list = mr.short_result,mappling_list=mr.mapping_list ,base_url = base_url)
    # r103
    next_fall_short = waterfall_api_mode(next_fall_short, 'r103', raw = original_text_half_width,base_url = base_url)
    # r104
    next_fall_short = waterfall_api_mode(next_fall_short, 'r104',base_url = base_url)
    # r105
    next_fall_short = waterfall_api_mode(next_fall_short, 'r105',base_url = base_url)
    # r106
    next_fall_short = waterfall_api_mode(next_fall_short, 'r106',base_url = base_url)
    # r107
    next_fall_short = waterfall_api_mode(next_fall_short, 'r107',base_url = base_url)
    # r200
    next_fall_short = waterfall_api_mode(next_fall_short, 'r200',base_url = base_url)
    # r201
    next_fall_short = waterfall_api_mode(next_fall_short, 'r201',base_url = base_url)
    # r202
    next_fall_short = waterfall_api_mode(next_fall_short, 'r202',base_url = base_url)
    # r203
    next_fall_short = waterfall_api_mode(next_fall_short, 'r203',base_url = base_url)
    # r299
    next_fall_short = waterfall_api_mode(next_fall_short, 'r299', get_list = mr.short_result,mappling_list=mr.mapping_list,base_url = base_url )

觉得还不错,需要保持的地方:

  • 1 数据规范。使用pydantic,这个可以继续保持
  • 2 waterfall_api_mode ,可以作为waterfall_func_mode, 且这次可以规定输出为4部分:get、pass、reject、error
  • 3 执行时,每条规则除了顺序之外,应该还有层次,实现BFS。规则分为若干模式,例如 is_in , is_not_in, 在每个层之间的同类规则可以合并。

2 设计与改进

诶,我突然想到了我的APIFunc。
在这里插入图片描述
总体上说,这个框架还是比较强大的,但是非常僵化,所以最终没有走向实际应用。所以我觉得完全可以进行拆解,重构。当然,里面有一部分问题的解决还是蛮厉害的,反正这一会我想不出来。

有几块内容是需要添加上的:

  • 1 logging对象:灵活的进行记录,后续会和logstash结合在一起(ELK)
  • 2 错误输出:遇到错误时发送到kafka
  • 3 shortuuid: 每次处理会生成一个shortuuid用于追溯,代表一次会话之内的

修改的部分:

  • 1 原来有很多数据的校验部分,现在可以用pydantic来控制
  • 2 BFS替代逐个的链式
  • 3 没有列式方法,全部是行式方法
  • 4 g变量:会存储额外的字典,不必完全按照df格式

优化的部分:

  • 1 修饰器方法,支持按依赖定义规则。例如 on depends of [rule1,rule2], def new rule。

保留的部分:

  • 1 reinit_data 重新初始化数据

2.1 原型部分

2.1.1 Logging
import logging
from logging.handlers import RotatingFileHandler

def get_logger(name, lpath='/var/log/', module='default.default'):
    logger = logging.getLogger(name)
    
    # 防止重复添加 handler
    if not logger.handlers:
        fpath = lpath + name + '.log'
        handler = RotatingFileHandler(fpath, maxBytes=100 * 1024 * 1024, backupCount=10)

        # 设置日志格式为 [时间] - [日志级别] - [模块名称] - 消息
        formatter = logging.Formatter(
            '[%(asctime)s] - [%(levelname)s] - [{}] - %(message)s'.format(module),
            datefmt='%Y-%m-%d %H:%M:%S'
        )
        handler.setFormatter(formatter)

        logger.addHandler(handler)
        logger.setLevel(logging.INFO)

    return logger


logger = get_logger('example')

# 记录不同级别的日志
logger.info('[part.a]系统启动完成')
logger.warning('[[part.b]磁盘空间不足,剩余空间小于10%')
logger.error('无法连接数据库,请检查网络设置')
logger.debug('这是调试信息,不会显示在日志中')
logger.critical('系统崩溃,立即采取措施')
2.1.2 BFS

先回忆一下过去的成果,当时的结论是:使用networkx作为核心的图计算工具,而neo4只是后端的存储backup。可以认为是pandas和mysql的关系。

在使用的时候,可以为一个项目设置一个独立的名称,这个独立的名称也就是节点的label,或者可以认为是节点的“表”。在需要的时候,可以整个读取(neo4j disk),在内存中处理(networkx memory)。

这段代码定义了一个很小的图,

import networkx as nx
import matplotlib.pyplot as plt

# =======================>  图的定义
# Create a directed graph
G = nx.DiGraph()

def hello():
    print('This is Node Running ...')

G.add_node(1)
G.nodes[1]['name'] = 'MinuteData'
G.nodes[1]['run'] =  hello

G.add_node(2)
G.nodes[2]['name'] = 'CaptialDataDaily'
G.nodes[2]['run'] =  hello

G.add_node(3)
G.nodes[3]['name'] = 'MergeData'
G.nodes[3]['run'] = hello

G.add_edge(1,3)
G.add_edge(2,3)

G.add_node(4)
G.nodes[4]['name'] = 'FeatureHorizonal'
G.nodes[4]['run'] = hello

G.add_edge(3,4)

G.add_node(5)
G.nodes[5]['name'] = 'ImbalanceSample'
G.nodes[5]['run'] = hello

G.add_edge(4,5)

G.add_node(6)
G.nodes[6]['name'] = 'FeatureVertical'
G.nodes[6]['run'] = hello

G.add_edge(5,6)


# =======================>  图的绘画
# 获取节点标签属性
node_labels = nx.get_node_attributes(G, "name")
# pos = nx.shell_layout(G)
pos = nx.spring_layout(G)
nx.draw(G, pos, with_labels=False,  node_size=1000, font_size=12, font_color='black', arrows=True)
# 绘制节点标签
_ = nx.draw_networkx_labels(G, pos, labels=node_labels)

在这里插入图片描述
这里,可以看到节点的依赖关系可以很清楚的展示出来。

然后稍微跳一下

# 输入一个nx图,给出BFS层级字典
def BFS(some_G,max_depth = 100):
    layer_dict = {}

    # 初始化节点
    init_node_list = [node for node, in_degree in some_G.in_degree() if in_degree == 0]
    layer_dict[0] = init_node_list    

    # 节点的入度字典
    in_degree_dict = dict(some_G.in_degree())
    all_nodes = set(some_G.nodes)
    travel_nodes = set(init_node_list)

    # 迭代节点
    for i in range(1,max_depth):
        last_layer_nodes = layer_dict[i-1]
        layer_dict[i] = []
        for last_node in last_layer_nodes:
            out_nodes = list(some_G.successors(last_node))
            if len(out_nodes):
                for out_node in out_nodes:
                    out_node_degree = in_degree_dict[out_node]
                    out_node_degree1 = out_node_degree-1
                    if out_node_degree1 == 0:
                        layer_dict[i].append(out_node)
                        travel_nodes.add(out_node)
                    else:
                        in_degree_dict[out_node] = out_node_degree1
        gap_set = all_nodes - travel_nodes
        if len(gap_set) ==0:
            break
    return layer_dict

BFS(G)
{0: [1, 2], 1: [3], 2: [4], 3: [5], 4: [6]}

给到一个定义好的图,通过BFS可以很快把层级梳理出来。

所以,将原来的修饰器改一改,将节点的依赖关系在启动修饰器的时候解释。函数可以在修饰器下临时定义,也可以引用已经编辑好的。现在已经具备了使用形式化参数(如slice_list_batch)来调用函数了,既有本地的包,也有微服务。

2.1.3 会话

我们将程序的每一次执行视为一次会话。

将程序的每一次执行视为一次会话是一种有用的抽象,可以帮助我们追踪、分析和管理程序的行为。每次执行都可以被认为是一个独立的会话,这些会话可以包括一系列输入、处理和输出。以下是将程序执行视为会话时的一些要点:

1. 会话的定义

  • 每次程序的执行周期(从启动到结束)被视为一个独立的会话。
  • 会话的范围可以根据程序的复杂度定义,可能包括启动、执行逻辑、处理数据、生成结果,并最终结束。

2. 会话数据

  • 输入数据:用户输入或外部数据源提供的信息。
  • 上下文信息:会话中的环境或系统状态(如用户信息、配置设置、会话 ID)。
  • 日志记录:在每个会话中生成的日志信息,帮助监控、调试和跟踪程序执行的过程。
  • 输出数据:会话完成后生成的结果或操作。

3. 会话标识

  • 每个会话可以使用唯一的标识符(例如 UUID、时间戳)来区分和追踪。
  • 日志和监控系统可以根据这个标识符来收集会话信息。

4. 会话的生命周期

  • 开始:程序启动或用户发起的操作。
  • 执行:程序的核心逻辑运行,处理输入并生成中间或最终结果。
  • 结束:程序完成执行或用户操作结束。程序可以写入日志、清理资源或返回结果。

5. 会话状态

  • 成功:程序按预期完成所有操作。
  • 失败:程序执行中出现错误或异常。
  • 中断:程序由于外部原因或用户取消而中途停止。

6. 会话管理

  • 可以通过记录每次会话的执行时间、状态、输入和输出数据,来分析系统的性能和稳定性。
  • 会话管理有助于调试(当出现问题时可以回溯某一具体会话)、分析(汇总和统计会话数据)以及优化程序。

7. 会话存储

  • 将会话数据存储到数据库、日志文件或分布式系统中,以便后续分析或复盘。

通过这种“会话”概念,能够更好地组织和管理程序的执行过程,尤其在需要跟踪状态、并发操作、或者执行历史时非常有用。

两个需要增加的点(以前没这么实施)

  • 1 生成uuid,用于生成会话的唯一ID
import uuid

def get_uuid(version=4, name=None, namespace=None):
    """
    生成 UUID。
    
    参数:
    - version: UUID 版本 (1, 3, 4, 5)
    - name: 当使用 UUID3 或 UUID5 时,需要提供的名称
    - namespace: 当使用 UUID3 或 UUID5 时,需要提供的命名空间 (uuid.NAMESPACE_DNS, uuid.NAMESPACE_URL 等)

    返回:
    - 生成的 UUID 字符串
    """
    if version == 1:
        # 基于时间生成 UUID
        return uuid.uuid1()
    elif version == 3:
        if name is None or namespace is None:
            raise ValueError("UUID3 需要提供 name 和 namespace 参数")
        # 基于 MD5 哈希的命名空间 UUID
        return uuid.uuid3(namespace, name)
    elif version == 4:
        # 生成随机的 UUID
        return uuid.uuid4()
    elif version == 5:
        if name is None or namespace is None:
            raise ValueError("UUID5 需要提供 name 和 namespace 参数")
        # 基于 SHA-1 哈希的命名空间 UUID
        return uuid.uuid5(namespace, name)
    else:
        raise ValueError("不支持的 UUID 版本。版本应为 1, 3, 4 或 5")
  • 2 会话数据存储

在高性能的场景下,里面增加的每一个操作可能都会导致系统的不稳定。但是,如果是必要的操作,那么也不能省。

我问了下大模型,自己也想了想,觉得还是用kafka比较合适。

python操作kafka一般使用confluent-kafka,在有些环境下安装会有点问题。例如,我在ubuntu18.04上安装时,爆了一些底层错误,类似C之类的依赖;在20.04上安装就没有问题。但总归要考虑这种环境问题差异会比较麻烦,所以我也做了一个kafka_agent,以API的形式提供kafka的访问。缺点是,json序列化的过程要加多一次。

我们来考虑当前场景时,并不是对每一个请求都发送会话数据:

  • 1 正常的执行(INFO):可以考虑按很低的概率抓取会话数据。
  • 2 错误(ERROR): 可以完全抓取,但这个类型的比例应该本身就是极低的。
  • 3 特定的抓取(DEBUG):可以在请求时用特定的字段区分,这类型的会话数据会被抓取。

总之,需要发起数据存储的概率非常低,总体上可能不到1%,所以这些额外的开销应该可以接受。反之,如果因为会话数据的存储影响了处理,说明:

  • 1 程序的水准过低,错误率太高。
  • 2 确实有必要进行并行:一边运行,一边监控。

如果是程序问题,那么就需要不断优化;如果是需要同步进行并行检查,那么就设置缓冲队列,加分布式处理。

使用kafka agent

假设topic为event_collect ,发送一个消息

import requests as req 
from pydantic import BaseModel,field_validator
import pandas as pd 
import json 
import time 
class Producer(BaseModel):
    servers : str 
    raw_msg_list : list 
    is_json : bool = True 
    topic : str 


    @property
    def msg_list(self):
        # change raw - json 
        if self.is_json:
            tick1 = time.time()
            the_list = pd.Series(self.raw_msg_list).apply(json.dumps).to_list()
            print('takes %.2f for json dumps ' %(time.time() - tick1 ))
            return the_list 
        else:
            return self.raw_msg_list

回顾一下kafka的搭建,可以使用docker-compose搭建,但是我还是比较喜欢直接用docker。

首先需要搭建zookeeper。

docker run -d --restart=always --name zookeeper -e \
ZOOKEEPER_CLIENT_PORT=2181 -e \
ZOOKEEPER_TICK_TIME=2000 -p 2181:2181 \
registry.cn-hangzhou.aliyuncs.com/andy08008/zookeeper0718:v100

然后再搭kafka,假设kafka分为内网和外网监听。
创建kafka持久化的路径

mkdir /home/data2T/kafka_data

然后创建

WAN_IP=XXXX
LAN_IP=192.168.0.159
docker run -d --name kafka \
  -p xxxx:xxxx \
  -p 9092:9092 \
  --link zookeeper:zk \
  -e HOST_IP=localhost \
  -e KAFKA_BROKER_ID=1 \
  -e KAFKA_ZOOKEEPER_CONNECT=zk:2181 \
  -e KAFKA_ADVERTISED_LISTENERS=INTERNAL://${LAN_IP}:9092,EXTERNAL://${WAN_IP}:xxxx \
  -e KAFKA_LISTENERS=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:xxxx \
  -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT \
  -e KAFKA_LISTENER_NAME=INTERNAL \
  -e KAFKA_LISTENER_NAME=EXTERNAL \
  -e KAFKA_INTER_BROKER_LISTENER_NAME=INTERNAL \
  -e KAFKA_LOG_DIRS=/data/kafka-logs \
  -v /home/data2T/kafka_data:/data/kafka-logs \
  registry.cn-hangzhou.aliyuncs.com/andy08008/kafka0718:v100

这时候就可以使用kafka_agent进行连接了

生产

import requests as req 
from pydantic import BaseModel,field_validator
import pandas as pd 
import json 
import time 
class Producer(BaseModel):
    servers : str 
    raw_msg_list : list 
    is_json : bool = True 
    topic : str 


    @property
    def msg_list(self):
        # change raw - json 
        if self.is_json:
            tick1 = time.time()
            the_list = pd.Series(self.raw_msg_list).apply(json.dumps).to_list()
            print('takes %.2f for json dumps ' %(time.time() - tick1 ))
            return the_list 
        else:
            return self.raw_msg_list

msg_list = [{'id':i ,'value':'abc' } for i in range(10)]
produces = Producer(servers = WAN_IP,raw_msg_list = msg_list, topic='mytest200' )

import time 
tick1 = time.time()
resp = req.post(f'http://{agent_url}/send_msg/',json = produces.dict()).json()
tick2 = time.time()
print('takes %.2f' %(tick2 - tick1) )

resp 10
# 外网被占用的情况下,耗时比较久
takes 1.44

消费

import requests as req 
from pydantic import BaseModel,field_validator
import pandas as pd 
import json 
import time 

# group.id: 声明不同的group.id 可以重头消费
class InputConsumer(BaseModel):
    servers : str 
    groupid : str = 'default01'
    is_commit: bool = True 
    msg_num : int  = 3 
    topic : str 
    is_json : bool = True 


# 外网
the_consumer = InputConsumer(servers = f'{WAN_IP}', msg_num =10, topic='mytest202')

import time 
tick1 = time.time()
resp = req.post(f'http://{agent_url}/consume_msg/',json = the_consumer.dict()).json()
tick2 = time.time()
print('takes %.2f' %(tick2 - tick1) )

# 内网
lan_the_consumer = InputConsumer(servers = f'{LAN_IP}', msg_num =10, topic='mytest202')
import time 
tick1 = time.time()
resp = req.post(f'http://{agent_url}/consume_msg/',json = lan_the_consumer.dict()).json()
tick2 = time.time()
print('takes %.2f' %(tick2 - tick1) )

我发现在带宽在被占满的情况,从公网拉取的消息结果为空,但是从内网可以拉取到结果。

在这里插入图片描述

原因大致如下:对应方法是保证带宽,或者在消费端进行修改
在这里插入图片描述
当我取消掉模拟耗带宽的操作(rsync大文件),此时无论WAN还是LAN都恢复正常了。
在这里插入图片描述

Q1: 使用代理,性能是否会有影响?

A1: 由于向代理发送,接受消息都要经过json序列化,效率将会大幅下降。80%以上的开销均为序列化开销。

生产者: 10万json+agent 1秒 外/ 0.78 内,仅10万json 0.5秒
消费者: 10万条 2.1秒 |1.95 |1.79

但可以看到,这样的速度仍然可以大规模使用。

Q2: 如果输错了服务器地址会怎样?

A2: 服务将陷入短暂不可用情况。在取消错误的请求后,大约5分钟,代理重连太久后才会自动取消。

结论:保存数据走内网kafka。

其他

Logstash的调试

Logstash 是一个开源的 数据收集引擎,通常用于实时数据处理和日志管理。它可以从多种来源收集数据,将其过滤、解析,并将处理后的数据发送到不同的目标存储系统。Logstash 是 ELK/Elastic Stack(Elasticsearch、Logstash、Kibana)的一部分,通常与 Elasticsearch 和 Kibana 搭配使用来构建一个完整的日志和事件管理系统。

Logstash 的主要功能

  1. 数据收集

    • Logstash 支持从各种数据源收集数据,例如日志文件、数据库、网络、消息队列等。通过插件系统,它能够轻松集成到不同的数据源环境中。
  2. 数据过滤与解析

    • Logstash 可以对收集到的数据进行过滤和解析,例如使用正则表达式提取字段,重新格式化数据,或者对数据进行清洗。
    • Logstash 的过滤器插件支持丰富的处理操作,比如 Grok 解析、JSON、日期处理、去重、聚合等。
  3. 数据输出

    • Logstash 可以将处理后的数据发送到多个目标系统,比如 Elasticsearch(用于搜索和分析)、文件、数据库、消息队列、监控系统等。

Logstash 主要的架构组件

  • Inputs(输入插件):用于指定数据来源,如文件、数据库、消息队列等。常见的输入插件包括 filesyslogkafkahttp 等。

  • Filters(过滤插件):用于处理、解析和转换数据,可以使用 Grok、正则表达式、日期处理等插件来解析复杂的日志格式。

  • Outputs(输出插件):用于定义数据的存储位置,比如发送到 Elasticsearch、存储到文件、发送到消息队列等。

常见使用场景

  1. 日志管理与分析

    • Logstash 经常与 Elasticsearch 和 Kibana 搭配使用来实现集中式日志管理,将来自不同服务的日志集中采集、分析和展示。
  2. 实时数据流处理

    • 它还可以用来处理实时数据流,例如从 Kafka 或 Redis 获取消息,对数据进行实时处理后发送到目标系统。
  3. 系统监控与安全分析

    • 在 DevOps 环境中,Logstash 用于实时监控应用程序、服务器和网络设备的日志,并通过 Kibana 展示给运维人员,实现系统健康监控和安全日志分析。

简单工作流程

  1. 输入:从不同的数据源收集数据(如文件、数据库、API 等)。
  2. 过滤:通过解析、格式化和过滤等操作对数据进行处理。
  3. 输出:将处理后的数据发送到指定目标(如 Elasticsearch、Kafka、文件等)。

示例

下面是一个简单的 Logstash 配置,它从一个日志文件中收集数据,解析后发送到 Elasticsearch:

input {
  file {
    path => "/var/log/example.log"
    start_position => "beginning"
  }
}

filter {
  grok {
    match => { "message" => "%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:level} %{GREEDYDATA:message}" }
  }
  date {
    match => ["timestamp", "ISO8601"]
  }
}

output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "logs-%{+YYYY.MM.dd}"
  }
}

这个配置会将日志文件中的数据解析为 JSON 格式,并按日期创建 Elasticsearch 索引。

Logstash 通过灵活的输入、过滤、输出插件,使它成为处理异构数据的强大工具。

日志

这段日志输出来自 Logstash,显示了一个日志事件的详细信息。下面是每个字段的解释:

9月 15 19:09:07 m7 logstash[23910]: {
9月 15 19:09:07 m7 logstash[23910]:        "@version" => "1",
9月 15 19:09:07 m7 logstash[23910]:             "log" => {
9月 15 19:09:07 m7 logstash[23910]:         "file" => {
9月 15 19:09:07 m7 logstash[23910]:             "path" => "/var/log/example.log"
9月 15 19:09:07 m7 logstash[23910]:         }
9月 15 19:09:07 m7 logstash[23910]:     },
9月 15 19:09:07 m7 logstash[23910]:          "module" => "part.b",
9月 15 19:09:07 m7 logstash[23910]:     "log_message" => "无法连接数据库,请检查网络设置",
9月 15 19:09:07 m7 logstash[23910]:       "timestamp" => "2024-09-15 19:08:15",
9月 15 19:09:07 m7 logstash[23910]:         "message" => "[2024-09-15 19:08:15] - [ERROR] - [part.b] - 无法连接数据库,请检查网络设置",
9月 15 19:09:07 m7 logstash[23910]:           "level" => "ERROR",
9月 15 19:09:07 m7 logstash[23910]:      "@timestamp" => 2024-09-15T11:09:06.865477838Z,
9月 15 19:09:07 m7 logstash[23910]:           "event" => {
9月 15 19:09:07 m7 logstash[23910]:         "original" => "[2024-09-15 19:08:15] - [ERROR] - [part.b] - 无法连接数据库,请检查网络设置"
9月 15 19:09:07 m7 logstash[23910]:     },
9月 15 19:09:07 m7 logstash[23910]:            "host" => {
9月 15 19:09:07 m7 logstash[23910]:         "name" => "m7"
9月 15 19:09:07 m7 logstash[23910]:     }
9月 15 19:09:07 m7 logstash[23910]: }

详细解释:

  1. @version: 指示事件的版本,通常为“1”,表示使用的事件格式的版本。

  2. log: 包含日志文件的信息。

    • file: 具体的日志文件信息。
      • path: 日志文件的完整路径,即日志数据的来源。
  3. module: 动态模块名称,即日志消息中包含的模块标识。这个值是你在日志记录中自定义的,比如在你的代码中你设置为 part.b

  4. log_message: 从原始消息中提取出的主要日志内容(不包含时间戳和日志级别)。

  5. timestamp: 日志事件的时间戳,表示事件发生的实际时间。

  6. message: 日志的原始格式化消息,包含时间戳、日志级别、模块名称和日志内容。

  7. level: 日志的级别,例如“ERROR”、“INFO”等。表示事件的严重性。

  8. @timestamp: Logstash 处理事件的时间戳,通常是 Logstash 解析日志并将其写入 Elasticsearch 的时间。

  9. event: 包含原始日志消息的完整文本,通常用于保持日志的原始格式。

  10. host: 提供了有关 Logstash 运行的主机的信息。

    • name: 主机名,显示 Logstash 实例所在的机器名。

这个日志条目展示了从日志文件中提取的数据,以及 Logstash 对其进行解析和处理后的结构化数据。

要特别注意日志偏移的设置,这个相当于是logstash的断点续传

在实际生产环境中,sincedb_path 选项是用于 Logstash 跟踪文件读取进度的机制,默认情况下它不会设置为 /dev/null。下面解释一下它的常用场景和配置方式:

1. sincedb 是什么?

  • sincedb 文件:用于 Logstash 记录输入插件(如 file)读取文件的当前位置。每次 Logstash 读取文件时,它会更新 sincedb 文件,以便在 Logstash 重启或系统重启时能够从上次停止的地方继续读取,而不是从头开始。
  • 位置:默认情况下,sincedb 文件存储在用户的主目录下,例如:
    • Linux: ~/.sincedb_*
    • Windows: C:\Users\Username\.sincedb_*

每个 sincedb 文件会跟踪一个特定日志文件的 inode 信息及读取进度。

2. 实际场景下 sincedb_path 的使用

  • 正常生产环境

    • 典型配置:你通常会为 sincedb_path 指定一个具体的文件路径,确保 Logstash 在重启时能够继续处理文件。例如:

      sincedb_path => "/var/lib/logstash/sincedb"
      

      在这个例子中,sincedb 文件会存储在 /var/lib/logstash/ 目录下,确保 Logstash 有足够的权限去读取和写入该文件。

  • 文件路径管理:如果你有多个不同的日志文件输入,可以为每个文件输入指定不同的 sincedb_path,以避免冲突。例如:

    input {
      file {
        path => "/var/log/app1.log"
        sincedb_path => "/var/lib/logstash/sincedb_app1"
      }
      file {
        path => "/var/log/app2.log"
        sincedb_path => "/var/lib/logstash/sincedb_app2"
      }
    }
    

3. sincedb_path => "/dev/null" 在实际中的用途

在某些特殊场景下,你可能会临时使用 /dev/null,但不建议在生产环境中使用。

使用 /dev/null 的情况:
  • 调试/开发阶段

    • 当你在开发或调试 Logstash 配置时,你可能希望每次启动 Logstash 时都从头读取日志文件。在这种情况下,你可以临时将 sincedb_path 设置为 /dev/null,这样 Logstash 每次都会忽略之前的进度,从文件的开头开始读取。

      sincedb_path => "/dev/null"
      
  • 短期任务

    • 对于一次性读取文件的任务或临时性的日志分析,你可能不需要记录进度。在这种情况下,使用 /dev/null 也是合理的。
不建议在生产环境中使用的原因:
  • 文件读取进度丢失:如果你将 sincedb_path 设置为 /dev/null,Logstash 无法保存文件读取进度。在生产环境中,如果 Logstash 服务重启或系统出现问题,你将丢失已处理文件的位置信息,Logstash 会从头开始读取整个日志文件,这可能会导致重复处理日志。

4. 总结:sincedb_path 在生产环境的最佳实践

  • 指定合适的路径:在生产环境中,建议明确指定 sincedb_path 到一个持久存储的路径,通常位于 /var/lib/logstash 之类的目录,确保 Logstash 能记录文件读取进度。

    例如:

    sincedb_path => "/var/lib/logstash/sincedb_example"
    
  • 使用 /dev/null 慎重:仅在调试、开发或一次性任务中使用 /dev/null,避免在生产环境中使用,以防日志文件重新读取时产生问题。


整体上,我们写好了程序,当逻辑较为复杂时,或者我们将之作为服务进行长期运行时,容易"失联"。我们并不知道程序/服务出了什么问题,进行定位时需要切到非常细的操作,经常达到让人望而却步的程度。

比较可行的方法是程序将日志追加到文件,然后由其他程序(如logstash)进行读取,解析,转存到es中,供监控和后续分析。

追加到日志是代价比较低,且不会犯错的操作。通过rotate,我们也避免了磁盘满的风险。

日志分为5个级别,我们关注其中四个(忽略Debug):

在这里插入图片描述
例如,Info 可以是类心跳的信息,确保程序正常运行,无论是Idle还是处理数据。给到的FeedBack是在常态运行。另一个点就是,提前准备好可以测试其功能的样本数据,隔一段时间调一次,确保无论是空载还是满载都能得到反馈。

Warning 是一些预警,例如磁盘空间不足、内存空间不足、网络带宽不足等。这些随时可能会导致程序崩溃、挂起。

Error 是一些错误,例如数据库连接中断,部分数据逻辑处理错误。

Critical 是致命性错误,例如显卡出问题了,模型无法载入。

日志文件 /var/log/example.log

[2024-09-15 19:08:15] - [INFO] - [part.a] - 系统启动完成
[2024-09-15 19:08:15] - [WARNING] - [part.b] - 磁盘空间不足,剩余空间小于10%
[2024-09-15 19:08:15] - [ERROR] - [part.b] - 无法连接数据库,请检查网络设置
[2024-09-15 19:08:15] - [CRITICAL] - [part.b] - 系统崩溃,立即采取措施

写入vim /etc/logstash/conf.d/debug_logstash.conf, grok语句解析4个变量:

  • 1 时间戳 timestamp
  • 2 日志等级:level
  • 3 模块名称:module
  • 4 消息主体:log_message

input {
  file {
    path => "/var/log/example.log"
    start_position => "beginning"
    sincedb_path => "/dev/null"
  }
}

filter {
  grok {
    match => {
      "message" => "\[%{TIMESTAMP_ISO8601:timestamp}\] - \[%{LOGLEVEL:level}\] - \[%{DATA:module}\] - %{GREEDYDATA:log_message}"
    }
  }
}


output {
  stdout { codec => rubydebug }
}


校验语句

/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/debug_logstash.conf --config.test_and_exit

重启服务

systemctl restart logstash

观察结果

journalctl -u logstash -f

总体上感觉grok解析还是有点麻烦,尽量简单点好了;倒是JSON解析可能更适合我,但是显然效率会稍微低一点。

在这里插入图片描述

篇幅太长了,再写一篇续吧。

本篇:

  • 1 介绍了问题的由来,现状(api)。
  • 2 完成了设计思路,以及一些对应组件的validate
    • 1 logging : python的logging 和 logstash配合
    • 2 graph: 使用 networkx 来进行BFS计算,规则之间可以按照nx的方式定义依赖
    • 3 uuid: 使用 uuid 来表示会话
    • 4 kafka: 回顾kafka的搭建,使用kafka agent进行数据提交保存

下篇:

  • 1 重构新的规则对象
    • 核心功能:允许灵活的定义规则(graph)
  • 2 将本次的日志、会话(uuid及保存)实现
  • 3 梳理未来规则分类与合并的思路

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2141445.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vue2源码解读

vue源码_哔哩哔哩_bilibili 1.Vue源码路径目录解读 Vue2源码的路径目录被设计得非常清晰,每个文件夹都承担着特定的职责和功能。以下是这些主要文件夹(compiler、core、platform、server、sfc、shared)的详细解读: 1. compiler …

LLM大模型部署实战指南:Ollama简化流程,OpenLLM灵活部署,LocalAI本地优化,Dify赋能应用开发

1. Ollama 部署的本地模型(🔺) Ollama 是一个开源框架,专为在本地机器上便捷部署和运行大型语言模型(LLM)而设计。,这是 Ollama 的官网地址:https://ollama.com/ 以下是其主要特点和功能概述: …

VLMEvalKit 评测实践:InternVL2 VS Qwen2VL

一、InternVL2简介 InternVL2是由上海人工智能实验室OpenGVLab发布的一款多模态大模型,其中文名称为“书生万象”。该模型在多学科问答(MMMU)任务上表现出色,成为国内首个在该任务上性能突破60的模型,被誉为开源多模态…

k8s 微服务 ingress-nginx 金丝雀发布

目录 一 什么是微服务 二 微服务的类型 三 ipvs模式 3.1 ipvs模式配置方式 四 微服务类型详解 4.1 clusterip 4.2 ClusterIP中的特殊模式headless 4.3 nodeport 4.4 loadbalancer 4.5 metalLB 4.6 externalname 五 Ingress-nginx 5.1 ingress-nginx功能 5.2 部署…

5. Python之数据类型

Python数据类型有数值型,字符串型,布尔型等等 内置函数type(),可以查看变量的数据类型 。 一、数值类型 整数(没有小数部分,包含正整数,负整数,0,默认为十进制数)&…

C++ 类域+类的对象大小

个人主页:Jason_from_China-CSDN博客 所属栏目:C系统性学习_Jason_from_China的博客-CSDN博客 所属栏目:C知识点的补充_Jason_from_China的博客-CSDN博客 概念概述 类定义了一个新的作用域,类的所有成员都在类的作用域中&#xff…

华为杯数学建模资料大全、入门指导攻略、获奖数据分析、选题建议

这里收集的资料个人认为已经非常全也非常值了,这么多资料收集成本真的不低 数学建模比赛资料部分(需要私聊找我) 华为杯创办以来每一年的比赛题目原题(包括A到F题)华为杯每年每种题目的优秀获奖作品论文 近几年的华…

2022高教社杯全国大学生数学建模竞赛C题 问题一(2) Python代码演示

目录 1.2 结合玻璃的类型,分析文物样品表面有无风化化学成分含量的统计规律数据预处理绘图热力图相关系数图百分比条形图箱线图小提琴图直方图KED图描述性统计分析偏度系数峰度系数其它统计量1.2 结合玻璃的类型,分析文物样品表面有无风化化学成分含量的统计规律 数据预处理 …

回归预测|基于鲸鱼优化随机森林数据的数据回归预测Matlab程序 多特征输入单输出WOA-RF

回归预测|基于鲸鱼优化随机森林数据的数据回归预测Matlab程序 多特征输入单输出WOA-RF 文章目录 一、基本原理鲸鱼优化算法(WOA)随机森林(RF)WOA-RF的结合总结 二、实验结果三、核心代码四、代码获取五、总结 一、基本原理 WOA-R…

服务器断电重启后报XFS文件系统错误 XFS (dm-0)_ Metadata I_O error

一、现象 服务器被意外断电,导致重启机器后报错,系统错误 XFS (dm-0): Metadata I/O error 二、解决方法 2.1 重启服务器,进入单用户模式 服务器系统为: centos7.9 开机按e 定位到ro 然后修改ro为rw(“rw init/sysroot/bin/sh”…

2024-1.2.12-Android-Studio配置

本地博客: https://k1t0111.github.io/ K1T0 最近在做一些app方向的移动技术开发学习,但是由于AS的配置问题,市面上找不到最新的2024版本的AS的相关配置。笔者也是踩了很多坑,因此想写一篇文章记录一下最新的AS 2024 1.2.12的对应java环境的一…

JavaScript 笔记汇总

JavaScript 笔记汇总 引入方式 内部方式 通过 script 标签包裹 JavaScript 代码。 <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>JavaScript 基础 - 引入方式</title> </head> <…

java项目之基于web的人力资源管理系统的设计与实现(源码+文档)

风定落花生&#xff0c;歌声逐流水&#xff0c;大家好我是风歌&#xff0c;混迹在java圈的辛苦码农。今天要和大家聊的是一款基于springboot的基于web的人力资源管理系统的设计与实现。项目源码以及部署相关请联系风歌&#xff0c;文末附上联系信息 。 项目简介&#xff1a; …

2024/9/16 pytorch

一、pytorch两大法宝元素 假设有一个名为pytorch的包 dir()&#xff1a;用于打开包&#xff0c;看里面的内容 help():用于查看具体的内容的用处 二、python文件&#xff0c;python控制台和jupyter的使用对比 三、pytorch读取数据 pytorch读取数据主要涉及到两个类&#xff1…

linux文件系统权限详解

注:目录的执行权限代表是否可以进入。 一、文件权限控制对文件的访问: 可以针对文件所属用户、所属组和其他用户可以设置不同的权限 权限具有优先级。user权限覆盖group权限,后者覆盖other权限。 有三种权限类别:读取、写入和执行 读权限:对文件:可读取文件…

LeetCode[中等] 合并区间

以数组 intervals 表示若干个区间的集合&#xff0c;其中单个区间为 intervals[i] [starti, endi] 。请你合并所有重叠的区间&#xff0c;并返回 一个不重叠的区间数组&#xff0c;该数组需恰好覆盖输入中的所有区间 。 思路 区间排序&#xff1a; 开始位置 ——> 升序排…

Elment-plus组件失效(没有样式)(0916)

在学习Vue3时&#xff0c;使用Element-plus组件库开发登录页面&#xff0c;发现无法显示反馈组件的样式 然后查找相关博客后&#xff0c;发现原来是因为我使用按需导入&#xff0c;然后又在登录页面导入&#xff0c;导致组件样式失效 删除导入语句后&#xff0c;成功显示反馈组…

「C++」类和对象(3)

欢迎大家再次来到海盗猫鸥的博客—— 今天将继续讲解类和对象的后续内容&#xff0c;本篇将讲解类和对象中运算符重载&#xff0c;赋值运算符重载&#xff0c;以及取地址运算符的内容&#xff0c;再结合内容进行Date日期类的实现。 目录 运算符重载 运算符重载 赋值运算符重…

【CMake】使用CMake在Visual Stdudio编译资源文件和多目标编译

一、资源文件的编译 首先&#xff0c;我们的项目结构如下&#xff0c;存在图片和第三方库&#xff1a; 配置主 C M a k e l i s t s CMakelists CMakelists&#xff1a; #需求的最低cmake程序版本 cmake_minimum_required(VERSION 3.12)#本工程的名字 project(OpenGL)#支持的…

[Python数据可视化] Plotly:交互式数据可视化的强大工具

引言&#xff1a; 在数据分析和可视化的世界中&#xff0c;Plotly 是一颗耀眼的明星。它是一个开源的交互式图表库&#xff0c;支持多种编程语言&#xff0c;包括 Python、R 和 JavaScript。Plotly 的强大之处在于它能够创建出既美观又具有高度交互性的图表&#xff0c;使得数据…