go-es模块统计日志中接口被刷数和ip访问来源
以下是使用go的web框架gin作为后端,展示的统计页面
背景
上面的数据来自elk日志统计。因为elk通过kibana进行展示,但是kibana有一定学习成本且不太能满足定制化的需求,所以考虑用编程的方式对数据进行处理 首先是接口统计,kibana的页面只会在 字段uri 的 top500 进行百分比统计,展示前5条数据,统计不够充分 其次是网关日志,ip来源的采集字段是通过x_forward_for,这记录了各级的代理来源ip。并不能直接对用户的ip进行数据聚合的统计
举例,这里面 “223.104.195.51,192.168.29.135” ,这种数据我需要拿到223.104.195.51,因为这才是用户的ip。所以需要进行编程的处理
环境
https://www.elastic.co/downloads/past-releases/elasticsearch-7-9-3
go 1.17 ,gin 1.6.3,go-elasticsearch 7.9.0
# go1.17下载地址
https://go.dev/dl/
# 模块下载
go env -w GOPROXY=https://goproxy.cn,direct
go mod init go-ops # 本项目的go mod 名字
go get github.com/elastic/go-elasticsearch/v7@v7.9.0
go get github.com/gin-gonic/gin@v1.6.3
# layui下载
http://layui.dotnetcms.cn/res/static/download/layui/layui-v2.6.8.zip?v=1
# layui框架代码
http://layui.dotnetcms.cn/web/demo/admin.html
# layui数据表格
http://layui.dotnetcms.cn/web/demo/table.html
# echarts下载(需魔法)
https://cdn.jsdelivr.net/npm/echarts@5.4.3/dist/echarts.min.js
# echarts直方图
https://echarts.apache.org/handbook/zh/get-started/
# echarts 饼图
https://echarts.apache.org/handbook/zh/how-to/chart-types/pie/basic-pie/
# gin静态文件服务(导入js、css、图片用的)
https://learnku.com/docs/gin-gonic/1.7/examples-serving-static-files/11402
# gin模板引擎(前后端不分离,后端数据渲染前端)
https://learnku.com/docs/gin-gonic/1.7/examples-html-rendering/11363
# gin绑定Uri(动态获取二级路由)
https://learnku.com/docs/gin-gonic/1.7/examples-bind-uri/11391
go-elasticsearch 模块
顾名思义此模块作用是充当es的客户端往es索引中读取数据,其原理和kibana上的dev tools一样,都是对es的restful api调用
# 以下是go-elasticsearch 的增删查改文档
https://www.elastic.co/guide/en/elasticsearch/client/go-api/current/getting-started-go.html
eql
实现统计数据分析的核心就是eql(es查询语言),通过go-elasticsearch模块进行eql的发送,再接收es返回的回复体 以下是使用go-es发送eql后,es的回复体的struct源码 可以看到type Response struct 中,我们想要的json数据在Body中,但是注意Body的类型为 io.ReadCloser , 因此是需要用go的io模块进行获取json数据
这边解决读取问题的代码如下。该函数接收es响应体,并返回未序列化的byte切片
func getResponseBody ( result * esapi. Response, context * gin. Context) [ ] byte {
var bodyBytes [ ] byte
bodyBytes, err := io. ReadAll ( result. Body)
if err != nil {
panic ( err)
}
return bodyBytes
}
es客户端建立连接
https://www.elastic.co/guide/en/elasticsearch/client/go-api/current/connecting.html
这边给的是https连接 + 用户名密码认证 + 不受信任证书的解决方法
package esinit
import (
"github.com/elastic/go-elasticsearch/v7"
"io/ioutil"
"log"
)
var EsClient * elasticsearch. Client
func init ( ) {
EsClient = newEsClient ( )
}
func newEsClient ( ) * elasticsearch. Client {
cert, certErr := ioutil. ReadFile ( "esinit/es.crt" )
if certErr != nil {
log. Println ( certErr)
}
EsClient, error := elasticsearch. NewClient ( elasticsearch. Config{
Username: "你的用户名" ,
Password: "你的密码" ,
Addresses: [ ] string {
"https://es-cluster1:9200" ,
"https://es-cluster2:9200" ,
"https://es-cluster3:9200" ,
} ,
CACert: cert,
} )
if error != nil {
panic ( error )
}
return EsClient
}
实现统计的三段eql
这里eql使用 fmt.Sprintf() 方法进行参数传递 第一段eql是统计PV
这里注意的是,我们在东八区,所以统计pv从16:00开始。这里根据@timestamp字段进行“aggs”聚合统计
func getPvResponse ( startYear int , startMonth int , startDay int , endYear, endMonth int , endDay int ) * esapi. Response {
query := fmt. Sprintf ( `
{
"query": {
"bool": {
"must": [
{
"range": {
"@timestamp": {
"gte": "%d-%02d-%02dT16:00:00",
"lte": "%d-%02d-%02dT16:00:00"
}
}
}
]
}
},
"aggs": {
"log_count": {
"value_count": {
"field": "@timestamp"
}
}
}
}
` , startYear, startMonth, startDay, endYear, endMonth, endDay)
result, _ := esinit. EsClient. Search (
esinit. EsClient. Search. WithIndex ( "k8s-istio-ingress*" ) ,
esinit. EsClient. Search. WithBody ( strings. NewReader ( query) ) ,
)
return result
}
第二段是对微服务(java)的接口(uri字段)的聚合统计
这里用的 sortUri 是gin的绑定uri功能(动态获取二级路由的名字) 这里返回前1小时10000条es文档的uri字段数据
func getSortResponse ( context * gin. Context) * esapi. Response {
if err := context. ShouldBindUri ( & sortUri) ; err != nil {
context. JSON ( 400 , gin. H{ "msg" : err} )
}
query := fmt. Sprintf ( `
{
"_source": ["uri"],
"query": {
"bool": {
"filter": [
{
"range": {
"@timestamp": {
"gte": "now-1d/d",
"lte": "now/d"
}
}
}
]
}
},
"size": 10000
}
` )
result, _ := esinit. EsClient. Search (
esinit. EsClient. Search. WithIndex ( sortUri. Name+ "*" ) ,
esinit. EsClient. Search. WithBody ( strings. NewReader ( query) ) ,
)
return result
}
第三段是对istio前一小时的ip请求统计,返回2000条记录
func getIstioDataResponse ( ) * esapi. Response {
query := `
{
"_source": [
"x_forwarded_for",
"@timestamp",
"path",
"user_agent_a",
"response_code",
"method",
"upstream_cluster"
],
"query": {
"bool": {
"filter": [
{
"range": {
"@timestamp": {
"gte": "now-1h/h",
"lte": "now/d"
}
}
}
]
}
},
"size": 2000
}
`
result, _ := esinit. EsClient. Search (
esinit. EsClient. Search. WithIndex ( "k8s-istio-ingress*" ) ,
esinit. EsClient. Search. WithBody ( strings. NewReader ( query) ) ,
)
return result
}
上面的eql函数,会return 一个 []byte切片,可以进行 json.Unmarshal 或其他struct转json的模块进行处理,就能够得到数据。然后便可进行渲染