go - 同时读取文件

标签 go

读取部分不是并发的,但处理是并发的。我以这种方式表述标题是因为我最有可能使用该短语再次搜索此问题。 :)

在尝试超越示例之后我遇到了僵局,所以这对我来说是一次学习经历。我的目标是:

  1. 逐行读取文件(最终使用缓冲区来做多行)。
  2. 将文本传递给执行一些正则表达式工作的 func()
  3. 将结果发送到某处,但避免使用互斥锁或共享变量。我正在向 channel 发送整数(始终为数字 1)。这有点傻,但如果它没有引起问题,我想保持这种状态,除非你们有更整洁的选择。
  4. 使用工作池来执行此操作。我不知道如何告诉 worker 自己重新排队?

这里是 playground link 。我试图写有用的评论,希望这是有道理的。我的设计可能完全错误,所以请不要犹豫重构。

package main

import (
  "bufio"
  "fmt"
  "regexp"
  "strings"
  "sync"
)

func telephoneNumbersInFile(path string) int {
  file := strings.NewReader(path)

  var telephone = regexp.MustCompile(`\(\d+\)\s\d+-\d+`)

  // do I need buffered channels here?
  jobs := make(chan string)
  results := make(chan int)

  // I think we need a wait group, not sure.
  wg := new(sync.WaitGroup)

  // start up some workers that will block and wait?
  for w := 1; w <= 3; w++ {
    wg.Add(1)
    go matchTelephoneNumbers(jobs, results, wg, telephone)
  }

  // go over a file line by line and queue up a ton of work
  scanner := bufio.NewScanner(file)
  for scanner.Scan() {
    // Later I want to create a buffer of lines, not just line-by-line here ...
    jobs <- scanner.Text()
  }

  close(jobs)
  wg.Wait()

  // Add up the results from the results channel.
  // The rest of this isn't even working so ignore for now.
  counts := 0
  // for v := range results {
  //   counts += v
  // }

  return counts
}

func matchTelephoneNumbers(jobs <-chan string, results chan<- int, wg *sync.WaitGroup, telephone *regexp.Regexp) {
  // Decreasing internal counter for wait-group as soon as goroutine finishes
  defer wg.Done()

  // eventually I want to have a []string channel to work on a chunk of lines not just one line of text
  for j := range jobs {
    if telephone.MatchString(j) {
      results <- 1
    }
  }
}

func main() {
  // An artificial input source.  Normally this is a file passed on the command line.
  const input = "Foo\n(555) 123-3456\nBar\nBaz"
  numberOfTelephoneNumbers := telephoneNumbersInFile(input)
  fmt.Println(numberOfTelephoneNumbers)
}

最佳答案

你快到了,只需要在 goroutine 的同步上做一些工作。您的问题是您试图在同一个例程中输入解析器并收集结果,但这无法完成。

我提出以下建议:

  1. 在单独的例程中运行扫描仪,读取所有内容后关闭输入 channel 。
  2. 运行单独的例程等待解析器完成其工作,然后关闭输出 channel 。
  3. 在您的主程序中收集所有结果。

相关更改可能如下所示:

// Go over a file line by line and queue up a ton of work
go func() {
    scanner := bufio.NewScanner(file)
    for scanner.Scan() {
        jobs <- scanner.Text()
    }
    close(jobs)
}()

// Collect all the results...
// First, make sure we close the result channel when everything was processed
go func() {
    wg.Wait()
    close(results)
}()

// Now, add up the results from the results channel until closed
counts := 0
for v := range results {
    counts += v
}

Playground 上的完整示例:http://play.golang.org/p/coja1_w-fY

值得补充的是,您不一定需要 WaitGroup 来实现相同的功能,您只需要知道何时停止接收结果即可。这可以通过扫描器广告(在 channel 上)读取多少行然后收集器只读取指定数量的结果来实现(尽管您也需要发送零)。

关于go - 同时读取文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27217428/

相关文章:

mongodb - 根据 map 值更新mongo文档并删除该值

go - 包装 io.PipeReader 来存储进度

go - Systemd 未检测到 GOPATH(在没有二进制文件的情况下运行)

go - 有彩色背景的按钮

functional-programming - go-lang curry 怎么做?

go - 如何对 channel /互斥内存消耗/分配进行基准测试?

go - 阅读 channel 的不同方式

json - 隐藏 JSON 中的属性

java - 在Java中验证golang生成的rsa.SignPKCS1v15签名

go - Golang 中的调用函数