整体架构
参考 七米老师的日志收集项目
主要用go实现logagent的部分,logagent的作用主要是实时监控日志追加的变化,并将变化发送到kafka中。
之前我们已经实现了 用go连接kafka并向其中发送数据,也实现了使用tail库监控日志追加操作。
我们把这两部分结合起来实现监控日志追加并发送到kafka。
使用github.com/go-ini/ini
配置参数
// 读取配置参数
cfg, err:=ini.Load("config/config.ini")
if err!=nil {
logrus.Error((" load config error"))
return
}
[kafka]
address = 127.0.0.1:9092
chan_size = 1000
[collect]
logfile_path= D:/learn/go/log-collector-lmh/log_agent/config_version/log_file/xx.log
配置参数主要包括,kafka的启动端口,存储的数据大小限制,日志文件的路径。
初始化kafka
kafka.go
package kafka
import (
"github.com/Shopify/sarama"
"github.com/sirupsen/logrus"
)
var (
Client sarama.SyncProducer
MsgChan chan *sarama.ProducerMessage //占用的字节数少,传递的指针
)
func InitKafka(kafkaAddr string, chanSize int64) (err error){
config:=sarama.NewConfig()
// 生产者配置
config.Producer.RequiredAcks=sarama.WaitForAll
config.Producer.Partitioner=sarama.NewRandomPartitioner
config.Producer.Return.Successes=true
// 连接kafka
Client,err=sarama.NewSyncProducer([]string{kafkaAddr}, config)
if err!=nil {
logrus.Error("producer closed", err)
return
}
// 从管道中读取日志并发送到kafka
MsgChan = make(chan *sarama.ProducerMessage, chanSize)
go sendMsg()
return
}
func sendMsg(){
for {
select {
case msg := <- MsgChan:
pid, offset, err := Client.SendMessage(msg)
if err != nil {
logrus.Warning("send msg failed, err:", err)
return
}
logrus.Infof("send msg to kafka success. pid:%v offset:%v", pid, offset)
}
}
}
这里实现了连接kafka,并使用协程不断地读取MsgChan,读取到数据后向kafka发送,这里MsgChan通道的数据由tail监控到的日志变化写入。
main.go中调用
// 初始化kafka
kafkaAddr:=cfg.Section("kafka").Key("address").String()
chanSize:=cfg.Section("kafka").Key("chan_size").MustInt64(0)
err=kafka.InitKafka(kafkaAddr, chanSize)
if err!=nil {
logrus.Error("kafka init failed")
}
logrus.Info("Kafka init success")
初始化tailf,并将日志数据写入ChanMsg
tailF.go
package tailF
import (
"github.com/hpcloud/tail"
"fmt"
)
var (
TailObj *tail.Tail
)
func InitTail(filename string) (err error) {
config := tail.Config{
ReOpen: true,
Follow: true,
Location: &tail.SeekInfo{Offset: 0, Whence: 2},
MustExist: false,
Poll: true,
}
// 打开文件开始读取数据
TailObj, err = tail.TailFile(filename, config)
if err != nil {
fmt.Printf("create tail %s failed, err:%v\n", filename, err)
return
}
return
}
main.go中对应
// 初始化tailf
fileName:=cfg.Section("collect").Key("logfile_path").String()
err=tailF.InitTail(fileName)
if err!=nil {
logrus.Error(" tailf init failed")
}
logrus.Info("Init tail success")
// 把读取的日志发往kafka
err=run()
if err!=nil {
logrus.Error(" run error%s", err)
return
}
logrus.Info("run success")
main.go中实现的run函数,读取tailF的数据,并写入ChanMsg
func run () (err error){
for {
line,ok:=<-tailF.TailObj.Lines
if !ok {
logrus.Warn("tail file %s close reopen\n", tailF.TailObj.Filename)
// 读取出错等一秒
time.Sleep(time.Second)
continue
}
// 使用通道将传输日志改为异步
// 读取的日志封装为ProducerMessage
msg:=&sarama.ProducerMessage{}
msg.Topic="web_log"
msg.Value=sarama.StringEncoder(line.Text)
// 放到channel中
kafka.MsgChan<-msg
}
}
完整main.go
package main
import (
"config_version/kafka"
"config_version/tailF"
"time"
"github.com/Shopify/sarama"
"github.com/go-ini/ini"
"github.com/sirupsen/logrus"
)
func main() {
// 读取配置参数
cfg, err:=ini.Load("config/config.ini")
if err!=nil {
logrus.Error((" load config error"))
return
}
// 初始化kafka
kafkaAddr:=cfg.Section("kafka").Key("address").String()
chanSize:=cfg.Section("kafka").Key("chan_size").MustInt64(0)
err=kafka.InitKafka(kafkaAddr, chanSize)
if err!=nil {
logrus.Error("kafka init failed")
}
logrus.Info("Kafka init success")
// 初始化tailf
fileName:=cfg.Section("collect").Key("logfile_path").String()
err=tailF.InitTail(fileName)
if err!=nil {
logrus.Error(" tailf init failed")
}
logrus.Info("Init tail success")
// 把读取的日志发往kafka
err=run()
if err!=nil {
logrus.Error(" run error%s", err)
return
}
logrus.Info("run success")
}
func run () (err error){
for {
line,ok:=<-tailF.TailObj.Lines
if !ok {
logrus.Warn("tail file %s close reopen\n", tailF.TailObj.Filename)
// 读取出错等一秒
time.Sleep(time.Second)
continue
}
// 使用通道将传输日志改为异步
// 读取的日志封装为ProducerMessage
msg:=&sarama.ProducerMessage{}
msg.Topic="web_log"
msg.Value=sarama.StringEncoder(line.Text)
// 放到channel中
kafka.MsgChan<-msg
}
}
至此, 我们实现了简化版的日志收集系统的logagent功能,目前日志的路径还需要手动写入配置文件中,修改的话还需重启项目,之后可以使用ETCD实现日志路径的自动配置。