supervisor-eventlistener

news2024/9/28 0:17:28

了解supervisor-eventlistener

本文主要介绍 supervisor Event 的功能。

supervisor 作为一个进程管理工具,在 3.0 版本之后,新增了 Event 的高级特性, 主要用于做(进程启动、退出、失败等)事件告警服务。
Event 特性是将监听的服务(listener)注册到supervisord中,当supervisord监听到相应事件时,将事件信息推送给监听对应事件的listener。

事件类型

Event 可以设置 27 种事件类型,可以分为如下几类:

1. 监控进程状态转移事件;
2. 监控进程状态日志变更事件;
3. 进程组中进程添加删除事件;
4. supervisord 进程本身日志变更事件;
5. supervisord 进程本身状态变更的事件;
6. 定时触发事件。

事件可以被单独监听,也可以一个listener 监听多种事件。

配置说明

对于一个listener,与正常program的区别是,新增了events 参数,用于标识要监听的事件。

[eventlistener:theeventlistenername]
events=PROCESS_STATE,TICK_60 
buffer_size=10 ; 事件池子大小(输入流大小)

事件类型配置多个,用逗号分割。上述配置的是子进程状态的变更,以及定时60s通知间隔60s
事件通知缓冲区大小,可以自定义配置,上述配置了10个事件消息的缓冲。

Listener 的实现

与supervisord 的交互
由于supervisord 是 listener的父进程,所以交互方式采用最简单的 标准输入输出的方式交互。listener 通过标准输入获取事件,通过标准输出通知supervisord listener的事件处理结果,以及当前supervisord的状态

listener 的状态

listener 有三种状态:ACKNOWLEDGED、READY、BUSY.

  • ACKNOWLEDGED: listener 未就绪的状态。(发送READY之前的状态)
  • READY: 等待事件触发的状态。(发送READY 消息后,未收到消息的状态)
  • BUSY: 事件处理中的状态。(即输出 OK, FAIL 之前处理Event消息时的状态)
    supervisor_listener_status.jpg

消息协议

消息包括supervisord 通知给listener 的事件消息和 listener 通知给supervisord 的状态变更消息。

listener 的状态变更消息, READY

状态OK的 “READYn” 消息
处理成功 “RESULT 2nOK” 消息
处理失败 “RESULT 4nFAIL” 消息
supervisord 广播的事件消息, 事件消息分为 header 和 payload 两部分。 header 中采用kv的方式发送,header 中包含了 payload 的长度。

例如官网提供的header 的例子:

ver:3.0 server:supervisor serial:21 pool:listener poolserial:10 eventname:PROCESS_COMMUNICATION_STDOUT len:54

header 含义:

  • serial 为事件的序列号
  • pool 表示listener 的进程池名称(listener支持启动多个)
  • poolserial 表示listener的进程池序列号
  • eventname 事件名称
  • len body 的长度
  • Listener 的基本流程
  • listener 的处理流程如下:
  1. 发送ready消息,等待事件发生。
  2. 收到事件后,处理事件
  3. 事件处理完成后,发送 result 消息, 从第一步开始循环
    进程状态转移举例
    我们以进程状态转移作为例子,做简单介绍。

首先,使用 golang 实现listener
版本1

package main

import (
    "bufio"
    "os"
    "strconv"
    "strings"
)

const RESP_OK = "RESULT 2\nOK"
const RESP_FAIL = "RESULT 4\nFAIL"

func main() {
    stdin := bufio.NewReader(os.Stdin)
    stdout := bufio.NewWriter(os.Stdout)
    stderr := bufio.NewWriter(os.Stderr)

    for {    
        // 发送后等待接收event
        _, _ = stdout.WriteString("READY\n")
        _ = stdout.Flush()
        // 接收header
        line, _, _ := stdin.ReadLine()          
        stderr.WriteString("read" + string(line))
        stderr.Flush()

        header, payloadSize := praseHeader(line)

        // 接收payload
        payload := make([]byte, payloadSize)
        stdin.Read(payload)   
        stderr.WriteString("read : " + string(payload))
        stderr.Flush()

        result := alarm(header, payload)

        if result {   // 发送处理结果
            stdout.WriteString(RESP_OK)
        } else {
            stdout.WriteString(RESP_FAIL)
        }
        stdout.Flush()
    }
}

func praseHeader(data []byte) (header map[string]string, 
        payloadSize int) {
    pairs := strings.Split(string(data), " ")
    header = make(map[string]string, len(pairs))

    for _, pair := range pairs {
        token := strings.Split(pair, ":")
        header[token[0]] = token[1]
    }

    payloadSize, _ = strconv.Atoi(header["len"])
    return header, payloadSize
}

// 这里设置报警即可
func alarm(header map[string]string, payload []byte) bool {
    // send mail
    return true
}

版本2

package main

import (
	"bufio"
	"fmt"
	"github.com/urfave/cli/v2"
	"os"
	"runtime/debug"
	"strings"
	"time"
)

const (
	RespOk = "RESULT 2\nOK"

	logPath = "./supervisor-event.log"
)

const RESP_FAIL = "RESULT 4FAIL"

func main() {
	//b := []byte("processname:config_back_8001 groupname:config_back_8001 from_state:STOPPED tries:0ver:3.0 server:supervisor serial:1897 pool:mylistenter poolserial:15 eventname:PROCESS_STATE_RUNNING len:85----------")
	//parseInfo(b)
	//os.Exit(3)
	// 默认并发数
	app := &cli.App{
		Name:  "services-notice",
		Usage: "services-notice on supervisor",
		Flags: []cli.Flag{
			&cli.StringFlag{
				Name:     "log_path",
				Aliases:  []string{"l"},
				Usage:    "`log_path` to write log default " + logPath,
				Required: false,
			},
		},
		Action: func(c *cli.Context) error {
			path := logPath
			return RevMsg(path)
		},
	}
	_ = app.Run(os.Args)
}

// RevMsg 处理消息
func RevMsg(logPath string) (err error) {
	var (
		f  *os.File
		ti = time.Now().Format("2006-01-02 15:04:05 --->")
	)

	stdin := bufio.NewReader(os.Stdin)
	stdout := bufio.NewWriter(os.Stdout)

	if f, err = os.Create(logPath); nil != err {
		return
	}

	defer func() {
		if er := recover(); er != nil {
			_, _ = f.Write(debug.Stack())
		}
	}()

	for {
		var line []byte

		// 发送后等待接收event
		_, err = stdout.WriteString("READY\n")
		if err != nil {
			_, _ = f.Write([]byte(ti + "stdout.WriteString(\"READY\\n\")" + err.Error()))
			goto END
		}
		_ = stdout.Flush()

		//接受数据
		if line, _, err = stdin.ReadLine(); err != nil {
			_, _ = f.Write([]byte(ti + "stdin.ReadLine()\n" + err.Error()))
			goto END
		}

		if strings.Contains(string(line), "TICK") {
			//write log
			_, err = f.Write([]byte(ti))
			if len(line) > 15 {
				line = line[len(line)-13:]
			}
			if _, err = f.Write(append(line, '\n')); err != nil {
				_, _ = f.Write([]byte(ti + "f.Write(line)\n" + err.Error()))
				goto END

			}
			goto END

		}
		//write log
		_, err = f.Write([]byte(time.Now().Format("2006-01-02 15:04:05 --->")))
		if _, err = f.Write(append(line, '\n')); err != nil {
			_, _ = f.Write([]byte(ti + "f.Write(line)\n" + err.Error() + "\n"))
			goto END

		}

		//tidy info
		if err = parseInfo(line); nil != err {
			_, _ = f.Write([]byte(ti + "parseInfo(line) " + err.Error() + "\n"))
			goto END
		}

	END:
		if _, err = stdout.WriteString(RespOk); nil != err {
			_, _ = f.Write([]byte(ti + "stdout.WriteString(RESP_OK)" + err.Error()))
			_, _ = stdout.WriteString(RespOk)

		}
		_ = stdout.Flush()

	}
}

// 解析消息
func parseInfo(data []byte) (err error) {
	var (
		d       []string
		mapInfo = map[string]string{}
	)
	d = strings.Split(string(data), " ")
	for _, v := range d {
		dp := strings.Split(v, ":")
		if len(dp) < 2 {
			continue
		}
		mapInfo[dp[0]] = dp[1]
	}
	if _, ok := mapInfo["eventname"]; !ok {
		return
	}
	return alarm(mapInfo)
}

// 报警
func alarm(mapInfo map[string]string) (err error) {
	var (
		token string
		b     = fmt.Sprintf(`
	//;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
	//;事件-%s
	//;服务-%s
	//;事件-%s
	//;状态-%s
	//;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;;
	//`, time.Now().Format("2006-0-02 15:04:05"), mapInfo["processname"], mapInfo["eventname"], mapInfo["from_state"])
	)
	//获取token
	if token, err = getAccessToken(); nil != err {
		return
	}

	//发送消息
	return pushMsg(token, b)
}

// 获取token
func getAccessToken() (accessToken string, err error) {
	return
}

// push msg
func pushMsg(token, msg string) (err error) {

	return
}

这里,报警处理未填写。

其次,在supervisor 中添加配置,监听服务:

[program:sleep]
process_name=%(program_name)s
command=/usr/bin/sleep 100
events=EVENT
autostart=true
autorestart=true

[program:sleep2]
process_name=sleep2-2
command=/usr/bin/sleep 1000
events=EVENT
autostart=true
autorestart=true

配置测试服务 用于测试

[eventlistener:listener]
command=/root/listener ;要执行的脚本
process_name=test
events=PROCESS_STATE,TICK_5 ;坚挺的事件类型
stdout_logfile=/var/log/tmp/listener_test_stdout.log 
stderr_logfile=/var/log/tmp/listener_test_stderr.log
user=root

这里监听了服务的处理状态,以及每5s的心跳消息。
[eventlistener:listener]的listenter 这个名字随便写,是监听子进程的名字
process_name=test 可写 可不写

最后,启动listener。

supervisorct start listener

从stderr的日志中可以看到,简单的TICK_5 的消息(调整了格式):

header : ver:3.0 server:supervisor serial:256 pool:listener_test poolserial:173 eventname:TICK_5 len:15read 
payload: when:1586258030
fastcgi 进程状态变更的消息:

header : ver:3.0 server:supervisor serial:291 pool:listener_test poolserial:208 eventname:PROCESS_STATE_EXITED len:87
payload: processname:fastcgi_test groupname:fastcgi_test from_state:RUNNING expected:0 pid:19119

header :ver:3.0 server:supervisor serial:293 pool:listener_test poolserial:210 eventname:PROCESS_STATE_STARTING len:73
payload: processname:fastcgi_test groupname:fastcgi_test from_state:EXITED tries:0

参考文章

https://segmentfault.com/a/1190000022298049/

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/688009.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Axure教程—中继器筛查与排序

当工作中需要进行数据筛查排序操作时&#xff0c;我们可以如何使用Axure来进行相关操作呢&#xff1f;本篇文章里&#xff0c;作者利用中继器为我们展示了数据筛查与排序&#xff0c;让我们一起来看一下。 预览效果 预览地址&#xff1a;https://6q6ajh.axshare.com 功能介绍…

【AI模型部署】基于gradio和python的网页交互界面(web-ui)——简易使用方法

使用gradio&#xff0c;只需在原有的代码中增加几行,快速部署机器学习模型&#xff0c;就能自动化生成交互式web页面&#xff0c;并支持多种输入输出格式&#xff0c;比如图像分类中的图>>标签&#xff0c;超分辨率中的图>>图等。 同时还支持生成能外部网络访问的链…

最小二乘法的原理及实现

1.最小二乘法的原理及实现 笔记来源于《白话机器学习的数学》 1.1 最小二乘法的原理 预测一个变量 x x x与一个变量 y y y的关系 例如&#xff1a;广告费 x x x与点击量 y y y 用直线拟合数据 1.2 最小二乘法的实现 广告费x和点击量y&#xff0c;找到一条直线表达式&#x…

基于matlab多运动目标跟踪监测算法实现(附源码)

一、前言 此示例演示如何对来自固定摄像机的视频中的移动对象执行自动检测和基于运动的跟踪。 二、介绍 移动物体检测和基于运动的跟踪是许多计算机视觉应用的重要组成部分&#xff0c;包括活动识别、交通监控和汽车安全。基于运动的对象跟踪问题可以分为两部分&#xff1a; 检…

【KitBash3D Cargo插件】向UE中直接导入免费模型

步骤 1. 进入KitBash3D官网&#xff0c;点击右上角按钮来下载Cargo 2. 下载好后是个压缩包&#xff0c;需要进行解压 3. 解压后运行安装程序 4. 我就安装到默认的安装路径 5. 安装好后打开软件&#xff0c;注册账号&#xff08;如果点击创建账户按钮没反应就去KitBash3D官网注…

VS依赖注入(DI)构造函数自动生成局部私有变量

前言 依赖注入(DI)在开发中既是常见的也是必需的技术。它帮助我们优化了代码结构&#xff0c;使得应用更加灵活、易于扩展&#xff0c;同时也降低了各个模块之间的耦合度&#xff0c;更容易进行单元测试&#xff0c;提高了编码效率和质量。我们经常会先定义局部变量&#xff0…

OpenCL编程指南-6.2程序对象

创建和构建程序 要创建程序对象&#xff0c;可以传入OpenCL C源代码文本&#xff0c;或者利用程序二进制码来创建。由OpenCL C源代码创建程序对象是开发人员创建程序对象的一般做法。OpenCL C程序的源代码放在一个外部文件中&#xff08;例如&#xff0c;就像我们的示例代码中…

【网络知识面试】初识协议栈和套接字及连接阶段的三次握手

接上一篇&#xff1a;【网络面试必问】浏览器如何委托协议栈完成消息的收发 1. 协议栈 一直对操作系统系统的内核协议栈理解的模模糊糊&#xff0c;借着这一篇博客做一下简单梳理。 我觉得最直白的理解&#xff0c;内核协议栈就是操作系统中的一个网络控制软件&#xff0c;就是…

【git】git常用指令(项目一般使用流程示例)

文章目录 创建开发环境clone到本地查看分支创建自己的开发分支切换到开发分支 开发完成上传到仓库判断目前本地仓库的状态新内容提交到暂存区新内容更新到本地仓库新内容推到远端仓库dev1.0并入主分支1.切换到主分支2.合并3.推主分支上远端仓库 回退版本主分支更新了&#xff0…

软件产品登记测试为何如此重要?

软件产品登记测试为何如此重要&#xff1f; 软件产品登记测试报告&#xff0c;是对客户的软件产品进行功能性的检测和验证&#xff0c;确保这些功能都得以实现并能正常运行&#xff0c;可作为国家高新、增值税退税、双软评估、首套台软件的检测证明材料。 软件登记测试是“双软…

three.js中聚光灯及其属性介绍

一、聚光灯及其属性介绍 Three.js中的聚光灯&#xff08;SpotLight&#xff09;是一种用于在场景中创建聚焦光照的光源类型。它有以下属性&#xff1a; color&#xff1a;聚光灯的颜色。 intensity&#xff1a;聚光灯的强度。 distance&#xff1a;聚光灯的有效距离。 angl…

知识管理工具:在信息时代下的组织智慧管理

随着信息时代的到来&#xff0c;企业面临着前所未有的信息爆炸和快速变化的挑战。如何高效地管理和利用这些信息已经成为了企业生存和发展的关键。在这种背景下&#xff0c;知识管理工具应运而生&#xff0c;为企业提供了优秀的解决方案。 知识管理工具的定义与特点 知识管理的…

DAMA数据治理CDGA/CDGP认证考试备考经验分享

一&#xff0c;关于DAMA中国和CDGA/CDGP考试 国际数据管理协会&#xff08;DAMA国际&#xff09;是一个全球性的专业组织&#xff0c;由数据管理和相关的专业人士组成&#xff0c;非营利性机构&#xff0c;厂商中立。协会自1980年成立以来&#xff0c;一直致力于数据管理和数字…

gralylog介绍与安装

介绍 Graylog是一个开源的日志管理和分析平台&#xff0c;用于收集、存储、分析和可视化大量日志数据。它提供了一个集中化的解决方案&#xff0c;可以帮助组织有效地处理分散在各种系统和应用程序中的日志信息。 以下是Graylog的主要特点和功能&#xff1a; 日志收集&#x…

【AI工具】-MockingBird-语音合成语音克隆

简介 MockingBird&#xff1a; 英文翻译&#xff1a;反舌鸟&#xff0c;也可能来自《杀死一只知更鸟》&#xff08;英语&#xff1a;To Kill a Mockingbird&#xff09;&#xff0c;台译“梅冈城故事”&#xff0c;中国大陆译“杀死一只知更鸟”&#xff0c;直译应为“杀死一…

【Python】python进阶篇之数据库操作

数据库操作 pip3安装mysql依赖 pip3 list|grep mysqlpip3 install mysql-connector-python #指定版本 pip3 install mysql-connector-python版本号 #升降版本 pip3 install --upgrade mysql-connector-python版本号原生SQL操作 操作mysql可以使用pymsql或mysql-connector-py…

基于html+css的图展示138

准备项目 项目开发工具 Visual Studio Code 1.44.2 版本: 1.44.2 提交: ff915844119ce9485abfe8aa9076ec76b5300ddd 日期: 2020-04-16T16:36:23.138Z Electron: 7.1.11 Chrome: 78.0.3904.130 Node.js: 12.8.1 V8: 7.8.279.23-electron.0 OS: Windows_NT x64 10.0.19044 项目…

第三章 决策树

文章目录 第三章 决策树3.1基本流程3.2划分选择3.2.1信息增益3.2.2增益率3.2.3基尼指数 3.3剪枝处理3.3.1预剪枝3.3.2后剪枝 3.4连续与缺失值3.4.1连续值处理3.4.2缺失值处理 3.5多变量决策树3.7实验 第三章 决策树 3.1基本流程 决策过程&#xff1a; 基本算法&#xff1a; …

灵雀云获Gartner® 首份《DevOps平台魔力象限报告》“荣誉提及”

随着平台工程理念的崛起&#xff0c;企业使用的独立的DevOps工具链逐渐向更先进、更便捷的DevOps平台演进。Gartner发布了首份DevOps平台魔力象限报告&#xff08;Gartner Magic Quadrant for DevOps Platforms&#xff09;。在这个备受关注的报告中&#xff0c;中国云原生厂商…

大势智慧软硬件技术答疑第五期

1.控制点误差表达到多少就可以&#xff1f; 答&#xff1a;水平和高程误差在0.01左右就可以&#xff0c;图示精度是满足的。 2.三维影像有颜色&#xff0c;为什么生成的是二维影像是黑色的&#xff1f; 答&#xff1a;使用dasviewer的工具-输出正射图再试试。 3.最新模方对ps版…