python 使用 watchdog 实现类似 Linux 中 tail -f 的功能

news2025/1/9 16:21:03

一、代码实现

import logging
import os
import threading
import time

from watchdog.events import FileSystemEventHandler
from watchdog.observers import Observer

logger = logging.getLogger(__name__)


class LogWatcher(FileSystemEventHandler):
    def __init__(self, log_file, on_modified_callback=None):
        """"
        初始化 LogWatcher 类的实例。

        参数:
        - log_file:日志文件的路径
        - on_modified_callback:可选的回调函数,在文件修改时调用

        属性:
        - log_file:日志文件的路径
        - file_object:日志文件对象
        - on_modified_callback:文件修改回调函数
        - last_line:最后一行文本
        - observer:观察者对象
        - match_string:需要匹配的字符串
        - stop_watching:停止监视的标志
        """
        self.log_file = log_file
        self.file_object = open(log_file, 'rb')
        self.on_modified_callback = on_modified_callback
        self.last_line = self.get_last_line()  # 初始化时获取最后一行文本
        self.observer = Observer()
        self.observer.schedule(self, ".", recursive=False)

        self.match_string = None
        self.stop_watching = False

    def start(self):
        """
        启动观察者对象,开始监视文件变化。
        """
        self.observer.start()

    def stop(self):
        """
        停止观察者对象,结束监视文件变化。
        """
        self.observer.stop()
        self.observer.join()
        self.file_object.close()

    def get_last_line(self):
        """
        获取日志文件的最后一行文本。它通过将文件指针移动到文件末尾,然后逐个字符向前搜索,直到找到换行符为止。

        返回值:
        - 最后一行文本,如果文件为空则返回None
        """

        # 将文件指针移动到文件末尾
        self.file_object.seek(0, os.SEEK_END)

        # 获取当前文件指针的位置(此时指针在最后一行的末尾)
        position = self.file_object.tell()

        try:
            # 尝试向前移动两个字节
            new_position = max(position - 2, 0)
            self.file_object.seek(new_position, os.SEEK_SET)
        except OSError as e:
            # 如果发生错误,可能是文件太小,返回None
            return None

        # 逐个字符向前搜索,确保文件指针最终停在当前行的第一个字符处
        while True:
            # read(1)读取的是指针位置的下一个字符,每次调用read(1)都会读取一个字符,并将指针向后移动一个字符的位置。
            char = self.file_object.read(1).decode('utf-8', errors='ignore')
            if char == '\n':
                break
            if new_position == 0:
                # 如果已经到达文件开头,跳出循环
                break

            # 尝试向前移动一个字节位置,确保不越界到文件开头
            new_position = max(new_position - 1, 0)
            # 将文件指针移动到新的位置
            self.file_object.seek(new_position, os.SEEK_SET)

        # last_line = self.file_object.readline().decode('utf-8', errors='ignore').strip()
        last_line = self.file_object.read(position - new_position).decode('utf-8', errors='ignore').strip()

        # 输出调试信息
        logger.debug(f'Reading line: {last_line}')
        return last_line

    def on_modified(self, event):
        """
        on_modified方法是FileSystemEventHandler的回调方法,当日志文件发生变化时,都会调用这个方法。
        参数:
        - event:文件变化事件对象
        """
        # 注意,这里一个要用绝对路径比较,不能直接使用 event.src_path == self.log_file,
        # event.src_path == self.log_file 的值为false
        # if event.src_path == self.log_file:
        if os.path.abspath(event.src_path) == os.path.abspath(self.log_file):
            # 在文件发生变化时,实时获取最后一行文本
            self.last_line = self.get_last_line()

        # 用户可在外部传入一个回调方法,在文本发生变化时执行该事件
        if self.on_modified_callback:
            self.on_modified_callback()

        # 调用基类的同名方法,以便执行基类的默认行为
        super(LogWatcher, self).on_modified(event)

    def tail_last_line_and_match(self, match_string=None, max_match_seconds=10):
        """
        实时监控日志文件的变化,并实时获取最后一行文本。如果匹配到指定的字符串,停止监视。

        参数:
        - match_string:需要匹配的字符串
        """
        self.match_string = match_string
        self.start()

        end_time = time.time() + max_match_seconds

        try:
            while not self.stop_watching and time.time() <= end_time:

                if self.match_string and self.match_string in self.last_line:
                    self.stop_watching = True

        except KeyboardInterrupt:
            pass

        self.stop_watching = True  # 停止监视循环


def write_logs(log_file):
    """在新线程中写入日志"""
    for i in range(10):
        with open(log_file, 'a') as file:
            file.write(f'New log entry {i}\n')
        time.sleep(1)  # 每秒写入一次日志


if __name__ == '__main__':
    import logging

    logging.basicConfig(level=logging.DEBUG)

    log_file = 'demo.log'

    # 创建日志文件并写入示例日志
    with open(log_file, 'w') as file:
        file.write('This is the first line of the log.\n')
        file.write('This is the second line of the log.\n')

    log_watcher = LogWatcher(log_file)

    # 启动新线程写入日志
    write_thread = threading.Thread(target=write_logs, args=(log_file,))
    write_thread.start()

    # 启动实时监控日志文件变化,并打印最后一行文本,直到匹配到指定字符串或超时才停止监视
    log_watcher.tail_last_line_and_match(match_string='New log entry 9', max_match_seconds=20)

    # 等待写入线程结束
    write_thread.join()

三、Demo验证

运行代码,控制台的输出结果:

DEBUG:__main__:Reading line: This is the second line of the log.
DEBUG:__main__:Reading line: New log entry 0
DEBUG:__main__:Reading line: New log entry 1
DEBUG:__main__:Reading line: New log entry 2
DEBUG:__main__:Reading line: New log entry 3
DEBUG:__main__:Reading line: New log entry 4
DEBUG:__main__:Reading line: New log entry 5
DEBUG:__main__:Reading line: New log entry 6
DEBUG:__main__:Reading line: New log entry 7
DEBUG:__main__:Reading line: New log entry 8
DEBUG:__main__:Reading line: New log entry 9

Process finished with exit code 0

欢迎技术交流:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1295423.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

105.长度最小的子数组(力扣)|滑动窗口

代码演示 class Solution { public:int minSubArrayLen(int target, vector<int>& nums) {int result INT_MAX; // 用于存储最小子数组的长度int sum 0; // 滑动窗口的长度int i 0; // 滑动窗口的起始位置int sumlength 0; // 当前子数…

Python语言基础知识(二)

文章目录 1、条件表达式2、分支结构—常见的分支结构2.1、分支结构—单分支选择结构2.2、分支结构—双分支选择结构2.3、分支结构—多分支选择结构2.4、分支结构—选择结构的嵌套 3、循环结构3.1、循环结构— for循环与while循环 1、条件表达式 在选择和循环结构中&#xff0c…

git更换远程地址

1、命令修改 git remote -v git remote set-url origin http://xxx.git git remote -v 2、工具修改

常见的性能测试缺陷有哪些?你都遇到过吗

前言 性能测试&#xff0c;是结合被测系统应用架构、业务场景和实现细节、逻辑&#xff0c;对软件响应时间、处理速率、容错能力等进行分析测试&#xff0c;找到系统的性能瓶颈&#xff0c;并确认问题得到解决的过程。 由于工作需要&#xff0c;对性能测试缺陷分类进行了整理…

PyQt6 QCalendarWidget日历控件

​锋哥原创的PyQt6视频教程&#xff1a; 2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~共计39条视频&#xff0c;包括&#xff1a;2024版 PyQt6 Python桌面开发 视频教程(无废话…

python基于轻量级GhostNet模型开发构建23种常见中草药图像识别系统

轻量级识别模型在我们前面的博文中已经有过很多实践了&#xff0c;感兴趣的话可以自行移步阅读&#xff1a; 《移动端轻量级模型开发谁更胜一筹&#xff0c;efficientnet、mobilenetv2、mobilenetv3、ghostnet、mnasnet、shufflenetv2驾驶危险行为识别模型对比开发测试》 《基…

一篇文章带你快速入门 Nuxt.js 服务端渲染

1. Nuxt.js 概述 1.1 我们一起做过的SPA SPA&#xff08;single page web application&#xff09;单页 Web 应用&#xff0c;Web 不再是一张张页面&#xff0c;而是一个整体的应用&#xff0c;一个由路由系统、数据系统、页面&#xff08;组件&#xff09;系统等等&#xff0…

【AI】以大厂PaaS为例,看人工智能技术方案服务能力的方向(2/2)

目录 三、解决方案 3.1 人脸身份验证 3.2 图像审核&#xff08;暴恐、色情等&#xff09; 3.3 人脸会场签到 3.4 机器人视觉 3.5 视频审核 3.6 电商图文详情生成 3.7 智能客服 接上回&#xff1a; 【AI】以大厂PaaS为例&#xff0c;看人工智能技术方案服务能力的方向&…

【mysql】事物与隔离级别

事物 事务(Transaction)是并发控制的基本单位。所谓的事务呢&#xff0c;它是一个操作序列&#xff0c;这些操作要么都执行&#xff0c;要么都不执行&#xff0c;它是一个不可分割的工作单位。 事务具有四大特性&#xff0c;通常称为ACID特性&#xff1a; 原子性&#xff08…

数据可视化免费化:趋势背后的动因

在数字化浪潮的推动下&#xff0c;数据可视化已成为解读和利用数据的关键工具。作为一个需要经常接触各种数据可视化软件的设计师&#xff0c;我发现数据可视化工具的免费化进程正在加速。为何越来越多的数据可视化工具选择走向免费之路&#xff1f;让我们一起探讨其中的原因。…

经典神经网络——ResNet模型论文详解及代码复现

论文地址&#xff1a;Deep Residual Learning for Image Recognition (thecvf.com) PyTorch官方代码实现&#xff1a;vision/torchvision/models/resnet.py at main pytorch/vision (github.com) B站讲解&#xff1a; 【精读AI论文】ResNet深度残差网络_哔哩哔哩_bilibili …

有关实时3D渲染:定义、工作原理和应用方向

随着新兴技术——3D渲染的发展&#xff0c;交互应用的质量有了极大的提高。用实时三维渲染软件创建的沉浸式数字体验&#xff0c;几乎与现实没有区别了。随着技术的逐步改进&#xff0c;在价格较低的个人工作站上渲染3D图像变得更加容易&#xff0c;设计师的投入也逐渐变少。 什…

一文搞懂什么是Hadoop

Hadoop概念 什么是Hadoop Hadoop是一个由Apache基金会所开发的用于解决海量数据的存储及分析计算问题的分布式系统基础架构。 广义上来说&#xff0c;Hadoop通常指一个跟广泛的概念——Hadoop生态圈。 以下是hadoop生态圈中的技术&#xff1a; Hadoop优势 hadoop组成 HDFS…

单例模式---饿汉式、懒汉式

一、什么是单例模式 单例模式&#xff0c;指的是一个类中的对象只能有一个&#xff0c;它在内存中只会创建一次对象的设计模式。 二、饿汉式 public class SingleTon {// 私有的构造方法private SingleTon() {};// 1. 饿汉式private static SingleTon instance new SingleTon…

【Java 基础】26 枚举

文章目录 1. 什么是枚举2. 定义3. 使用1&#xff09;常量2&#xff09;遍历3&#xff09;switch 4. 属性和方法1&#xff09;属性2&#xff09;方法 5. 实现原理6. 使用场景总结 1. 什么是枚举 枚举是列出某些有穷序列集的所有成员的程序&#xff0c;或者是一种特定类型对象的计…

第17章:随堂复习与企业真题(反射机制)

第17章&#xff1a;随堂复习与企业真题&#xff08;反射机制&#xff09; 一、随堂复习 1. 反射的概述&#xff08;熟悉&#xff09; Java给我们提供了一套API&#xff0c;使用这套API我们可以在运行时动态的获取指定对象所属的类&#xff0c;创建运行时类的对象&#xff0c;…

12.08

1.头文件 #ifndef WIDGET_H #define WIDGET_H#include <QWidget> #include <QDebug>QT_BEGIN_NAMESPACE namespace Ui { class Widget; } QT_END_NAMESPACEclass Widget : public QWidget {Q_OBJECTpublic:Widget(QWidget *parent nullptr);~Widget(); signals:v…

One-to-Few Label Assignment for End-to-End Dense Detection阅读笔记

One-to-Few Label Assignment for End-to-End Dense Detection阅读笔记 Abstract 一对一&#xff08;o2o&#xff09;标签分配对基于变换器的端到端检测起着关键作用&#xff0c;最近已经被引入到全卷积检测器中&#xff0c;用于端到端密集检测。然而&#xff0c;o2o可能因为…

Aligning Large Multi-Modal Model with Robust Instruction Tuning

Abstract 尽管多模态任务取得了有希望的进展&#xff0c;但当前的大型多模态模型&#xff08;LMM&#xff09;很容易产生与相关图像和人类指令 不一致的描述的幻觉。 LRV-指令。我们通过引入第一个大型且多样化的视觉指令调整数据集来解决这个问题&#xff0c;该数据集名为大…

DOS 批处理 (一)

DOS 批处理 1. 批处理是什么&#xff1f;2. DOS和MS-DOS3. 各种操作系统shell的区别Shell 介绍图形用户界面&#xff08;GUI&#xff09;shell命令行界面&#xff08;CLI&#xff09;的 shell命令区别 1. 批处理是什么&#xff1f; 批处理(Batch)&#xff0c;也称为批处理脚本…