别人的经验:
如今任何计算机系统每天都会产生大量的日志或数据。随着系统的增长,将调试数据存储到数据库中是不可行的,因为它们是不可变的,主要用于分析和解决故障的目的。因此,企业倾向于将其存储在文件中,并保存在本地磁盘中。
我们将使用Golang从大小为16 GB的.txt或.log文件中提取日志,该文件有数百万行。
直接上代码,首先打开文件,将使用标准Go os.File来读取文件IO:
f, err := os.Open(fileName)
if err != nil {
fmt.Println("cannot able to read the file", err)
return
}
// UPDATE: close after checking error
defer file.Close() //Do not forget to close the file
一旦打开文件后,我们有以下两个选择进行下一步处理。
1、逐行读取文件,这有助于减少系统内存压力,但会在IO中花费更多时间。
2、一次性将整个文件都读入内存并处理该文件,这会消耗很大内存,但会节约时间。
由于文件太大例如16GB,无法将整个文件加载到内存中。但是第一个选择也不可行,因为我们希望几秒钟内处理该文件。
但是你猜怎么着,还有第三种选择。不是加载整个文件到内存,而是使用bufio.NewReader()以块的形式加载文件。
r := bufio.NewReader(f)
for {
buf := make([]byte,4*1024) //the chunk size
n, err := r.Read(buf) //loading chunk into buffer
buf = buf[:n]
if n == 0 {
if err != nil {
fmt.Println(err)
break
}
if err == io.EOF {
break
}
return err
}
}
一旦我们读取到文件块,我们将fork一个线程,也就是Go协程,来同时处理每个块。上面的代码将改为如下:
//sync pools to reuse the memory and decrease the preassure on //Garbage Collector
linesPool := sync.Pool{New: func() interface{} {
lines := make([]byte, 500*1024)
return lines
}}
stringPool := sync.Pool{New: func() interface{} {
lines := ""
return lines
}}
slicePool := sync.Pool{New: func() interface{} {
lines := make([]string, 100)
return lines
}}
r := bufio.NewReader(f)
var wg sync.WaitGroup //wait group to keep track off all threads
for {
buf := linesPool.Get().([]byte)
n, err := r.Read(buf)
buf = buf[:n]
if n == 0 {
if err != nil {
fmt.Println(err)
break
}
if err == io.EOF {
break
}
return err
}
nextUntillNewline, err := r.ReadBytes('\n')//read entire line
if err != io.EOF {
buf = append(buf, nextUntillNewline...)
}
wg.Add(1)
go func() {
//process each chunk concurrently
//start -> log start time, end -> log end time
ProcessChunk(buf, &linesPool, &stringPool, &slicePool, start, end)
wg.Done()
}()
}
wg.Wait()
}
上面的代码引入了两个新的优化:
1、sync.Pool是一个强大的实例池,可以重用实例来减少垃圾收集器的压力。我们将重用分配的内存片。这将减少内存消耗,使代码优化更高效。
2、Go协程并发处理缓冲区块,大大提高了处理速度。
下面来实现ProcessChunk函数,处理如下格式的日志信息:
2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n
我们将根据命令行提供的时间戳提取日志:
func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {
//another wait group to process every chunk further
var wg2 sync.WaitGroup
logs := stringPool.Get().(string)
logs = string(chunk)
linesPool.Put(chunk) //put back the chunk in pool
//split the string by "\n", so that we have slice of logs
logsSlice := strings.Split(logs, "\n")
stringPool.Put(logs) //put back the string pool
chunkSize := 100 //process the bunch of 100 logs in thread
n := len(logsSlice)
noOfThread := n / chunkSize
if n%chunkSize != 0 { //check for overflow
noOfThread++
}
length := len(logsSlice)
//traverse the chunk
for i := 0; i < length; i += chunkSize {
wg2.Add(1)
//process each chunk in saperate chunk
go func(s int, e int) {
for i:= s; i<e;i++{
text := logsSlice[i]
if len(text) == 0 {
continue
}
logParts := strings.SplitN(text, ",", 2)
logCreationTimeString := logParts[0]
logCreationTime, err := time.Parse("2006-01- 02T15:04:05.0000Z", logCreationTimeString)
if err != nil {
fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)
return
}
// check if log's timestamp is inbetween our desired period
if logCreationTime.After(start) && logCreationTime.Before(end) {
fmt.Println(text)
}
}
textSlice = nil
wg2.Done()
}(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
//passing the indexes for processing
}
wg2.Wait() //wait for a chunk to finish
logsSlice = nil
}
上面的代码使用16GB的日志文件进行基础测试,提取日志所花费的时间约25秒。
下面是完成代码:
func main() {
s := time.Now()
args := os.Args[1:]
if len(args) != 6 { // for format LogExtractor.exe -f "From Time" -t "To Time" -i "Log file directory location"
fmt.Println("Please give proper command line arguments")
return
}
startTimeArg := args[1]
finishTimeArg := args[3]
fileName := args[5]
file, err := os.Open(fileName)
if err != nil {
fmt.Println("cannot able to read the file", err)
return
}
defer file.Close() //close after checking err
queryStartTime, err := time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg)
if err != nil {
fmt.Println("Could not able to parse the start time", startTimeArg)
return
}
queryFinishTime, err := time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg)
if err != nil {
fmt.Println("Could not able to parse the finish time", finishTimeArg)
return
}
filestat, err := file.Stat()
if err != nil {
fmt.Println("Could not able to get the file stat")
return
}
fileSize := filestat.Size()
offset := fileSize - 1
lastLineSize := 0
for {
b := make([]byte, 1)
n, err := file.ReadAt(b, offset)
if err != nil {
fmt.Println("Error reading file ", err)
break
}
char := string(b[0])
if char == "\n" {
break
}
offset--
lastLineSize += n
}
lastLine := make([]byte, lastLineSize)
_, err = file.ReadAt(lastLine, offset+1)
if err != nil {
fmt.Println("Could not able to read last line with offset", offset, "and lastline size", lastLineSize)
return
}
logSlice := strings.SplitN(string(lastLine), ",", 2)
logCreationTimeString := logSlice[0]
lastLogCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
if err != nil {
fmt.Println("can not able to parse time : ", err)
}
if lastLogCreationTime.After(queryStartTime) && lastLogCreationTime.Before(queryFinishTime) {
Process(file, queryStartTime, queryFinishTime)
}
fmt.Println("\nTime taken - ", time.Since(s))
}
func Process(f *os.File, start time.Time, end time.Time) error {
linesPool := sync.Pool{New: func() interface{} {
lines := make([]byte, 250*1024)
return lines
}}
stringPool := sync.Pool{New: func() interface{} {
lines := ""
return lines
}}
r := bufio.NewReader(f)
var wg sync.WaitGroup
for {
buf := linesPool.Get().([]byte)
n, err := r.Read(buf)
buf = buf[:n]
if n == 0 {
if err != nil {
fmt.Println(err)
break
}
if err == io.EOF {
break
}
return err
}
nextUntillNewline, err := r.ReadBytes('\n')
if err != io.EOF {
buf = append(buf, nextUntillNewline...)
}
wg.Add(1)
go func() {
ProcessChunk(buf, &linesPool, &stringPool, start, end)
wg.Done()
}()
}
wg.Wait()
return nil
}
func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, start time.Time, end time.Time) {
var wg2 sync.WaitGroup
logs := stringPool.Get().(string)
logs = string(chunk)
linesPool.Put(chunk)
logsSlice := strings.Split(logs, "\n")
stringPool.Put(logs)
chunkSize := 300
n := len(logsSlice)
noOfThread := n / chunkSize
if n%chunkSize != 0 {
noOfThread++
}
for i := 0; i < (noOfThread); i++ {
wg2.Add(1)
go func(s int, e int) {
defer wg2.Done() //to avaoid deadlocks
for i := s; i < e; i++ {
text := logsSlice[i]
if len(text) == 0 {
continue
}
logSlice := strings.SplitN(text, ",", 2)
logCreationTimeString := logSlice[0]
logCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
if err != nil {
fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)
return
}
if logCreationTime.After(start) && logCreationTime.Before(end) {
//fmt.Println(text)
}
}
}(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
}
wg2.Wait()
logsSlice = nil
}
自己实践
server/server.go
package main
import (
"bufio"
"fmt"
"net"
"os"
"sync"
//"time"
)
func SendFile(con net.Conn,fileName string){
file, err := os.Open(fileName)
if err != nil {
fmt.Println("cannot able to read the file", err)
return
}
defer file.Close()
filestat, err := file.Stat()
if err != nil {
fmt.Println("Could not able to get the file stat")
return
}
fileSize := filestat.Size()
bufPool := sync.Pool{New: func() interface{} {
buf := make([]byte, 100*1024*1024) //100M per block to mem
return buf
}}
r := bufio.NewReader(file)
var wg sync.WaitGroup
block := 0
var sendSize int = 0
for {
buf := bufPool.Get().([]byte)
n, err := r.Read(buf)
buf = buf[:n]
if err != nil {
fmt.Println(err)
break
}
// nextUntillNewline, err := r.ReadBytes('\n')
// if err != io.EOF {
// buf = append(buf, nextUntillNewline...)
// }
wg.Add(1) //just test wg not multi goroutine
go func() {
defer wg.Done()
n1, err := con.Write(buf)
if err != nil{
fmt.Println(err)
}else if n1 != n{
fmt.Printf("send len n1 =%d != n %d\n",n1,n)
}
return
}()
wg.Wait()
block++
sendSize += n
fmt.Printf("send block num = %d\n",block)
}
fmt.Printf("file size = %d ,send size = %d\n",fileSize,sendSize)
}
//处理客户端连接请求
func process(coon net.Conn) {
defer coon.Close()
//定义接收信息的字节数组
var buf [1024]byte
//读取数据
n, err := coon.Read(buf[:])
if err != nil {
fmt.Println("获取信息失败,err:", err)
return
}
filename := string(buf[:n])
fmt.Printf("对方获取文件是:%s", filename)
SendFile(coon, filename)
fmt.Printf("发送文件文件完毕:%s", filename)
}
//TCP服务端配置
func main() {
//1:启用监听
listener, err := net.Listen("tcp", "127.0.0.1:20000")
//连接失败处理
if err != nil {
fmt.Println("启动服务失败,err:", err)
return
}
//程序退出时释放端口
defer listener.Close()
for {
conn, err := listener.Accept() //2.建立连接
if err != nil {
fmt.Println("接收客户连接失败,err:", err)
continue
}
//3.启动一个人goroutine处理客户端连接
go process(conn)
}
}
client/client.go
package main
import (
"fmt"
"net"
"os"
"bufio"
"sync"
"time"
)
//TCP客户端
func main() {
args := os.Args[1:]
if len(args) != 1 { // for format clien fileName
fmt.Println("Please give proper command line arguments")
return
}
fileName := args[0]
f, err := os.Create(fileName) //创建文件
if err != nil{
fmt.Println("create file fail" + fileName)
}
defer f.Close()
//1:拨号方式建立与服务端连接
conn, err := net.Dial("tcp", "127.0.0.1:20000")
if err != nil {
fmt.Println("连接服务端失败,err:", err)
return
}
//注意:关闭连接位置,不能写在连接失败判断上面
defer conn.Close()
//2:向服务器发送信息
_, err = conn.Write([]byte(fileName))
if err != nil {
fmt.Println("发送信息失败,err:", err)
return
}
s := time.Now()
bufPool := sync.Pool{New: func() interface{} {
buf := make([]byte, 100*1024*1024) //100M per block to mem
return buf
}}
w := bufio.NewWriterSize(f,100*1024*1024)
block := 0
var reveiveSize int = 0
for {
buf := bufPool.Get().([]byte)
n, err := conn.Read(buf)
if err != nil {
fmt.Println("获取结束", err)
break
}
buf = buf[:n]
wn , err := w.Write(buf)
if err != nil{
fmt.Println(err)
break
}else if wn != n{
fmt.Printf("write len wn =%d != n %d\n",wn,n)
break
}
block++
reveiveSize += n
fmt.Printf("reveive block num = %d,size = %d\n",block,reveiveSize)
}
w.Flush()
spend := time.Since(s)
fmt.Printf("receive end size = %d \n",reveiveSize)
fmt.Println("time spend ",spend)
}
测试2.4G本地go tcp 发送接收花费17秒