Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】

news2025/1/19 15:21:28

Go实现LogAgent:海量日志收集系统【下篇】

0 前置文章

Go实现LogAgent:海量日志收集系统【上篇——LogAgent实现】

前面的章节我们已经完成了日志收集(LogAgent),接下来我们需要将日志写入到kafka中,然后将数据落地到Elasticsearch中。

项目架构图:
在这里插入图片描述
项目逻辑图:
在这里插入图片描述

1 docker搭建Elasticsearcsh、Kibana

如果没有docker环境的,可以在本机安装docker desktop

# 1 创建一个docker网络
docker network create es-net
# 查看本机网络
docker network ls
# 删除一个网络
docker network rm es-net

# 2 拉取es、kibana镜像
docker pull elasticsearch:7.17.4
docker pull kibana:7.17.4

# 3 创建es容器并挂在数据卷
mkdir -p /Users/xxx/docker-home/es-data/_data
mkdir -p /Users/xxx/docker-home/es-plugins
mkdir -p /Users/xxx/docker-home/es-config
mkdir -p /Users/xxx/docker-home/kibana-config

touch elasticsearch.yml
touch kibana.yml

1.需要保证要挂载的目录有读写权限,包括要挂载的配置文件。如果没有则用chmod 777命令
2.如果要挂载配置文件,则需要提前把配置文件内容写好,不能为空,否则可能会影响es和kibana运行。
3.如果只挂载到配置文件目录,不准备配置文件,会导致创建容器后没有配置文件。报错

elasticsearch.yml:

cluster.name: "docker-cluster"
network.host: 0.0.0.0

kibana.yml:

server.host: "0.0.0.0"
server.shutdownTimeout: "5s"
elasticsearch.hosts: [ "http://elasticsearch:9200" ]
monitoring.ui.container.elasticsearch.enabled: true

启动es:

docker run -d \
 --name es7.17.4 -p 9200:9200 -p 9300:9300 \
 -e "discovery.type=single-node" -e ES_JAVA_OPTS="-Xms64m -Xmx128m" \
 -v /Users/xxx/docker-home/es-data/_data:/usr/share/elasticsearch/data \
 -v  /Users/xxx/docker-home/es-plugins:/usr/share/elasticsearch/plugins \
 -v  /Users/xxx/docker-home/es-config/elasticsearch.yml:/usr/share/elasticsearch/config/elasticsearch.yml \
 --privileged \
 --network es-net \
  elasticsearch:7.17.4

启动Kibana:

docker run -d \
--name kibana17 \
--network=es-net \
-p 5601:5601 \
-e ELASTICSEARCH_HOSTS=http://es7.17.4:9200 \ 
kibana:7.17.4

-e ELASTICSEARCH_HOSTS=http://es7.17.4:9200 \ 其中,es7.17.4的名称为上面es容器的名称

结果:
在这里插入图片描述

2 golang操作es

执行下面代码在es中添加索引,然后到kibana页面创建索引

package main

import (
	"context"
	"fmt"
	"github.com/olivere/elastic/v7"
)

type Tweet struct {
	User    string
	Message string
}

func main() {
	client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL("http://localhost:9200/"))
	if err != nil {
		fmt.Println("connect es error", err)
		return
	}

	fmt.Println("conn es succ")

	tweet := Tweet{User: "haohan", Message: "This is a test"}
	_, err = client.Index().
		Index("twitter").
		Id("1").
		BodyJson(tweet).
		Do(context.Background())
	if err != nil {
		// Handle error
		panic(err)
		return
	}

	fmt.Println("insert succ")
}
# 执行上面的go代码执行,控制台输出如下表明插入成功
conn es succ
insert succ

然后我们手动到kibana中添加对应的index即可搜索出对应数据

在这里插入图片描述
在这里插入图片描述

3 开发LogTransfer:从kafka中读取数据并写入es

在前面的开发中,我们已经将日志写入到了kafka。接下来我们要做的就是从kafka中消费数据,然后写入到es中。LogTransfer做的就是这个工作。

3.1 项目结构

├─config
│      logTransfer.conf
│
├─es
│      elasticsearch.go
│   
├─logs
│      my.log
│
└─main
		kafka.go
        config.go
        log.go
        main.go

在这里插入图片描述

3.2 项目代码

①LogTransfer/main/main.go

package main

import (
	"github.com/astaxie/beego/logs"
)

func main() {
	// 初始化配置
	err := InitConfig("ini", "/Users/xxx/GolandProjects/LogCollect/LogTransfer/config/log_transfer.conf")
	if err != nil {
		panic(err)
		return
	}
	logs.Debug("初始化配置成功")

	//初始化日志模块
	err = initLogger(logConfig.LogPath, logConfig.LogLevel)
	if err != nil {
		panic(err)
		return
	}
	logs.Debug("初始化日志模块成功")

	// 初始化Kafka
	err = InitKafka(logConfig.KafkaAddr, logConfig.KafkaTopic)
	if err != nil {
		logs.Error("初始化Kafka失败, err:", err)
		return
	}
	logs.Debug("初始化Kafka成功")
}

②LogTransfer/main/log.go

package main

import (
	"encoding/json"
	"fmt"
	"github.com/astaxie/beego/logs"
)

func convertLogLevel(level string) int {

	switch level {
	case "debug":
		return logs.LevelDebug
	case "warn":
		return logs.LevelWarn
	case "info":
		return logs.LevelInfo
	case "trace":
		return logs.LevelTrace
	}
	return logs.LevelDebug
}

func initLogger(logPath string, logLevel string) (err error) {

	config := make(map[string]interface{})
	config["filename"] = logPath
	config["level"] = convertLogLevel(logLevel)
	configStr, err := json.Marshal(config)
	if err != nil {
		fmt.Println("初始化日志, 序列化失败:", err)
		return
	}
	_ = logs.SetLogger(logs.AdapterFile, string(configStr))

	return
}

③LogTransfer/main/kafka.go

package main

import (
	"github.com/IBM/sarama"
	"github.com/astaxie/beego/logs"
	"strings"
)

type KafkaClient struct {
	client sarama.Consumer
	addr   string
	topic  string
}

var (
	kafkaClient *KafkaClient
)

func InitKafka(addr string, topic string) (err error) {

	kafkaClient = &KafkaClient{}
	consumer, err := sarama.NewConsumer(strings.Split(addr, ","), nil)
	if err != nil {
		logs.Error("启动Kafka消费者错误: %s", err)
		return nil
	}
	kafkaClient.client = consumer
	kafkaClient.addr = addr
	kafkaClient.topic = topic
	return
}

④LogTransfer/main/config.go

package main

import (
	"fmt"
	"github.com/astaxie/beego/config"
)

type LogConfig struct {
	KafkaAddr  string
	KafkaTopic string
	EsAddr     string
	LogPath    string
	LogLevel   string
}

var (
	logConfig *LogConfig
)

func InitConfig(confType string, filename string) (err error) {
	conf, err := config.NewConfig(confType, filename)
	if err != nil {
		fmt.Printf("初始化配置文件出错:%v\n", err)
		return
	}
	// 导入配置信息
	logConfig = &LogConfig{}
	// 日志级别
	logConfig.LogLevel = conf.String("logs::log_level")
	if len(logConfig.LogLevel) == 0 {
		logConfig.LogLevel = "debug"
	}
	// 日志输出路径
	logConfig.LogPath = conf.String("logs::log_path")
	if len(logConfig.LogPath) == 0 {
		logConfig.LogPath = "/Users/xxx/GolandProjects/LogCollect/LogTransfer/logs/log_transfer.log"
	}

	// Kafka
	logConfig.KafkaAddr = conf.String("kafka::server_addr")
	if len(logConfig.KafkaAddr) == 0 {
		err = fmt.Errorf("初识化Kafka addr失败")
		return
	}
	logConfig.KafkaTopic = conf.String("kafka::topic")
	if len(logConfig.KafkaAddr) == 0 {
		err = fmt.Errorf("初识化Kafka topic失败")
		return
	}

	// Es
	logConfig.EsAddr = conf.String("elasticsearch::addr")
	if len(logConfig.EsAddr) == 0 {
		err = fmt.Errorf("初识化Es addr失败")
		return
	}
	return
}

④LogTransfer/config/log_transfer.conf

[logs]
log_level = debug
log_path = "/Users/xxx/GolandProjects/LogCollect/LogTransfer/logs/log_transfer.log"

[kafka]
server_addr = localhost:9092
topic = nginx_log

[elasticsearch]
addr = http://localhost:9200/

⑤LogTransfer/es/es.go

package main

import (
	"context"
	"fmt"
	"github.com/olivere/elastic/v7"
)

type Tweet struct {
	User    string
	Message string
}

func main() {
	client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL("http://localhost:9200/"))
	if err != nil {
		fmt.Println("connect es error", err)
		return
	}

	fmt.Println("conn es succ")

	tweet := Tweet{User: "haohan", Message: "This is a test"}
	_, err = client.Index().
		Index("twitter").
		Id("1").
		BodyJson(tweet).
		Do(context.Background())
	if err != nil {
		// Handle error
		panic(err)
		return
	}

	fmt.Println("insert succ")
}

结果

LogTransfer的运行日志在LogTransfer/logs/log_transfer.log中

logs/log_transfer.log:

2023/09/02 19:55:29.037 [D]  初始化日志模块成功
2023/09/02 19:55:29.074 [D]  初始化Kafka成功

在这里插入图片描述

4 完成LogTransfer:将日志入库到es并通过kibana展示

前面我们将LogTransfer的配置初始化成功了,下面我们将从Kafka中消费数据,然后将日志入库到es,最后通过kibana展示。

在这里插入图片描述

4.1 将日志保存到es

在LogTransfer/main/main.go中添加初始化InitEs函数

①main.go中添加InitEs函数

LogTransfer/main/main.go:

package main

import (
	"github.com/astaxie/beego/logs"
	"logtransfer.com/es"
)

func main() {
	// 初始化配置
	err := InitConfig("ini", "/Users/xxx/GolandProjects/LogCollect/LogTransfer/config/log_transfer.conf")
	if err != nil {
		panic(err)
		return
	}
	logs.Debug("初始化配置成功")

	//初始化日志模块
	err = initLogger(logConfig.LogPath, logConfig.LogLevel)
	if err != nil {
		panic(err)
		return
	}
	logs.Debug("初始化日志模块成功")

	// 初始化Kafka
	err = InitKafka(logConfig.KafkaAddr, logConfig.KafkaTopic)
	if err != nil {
		logs.Error("初始化Kafka失败, err:", err)
		return
	}
	logs.Debug("初始化Kafka成功")
	// 初始化Es
	err = es.InitEs(logConfig.EsAddr)
	if err != nil {
		logs.Error("初始化Elasticsearch失败, err:", err)
		return
	}
	logs.Debug("初始化Es成功")

}

运行LogTransfer下的main.go可以发现log_transfer.log中输出的日志信息
在这里插入图片描述

②LogTransfer/es/es.go

package es

import (
	"fmt"
	"github.com/olivere/elastic/v7"
)

type Tweet struct {
	User    string
	Message string
}

var (
	esClient *elastic.Client
)

func InitEs(addr string) (err error) {
	client, err := elastic.NewClient(elastic.SetSniff(false), elastic.SetURL(addr))
	if err != nil {
		fmt.Println("connect es error", err)
		return nil
	}
	esClient = client
	return
}

运行LogTransfer/main下的main函数

  • 可以从logs/log_transfer.log中看到打印初始化es、kafka等成功

③添加run.go:消费kafka中的数据

在main函数中添加run函数, 用于运行kafka消费数据到Es

package main

import (
	"github.com/Shopify/sarama"
	"github.com/astaxie/beego/logs"
)

func run() (err error) {

	partitionList, err := kafkaClient.Client.Partitions(kafkaClient.Topic)

	if err != nil {
		logs.Error("Failed to get the list of partitions: ", err)
		return
	}
	for partition := range partitionList {
		pc, errRet := kafkaClient.Client.ConsumePartition(kafkaClient.Topic, int32(partition), sarama.OffsetNewest)
		if errRet != nil {
			err = errRet
			logs.Error("Failed to start consumer for partition %d: %s\n", partition, err)
			return
		}
		defer pc.AsyncClose()
		kafkaClient.wg.Add(1)
		go func(pc sarama.PartitionConsumer) {
			for msg := range pc.Messages() {
				logs.Debug("Partition:%d, Offset:%d, Key:%s, Value:%s", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
				err = es.SendToES(kafkaClient.topic, msg.Value)
				if err != nil {
					logs.Warn("send to es failed, err:%v", err)
				}
			}
			kafkaClient.wg.Done()
		}(pc)
	}

	kafkaClient.wg.Wait()

	return
}

④main.go中添加SendToES函数

package main

import (
	"github.com/astaxie/beego/logs"
	"logtransfer.com/es"
)

func main() {
	// 初始化配置
	err := InitConfig("ini", "/Users/xxx/GolandProjects/LogCollect/LogTransfer/config/log_transfer.conf")
	if err != nil {
		panic(err)
		return
	}
	logs.Debug("初始化配置成功")

	//初始化日志模块
	err = initLogger(logConfig.LogPath, logConfig.LogLevel)
	if err != nil {
		panic(err)
		return
	}
	logs.Debug("初始化日志模块成功")

	// 初始化Kafka
	err = InitKafka(logConfig.KafkaAddr, logConfig.KafkaTopic)
	if err != nil {
		logs.Error("初始化Kafka失败, err:", err)
		return
	}
	logs.Debug("初始化Kafka成功")
	// 初始化Es
	err = es.InitEs(logConfig.EsAddr)
	if err != nil {
		logs.Error("初始化Elasticsearch失败, err:", err)
		return
	}
	logs.Debug("初始化Es成功")
	// 运行
	err = run()
	if err != nil {
		logs.Error("运行错误, err:", err)
		return
	}
	select {}
}

5 联调

5.1 运行LogAgent:采集数据并存储到kafka

# 用于向docker中的etcd写入对应key
docker exec etcd1 etcdctl put /backend/logagent/config/192.168.0.103 "[{\"logpath\":\"/Users/xxx/GolandProjects/LogCollect/LogAgent/mysql_log.log\",\"topic\":\"mysql_log\"},{\"logpath\":\"/Users/xxx/GolandProjects/LogCollect/LogAgent/nginx_log.log\",\"topic\":\"nginx_log\"}]"

通过上面的命令,用于向etcd中写入对应key,etcd的watcher监视到后会对应更新配置

在这里插入图片描述

查看LogAgent的运行日志:
在这里插入图片描述

5.2 运行LogTransfer:消费kafka数据并存到es

选中LogTransfer下main文件夹下的所有go文件,鼠标右击运行,查看控制台输出

在这里插入图片描述
查看LogTransfer的运行日志:
在这里插入图片描述

5.3 在kibana创建index并查看

Management - Stack Management - Kibana - Index Patterns ,根据kafka中的topic创建对应的索引。以nginx_log为例:

在这里插入图片描述
回到overview,根据nginx_log这个index搜索信息:
在这里插入图片描述

可以看到成功读取到日志信息,至此该项目已开发完成

参考文章:https://blog.csdn.net/qq_43442524/article/details/105072952

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/966906.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

后端SpringBoot+前端Vue前后端分离的项目(一)

前言:后端使用SpringBoot框架,前端使用Vue框架,做一个前后端分离的小项目,需求:实现一个表格,具备新增、删除、修改的功能。 一、数据库表的设计 设计了一个merchandise表,id是编号&#xff0c…

基于Matlab利用IRM和RRTstar实现无人机路径规划(附上源码+数据+说明+报告+PPT)

无人机路径规划是无人机应用领域中的关键问题之一。本文提出了一种基于IRM(Informed RRTstar Method)和RRTstar(Rapidly-exploring Random Tree star)算法的无人机路径规划方法,并使用Matlab进行实现。该方法通过结合I…

【LeetCode-中等题】994. 腐烂的橘子

文章目录 题目方法一:bfs层序遍历 题目 该题值推荐用bfs,因为是一层一层的感染,而不是一条线走到底的那种,所以深度优先搜索不适合 方法一:bfs层序遍历 广度优先搜索,就是从起点出发,每次都尝…

无涯教程-JavaScript - VARP函数

VARP函数取代了Excel 2010中的VAR.P函数。 描述 该函数根据整个总体计算方差。 语法 VARP (number1,[number2],...)争论 Argument描述Required/OptionalNumber1The first number argument corresponding to a population.RequiredNumber2...Number arguments 2 to 255 cor…

Go实现LogCollect:海量日志收集系统【上篇——LogAgent实现】

Go实现LogCollect:海量日志收集系统【上篇——LogAgent实现】 下篇:Go实现LogCollect:海量日志收集系统【下篇——开发LogTransfer】 项目架构图: 0 项目背景与方案选择 背景 当公司发展的越来越大,业务越来越复杂…

Web of Science批量导出

目录 如何用Web of Science检索学术信息问题批量导出 Web of Science检索结果 如何用Web of Science检索学术信息 进入 Web of Science 检索页面: https://www.webofscience.com/wos/woscc/basic-search 根据需求填写过滤条件,点击 search 进入搜索详…

Stable Diffusion---Ai绘画-下载-入门-进阶(笔记整理)

前言 注:本文偏向于整理,都是跟着大佬们学的。 推荐两个b站up主,学完他们俩的东西基本就玩转SD为底的ai绘画: 秋葉aaaki,Nenly同学 1.首先SD主流的就是秋叶佬的Webui了,直接压缩包下载即可,下…

TCP三次握手四次挥手总结

目录 一、两种传输模式: 二、数据方向: 三、端口的作用: 四、端口类型: 五、三次握手: 六、四次断开 常见面试题 TCP(Transfer control protocol)传输控制协议 一、两种传输模式&#x…

计算机毕业设计 校园二手交易平台 Vue+SpringBoot+MySQL

作者主页:Designer 小郑 作者简介:Java全栈软件工程师一枚,来自浙江宁波,负责开发管理公司OA项目,专注软件前后端开发、系统定制、远程技术指导。CSDN学院、蓝桥云课认证讲师,全栈领域优质创作者。 项目内容…

PYTHON知识点学习-空语句和条件语句

空语句: 虽然条件满足时啥也不做,但是由于python对于语法格式,尤其是缩进和代码块的要求较高,所以如果啥都不写(只写注释)是不符合语法要求的. 报错原因:if后面需要带缩进块------>所以可以使用一个空语句pass进行占位 pass--空语句,无实际意义,啥都不想做时利用它进行占位 …

使用多线程导入大量数据,多线程事物控制

本文主要讲述通过Spring Boot MyBatis做大数据量数据插入的案例和结果 不分批次直接梭哈 MyBatis直接一次性批量插入30万条,代码如下: Test public void testBatchInsertUser() throws IOException {InputStream resourceAsStream Resources.getResou…

Windows配置SonarQube代码审查工具详细步骤(附带IDEA SonarLint插件使用)

文章目录 环境说明以及准备一. SonarQube的下载与安装二. 添加SonarQube项目三. 使用Maven命令上传代码到SonarQube四. IDEA安装SonarLint插件 环境说明以及准备 本篇博客使用的SonarQube版本为9.8,注意JDK 1.8已经不能支持 NameVersionDownLoad LinkSonarQube9.8…

Redis缓存和持久化

目录 Redis缓存 什么是缓存 缓存更新策略​编辑 业务场景 缓存穿透 常见的解决方案 缓存雪崩 解决方案 缓存击穿 解决方案 Redis持久化 RDB持久化 执行时机 RDB方式bgsave的基本流程 AOF持久化 RDB和AOF的对比​编辑 Redis主从 数据同步原理 总结 Redis缓存 …

数学建模:灰色预测模型

🔆 文章首发于我的个人博客:欢迎大佬们来逛逛 数学建模:灰色预测模型 文章目录 数学建模:灰色预测模型灰色预测算法步骤代码实现 灰色预测 三个基本方法: 累加数列:计算一阶累加生成数列 x ( 1 ) ( k ) …

Python安装与Pycharm配置

Python与Pycharm安装 用了一年的Python最近被一个问题难倒了,pip安装一直不能用,报错说被另一个程序使用。被逼到只能重新安装python了,正好记录一下这个过程,写这篇笔记。(突然想到可能是配Arcgis的python接口&#…

锐捷盒式交换机S5760C版本U盘升级

1.确认设备当前版本信息 2.将升级文件包放置U盘文件夹中, U盘名称123 , 文件夹名称A 3.查看到升级包后,进行U盘升级 #upgrade usb0:/A/S5760X_RGOS12.5(4)B0702P4_install.bin 4.升级成功后 reload交换机 5.等交换机重启完毕,再次…

零基础学Python:元组(Tuple)详细教程

前言 嗨喽,大家好呀~这里是爱看美女的茜茜呐 Python的元组与列表类似, 不同之处在于元组的元素不能修改, 元组使用小括号,列表使用方括号, 元组创建很简单,只需要在括号中添加元素,并使用逗号隔开即可 👇 👇 👇 更…

CXL.mem M2S Message 释义

🔥点击查看精选 CXL 系列文章🔥 🔥点击进入【芯片设计验证】社区,查看更多精彩内容🔥 📢 声明: 🥭 作者主页:【MangoPapa的CSDN主页】。⚠️ 本文首发于CSDN&#xff0c…

ubuntu22.04搭建verilator仿真环境

概述 操作系统为 Ubuntu(22.04.2 LTS),本次安装verilator开源verilog仿真工具,进行RTL功能仿真。下面构建版本为5.008的verilator仿真环境。先看一下我系统的版本: 安装流程 安装依赖 sudo apt-get install git perl python3 make autoc…

UDP/TCP协议报头详细分析

文章目录 ————————预备知识————————数据段netstatpidof—————UDP协议报头即相关概念分析—————UDP协议端格式UDP 特点全双工send / rec 函数的本质UDP的缓冲区基于UDP的应用层协议—————TCP协议报头即相关概念分析—————TCP格式及解析32位序号…