go语言并发实战——日志收集系统(六) 编写日志收集系统客户端

news2025/1/12 3:52:01

上节回顾

在上一篇文章中我们介绍了编写客户端的四个步骤,分别是:

  • 读取配置文件,寻找日志路径
  • 初始化服务
  • 根据日志路径l来收集日志
  • 将收集到的日志发送Kafka中
    关于上述的内容博主画了一个思维导图(有点丑,大家勉强看看,以前没画过):
    在这里插入图片描述
    对了,为了画这个思维导图昨天博主找了好久思维导图的软件,最后发现了Vscode上面有一个非常不错的插件:drawio,样子大概是这样的:
    在这里插入图片描述
    大家如果没有合适的思维导图绘制根据,可以试试这个。好了,话不多说,开始今天的内容。

读取配置信息,获取日志信息

前言

这里读取日志信息我们选择的是go-ini这一第三方包,具体的使用方法在我前面的博文这种有所介绍,大家不了解的话可以参考:
go语言并发实战——日志收集系统(五) 基于go-ini包读取日志收集服务的配置文件

需求分析

这里配置文件中我们主要要知道两个消息,一个Kafka的配置信息,一个是日志文件的路径,配置文件应该是这样的:

[kafka]
address=127.0.0.1:9092
topic=web.log
chan_size=100000

[collect]
logfile_path:G:\goproject\-goroutine-\log-agent\log\log1

而为了方便我们利用反射来读取配置文件,我们来创建几个结构体来存储我们读到的配置信息:

  • Kafka结构体
type Kafkaddress struct {
	Addr        []string `ini:"address"`
	Topic       string   `ini:"topic"`
	MessageSize int64    `ini:"chan_size"`
}
  • tail结构体
type LogFilePath struct {
	Path string `ini:"logfile_path"`
}
  • 总的结构体

type Config struct {
	Kafakaddress Kafkaddress `ini:"kafka"`
	LogFilePath  LogFilePath `ini:"collect"`
}

然后读取配置信息放入结构体中:

	//读取配置文件,获取配置信息
	filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"
	ConfigObj := new(Config)
	err := ini.MapTo(ConfigObj, filename)
	if err != nil {
		logrus.Error("%s Load failed,err:", filename, err)
	}

这样我们就获得我们所需要的配置消息了

初始化服务

前言

这里我们初始服务主要是初始化Kafka以及tail包,利用它们读取日志信息并将其发送Kafka中,具体介绍可以参考前面的几篇文章:
go语言并发实战——日志收集系统(三) 利用sarama包连接KafKa实现消息的生产与消费
go语言并发实战——日志收集系统(四) 利用tail包实现对日志文件的实时监控

Kafka的初始化

	//初始化Kafka
	err = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)
	if err != nil {
		logrus.Error("InitKafka failed, err:%v", err)
		return
	}
	logrus.Infof("InitKafka success")

tail的初始化

func InitTail(filename string) (err error) {
	config := tail.Config{
		Follow:    true,
		ReOpen:    true,
		MustExist: true,
		Poll:      true,
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
	}
	TailObj, err = tail.TailFile(filename, config)
	if err != nil {
		logrus.Error("tail create tailObj for path:%s,err:%v", filename, err)
		return
	}
	return
}

根据路径来读取日志

需求分析

一般我们常见的想法会是我们先将日志消息读取出来然后发送给Kafka但是这样的串行操作无疑会大大增加程序的运行时间,所以这里我们选择将读到的日志信息打包发送到管道中,然后再看起一个协程来发送数据,这样实现了读取与发送的一步操作,可以有效降低程序的运行时间,而上面出现的MessageSiz也就是我们设置的管道大小

func run(config *Config) (err error) {
	for {
		line, ok := <-tailFile.TailObj.Lines
		if !ok {
			logrus.Error("read from tail failed,err:", err)
			time.Sleep(2 * time.Second)
			continue
		}
		msg := &sarama.ProducerMessage{}
		msg.Topic = config.Kafakaddress.Topic
		msg.Value = sarama.StringEncoder(line.Text)
		Kafka.MsgChan <- msg
	}
}

发送消息到KafKa

func SendMsg() {
	for {
		select {
		case msg := <-MsgChan:
			pid, offset, err := client.SendMessage(msg)
			if err != nil {
				logrus.Error("send msg to kafka failed,err:%v", err)
				return
			}
			logrus.Info("send msg to kafka success,pid:%d,offset:%d", pid, offset)
		}
	}
}

完整代码

  • main.go
package main

import (
	"github.com/Shopify/sarama"
	"github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
	"github.com/go-ini/ini"
	"log-agent/Kafka"
	"log-agent/tailFile"
	"time"
)

type Config struct {
	Kafakaddress Kafkaddress `ini:"kafka"`
	LogFilePath  LogFilePath `ini:"collect"`
}

type Kafkaddress struct {
	Addr        []string `ini:"address"`
	Topic       string   `ini:"topic"`
	MessageSize int64    `ini:"chan_size"`
}

type LogFilePath struct {
	Path string `ini:"logfile_path"`
}

func run(config *Config) (err error) {
	for {
		line, ok := <-tailFile.TailObj.Lines
		if !ok {
			logrus.Error("read from tail failed,err:", err)
			time.Sleep(2 * time.Second)
			continue
		}
		msg := &sarama.ProducerMessage{}
		msg.Topic = config.Kafakaddress.Topic
		msg.Value = sarama.StringEncoder(line.Text)
		Kafka.MsgChan <- msg
	}
}

func main() {
	//读取配置文件,获取配置信息
	filename := "G:\\goproject\\-goroutine-\\log-agent\\conf\\config.ini"
	ConfigObj := new(Config)
	err := ini.MapTo(ConfigObj, filename)
	if err != nil {
		logrus.Error("%s Load failed,err:", filename, err)
	}

	//初始化Kafka
	err = Kafka.InitKafka(ConfigObj.Kafakaddress.Addr, ConfigObj.Kafakaddress.MessageSize)
	if err != nil {
		logrus.Error("InitKafka failed, err:%v", err)
		return
	}
	logrus.Infof("InitKafka success")

	//初始化tail
	err = tailFile.InitTail(ConfigObj.LogFilePath.Path)
	if err != nil {
		logrus.Error("InitTail failed, err:%v", err)
		return
	}
	logrus.Infof("InitTail success")

	//利用sarama报发送消息到Kafka中
	err = run(ConfigObj)
}

  • Kafka.go
package Kafka

import (
	"github.com/Shopify/sarama"
	"github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
)

var (
	client  sarama.SyncProducer
	MsgChan chan *sarama.ProducerMessage
)

func InitKafka(address []string, Chan_size int64) (err error) {
	//初始化MsgChan
	MsgChan = make(chan *sarama.ProducerMessage, Chan_size)
	//初始化config
	config := sarama.NewConfig()
	config.Producer.RequiredAcks = sarama.WaitForAll
	config.Producer.Partitioner = sarama.NewRandomPartitioner
	config.Producer.Return.Successes = true

	//连接Kafka
	client, err = sarama.NewSyncProducer(address, config)
	if err != nil {
		logrus.Error("kafka connect error,err:%v", err)
		return
	}
	go SendMsg()
	return
}

func SendMsg() {
	for {
		select {
		case msg := <-MsgChan:
			pid, offset, err := client.SendMessage(msg)
			if err != nil {
				logrus.Error("send msg to kafka failed,err:%v", err)
				return
			}
			logrus.Info("send msg to kafka success,pid:%d,offset:%d", pid, offset)
		}
	}
}

  • tailFile.go
package tailFile

import (
	"github.com/Shopify/toxiproxy/Godeps/_workspace/src/github.com/Sirupsen/logrus"
	"github.com/hpcloud/tail"
)

var TailObj *tail.Tail

func InitTail(filename string) (err error) {
	config := tail.Config{
		Follow:    true,
		ReOpen:    true,
		MustExist: true,
		Poll:      true,
		Location:  &tail.SeekInfo{Offset: 0, Whence: 2},
	}
	TailObj, err = tail.TailFile(filename, config)
	if err != nil {
		logrus.Error("tail create tailObj for path:%s,err:%v", filename, err)
		return
	}
	return
}

运行结果

在运行前打开ZooKeeper与Kafka,然后对日志文件进行操作,会出现:
在这里插入图片描述
出现

2024/04/22 20:26:34 Seeked G:\goproject\-goroutine-\log-agent\log\log1 - &{Offset:0 Whence:2}
INFO[0013] send msg to kafka success,pid:%d,offset:%d0 3 
INFO[0013] send msg to kafka success,pid:%d,offset:%d0 4

就代表运行成功了。

结语

今天的有关内容就到此为止啦,有问题的话欢迎在评论区评论,大家可以集思广益,如果你觉得博主的内容对你有帮助,欢迎三连一下和订阅专栏
如果博主文章里面有什么错误页欢迎斧正(毕竟博主页只是个小蒟蒻鸡),下篇文章我们要进入etcd的有关学习了,好了,大家下篇文章见!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1616419.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

iOS - 多线程-GCD

文章目录 iOS - 多线程-GCD1. 常见多线程方案2. GCD2.1 GCD的常见函数GCD中有2个用来执行任务的函数 2.2 GCD的队列2.2.1 GCD的队列可以分为2大类型 2.3 容易混淆的术语2.4.1 有4个术语比较容易混淆&#xff1a;同步、异步、并发、串行 2.4 各种队列的执行效果 3. 死锁3.1 死锁…

鸿蒙开发模拟器的坑, No Devices

问题 我已经安装了模拟器&#xff0c;并且模拟器已经运行了 在Device Manager页面开启模拟器 No Devices 但是这里没有模拟器的选项 解决 添加环境变量 下面步骤 1、清除用户数据 2、 关闭Device Manager 3、 关闭ide 重启ide、开启模拟器 看到有模拟器的选项了

【数据结构|C语言版】算法效率和复杂度分析

前言1. 算法效率2. 大O的渐进表示法3. 时间复杂度3.1 时间复杂度概念3.2 时间复杂度计算举例 4. 空间复杂度4.1 空间复杂度的概念4.2 空间复杂度计算举例 5. 常见复杂度对比结语 ↓ 个人主页&#xff1a;C_GUIQU 个人专栏&#xff1a;【数据结构&#xff08;C语言版&#xff09…

光伏仿真设计需要用到的工具有哪些?

随着全球能源结构的转型和可持续发展战略的深入实施&#xff0c;光伏发电作为一种清洁、可再生的能源形式&#xff0c;正日益受到广泛关注和应用。在光伏系统的设计和优化过程中&#xff0c;光伏仿真设计工具发挥着至关重要的作用。那么&#xff0c;光伏仿真设计需要用到的工具…

Opencv_3_图像对象的创建与赋值

ColorInvert.h 如下&#xff1a; #include <opencv.hpp> using namespace std; #include <opencv.hpp> using namespace cv; using namespace std; class ColorInvert{ public : void mat_creation(); }; ColorInvert.cpp 文件如下&#xff1a; #include &q…

【oceanbase】安装ocp,ocp部署oceanbase

https://www.oceanbase.com/docs/common-ocp-1000000000584989 资源 iphostnamecpumem组件192.168.0.71obnode-000-071816oceanbase-ce192.168.0.72obnode-000-072816oceanbase-ce192.168.0.73obnode-000-073816oceanbase-ce192.168.0.74obproxy-000-07424obproxy-ce192.168.0…

springcloud Ribbon的详解

1、Ribbon是什么 Ribbon是Netflix发布的开源项目&#xff0c;Spring Cloud Ribbon是基于Netflix Ribbon实现的一套客户端负载均衡的框架。 2、Ribbon能干什么 LB负载均衡(Load Balance)是什么&#xff1f;简单的说就是将用户的请求平摊的分配到多个服务上&#xff0c;从而达…

Redis入门到通关之数据结构解析-ZipList

文章目录 ☃️概述☃️ZipListEntry☃️Encoding编码☃️ZipList的连锁更新问题☃️总结 欢迎来到 请回答1024 的博客 &#x1f353;&#x1f353;&#x1f353;欢迎来到 请回答1024的博客 关于博主&#xff1a; 我是 请回答1024&#xff0c;一个追求数学与计算的边界、时间与…

STM32H750外设ADC之模拟窗口看门狗

目录 概述 1 相关寄存器 2 功能描述 3 AWDx 标志和中断 4 模拟看门狗 1 4.1 模拟看门狗 1 说明 4.2 模拟看门狗通道选择 4.3 阀值选择 5 模拟看门狗 2和3 6 ADCx_AWDy_OUT 信号输出生成 6.1 功能介绍 6.2 输出信号案例 7 模拟看门狗 1、 2、 3 比较 概述 本文主…

深度学习从入门到精通—Transformer

1.绪论介绍 1.1 传统的RNN网络 传统的RNN&#xff08;递归神经网络&#xff09;主要存在以下几个问题&#xff1a; 梯度消失和梯度爆炸&#xff1a;这是RNN最主要的问题。由于序列的长距离依赖&#xff0c;当错误通过层传播时&#xff0c;梯度可以变得非常小&#xff08;消失…

MATLAB中左边的大括号最后一行为什么会留很大的空白——解决

看了一些帖子说改字体&#xff0c;但是并没有什么用&#xff0c;在此给出亲测有效的方法&#xff1a;改变矩阵的行间距 先说一下问题 上图中留有大块空白 **解决办法&#xff1a;**光标放在矩阵上 格式——矩阵——更改矩阵&#xff0c;在矩阵设置中选中“行高相等”&#xff…

基于Springboot的学生毕业离校系统

基于SpringbootVue的学生毕业离校系统的设计与实现 开发语言&#xff1a;Java数据库&#xff1a;MySQL技术&#xff1a;SpringbootMybatis工具&#xff1a;IDEA、Maven、Navicat 系统展示 用户登录 网站首页 离校流程 网站公告 留言反馈 后台登录 学生管理 教师管理 离校流程…

【UI】element-ui的el-dialog的遮罩层在模态框的前面bug

最近在写element ui 的时候使用dialog组件&#xff0c;偶然出现了这种情况 原因&#xff1a; 是因为遮罩层插入进了body标签下&#xff0c;z-index高于当前父元素。 解决&#xff1a;在el-dialog标签里加上:modal-append-to-body"false"就可以了。 饿了么官网文档&a…

Redis - Redisson tryLock 函数参数分析

这里有三个参数&#xff1a; waitTime&#xff1a;等待时间leaseTime&#xff1a;超时施放时间TimeUnit&#xff1a;时间单位 等待时间 如果 ABC… 多个线程去抢夺一把锁&#xff0c;A 成功了&#xff0c;如果设置的是 -1&#xff0c;那么 BCD... 就不等待&#xff0c;直接返…

虚拟化+Docker基本管理

一、虚拟化简介 1、云端 华为云、谷歌云、腾讯云、阿里云、亚马逊、百度云、移动云、天翼云、西部数码云等 1.国内云 华为云、阿里云、腾讯云、天翼云(私有云) 2.国外云 谷歌云、亚马逊 2、云计算的服务模式是分层的 IaaS&#xff1a;Infrastructure&#xff08;基础设…

蓝桥杯第17169题——兽之泪II

问题描述 在蓝桥王国&#xff0c;流传着一个古老的传说&#xff1a;在怪兽谷&#xff0c;有一笔由神圣骑士留下的宝藏。 小蓝是一位年轻而勇敢的冒险家&#xff0c;他决定去寻找宝藏。根据远古卷轴的提示&#xff0c;如果要找到宝藏&#xff0c;那么需要集齐 n 滴兽之泪&#…

【YOLOv8改进[注意力]】YOLOv8添加DAT(Vision Transformer with Deformable Attention)助力涨点

目录 一 DAT 二 YOLOv8添加DAT助力涨点 1 总体修改 2 配置文件 3 训练 其他 一 DAT 官方论文地址&#xff1a;https://openaccess.thecvf.com/content/CVPR2022/papers/Xia_Vision_Transformer_With_Deformable_Attention_CVPR_2022_paper.pdf Transformers最近在各种视…

js连接抖音打印组件实现打印

js连接抖音打印组件实现打印小票 安装抖音打印组件 抖音打印组件文档&#xff1a; https://bytedance.larkoffice.com/docs/doccn2vbOOdd3KWrCd6Z93nIlvg 跟着文档案例一步步配基本上没问题&#xff0c; 打印的时候需要设置下打印机名称 export class DouyinPrint {construct…

完美运营版商城/拼团/团购/秒杀/积分/砍价/实物商品/虚拟商品等全功能商城

源码下载地址&#xff1a;完美运营版商城.zip 后台可以自由拖曳修改前端UI页面 还支持虚拟商品自动发货等功能 挺不错的一套源码 前端UNIAPP 后端PHP 一键部署版本

【计算机组成原理】浮点运算方法和浮点运算器

浮点加法、减法运算 浮点数加减法的步骤结合题目分析步骤 浮点数加减法的步骤 ① 0 操作数检查 ② 比较阶码大小&#xff0c;完成对阶 ③ 尾数进行加减法运算 ④ 结果规格化 ⑤ 舍入处理 ⑥ 判断结果是否溢出 结合题目分析步骤 例&#xff1a;设 x 2010 0.11011011&#x…