使用golang实现日志收集系统的logagent

news2025/1/23 4:45:18

整体架构

参考 七米老师的日志收集项目
在这里插入图片描述
主要用go实现logagent的部分,logagent的作用主要是实时监控日志追加的变化,并将变化发送到kafka中。
之前我们已经实现了 用go连接kafka并向其中发送数据,也实现了使用tail库监控日志追加操作。
我们把这两部分结合起来实现监控日志追加并发送到kafka。

使用github.com/go-ini/ini配置参数

// 读取配置参数
	cfg, err:=ini.Load("config/config.ini")
	if err!=nil {
		logrus.Error((" load config error"))
		return
	}
[kafka]
address = 127.0.0.1:9092
chan_size = 1000

[collect]
logfile_path= D:/learn/go/log-collector-lmh/log_agent/config_version/log_file/xx.log

配置参数主要包括,kafka的启动端口,存储的数据大小限制,日志文件的路径。

初始化kafka

kafka.go

package kafka

import (
	"github.com/Shopify/sarama"
	"github.com/sirupsen/logrus"
)

var (
	Client sarama.SyncProducer
	MsgChan chan *sarama.ProducerMessage //占用的字节数少,传递的指针
)

func InitKafka(kafkaAddr string, chanSize int64) (err error){
	config:=sarama.NewConfig()
	// 生产者配置
	config.Producer.RequiredAcks=sarama.WaitForAll
	config.Producer.Partitioner=sarama.NewRandomPartitioner
	config.Producer.Return.Successes=true
	// 连接kafka
	Client,err=sarama.NewSyncProducer([]string{kafkaAddr}, config)
	if err!=nil {
		logrus.Error("producer closed", err)
		return
	}
	// 从管道中读取日志并发送到kafka
	MsgChan = make(chan *sarama.ProducerMessage, chanSize)
	go sendMsg()
	return
}

func sendMsg(){
	for {
		select {
			case msg := <- MsgChan:
				pid, offset, err := Client.SendMessage(msg)
				if err != nil {
					logrus.Warning("send msg failed, err:", err)
					return
				}
				logrus.Infof("send msg to kafka success. pid:%v offset:%v", pid, offset)
		}
	}
}

这里实现了连接kafka,并使用协程不断地读取MsgChan,读取到数据后向kafka发送,这里MsgChan通道的数据由tail监控到的日志变化写入。
main.go中调用

// 初始化kafka
	kafkaAddr:=cfg.Section("kafka").Key("address").String()
	chanSize:=cfg.Section("kafka").Key("chan_size").MustInt64(0)

	err=kafka.InitKafka(kafkaAddr, chanSize)
	if err!=nil {
		logrus.Error("kafka init failed")
	}
	logrus.Info("Kafka init success")

初始化tailf,并将日志数据写入ChanMsg

tailF.go

package tailF
import (
	"github.com/hpcloud/tail"
	"fmt"
)
var (
	TailObj *tail.Tail
)
func InitTail(filename string) (err error) {
	config := tail.Config{
		ReOpen: true,
		Follow: true,
		Location: &tail.SeekInfo{Offset: 0, Whence: 2},
		MustExist: false,
		Poll: true,
	}

	// 打开文件开始读取数据
	TailObj, err =  tail.TailFile(filename, config)
	if err != nil {
		fmt.Printf("create tail %s failed, err:%v\n", filename, err)
		return
	}
	return
}

main.go中对应

// 初始化tailf
	fileName:=cfg.Section("collect").Key("logfile_path").String()
	err=tailF.InitTail(fileName)
	if err!=nil {
		logrus.Error(" tailf init failed")
	}
	logrus.Info("Init tail success")
	// 把读取的日志发往kafka
	err=run()
	if err!=nil {
		logrus.Error(" run error%s", err)
		return
	}
	logrus.Info("run success")

main.go中实现的run函数,读取tailF的数据,并写入ChanMsg

func run () (err error){
	for {
		line,ok:=<-tailF.TailObj.Lines
		if !ok {
			logrus.Warn("tail file %s close reopen\n", tailF.TailObj.Filename)
			// 读取出错等一秒
			time.Sleep(time.Second)
			continue
		}
		// 使用通道将传输日志改为异步
		// 读取的日志封装为ProducerMessage
		msg:=&sarama.ProducerMessage{}
		msg.Topic="web_log"
		msg.Value=sarama.StringEncoder(line.Text)
		// 放到channel中
		kafka.MsgChan<-msg
	}
}

完整main.go

package main

import (
	"config_version/kafka"
	"config_version/tailF"
	"time"
	"github.com/Shopify/sarama"
	"github.com/go-ini/ini"
	"github.com/sirupsen/logrus"
)

func main() {
	// 读取配置参数
	cfg, err:=ini.Load("config/config.ini")
	if err!=nil {
		logrus.Error((" load config error"))
		return
	}
	// 初始化kafka
	kafkaAddr:=cfg.Section("kafka").Key("address").String()
	chanSize:=cfg.Section("kafka").Key("chan_size").MustInt64(0)

	err=kafka.InitKafka(kafkaAddr, chanSize)
	if err!=nil {
		logrus.Error("kafka init failed")
	}
	logrus.Info("Kafka init success")
	// 初始化tailf
	fileName:=cfg.Section("collect").Key("logfile_path").String()
	err=tailF.InitTail(fileName)
	if err!=nil {
		logrus.Error(" tailf init failed")
	}
	logrus.Info("Init tail success")
	// 把读取的日志发往kafka
	err=run()
	if err!=nil {
		logrus.Error(" run error%s", err)
		return
	}
	logrus.Info("run success")

}

func run () (err error){
	for {
		line,ok:=<-tailF.TailObj.Lines
		if !ok {
			logrus.Warn("tail file %s close reopen\n", tailF.TailObj.Filename)
			// 读取出错等一秒
			time.Sleep(time.Second)
			continue
		}
		// 使用通道将传输日志改为异步
		// 读取的日志封装为ProducerMessage
		msg:=&sarama.ProducerMessage{}
		msg.Topic="web_log"
		msg.Value=sarama.StringEncoder(line.Text)
		// 放到channel中
		kafka.MsgChan<-msg
	}
}

至此, 我们实现了简化版的日志收集系统的logagent功能,目前日志的路径还需要手动写入配置文件中,修改的话还需重启项目,之后可以使用ETCD实现日志路径的自动配置。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/418200.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Java基础】反射详述简单模拟SpringMVC

&#x1f6a9; 本文已收录至专栏&#xff1a;JAVA基础 &#x1f44d;希望能对你有所帮助 一.概述 反射是指对于任何一个Class类&#xff0c;在运行的时候都可以直接得到这个类全部成分&#xff0c;使得我们可以动态操作Java代码&#xff0c;同时反射也破坏了Java的封装性。 例…

如何清理电脑缓存,分享4个简单方法!

案例&#xff1a;如何清理电脑缓存 【朋友们&#xff01;我感觉我电脑的内存已经严重不足了&#xff0c;想清理下电脑的缓存释放空间&#xff0c;却不知如何清理&#xff0c;大家有什么好的方法吗&#xff1f;】 经常使用电脑的朋友可能都会发现&#xff0c;好像我们没下载什…

平台和编译器决定 char 是 signed char 或者 unsigned char

平台和编译器决定 char 是 signed char 或者 unsigned charThe C and C standards allows the character type char to be signed or unsigned, depending on the platform and compiler. Most systems, including x86 GNU/Linux and Microsoft Windows, use signed char, but …

NetCore3.1或Net6.0项目升级到Net7.0

其实与我之前发布的步骤基本一致&#xff0c;升级到net6.0之后&#xff0c;在升级net7.0基本没有可修改的代码&#xff0c;只是升级一些nuget包而已&#xff0c;NetCore3.1升级到Net6.0&#xff0c;可参考此文章&#xff1a;NetCore3.1项目升级到Net6.0_csdn_aspnet的博客-CSDN…

Rust China Conf 2023 筹备启动:议题征集开始

大会介绍Rust China Conf 2023 由 Rust 中文社区发起主办、知名企业和开源组织联合协办&#xff0c;是年度国内规模最大并唯一的 Rust 线下大型会议&#xff0c;深受 Rust 中文社区开发者与相关企业的喜爱与推崇。本次大会为线下会议&#xff0c;将于6月17日-18日在上海举办&am…

Java虚拟机总结

前言 Java是目前用户最多、使用范围最广的软件开发技术之一。Java的技术体系主要由支撑Java程序运行的虚拟机、提供各开发领域接口支持的Java APl、,Java编程语言及许多第三方Java框架&#xff08;如Spring、Struts等&#xff09;构成。在国内&#xff0c;有关Java APl、Java语…

【蓝桥杯嵌入式】第十四届蓝桥杯嵌入式省赛[第一场]程序设计题以及详细题解

文章目录原题展示原题分析原题题解LED相关LCD相关按键相关ADC相关定时器相关PWM输入捕获小结文章福利原题展示 原题分析 今年的第一场比赛绝对np,官方将串口直接省掉了&#xff0c;将其替换成很多小功能&#xff0c;如&#xff1a;切换计时、频率均匀变化、锁机制等等&#xff…

Angular 全屏后选择器 (nz-select) 下拉选项框失效【开发笔记】

问题&#xff1a;Angular 全屏后选择器 (nz-select) 下拉选择无法使用 如图&#xff1a; 相应解决方法的文章&#xff1a;https://medium.com/shahar.kazaz/adding-fullscreen-support-to-ng-zorro-a38140da676 三种解决方法&#xff1a; ① FullscreenOverlyContainer&#…

SAP中用CS20批量修改BOM应用问题处理实例

在应用中可能会遇到这样的情况&#xff0c;用户通过某个工艺或技术上的改进&#xff0c;节约了某个原料的用量&#xff0c;而这个原料可能应用在一批成品上。如果成品数量太大&#xff0c;就需要做批量的变更了。 CS20这个事务应该就是用于做BOM批量处理的&#xff0c;笔者之前…

C++基础回顾(上)

C基础回顾&#xff08;上&#xff09; 目录C基础回顾&#xff08;上&#xff09;前言关键字和标识符运算符数据类型函数类前言 C之前学过一点&#xff0c;但是很长时间都没用过&#xff0c;翻出了书从头看了一遍&#xff0c;简短地做了笔记&#xff0c;以便自己之后查看和学习…

5、存储引擎

1、查看存储引擎 查看mysql提供什么存储引擎&#xff1a; show engines;2、设置系统默认的存储引擎 查看默认的存储引擎&#xff1a; show variables like %storage_engine%; #或 SELECT default_storage_engine;修改默认的存储引擎 如果在创建表的语句中没有显式指定表的存…

教你精通Java语法之第十二章、递归

目录 一、递归 1.1递归的概念 1.1.1定义 1.1.2原理 1.1.3思路 1.2单路递归 1.2.1阶乘 1.2.2正向输出数字 1.2.3反向输出字符串 1.3多路递归 1.3.1斐波那契数列 1.3.2兔子问题 1.3.3青蛙爬楼梯 1.4汉诺塔问题 1.5猴子吃桃问题 1.6老鼠走迷宫问题 二、递归的时…

从视频中截取gif怎么弄?三步简单完成视频转gif制作

电影、电视剧等短视频充斥着我们的生活&#xff0c;很多小伙伴会将这些视频中的有趣画面提取出来做成Gif动画表情包。那么&#xff0c;怎么才能从视频中提取gif动画呢&#xff1f; 一、使用什么工具才能从视频中提取gif呢&#xff1f; 通过使用GIF中文网这款专业的视频转gif&…

RabbitMQ (工作队列:Work Queues)

本章目录&#xff1a; 什么是Work Queues模拟场景&#xff0c;使用Work Queues官网文档&#xff1a;RabbitMQ tutorial - Work Queues — RabbitMQ 一、何为Work Queues 我们先看下它的结构图 显然&#xff0c;它与入门案例相比只是多了几个消费者。 以下是官方文档说明 In …

【目标检测】目标检测遇上知识图谱:Object detection meets knowledge graphs论文解读与复现

前言 常规的目标检测往往是根据图像的特征来捕捉出目标信息&#xff0c;那么是否有办法加入一些先验信息来提升目标检测的精准度&#xff1f; 一种可行的思路是在目标检测的输出加入目标之间的关联信息&#xff0c;从而对目标进行干涉。 2017年8月&#xff0c;新加波管理大学…

Vue——插槽

目录 插槽内容与出口​ 渲染作用域​ 默认内容​ 具名插槽​ 动态插槽名​ 作用域插槽​ 具名作用域插槽​ 高级列表组件示例​ 无渲染组件​ 插槽内容与出口​ 在之前的章节中&#xff0c;我们已经了解到组件能够接收任意类型的 JavaScript 值作为 props&#xff0c;…

微信小程序 | 基于ChatGPT实现电影推荐小程序

文章目录** 效果预览 **1、根据电影明星推荐2、根据兴趣标签推荐3、根据电影名推荐一、需求背景二、项目原理及架构2.1 实现原理&#xff08;1&#xff09;根据用户的兴趣标签&#xff08;2&#xff09;根据关联类似主题的题材&#xff08;3&#xff09;根据特定的电影明星2.2 …

IK集成ElasticSearch,IK分词器的下载及使用

IK集成ElasticSearch&#xff0c;IK分词器的下载及使用 下载ElasticSearch 8.7.0网址&#xff1a;Download Elasticsearch | Elastic 历史版本地址&#xff1a;Past Releases of Elastic Stack Software | Elastic 解压ElasticSearch 什么是IK分词器 分词∶即把一段中文或…

IO流基础

目录 1.FileOutPutStream字节输入流 1.1FileOutPutStream使用 1.1.1创建对象 FileOutPutStream fos new FileOutPutStream("路径或者File对象")&#xff1b; 1.1.2.写数据 调用write方法&#xff0c;参数是int类型&#xff0c;但传入文件中是asci…

【LeetCode: 剑指 Offer II 112. 最长递增路径 | 递归 | DFS | 深度优先遍历 | 记忆化缓存表】

&#x1f34e;作者简介&#xff1a;硕风和炜&#xff0c;CSDN-Java领域新星创作者&#x1f3c6;&#xff0c;保研|国家奖学金|高中学习JAVA|大学完善JAVA开发技术栈|面试刷题|面经八股文|经验分享|好用的网站工具分享&#x1f48e;&#x1f48e;&#x1f48e; &#x1f34e;座右…