multithreading - 每个唯一 id 不超过一个并发线程运行代码的算法

标签 multithreading go concurrency

我有一个 Go web 应用程序,它需要在每个唯一 ID 仅在一个 goroutine 中执行给定的代码部分。场景是我的请求带有代表某种交易的各种 ID。需要保证对这些操作的特定子集对于给定 ID 仅“一次一个”运行(并且其他竞争请求应该阻塞,直到前一个处理/为该 ID 完成)。

我可以想出几种方法来做到这一点,但簿记似乎很棘手 - 需要保留一个全局互斥锁来锁定对正在发生的并发请求的映射的访问,然后从那里使用互斥锁或计数器,并且然后确保它不会死锁,然后垃圾收集(或仔细引用计数)旧的请求条目。我可以做到这一点,但听起来很容易出错。

在这种情况下,标准库中是否有模式或其他东西可以很容易地使用并取得良好效果?没有看到任何明显的东西。

编辑: 我认为在我上面的解释中令人困惑的一件事是“交易”一词的使用。在我的例子中,每一个都不需要明确的关闭——它只是一个标识符来关联多个操作。由于我对这些没有明确的“关闭”或“结束”概念,我可能会在同一秒内收到 3 个请求,每个操作需要 2 秒——我需要序列化这些请求,因为同时运行它们会造成严重破坏;但是一周后我可能会收到一个具有相同 ID 的请求,它指的是同一组操作(ID 只是数据库表中的 PK)。

最佳答案

need to keep a global mutex to lock access to a map of what concurrent requests are happening and then use a mutex or a counter from there, and then make sure it doesn't deadlock, and then garbage collect (or carefully reference count) old request entries

这似乎太复杂了。以下是我的做法:

  • 所有 map 内容都应由一个线程(您的调度程序)处理,这样您就不必处理锁定问题。这假设工作时间远大于 dispatch 时间。调度员跟踪每个 ID 的 channel 和计数器(显然是在 map 中)。
  • 唯一复杂的是如何处理“goroutine 认为它已经完成 ID 工作”“调度程序刚刚找到更多工作” 的竞争。答案是工作人员请求进行清理,但调度程序决定清理请求是否可行。

下面是代码的工作原理:

1) 调度过程从单个输入 channel 读取。它收到两种类型的请求:“新工作”(来自外部)和“完成工作”(来自工作人员)。两个请求都包含一个 ID。

2) 调度员收到一条“新工作”消息:按 ID 在 map 中查找。如果您找到一个 channel + 一个计数,请将作品发送到该 channel 并增加计数。 (*) 如果您什么也没找到,请在 map 中创建一个新 channel + 计数,将工作发送到 channel (也增加计数),然后创建一个在该 channel 上读取的工作程序(go-routine)。

3) worker goroutine 显然会从 channel 中拉取“新工作”并完成工作。完成后,它会向 Dispatcher 发送“完成工作”请求。

4) 调度员收到“完成工作”消息。在 map 中查找并找到 channel +计数器。递减计数器。如果为零,则向工作人员发送“退出”消息,并删除 map 中的条目

5) 如果 worker goroutine 收到“退出”消息(而不是工作消息),它就会退出。 (请注意,在一个很小的比赛中,可以在旧 worker 退出时创建该 ID 上的第二个 worker 。但是旧 worker 只会处理退出消息,所以没关系。旧 worker 会清理自己向上,包括旧 channel 。)

如果您的请求足够慢, map 中一次只会有一个条目。另一个极端是,如果您对同一 ID 的请求足够快,则该 ID 的 channel 将保持事件状态(只是计数器会上升和下降)。

(*) 注意:如果您使 channel 深度为 5,并且有 6 条消息排队,调度程序将停止。我认为您可以在这种情况下扩展 channel 深度,但我不确定。

关于multithreading - 每个唯一 id 不超过一个并发线程运行代码的算法,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31594150/

相关文章:

go - 如何通知另一个 goroutine 停止?

java在服务器上并行运行服务

c++ - 多线程 C++ 程序未使用 vector<thread> 和 .join() 并行运行

python - Python 脚本中的多线程

console - 如何从Golang创建可执行文件,该可执行文件在运行时不会打开控制台窗口?

json - 在 Go 中将映射转换为字符串

java - Java Web 应用程序中的线程

c - 关键部分中使用的资源是否需要 volatile?

java - 指令重新排序和发生之前的关系

multithreading - 一个程序怎么可能只包含线程安全的类,但不是线程安全的?