concurrency - Go lang关闭管道死锁

标签 concurrency go deadlock pipeline

我正在使用Go语言进行数据导入作业,我想将每个步骤编写为闭包,并使用 channel 进行通信,即每个步骤都是并发的。该问题可以通过以下结构来定义。

  1. 从数据源获取小部件
    1. 将源 1 的翻译添加到小部件
    2. 将源 2 的翻译添加到小工具
    3. 将来源 1 的定价添加到小部件
    4. WidgetRevisions添加到Widget
      1. 将源 1 的翻译添加到 WidgetRevisions
      2. 将源 2 的翻译添加到 WidgetRevisions

就本问题而言,我仅处理必须在新小部件上执行的前三个步骤。在此基础上,我假设第四步可以作为管道步骤来实现,该步骤本身是按照子三步管道来实现的,以控制 *WidgetRevision*s

为此,我编写了一小段代码来提供以下 API:

// A Pipeline is just a list of closures, and a smart 
// function to set them all off, keeping channels of
// communication between them.
p, e, d := NewPipeline()

// Add the three steps of the process
p.Add(whizWidgets)
p.Add(popWidgets)
p.Add(bangWidgets)

// Start putting things on the channel, kick off
// the pipeline, and drain the output channel
// (probably to disk, or a database somewhere)
go emit(e)
p.Execute()
drain(d)

我已经实现了它(代码位于 GistGo Playground ),但它陷入僵局,成功失败率为 100%

调用p.Execute()时出现死锁,因为可能其中一个 channel 最终无事可做,没有在任何 channel 上发送任何内容,也没有任何工作可做...

将几行调试输出添加到 emit()drain() ,我看到以下输出,我相信闭包调用之间的管道是正确的,并且我看到一些小部件被省略。

Emitting A Widget
Input Will Be Emitted On 0x420fdc80
Emitting A Widget
Emitting A Widget
Emitting A Widget
Output Will Drain From 0x420fdcd0
Pipeline reading from 0x420fdc80 writing to 0x420fdd20
Pipeline reading from 0x420fdd20 writing to 0x420fddc0
Pipeline reading from 0x420fddc0 writing to 0x42157000

以下是我对这种方法的一些了解:

  • 我相信这种设计让一个或另一个协程“挨饿”的情况并不罕见,我相信这就是陷入僵局的原因
  • 我希望管道首先能够将内容输入其中(API 将实现 Pipeline.Process(*Widget)
    • 如果我能做到这一点,排水可能是一个“步骤”,只是没有将任何内容传递给下一个函数,这可能是一个更干净的 API
  • 我知道我还没有实现任何类型的梯级缓冲区,因此完全有可能使机器的可用内存重载
  • 我真的不相信这是好的 Go 风格...但它似乎利用了很多 Go 功能,但这并不是真正的好处
  • 由于 WidgetRevisions 也需要管道,我想让我的管道更加通用,也许是 interface{} type 是解决方案,我不知道 Go 是否足够好来确定这是否明智。
  • 有人建议我考虑实现互斥锁来防止竞争条件,但我相信我可以保存,因为每个闭包都会在 Widget 结构的一个特定单元上运行,但是我很乐意接受教育关于这个话题。

总结:我怎样才能修复这个代码,应该我修复这个代码,如果你是一个比我更有经验的 go 程序员,你会如何解决这个问题“连续工作单元”问题?

最佳答案

我只是认为我不会构建远离 channel 的抽象。显式管道传输。

您可以很容易地为所有实际的管道操作创建一个函数,如下所示:

type StageMangler func(*Widget)

func stage(f StageMangler, chi <-chan *Widget, cho chan<- *Widget) {
    for widget := range chi {
                f(widget)
                cho <- widget
    }
    close(cho)
}

然后你可以传入 func(w *Widget) { w.Whiz = true} 或类似于阶段构建器。

此时您的添加可以拥有这些及其工作人员数量的集合,因此特定阶段可以更轻松地拥有n个工作人员。

我只是不确定这比直接将 channel 拼接在一起更容易,除非您在运行时构建这些管道。

关于concurrency - Go lang关闭管道死锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/13810261/

相关文章:

Java:并发读取 InputStream

javascript - 运行minio服务器时不会出现对minio浏览器的更改

python - 如何用 gevent 和 oursql 解决这个死锁

java - Java 未检测到死锁

c# - Process.StandardOutput.ReadToEnd() 中的死锁问题;

java - 实现在执行过程中返回结果的多个并发线程的最佳方法?

c++ - 在 C++ 中使用多线程时是否可以读取写了一半、损坏的原始变量?

java - 基于另一个线程结果的数据库回滚

file - 我应该在同时写入/读取时关闭文件吗?

enums - 在 Go 中表示枚举的惯用方式是什么?