go读写文件总结

news2025/1/10 16:00:05

别人的经验:

如今任何计算机系统每天都会产生大量的日志或数据。随着系统的增长,将调试数据存储到数据库中是不可行的,因为它们是不可变的,主要用于分析和解决故障的目的。因此,企业倾向于将其存储在文件中,并保存在本地磁盘中。

我们将使用Golang从大小为16 GB的.txt或.log文件中提取日志,该文件有数百万行。

直接上代码,首先打开文件,将使用标准Go os.File来读取文件IO:

f, err := os.Open(fileName)
 if err != nil {
   fmt.Println("cannot able to read the file", err)
   return
 }
// UPDATE: close after checking error
defer file.Close()  //Do not forget to close the file

一旦打开文件后,我们有以下两个选择进行下一步处理。
1、逐行读取文件,这有助于减少系统内存压力,但会在IO中花费更多时间。
2、一次性将整个文件都读入内存并处理该文件,这会消耗很大内存,但会节约时间。
由于文件太大例如16GB,无法将整个文件加载到内存中。但是第一个选择也不可行,因为我们希望几秒钟内处理该文件。

但是你猜怎么着,还有第三种选择。不是加载整个文件到内存,而是使用bufio.NewReader()以块的形式加载文件。

r := bufio.NewReader(f)
for {
buf := make([]byte,4*1024) //the chunk size
n, err := r.Read(buf) //loading chunk into buffer
   buf = buf[:n]
if n == 0 {
   
     if err != nil {
       fmt.Println(err)
       break
     }
     if err == io.EOF {
       break
     }
     return err
  }
}

一旦我们读取到文件块,我们将fork一个线程,也就是Go协程,来同时处理每个块。上面的代码将改为如下:

//sync pools to reuse the memory and decrease the preassure on //Garbage Collector
linesPool := sync.Pool{New: func() interface{} {
        lines := make([]byte, 500*1024)
        return lines
}}
stringPool := sync.Pool{New: func() interface{} {
          lines := ""
          return lines
}}
slicePool := sync.Pool{New: func() interface{} {
           lines := make([]string, 100)
           return lines
}}
r := bufio.NewReader(f)
var wg sync.WaitGroup //wait group to keep track off all threads
for {
     
     buf := linesPool.Get().([]byte)
     n, err := r.Read(buf)
     buf = buf[:n]
if n == 0 {
        if err != nil {
            fmt.Println(err)
            break
        }
        if err == io.EOF {
            break
        }
        return err
     }
nextUntillNewline, err := r.ReadBytes('\n')//read entire line
     
     if err != io.EOF {
         buf = append(buf, nextUntillNewline...)
     }
     
     wg.Add(1)
     go func() { 
      
        //process each chunk concurrently
        //start -> log start time, end -> log end time
        
        ProcessChunk(buf, &linesPool, &stringPool, &slicePool,     start, end)
wg.Done()
     
     }()
}
wg.Wait()
}

上面的代码引入了两个新的优化:
1、sync.Pool是一个强大的实例池,可以重用实例来减少垃圾收集器的压力。我们将重用分配的内存片。这将减少内存消耗,使代码优化更高效。
2、Go协程并发处理缓冲区块,大大提高了处理速度。
下面来实现ProcessChunk函数,处理如下格式的日志信息:

2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n

我们将根据命令行提供的时间戳提取日志:

func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {
//another wait group to process every chunk further                             
      var wg2 sync.WaitGroup
logs := stringPool.Get().(string)
logs = string(chunk)
linesPool.Put(chunk) //put back the chunk in pool
//split the string by "\n", so that we have slice of logs
      logsSlice := strings.Split(logs, "\n")
stringPool.Put(logs) //put back the string pool
chunkSize := 100 //process the bunch of 100 logs in thread
n := len(logsSlice)
noOfThread := n / chunkSize
if n%chunkSize != 0 { //check for overflow 
         noOfThread++
      }
length := len(logsSlice)
//traverse the chunk
     for i := 0; i < length; i += chunkSize {
         
         wg2.Add(1)
//process each chunk in saperate chunk
         go func(s int, e int) {
            for i:= s; i<e;i++{
               text := logsSlice[i]
if len(text) == 0 {
                  continue
               }
           
            logParts := strings.SplitN(text, ",", 2)
            logCreationTimeString := logParts[0]
            logCreationTime, err := time.Parse("2006-01-  02T15:04:05.0000Z", logCreationTimeString)
if err != nil {
                 fmt.Printf("\n Could not able to parse the time :%s       for log : %v", logCreationTimeString, text)
                 return
            }
// check if log's timestamp is inbetween our desired period
          if logCreationTime.After(start) && logCreationTime.Before(end) {
          
            fmt.Println(text)
           }
        }
        textSlice = nil
        wg2.Done()
     
     }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
   //passing the indexes for processing
}  
   wg2.Wait() //wait for a chunk to finish
   logsSlice = nil
}

上面的代码使用16GB的日志文件进行基础测试,提取日志所花费的时间约25秒。
下面是完成代码:

func main() {

    s := time.Now()
    args := os.Args[1:]
    if len(args) != 6 { // for format  LogExtractor.exe -f "From Time" -t "To Time" -i "Log file directory location"
        fmt.Println("Please give proper command line arguments")
        return
    }
    startTimeArg := args[1]
    finishTimeArg := args[3]
    fileName := args[5]

    file, err := os.Open(fileName)
    
    if err != nil {
        fmt.Println("cannot able to read the file", err)
        return
    }
    
    defer file.Close() //close after checking err
    
    queryStartTime, err := time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg)
    if err != nil {
        fmt.Println("Could not able to parse the start time", startTimeArg)
        return
    }

    queryFinishTime, err := time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg)
    if err != nil {
        fmt.Println("Could not able to parse the finish time", finishTimeArg)
        return
    }

    filestat, err := file.Stat()
    if err != nil {
        fmt.Println("Could not able to get the file stat")
        return
    }

    fileSize := filestat.Size()
    offset := fileSize - 1
    lastLineSize := 0

    for {
        b := make([]byte, 1)
        n, err := file.ReadAt(b, offset)
        if err != nil {
            fmt.Println("Error reading file ", err)
            break
        }
        char := string(b[0])
        if char == "\n" {
            break
        }
        offset--
        lastLineSize += n
    }

    lastLine := make([]byte, lastLineSize)
    _, err = file.ReadAt(lastLine, offset+1)

    if err != nil {
        fmt.Println("Could not able to read last line with offset", offset, "and lastline size", lastLineSize)
        return
    }

    logSlice := strings.SplitN(string(lastLine), ",", 2)
    logCreationTimeString := logSlice[0]

    lastLogCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
    if err != nil {
        fmt.Println("can not able to parse time : ", err)
    }

    if lastLogCreationTime.After(queryStartTime) && lastLogCreationTime.Before(queryFinishTime) {
        Process(file, queryStartTime, queryFinishTime)
    }

    fmt.Println("\nTime taken - ", time.Since(s))
}

func Process(f *os.File, start time.Time, end time.Time) error {

    linesPool := sync.Pool{New: func() interface{} {
        lines := make([]byte, 250*1024)
        return lines
    }}

    stringPool := sync.Pool{New: func() interface{} {
        lines := ""
        return lines
    }}

    r := bufio.NewReader(f)

    var wg sync.WaitGroup

    for {
        buf := linesPool.Get().([]byte)

        n, err := r.Read(buf)
        buf = buf[:n]

        if n == 0 {
            if err != nil {
                fmt.Println(err)
                break
            }
            if err == io.EOF {
                break
            }
            return err
        }

        nextUntillNewline, err := r.ReadBytes('\n')

        if err != io.EOF {
            buf = append(buf, nextUntillNewline...)
        }

        wg.Add(1)
        go func() {
            ProcessChunk(buf, &linesPool, &stringPool, start, end)
            wg.Done()
        }()

    }

    wg.Wait()
    return nil
}

func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, start time.Time, end time.Time) {

    var wg2 sync.WaitGroup

    logs := stringPool.Get().(string)
    logs = string(chunk)

    linesPool.Put(chunk)

    logsSlice := strings.Split(logs, "\n")

    stringPool.Put(logs)

    chunkSize := 300
    n := len(logsSlice)
    noOfThread := n / chunkSize

    if n%chunkSize != 0 {
        noOfThread++
    }

    for i := 0; i < (noOfThread); i++ {

        wg2.Add(1)
        go func(s int, e int) {
            defer wg2.Done() //to avaoid deadlocks
            for i := s; i < e; i++ {
                text := logsSlice[i]
                if len(text) == 0 {
                    continue
                }
                logSlice := strings.SplitN(text, ",", 2)
                logCreationTimeString := logSlice[0]

                logCreationTime, err := time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)
                if err != nil {
                    fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)
                    return
                }

                if logCreationTime.After(start) && logCreationTime.Before(end) {
                    //fmt.Println(text)
                }
            }
            

        }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))
    }

    wg2.Wait()
    logsSlice = nil
}

自己实践

server/server.go

package main

import (
        "bufio"
        "fmt"
        "net"
        "os"
        "sync"
        //"time"
)



func SendFile(con net.Conn,fileName string){
    file, err := os.Open(fileName)
    
    if err != nil {
        fmt.Println("cannot able to read the file", err)
        return
    }
    
    defer file.Close() 
    filestat, err := file.Stat()
    if err != nil {
        fmt.Println("Could not able to get the file stat")
        return
    }
 
    fileSize := filestat.Size()
    bufPool := sync.Pool{New: func() interface{} {
        buf := make([]byte, 100*1024*1024) //100M per block to mem
        return buf
    }}
    r := bufio.NewReader(file)
    var wg sync.WaitGroup
    block := 0
    var sendSize int = 0
    for {
        buf := bufPool.Get().([]byte)
        n, err := r.Read(buf)
        buf = buf[:n]
 

        if err != nil {
            fmt.Println(err)
            break
        }
        
 
        // nextUntillNewline, err := r.ReadBytes('\n')
        // if err != io.EOF {
        //     buf = append(buf, nextUntillNewline...)
        // }
 
        wg.Add(1) //just test wg not multi goroutine
        go func() {
            defer wg.Done()
            n1, err := con.Write(buf)
            if err != nil{
                fmt.Println(err)
            }else if n1 != n{
                fmt.Printf("send len n1 =%d != n %d\n",n1,n)
            }
            return
        }()
        wg.Wait()
        block++
        sendSize += n
        fmt.Printf("send block num = %d\n",block)
    }
    fmt.Printf("file size = %d ,send size = %d\n",fileSize,sendSize)
    
}


//处理客户端连接请求
func process(coon net.Conn) {
        defer coon.Close()
        //定义接收信息的字节数组
        var buf [1024]byte
        //读取数据
        n, err := coon.Read(buf[:])
        if err != nil {
                fmt.Println("获取信息失败,err:", err)
                return
        }
        filename := string(buf[:n])
        fmt.Printf("对方获取文件是:%s", filename)
        SendFile(coon, filename)
        fmt.Printf("发送文件文件完毕:%s", filename)
}

//TCP服务端配置
func main() {
        //1:启用监听
        listener, err := net.Listen("tcp", "127.0.0.1:20000")
        //连接失败处理
        if err != nil {
                fmt.Println("启动服务失败,err:", err)
                return
        }

        //程序退出时释放端口
        defer listener.Close()
        for {
                conn, err := listener.Accept() //2.建立连接
                if err != nil {
                        fmt.Println("接收客户连接失败,err:", err)
                        continue
                }
                //3.启动一个人goroutine处理客户端连接
                go process(conn)
        }
}

client/client.go

package main

import (
        "fmt"
        "net"
		"os"
		"bufio"
		"sync"
		"time"
)

//TCP客户端
func main() {
	args := os.Args[1:]
    if len(args) != 1 { // for format  clien fileName
        fmt.Println("Please give proper command line arguments")
        return
    }
	fileName := args[0]
	f, err := os.Create(fileName) //创建文件
    if err != nil{
        fmt.Println("create file fail" + fileName)
    }
	defer f.Close()
	//1:拨号方式建立与服务端连接
	conn, err := net.Dial("tcp", "127.0.0.1:20000")
	if err != nil {
			fmt.Println("连接服务端失败,err:", err)
			return
	}

	//注意:关闭连接位置,不能写在连接失败判断上面
	defer conn.Close()
	//2:向服务器发送信息
	_, err = conn.Write([]byte(fileName))
	if err != nil {
			fmt.Println("发送信息失败,err:", err)
			return
	}
	s := time.Now()
	bufPool := sync.Pool{New: func() interface{} {
        buf := make([]byte, 100*1024*1024) //100M per block to mem
        return buf
    }}
	w := bufio.NewWriterSize(f,100*1024*1024)
	block := 0
    var reveiveSize int = 0
	for {
		buf := bufPool.Get().([]byte)
		n, err := conn.Read(buf)
        if err != nil {
                fmt.Println("获取结束", err)
                break
        }
		buf = buf[:n]
		wn , err := w.Write(buf)
		if err != nil{
			fmt.Println(err)
			break
		}else if wn != n{
			fmt.Printf("write len wn =%d != n %d\n",wn,n)
			break
		}
		
		block++
        reveiveSize += n
        fmt.Printf("reveive block num = %d,size = %d\n",block,reveiveSize)
	}
	w.Flush()
	spend := time.Since(s)
	fmt.Printf("receive end size = %d \n",reveiveSize)
	fmt.Println("time spend ",spend)
}

测试2.4G本地go tcp 发送接收花费17秒

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/704384.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

2023一建建筑市政全彩图文教材

本《建筑学全彩图文教材》改编自2022年官方指定的最新版本一级建造师《建筑工程管理与实务》教材。适用于2023年的第一次建造考试。2023年版教材出版上市后&#xff0c;我们将重点关注教材。补充电子版在变更处提供。 本书忠于官方原版教材&#xff0c;并不删除任何知识点&…

当大模型遇到数据仓库 HashData助力LLM规模化应用

6月30日&#xff0c;由 IT168主办的第十六届中国系统架构师大会&#xff08;SACC2023&#xff09;在北京开幕。本届大会以“数字转型 架构演进”为主题&#xff0c;议题涵盖AIGC大数据、多云多活、云成本等多个热门领域。 在会上&#xff0c;酷克数据首席科学家杨胜文发表了题…

UWB超宽带定位技术的原理及定位方法

uwb定位技术即超宽带技术&#xff0c;它是一种无载波通信技术&#xff0c;利用纳秒级的非正弦波窄脉冲传输数据&#xff0c;因此其所占的频谱范围很宽。传统的定位技术是根据信号强弱来判别物体位置&#xff0c;信号强弱受外界 影响较大&#xff0c;因此定位出的物体位置与实际…

JAVA-编程基础-08-Java异常处理全面解析

Lison <dreamlison163.com>, v1.0.0, 2023.04.01 JAVA-编程基础-08-Java异常处理全面解析 文章目录 JAVA-编程基础-08-Java异常处理全面解析什么是异常Exception和Error的区别checked和unchecked异常”关于 throw 和 throws关于 try-catch-finally小结 try-with-resourc…

升级Xcode14.3,项目无法运行解决

报错&#xff1a;link command failed with exit code 1(use -v to see invocaiton) 原因&#xff1a;新版本Xcode删除了特定目录下的一些文件 解决&#xff1a; post_install do |installer|installer.pods_project.targets.each do |target|target.build_configurations.e…

O2OA(翱途)开发平台如何在流程表单中使用基于Vue的ElementUI组件?

本文主要介绍如何在O2OA中进行审批流程表单或者工作流表单设计&#xff0c;O2OA主要采用拖拽可视化开发的方式完成流程表单的设计和配置&#xff0c;不需要过多的代码编写&#xff0c;业务人员可以直接进行修改操作。 在流程表单设计界面&#xff0c;可以在左边的工具栏找到Ele…

Linux--显示当前路径下的所有文件指令:ls

一、ls是list的简写 二、语法&#xff1a; ls [选项] [目录或文件] 三、功能&#xff1a; ①对于目录&#xff0c;该命令列出当前目录下的所有子目录与文件。 ②对于文件&#xff0c;将列出文件名以及其他信息。 四、常用选项&#xff1a; 1.-a 列出目录下的所有文件&…

Java实现将数据转成xmind脑图(附有工具类)。

&#x1f61c;作 者&#xff1a;是江迪呀✒️本文关键词&#xff1a;Java、工具类、xmind、脑图、转换☀️每日 一言&#xff1a;昨日已成过去,未来充满可能,唯有珍惜现在。 [TOPC] 前言 当谈到Xmind时&#xff0c;这是一个非常流行的思维导图工具&#xff0c;可…

超级实用!详解Node.js中的path模块和events模块

文章目录 3. path 模块路径操作方法路径格式化方法路径拆分方法 4. events 模块EventEmitter 类创建事件对象注册事件处理函数触发事件一次性事件处理函数异步事件处理函数移除事件处理函数 继承 EventEmitter 类 3. path 模块 用于处理文件路径&#xff0c;包括解析、拼接、规…

删除MySQL中名字首尾固定关键字相同的表

删除MySQL中名字首尾固定关键字相同的表 SELECT CONCAT(drop table , group_concat(TABLE_NAME), ;) FROM information_schema.TABLES WHERE table_schema test AND TABLE_NAME LIKE t_%_history ;查看表列表 SHOW TABLES;通过上图观察发现所有的表都是以 t_ 开头 和以 _his…

Mybatis-Plus学习1

mybatis-plus需要两个依赖&#xff0c;一个lombok&#xff0c;一个mybatis-plus <dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.1</version> </dependency> …

Service 基础

今天开始来分享Service 的基础知识&#xff0c;后续我们可以慢慢打磨&#xff0c;分享 Service 的进阶知识和原理 Service 基本概念 Service 是 K8S 最核心的概念了 我们可以通过创建 Service &#xff0c;为一组具有相同功能的容器应用提供一个统一的入口地址&#xff0c;并…

linux上搭建ftp服务

linux上搭建ftp服务简要过程。 1.安装 在目标主机上安装对应软件。 rpm -qa | grep vsftpd # 查看是否已经安装了vsftpd软件 yum install -y vsftpd # 安装2.配置 安装好了之后可在这个路径下编辑配置文件&#xff0c;按需配置&#xff0c;这里采用默认。 vi /etc/vsftp…

螺杆支撑座要怎么选?

螺杆支撑座是连接螺杆和电机的轴承固定座&#xff0c;使用螺杆支撑座可以获得高刚性、高精度的稳定的回转性能&#xff0c;这也是大部分厂商愿意使用的原因之一。 目前&#xff0c;市面上做螺杆支撑座的品牌还比较少&#xff0c;给大家选择的空间也不多&#xff0c;那么我们如何…

centos8运行cloudstack4.18

安装软件&#xff1a; mysql 5.7.42 node v10.24.0 git 2.34.1 jdk openjdk version "11.0.19" 2023-04-18 maven Apache Maven 3.8.3 (ff8e977a158738155dc465c6a97ffaf31982d739)自行配置&#xff1a;nfs 代码克隆地址&#xff1a; git clone https://git-wip-us…

Lesson1-1:OpenCV简介

图像处理 学习目标 了解图像的起源知道数字图像的表示 1 图像的起源 1.1 图像是什么 图像是人类视觉的基础&#xff0c;是自然景物的客观反映&#xff0c;是人类认识世界和人类本身的重要源泉。“图”是物体反射或透射光的分布&#xff0c;“像“是人的视觉系统所接受的图在…

ss客服让您在Facebook 的客户服务更便捷

ss客服让您在Facebook Messenger 的客户服务更便捷 在这个信息时代&#xff0c;新兴通讯软件蓬勃兴起&#xff0c;比如Facebook Messenger。事实证明&#xff0c;这对企业来说非常有利&#xff0c;同时突出了电子邮件、网络聊天和电话等传统渠道的局限性。在传统渠道上&#xf…

fastadmin表格列表内部自定义按钮

效果图&#xff1a; 直接上代码&#xff1a; 打开js渲染文件---》找到渲染原生的按钮&#xff1a; {field: "operate",title: __("Operate"),table: table,events: Table.api.events.operate,buttons: [//可多个按钮{name: "record", //名称tex…

vue + js 实现导出excel

效果如下图所示&#xff1a; 下面是具体的步骤&#xff1a; 第一步&#xff1a;安装依赖 **注意&#xff1a;**安装的时候注意版本号 npm install --save file-saver xlsx第二步&#xff1a;新建导出文件 Export2Excel.js /* eslint-disable */ import { saveAs } from f…

C#winform listBox组件批量删除

修改listBox组件属性&#xff1a;可以选中多个板坯号 选中板坯列表&#xff0c;在界面上点击删除按钮&#xff0c;触发删除方法deleteList&#xff1a; private void deleteList() { ListBox.SelectedIndexCollection sic listBoxProducts.SelectedIndice…