multithreading - 如何等待执行

标签 multithreading go concurrency

我有一个您想要并行分析的大型日志文件。

我有这个代码:

package main

import (
    "bufio"
    "fmt"
    "os"
    "time"
)

func main() {
    filename := "log.txt"
    threads := 10

    // Read the  file
    file, err := os.Open(filename)
    if err != nil {
        fmt.Println("Could not open file with the database.")
        os.Exit(1)
    }
    defer file.Close()

    scanner := bufio.NewScanner(file)

    // Channel for strings
    tasks := make(chan string)

    // Run the threads that catch events from the channel and understand one line of the log file
    for i := 0; i < threads; i++ {
        go parseStrings(tasks)
    }

    // Start a thread load lines from a file into the channel
    go getStrings(scanner, tasks)

    // At this point I have to wait until all of the threads executed
    // For example, I set the sleep
    for {
        time.Sleep(1 * time.Second)
    }
}

func getStrings(scanner *bufio.Scanner, tasks chan<- string) {
    for scanner.Scan() {
        s := scanner.Text()
        tasks <- s
    }
}

func parseStrings(tasks <-chan string) {
    for {
        s := <-tasks
        event := parseLine(s)
        fmt.Println(event)
    }
}

func parseLine(line string) string {
    return line
}

实际上,正如我等待所有线程结束? 我被建议创建一个单独的线程,我将在其中添加结果,但是如何添加?

最佳答案

使用管道模式和“扇出/扇入”模式:

package main

import (
    "bufio"
    "fmt"
    "strings"
    "sync"
)

func main() {
    file := "here is first line\n" +
        "here is second line\n" +
        "here is line 3\n" +
        "here is line 4\n" +
        "here is line 5\n" +
        "here is line 6\n" +
        "here is line 7\n"
    scanner := bufio.NewScanner(strings.NewReader(file))

    // all lines onto one channel
    in := getStrings(scanner)

    // FAN OUT
    // Multiple functions reading from the same channel until that channel is closed
    // Distribute work across multiple functions (ten goroutines) that all read from in.
    xc := fanOut(in, 10)

    // FAN IN
    // multiplex multiple channels onto a single channel
    // merge the channels from c0 through c9 onto a single channel
    for n := range merge(xc) {
        fmt.Println(n)
    }
}

func getStrings(scanner *bufio.Scanner) <-chan string {
    out := make(chan string)
    go func() {
        for scanner.Scan() {
            out <- scanner.Text()
        }
        close(out)
    }()
    return out
}

func fanOut(in <-chan string, n int) []<-chan string {
    var xc []<-chan string
    for i := 0; i < n; i++ {
        xc = append(xc, parseStrings(in))
    }
    return xc
}

func parseStrings(in <-chan string) <-chan string {
    out := make(chan string)
    go func() {
        for n := range in {
            out <- parseLine(n)
        }
        close(out)
    }()
    return out
}

func parseLine(line string) string {
    return line
}

func merge(cs []<-chan string) <-chan string {
    var wg sync.WaitGroup
    wg.Add(len(cs))

    out := make(chan string)
    for _, c := range cs {
        go func(c <-chan string) {
            for n := range c {
                out <- n
            }
            wg.Done()
        }(c)
    }

    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

Check it out on the playground .

关于multithreading - 如何等待执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34417872/

相关文章:

java - 警惕 volatile /同步性能损失

java - ScheduledExecutorService 中终止的任务会发生什么情况?

sorting - sort.Reverse 在 Go 中

javascript - 共享数据问题

c# - 并发 Web api 请求以及如何处理 ASP.NET 核心中的状态

java - 如何添加线程 MVC 模式 theView 以避免 JFrame 卡住

选择 pthread 中的打印顺序

regex - gorilla Mux 正则表达式

go - 如何使用 Go 连接到非默认 firestore 数据库?

java - CPU 单核 : Need volatile or synchronized?