Python中日志异步发送到远程服务器

news2024/11/24 17:39:16

背景

在Python中使用日志最常用的方式就是在控制台和文件中输出日志了,logging模块也很好的提供的相应 的类,使用起来也非常方便,但是有时我们可能会有一些需求,如还需要将日志发送到远端,或者直接写入数 据库,这种需求该如何实现呢?

StreamHandler和FileHandler

# -*- coding: utf-8 -*-
"""
-------------------------------------------------
 File Name:   loger
 Description :
 Author :    yangyanxing
 date:     2020/9/23
-------------------------------------------------
"""
import logging
import sys
import os
# 初始化logger
logger = logging.getLogger("yyx")
logger.setLevel(logging.DEBUG)
# 设置日志格式
fmt = logging.Formatter('[%(asctime)s] [%(levelname)s] %(message)s', '%Y-%m-%d
%H:%M:%S')
# 添加cmd handler
cmd_handler = logging.StreamHandler(sys.stdout)
cmd_handler.setLevel(logging.DEBUG)
cmd_handler.setFormatter(fmt)
# 添加文件的handler
logpath = os.path.join(os.getcwd(), 'debug.log')
file_handler = logging.FileHandler(logpath)
file_handler.setLevel(logging.DEBUG)
file_handler.setFormatter(fmt)
# 将cmd和file handler添加到logger中
logger.addHandler(cmd_handler)
logger.addHandler(file_handler)
logger.debug("今天天气不错")

先初始化一个logger, 并且设置它的日志级别是DEBUG,然后添初始化了 cmd_handler和 file_handler,最后将它们添加到logger中, 运行脚本,会在cmd中打印出

[2020-09-23 10:45:56] [DEBUG] 今天天气不错

添加HTTPHandler

# 添加一个httphandler
import logging.handlers
http_handler = logging.handlers.HTTPHandler(r"127.0.0.1:1987", '/api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("今天天气不错")

结果在服务端我们收到了很多信息

{
'name': [b 'yyx'],
'msg': [b
'\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
'args': [b '()'],
'levelname': [b 'DEBUG'],
'levelno': [b '10'],
'pathname': [b 'I:/workplace/yangyanxing/test/loger.py'],
'filename': [b 'loger.py'],
'module': [b 'loger'],
'exc_info': [b 'None'],
'exc_text': [b 'None'],
'stack_info': [b 'None'],
'lineno': [b '41'],
'funcName': [b '<module>'],
'created': [b '1600831054.8881223'],
'msecs': [b '888.1223201751709'],
'relativeCreated': [b '22.99976348876953'],
'thread': [b '14876'],
'threadName': [b 'MainThread'],
'processName': [b 'MainProcess'],
'process': [b '8648'],
'message': [b
'\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99'],
'asctime': [b '2020-09-23 11:17:34']
}

可以说是信息非常之多,但是却并不是我们想要的样子,我们只是想要类似于

[2020-09-23 10:45:56][DEBUG] 今天天气不错

logging.handlers.HTTPHandler 只是简单的将日志所有信息发送给服务端,至于服务端要怎么组织内 容是由服务端来完成. 所以我们可以有两种方法,一种是改服务端代码,根据传过来的日志信息重新组织一 下日志内容, 第二种是我们重新写一个类,让它在发送的时候将重新格式化日志内容发送到服务端。

我们采用第二种方法,因为这种方法比较灵活, 服务端只是用于记录,发送什么内容应该是由客户端来决定。

我们需要重新定义一个类,我们可以参考 logging.handlers.HTTPHandler 这个类,重新写一个httpHandler类

class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  def emit(self, record):
    '''
   重写emit方法,这里主要是为了把初始化时的baseParam添加进来
   :param record:
   :return:
   '''
    msg = self.format(record)
    if self.method == "GET":
      if (self.url.find("?") >= 0):
        sep = '&'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
msg}))
      requests.get(url, timeout=1)
    else:
      headers = {
        "Content-type": "application/x-www-form-urlencoded",
        "Content-length": str(len(msg))
     }
      requests.post(self.url, data={'log': msg}, headers=headers,
timeout=1)

这行代码表示,将会根据日志对象设置的格式返回对应的内容。

{'log': [b'[2020-09-23 11:39:45] [DEBUG]
\xe4\xbb\x8a\xe5\xa4\xa9\xe5\xa4\xa9\xe6\xb0\x94\xe4\xb8\x8d\xe9\x94\x99']}

将bytes类型转一下就得到了

[2020-09-23 11:43:50] [DEBUG] 今天天气不错

异步的发送远程日志

async def post(self):
  print(self.getParam('log'))
  await asyncio.sleep(5)
  self.write({"msg": 'ok'})

此时我们再打印上面的日志

logger.debug("今天天气不错")
logger.debug("是风和日丽的")

得到的输出为

[2020-09-23 11:47:33] [DEBUG] 今天天气不错
[2020-09-23 11:47:38] [DEBUG] 是风和日丽的

那么现在问题来了,原本只是一个记录日志,现在却成了拖累整个脚本的累赘,所以我们需要异步的来 处理远程写日志。

1

使用多线程处理

def emit(self, record):
  msg = self.format(record)
  if self.method == "GET":
    if (self.url.find("?") >= 0):
      sep = '&'
    else:
      sep = '?'
    url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
    t = threading.Thread(target=requests.get, args=(url,))
    t.start()
  else:
    headers = {
      "Content-type": "application/x-www-form-urlencoded",
      "Content-length": str(len(msg))
   }
    t = threading.Thread(target=requests.post, args=(self.url,), kwargs=
{"data":{'log': msg},

2

使用线程池处理

python 的 concurrent.futures 中有ThreadPoolExecutor, ProcessPoolExecutor类,是线程池和进程池, 就是在初始化的时候先定义几个线程,之后让这些线程来处理相应的函数,这样不用每次都需要新创建线程

exector = ThreadPoolExecutor(max_workers=1) # 初始化一个线程池,只有一个线程
exector.submit(fn, args, kwargs) # 将函数submit到线程池中

exector = ThreadPoolExecutor(max_workers=1)
def emit(self, record):
  msg = self.format(record)
  timeout = aiohttp.ClientTimeout(total=6)
  if self.method == "GET":
    if (self.url.find("?") >= 0):
      sep = '&'
    else:
      sep = '?'
    url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log": msg}))
    exector.submit(requests.get, url, timeout=6)
  else:
    headers = {
      "Content-type": "application/x-www-form-urlencoded",
      "Content-length": str(len(msg))
   }
    exector.submit(requests.post, self.url, data={'log': msg},
headers=headers, timeout=6)

使用异步aiohttp库来发送请求

class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  async def emit(self, record):
    msg = self.format(record)
    timeout = aiohttp.ClientTimeout(total=6)
    if self.method == "GET":
      if (self.url.find("?") >= 0):
        sep = '&'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
msg}))
      async with aiohttp.ClientSession(timeout=timeout) as session:
      async with session.get(self.url) as resp:
          print(await resp.text())
      else:
        headers = {
        "Content-type": "application/x-www-form-urlencoded",
        "Content-length": str(len(msg))
     }
      async with aiohttp.ClientSession(timeout=timeout, headers=headers)
as session:
      async with session.post(self.url, data={'log': msg}) as resp:
          print(await resp.text())

这时代码执行崩溃了

C:\Python37\lib\logging\__init__.py:894: RuntimeWarning: coroutine
'CustomHandler.emit' was never awaited
self.emit(record)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback

究其原因是由于emit方法中使用 async with session.post 函数,它需要在一个使用async 修饰的函数 里执行,所以修改emit函数,使用async来修饰,这里emit函数变成了异步的函数, 返回的是一个 coroutine 对象,要想执行coroutine对象,需要使用await, 但是脚本里却没有在哪里调用 await emit() ,所以崩溃信息 中显示 coroutine 'CustomHandler.emit' was never awaited。

async def main():
  await logger.debug("今天天气不错")
  await logger.debug("是风和日丽的")
loop = asyncio.get_event_loop()
loop.run_until_complete(main())

执行依然报错

raise TypeError('An asyncio.Future, a coroutine or an awaitable is '

这似乎就没有办法了,想要使用异步库来发送,但是却没有可以调用await的地方。

import asyncio
async def test(n):
 while n > 0:
   await asyncio.sleep(1)
   print("test {}".format(n))
   n -= 1
 return n

async def test2(n):
 while n >0:
   await asyncio.sleep(1)
   print("test2 {}".format(n))
   n -= 1
def stoploop(task):
 print("执行结束, task n is {}".format(task.result()))
 loop.stop()
loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task2 = loop.create_task(test2(3))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
loop.run_forever()

注意看上面的代码,我们并没有在某处使用await来执行协程,而是通过将协程注册到某个事件循环对象上, 然后调用该循环的 run_forever() 函数,从而使该循环上的协程对象得以正常的执行。

test 5
test2 3
test 4
test2 2
test 3
test2 1
test 2
test 1
执行结束, task n is 0

可以看到,使用事件循环对象创建的task,在该循环执行run_forever() 以后就可以执行了如果不执行 loop.run_forever() 函数,则注册在它上面的协程也不会执行

loop = asyncio.get_event_loop()
task = loop.create_task(test(5))
task.add_done_callback(stoploop)
task2 = loop.create_task(test2(3))
time.sleep(5)
# loop.run_forever()

loop = asyncio.get_event_loop()
class CustomHandler(logging.Handler):
  def __init__(self, host, uri, method="POST"):
    logging.Handler.__init__(self)
    self.url = "%s/%s" % (host, uri)
    method = method.upper()
    if method not in ["GET", "POST"]:
      raise ValueError("method must be GET or POST")
    self.method = method
  # 使用aiohttp封装发送数据函数
  async def submit(self, data):
    timeout = aiohttp.ClientTimeout(total=6)
    if self.method == "GET":
      if self.url.find("?") >= 0:
        sep = '&'
      else:
        sep = '?'
      url = self.url + "%c%s" % (sep, urllib.parse.urlencode({"log":
data}))
      async with aiohttp.ClientSession(timeout=timeout) as session:
        async with session.get(url) as resp:
          print(await resp.text())
    else:
      headers = {
        "Content-type": "application/x-www-form-urlencoded",
     }
      async with aiohttp.ClientSession(timeout=timeout, headers=headers)
as session:
        async with session.post(self.url, data={'log': data}) as resp:
          print(await resp.text())
    return True
  def emit(self, record):
    msg = self.format(record)
    loop.create_task(self.submit(msg))
# 添加一个httphandler
http_handler = CustomHandler(r"http://127.0.0.1:1987", 'api/log/get')
http_handler.setLevel(logging.DEBUG)
http_handler.setFormatter(fmt)
logger.addHandler(http_handler)
logger.debug("今天天气不错")
logger.debug("是风和日丽的")
loop.run_forever()

但这种方式有一点要注意,loop.run_forever() 将会一直阻塞,所以需要有个地方调用 loop.stop() 方法. 可以注册到某个task的回调中。

最后感谢每一个认真阅读我文章的人,礼尚往来总是要有的,虽然不是什么很值钱的东西,如果你用得到的话可以直接拿走:

这些资料,对于【软件测试】的朋友来说应该是最全面最完整的备战仓库,这个仓库也陪伴上万个测试工程师们走过最艰难的路程,希望也能帮助到你!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1178345.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

java泛型的深入 泛型还可以在很多地方进行定义 泛型类 泛型方法 泛型接口 泛型的继承和通配符 泛型类练习

文章目录 泛型的深入泛型还可以在很多地方进行定义泛型类泛型方法泛型接口 泛型的继承和通配符泛型类练习总结 泛型的深入 public static void main(String[] args) {//在没有泛型的时候怎么存储数据ArrayList listnew ArrayList();list.add(1);list.add("abc");//遍…

C语言 用字符串比较函数cmp来做一个门禁:账号密码是否匹配 (干货满满)

#include<stdio.h> #include<string.h> void fun04() {for (int i 0; i < 3; i){char *str01 "hello";char uname[100] ;printf("请输入账号");scanf("%s",uname);char *str02 "123456";char pword[100];printf(&qu…

数字化转型:云表低代码开发助力制造业腾飞

数字化转型已成为制造业不可避免的趋势。为了应对市场快速变化、提高运营效率以及降低成本&#xff0c;制造业企业积极追求更加智能化、敏捷的生产方式。在这个转型过程中&#xff0c;低代码技术作为一种强大的工具&#xff0c;正逐渐崭露头角&#xff0c;有望加速制造业的数字…

QGC 中添加海康威视摄像头记录(Qt For Android 使用 JNI 进行JAVA 与 C++ 的通讯)

文章目录 1. 配置海康威视 SDK 下载库文件移植工程文件添加动态库&#xff08;.so&#xff09;Android xml 配置添加 java 文件 2. JavaQGCActivity.javaHkwsManager.java 3. C头文件添加&#xff1a;C 中调用 Java 静态函数&#xff08;hcnNetSDKInit&#xff09;JNI 传入规则…

【电路笔记】-串联RLC电路分析

串联RLC电路分析 文章目录 串联RLC电路分析1、概述2、瞬态响应3、AC响应4、RCL和CLR配置5、结论 电阻器 、电感器 (L) 和电容器 © 是电子器件中的三个基本无源元件。 它们的属性和行为已在交流电阻、交流电感和交流电容文章中详细介绍。 在本文中&#xff0c;我们将重点讨…

二蛋赠书七期:《云原生数据中台:架构、方法论与实践》

前言 大家好&#xff01;我是二蛋&#xff0c;一个热爱技术、乐于分享的工程师。在过去的几年里&#xff0c;我一直通过各种渠道与大家分享技术知识和经验。我深知&#xff0c;每一位技术人员都对自己的技能提升和职业发展有着热切的期待。因此&#xff0c;我非常感激大家一直…

基于北方苍鹰算法的无人机航迹规划-附代码

基于北方苍鹰算法的无人机航迹规划 文章目录 基于北方苍鹰算法的无人机航迹规划1.北方苍鹰搜索算法2.无人机飞行环境建模3.无人机航迹规划建模4.实验结果4.1地图创建4.2 航迹规划 5.参考文献6.Matlab代码 摘要&#xff1a;本文主要介绍利用北方苍鹰算法来优化无人机航迹规划。 …

Qt QTableView排序

1.简介 在开发过程中&#xff0c;我们需要通过点击表头来对QTableView或QTreeView等一系列高级视图进行排序操作&#xff0c;以下是进行排序的步骤。 步骤&#xff1a; 首先创建了一个QStandardItemModel对象或者继承QAbstractTableModel类作为数据模型&#xff0c;并设置了…

如何获取HuggingFace的Access Token;如何获取HuggingFace的API Key

Access Token通过编程方式向 HuggingFace 验证您的身份&#xff0c;允许应用程序执行由授予的权限范围&#xff08;读取、写入或管理&#xff09;指定的特定操作。您可以通过以下步骤获取&#xff1a; 1.首先&#xff0c;你需要注册一个 Hugging Face 账号。如果你已经有了账号…

Android修行手册 - 实现POI上万行的大数据量Excel读写操作,解决内存溢出

点击跳转>Unity3D特效百例点击跳转>案例项目实战源码点击跳转>游戏脚本-辅助自动化点击跳转>Android控件全解手册点击跳转>Scratch编程案例点击跳转>软考全系列 &#x1f449;关于作者 专注于Android/Unity和各种游戏开发技巧&#xff0c;以及各种资源分享&…

web前端JS基础------制作一个获取验证码

1&#xff0c;需要一个定时器&#xff0c;和一个button&#xff0c;通过点击事件启动获取验证码 2&#xff0c;参考代码如下 <!DOCTYPE html> <html><head><meta charset"utf-8"><title></title></head><body><…

Linux中for循环

for do done 复习知识点&#xff1a;cut命令&#xff0c;id命令&#xff0c;finger命令&#xff0c;for循环 程序如上&#xff0c;-d 接分隔符&#xff0c;-f后的数字表示分隔后的列 从结果可以看出&#xff0c;系统上没有finger这个命令&#xff0c;后面会学到yum安装命令&a…

上手SQL语句调优必须了解的内容——Explain

在做性能测试时&#xff0c;资深的性能测试工程师&#xff0c;都会帮助研发同学优化sql语句&#xff0c;听起来很高深&#xff0c;但是具体操作是比较容易的&#xff0c;使用expain命令就可以了&#xff01;本文我会用最简单有效的方式带大家掌握expain的使用方法&#xff01; …

JVM虚拟机:垃圾回收器之Serial(年轻代)

本文重点 本文将介绍年轻代的Serial回收器,它最主要的特征就是串行化的回收器。 运行方式 Serial是一个单线程的收集器,在进行垃圾收集的时候,必须暂停其它所有的工作线程(java程序找一个安全点safe point然后才停止执行,进行等待)直到垃圾回收结束,下的运行状态图如…

迅为iTOP-i.MX8M开发板使用 make 工具

make 工具是编译辅助工具&#xff0c;用来解决使用命令编译工程非常繁琐的问题。 调用这个命令工具&#xff1a;我们在 windows 上编程使用 ide &#xff0c;我们有图形界面&#xff0c;有相应的按钮&#xff0c;比如说 build 或者 run 来编译。其实 make 这个编译辅助工具使…

Dajngo学习笔记(3)

电话号码管理 查看功能 class PrettyNum(models.Model):mobilemodels.CharField(verbose_name"电话号",max_length11)pricemodels.IntegerField(verbose_name"价格")level_choice((1,"一级"),(2,"二级"),(3,"三级"))level…

RISC-V处理器设计(四)—— Verilog 代码设计

一、前言 从6月底刚开始接触 risc-v 架构&#xff0c;到现在完成了一个 risc-v cpu 的设计&#xff0c;并且成功移植了 rt-thread nano 到本 cpu 上运行&#xff0c;中间经过了 4个多月的时间&#xff0c;遇到了数不清的问题&#xff0c;也想过放弃&#xff0c;但好在最后还是…

美国阿贡国家实验室发布快速自动扫描套件 FAST,助力显微技术「快速阅读」成为可能

「我高兴地在北京市的天安门广场上看红色的国旗升起」 快速阅读一下这个句子&#xff0c;大家可能会发现&#xff0c;只需「我在天安门广场看升旗」几个字&#xff0c;就能概述我们需要的信息&#xff0c;也就是说&#xff0c;无需逐字逐句地阅读&#xff0c;抓住重点即可破译…

【flutter no devices】

1.在环境变量增加 ANDROID_HOME 值为&#xff1a;C:\Users\Administrator\AppData\Local\Android\Sdk &#xff08;Android sdk 位置) 2 环境变量的path里面增加2个值&#xff1a; %ANDROID_HOME%\platform-tools %ANDROID_HOME%\tools 3 打开cmd&#xff0c;或者在Android st…

嵌入式Linux和stm32区别? 之间有什么关系吗?

嵌入式Linux和stm32区别? 之间有什么关系吗&#xff1f; 主要体现在以下几个方面&#xff1a; 1.硬件资源不同 单片机一般是芯片内部集成flash、ram&#xff0c;ARM一般是CPU&#xff0c;配合外部的flash、ram、sd卡存储器使用。最近很多小伙伴找我&#xff0c;说想要一些嵌…