multithreading - 半异步代码逻辑

标签 multithreading asynchronous go goroutine

我正在努力找出一种可行的设计,将同步流与异步行为混合在一起。

我有 4 个组件:

  1. 播种者
  2. worker
  3. 出版商
  4. 更新者

我唯一的限制是,一旦 Seeder 播种数据,它必须被阻止,直到 Updater 没有完全完成所有任务的处理。前 3 个组件可以很容易地同步,但更新程序必须并行工作,否则将永远无法完成任务。

所以流程是:

播种机 -> worker -> 发布者 -> 更新者 --> 播种者 -> worker -> 发布者 -> 更新者 ...

并且这个流必须永远旋转。

播种和更新是针对数据库的。不幸的是,这个特定的数据库不允许不同的设计。

我最好的办法是使用 sync.WaitGroup 来同步 Updater goroutines 并将其他所有内容保持在同步状态。更新器的数据通过 channel 提供。

这是一个简化的代码(没有错误,逻辑不多)。

func main() {
    var wg sync.WaitGroup
    c := make(chan Result, 100)

    for {
        data := Seeder()
        msgs := Worker(data)
        results := Publisher(msgs)

        for i := 0; i < 10; i++ {
            wg.Add(1)
            go func(){
                defer wg.Done()
                data := <- c

                // this is the updater
            }(&wg)
        }

        for _, result := range results {
            c <- result
        }
        wg.Wait()
    }
}

结果是代码一直运行直到它在某个周期停止并且永远不会向前移动。我玩过很多变量,加载 100 行而不是 10k,结果没有太大区别。

我还尝试传递一个包含 channel 的结构并异步运行所有内容,但我更难弄清楚更新程序何时完成以便我可以解锁播种器。

感谢任何指点。

最佳答案

不好说,因为你的代码无法编译运行,不清楚你是怎么用c的。至少有一件事是肯定的:wg 应该通过引用传递,而不是通过值传递(sync.WaitGroup 有 nocopy 注释)。然后,我假设您使用 c 将值发送到更新程序。但是你没有提供他们的代码,所以我只能猜测。例如,假设调度的发生使得前 9 个 goroutine 占用了 channel 中的所有内容;然后,最后一个例程将永远阻塞,永远不会释放 WaitGroup。在这种情况下,一个简单的解决方案是在最外层的 for 循环(将第 3 行向下移动两行)和 close c 的每次迭代中创建一个新 channel 。在调用 wg.Wait() 之前。您的更新程序必须能够处理 read from a close channel .

[edit] 我想你正在寻找的是这样的东西:

package main

import (
    "fmt"
    "sync"
)

// Result is a type
type Result struct {
    I int
}

// Seeder is a function
func Seeder() []int {
    fmt.Println("Seeding")
    return []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13}
}

// Worker is a function
func Worker(data []int) []int {
    return data
}

// Publisher is a function
func Publisher(data []int) []Result {
    var r []Result
    for i := 0; i < len(data); i++ {
        r = append(r, Result{I: data[i]})
    }
    return r
}

func updater(c chan Result, wg *sync.WaitGroup) {
    for _ = range c {
        // update here
        wg.Done()
    }
}

func main() {
    var wg sync.WaitGroup

    c := make(chan Result, 100)
    for i := 0; i < 10; i++ {
        go updater(c, &wg)
    }

    for {
        data := Seeder()
        msgs := Worker(data)
        results := Publisher(msgs)

        wg.Add(len(results))
        for _, result := range results {
            c <- result
        }
        wg.Wait()
    }
}

关于multithreading - 半异步代码逻辑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42730506/

相关文章:

java - 如何让Java使用多线程线程?

ruby-on-rails - Ruby 中的纤维有什么意义?

Goroutines被for循环阻塞?

google-app-engine - 未知数量的数据存储过滤器的 Golang 实现(需要附加过滤器的功能)

go - 检查所有 goroutine 是否已完成而不使用 wg.Wait()

c - 为什么用户级线程(纤程)的实现需要为每个纤程分配一个新的堆栈?

Django 和 Channels 以及 ASGI 线程问题

javascript - 使用 Q/promises/异步函数时,如何将值从一个类方法返回到另一个类方法?

c# - 使用 Async & Await 的 ASP.NET C#5 异步 Web 应用程序

c# - Rx 和异步 nunit 测试