文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

使用Go语言,25秒读取16GB文件

2024-12-03 01:52

关注

[[410141]]

我们将使用Go语言,从一个大小为16GB的.txt或.log文件中提取日志。

让我们开始编码……

首先,我们打开文件。对于任何文件的IO,我们都将使用标准的Go os.File。 

  1. f, err :os.Open(fileName)  
  2.  if err != nil {  
  3.    fmt.Println("cannot able to read the file", err)  
  4.    return  
  5.  }  
  6. // UPDATE: close after checking error  
  7. defer file.Close()  //Do not forget to close the file 

打开文件后,我们有以下两个选项可以选择:

逐行读取文件,这有助于减少内存紧张,但需要更多的时间。一次将整个文件读入内存并处理该文件,这将消耗更多内存,但会显著减少时间。

由于文件太大,即16 GB,因此无法将整个文件加载到内存中。但是第一种选择对我们来说也是不可行的,因为我们希望在几秒钟内处理文件。

但你猜怎么着,还有第三种选择。瞧……相比于将整个文件加载到内存中,在Go语言中,我们还可以使用bufio.NewReader()将文件分块加载。 

  1. :bufio.NewReader(f)  
  2. for {  
  3. buf :make([]byte,4*1024) //the chunk size  
  4. n, err :r.Read(buf) //loading chunk into buffer  
  5.    bufbuf = buf[:n]  
  6. if n == 0 { 
  7.       if err != nil {  
  8.        fmt.Println(err)  
  9.        break  
  10.      }  
  11.      if err == io.EOF {  
  12.        break  
  13.      }  
  14.      return err  
  15.   }  

一旦我们将文件分块,我们就可以分叉一个线程,即Go routine,同时处理多个文件区块。上述代码将修改为: 

  1. //sync pools to reuse the memory and decrease the preassure on Garbage Collector  
  2. linesPool :sync.Pool{New: func() interface{} {  
  3.         lines :make([]byte, 500*1024)  
  4.         return lines  
  5. }}  
  6. stringPool :sync.Pool{New: func() interface{} {  
  7.           lines :""  
  8.           return lines  
  9. }}  
  10. slicePool :sync.Pool{New: func() interface{} {  
  11.            lines :make([]string, 100)  
  12.            return lines  
  13. }}  
  14. :bufio.NewReader(f)  
  15. var wg sync.WaitGroup //wait group to keep track off all threads  
  16. for {   
  17.       buf :linesPool.Get().([]byte)  
  18.      n, err :r.Read(buf)  
  19.      bufbuf = buf[:n]  
  20. if n == 0 {  
  21.         if err != nil {  
  22.             fmt.Println(err)  
  23.             break  
  24.         }  
  25.         if err == io.EOF {  
  26.             break  
  27.         }  
  28.         return err  
  29.      }  
  30. nextUntillNewline, err :r.ReadBytes('\n')//read entire line     
  31.       if err != io.EOF { 
  32.          buf = append(buf, nextUntillNewline...)  
  33.      }     
  34.       wg.Add(1)  
  35.      go func() {       
  36.          //process each chunk concurrently  
  37.         //start -> log start time, end -> log end time        
  38.          ProcessChunk(buf, &linesPool, &stringPool, &slicePool,     start, end)  
  39. wg.Done()    
  40.       }()  
  41.  
  42. wg.Wait()  

上面的代码,引入了两个优化点:

sync.Pool是一个强大的对象池,可以重用对象来减轻垃圾收集器的压力。我们将重用各个分片的内存,以减少内存消耗,大大加快我们的工作。Go Routines帮助我们同时处理缓冲区块,这大大提高了处理速度。

现在让我们实现ProcessChunk函数,它将处理以下格式的日志行。 

  1. 2020-01-31T20:12:38.1234Z, Some Field, Other Field, And so on, Till new line,...\n 

我们将根据命令行提供的时间戳提取日志。 

  1. func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, slicePool *sync.Pool, start time.Time, end time.Time) {  
  2. //another wait group to process every chunk further                            
  3.        var wg2 sync.WaitGroup  
  4. logs :stringPool.Get().(string)  
  5. logs = string(chunk)  
  6. linesPool.Put(chunk) //put back the chunk in pool  
  7. //split the string by "\n", so that we have slice of logs  
  8.       logsSlice :strings.Split(logs, "\n")  
  9. stringPool.Put(logs) //put back the string pool  
  10. chunkSize :100 //process the bunch of 100 logs in thread  
  11. :len(logsSlice)  
  12. noOfThread :n / chunkSize  
  13. if n%chunkSize != 0 { //check for overflow   
  14.          noOfThread++  
  15.       }  
  16. length :len(logsSlice)  
  17. //traverse the chunk  
  18.      for i :0; i < length; i += chunkSize {         
  19.           wg2.Add(1) 
  20. //process each chunk in saperate chunk  
  21.          go func(s int, e int) {  
  22.             for i:s; i<e;i++{  
  23.                text :logsSlice[i]  
  24. if len(text) == 0 {  
  25.                   continue  
  26.                }            
  27.              logParts :strings.SplitN(text, ",", 2)  
  28.             logCreationTimeString :logParts[0]  
  29.             logCreationTime, err :time.Parse("2006-01-  02T15:04:05.0000Z", logCreationTimeString)  
  30. if err != nil {  
  31.                  fmt.Printf("\n Could not able to parse the time :%s       for log : %v", logCreationTimeString, text) 
  32.                   return  
  33.             }  
  34. // check if log's timestamp is inbetween our desired period  
  35.           if logCreationTime.After(start) && logCreationTime.Before(end) { 
  36.             fmt.Println(text)  
  37.            }  
  38.         }  
  39.         textSlice = nil 
  40.          wg2.Done()   
  41.       }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))  
  42.    //passing the indexes for processing  
  43. }    
  44.    wg2.Wait() //wait for a chunk to finish  
  45.    logsSlice = nil  

对上面的代码进行基准测试。以16 GB的日志文件为例,提取日志所需的时间约为25秒。

完整的代码示例如下: 

  1. func main() {  
  2.  s :time.Now()  
  3.  args :os.Args[1:]  
  4.  if len(args) != 6 { // for format  LogExtractor.exe -f "From Time" -t "To Time" -i "Log file directory location"  
  5.   fmt.Println("Please give proper command line arguments")  
  6.   return  
  7.  }  
  8.  startTimeArg :args[1]  
  9.  finishTimeArg :args[3]  
  10.  fileName :args[5]  
  11.  file, err :os.Open(fileName)  
  12.  if err != nil {  
  13.   fmt.Println("cannot able to read the file", err)  
  14.   return  
  15.  }  
  16.  defer file.Close() //close after checking err  
  17.  queryStartTime, err :time.Parse("2006-01-02T15:04:05.0000Z", startTimeArg)  
  18.  if err != nil { 
  19.    fmt.Println("Could not able to parse the start time", startTimeArg)  
  20.   return  
  21.  }  
  22.  queryFinishTime, err :time.Parse("2006-01-02T15:04:05.0000Z", finishTimeArg)  
  23.  if err != nil {  
  24.   fmt.Println("Could not able to parse the finish time", finishTimeArg)  
  25.   return  
  26.  }  
  27.  filestat, err :file.Stat()  
  28.  if err != nil {  
  29.   fmt.Println("Could not able to get the file stat")  
  30.   return  
  31.  } 
  32.  fileSize :filestat.Size()  
  33.  offset :fileSize - 1  
  34.  lastLineSize :0  
  35.  for {  
  36.   b :make([]byte, 1)  
  37.   n, err :file.ReadAt(b, offset)  
  38.   if err != nil {  
  39.    fmt.Println("Error reading file ", err)  
  40.    break  
  41.   }  
  42.   char :string(b[0])  
  43.   if char == "\n" { 
  44.     break  
  45.   }  
  46.   offset--  
  47.   lastLineSize += n  
  48.  }  
  49.  lastLine :make([]byte, lastLineSize)  
  50.  _, err = file.ReadAt(lastLine, offset+1) 
  51.  if err != nil {  
  52.   fmt.Println("Could not able to read last line with offset", offset, "and lastline size", lastLineSize)  
  53.   return  
  54.  }  
  55.  logSlice :strings.SplitN(string(lastLine), ",", 2)  
  56.  logCreationTimeString :logSlice[0]  
  57.  lastLogCreationTime, err :time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)  
  58.  if err != nil {  
  59.   fmt.Println("can not able to parse time : ", err)  
  60.  }  
  61.  if lastLogCreationTime.After(queryStartTime) && lastLogCreationTime.Before(queryFinishTime) {  
  62.   Process(file, queryStartTime, queryFinishTime) 
  63.   }  
  64.  fmt.Println("\nTime taken - ", time.Since(s))  
  65. func Process(f *os.File, start time.Time, end time.Time) error {  
  66.  linesPool :sync.Pool{New: func() interface{} {  
  67.   lines :make([]byte, 250*1024)  
  68.   return lines  
  69.  }} 
  70.  stringPool :sync.Pool{New: func() interface{} {  
  71.   lines :""  
  72.   return lines  
  73.  }}  
  74.  r :bufio.NewReader(f)  
  75.  var wg sync.WaitGroup  
  76.  for {  
  77.   buf :linesPool.Get().([]byte)  
  78.   n, err :r.Read(buf)  
  79.   bufbuf = buf[:n]   
  80.   if n == 0 { 
  81.     if err != nil {  
  82.     fmt.Println(err)  
  83.     break  
  84.    }  
  85.    if err == io.EOF {  
  86.     break  
  87.    }  
  88.    return err  
  89.   }  
  90.   nextUntillNewline, err :r.ReadBytes('\n')  
  91.   if err != io.EOF {  
  92.    buf = append(buf, nextUntillNewline...)  
  93.   }  
  94.   wg.Add(1)  
  95.   go func() {  
  96.    ProcessChunk(buf, &linesPool, &stringPool, start, end)  
  97.    wg.Done()  
  98.   }()  
  99.  }  
  100.  wg.Wait()  
  101.  return nil  
  102.   
  103. func ProcessChunk(chunk []byte, linesPool *sync.Pool, stringPool *sync.Pool, start time.Time, end time.Time) { 
  104.   var wg2 sync.WaitGroup  
  105.  logs :stringPool.Get().(string)  
  106.  logs = string(chunk)  
  107.  linesPool.Put(chunk)  
  108.  logsSlice :strings.Split(logs, "\n")  
  109.  stringPool.Put(logs) 
  110.  chunkSize :300  
  111.  n :len(logsSlice)  
  112.  noOfThread :n / chunkSize  
  113.  if n%chunkSize != 0 {  
  114.   noOfThread++ 
  115.  }  
  116.  for i :0; i < (noOfThread); i++ {  
  117.   wg2.Add(1)  
  118.   go func(s int, e int) {  
  119.    defer wg2.Done() //to avaoid deadlocks  
  120.    for i :s; i < e; i++ {  
  121.     text :logsSlice[i]  
  122.     if len(text) == 0 {  
  123.      continue  
  124.     }  
  125.     logSlice :strings.SplitN(text, ",", 2)  
  126.     logCreationTimeString :logSlice[0]  
  127.     logCreationTime, err :time.Parse("2006-01-02T15:04:05.0000Z", logCreationTimeString)  
  128.     if err != nil {  
  129.      fmt.Printf("\n Could not able to parse the time :%s for log : %v", logCreationTimeString, text)  
  130.      return  
  131.     }  
  132.     if logCreationTime.After(start) && logCreationTime.Before(end) {  
  133.      //fmt.Println(text)  
  134.     }  
  135.    }  
  136.   }(i*chunkSize, int(math.Min(float64((i+1)*chunkSize), float64(len(logsSlice)))))  
  137.  }  
  138.  wg2.Wait()  
  139.  logsSlice = nil  
  140.  

 

来源:运维派内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯