ruby - 在 Ruby 中实现生产者消费者模式

标签 ruby multithreading

假设我有 200 个昂贵的方法调用(每个都有不同的参数)。出于某种原因,我可以并行执行其中的 5 个调用,但不能更多。我可以一次执行一个,但一次执行 5 个要快 5 倍。

我想一直执行五件事。不想排五个,等五个都排完了,再排五个。如果我排队 A、B、C、D、E 并且 C 先完成,我想立即用 F 替换它,即使 A 和 B 还没有完成。

我一直在研究这个问题,因为我可以想象它会定期发生。解决方案似乎是生产者-消费者模式,Ruby 在其标准库中内置了一些用于该模式的结构(QueueSizedQueue)。我玩过代码示例,阅读了一些文档,我想我对它有一个粗略的了解。但是我有一些问题我对我的解决方案没有信心,而且多线程的整个领域对我来说都是新领域,所以我想我会在这里问以确保我不是,你知道的,完全错误而且只是幸运.

所以这是我写的一个测试程序:

q = Queue.new
q << 'balloon'
q << 'sandwich'
q << 'clown'
q << 'fairy floss'
q << 'ferris wheel'
q << 'magician'
q << 'cake'
q << 'present'
q << 'chip'
q << 'game'
q << 'animal'

consumer_1 = Thread.new do
  until q.empty?
    sleep rand(0..10)
    print "#{q.pop}\n"
  end
end

# consumer 2 and 3 are identical to consumer 1

[consumer_1, consumer_2, consumer_3].map(&:join)

队列包含生日聚会所需的元素 list 。 3 个消费者线程处理列表。

这有效,我的问题是:

如果消费者的数量决定了并行处理的项目数量,那么拥有一个大小队列有什么意义?

是否只有在任务无限、未知或数量庞大并且您想在填满队列之前暂停的情况下,大小队列才有用?

我是否未能正确实现问题?手动创建多个线程,然后对它们调用 join 看起来有点乱。有更好的解决方案吗?

最佳答案

SizedQueue 可防止生产者添加项目的速度快于消费者可以消费的速度。

SizedQueue#push

If there is no space left in the queue, waits until space becomes available

Queue#pop/SizedQueue#pop

If the queue is empty, the calling thread is suspended until data is pushed onto the queue.

SizedQueue 与队列

require 'thread'
require 'logger'

queue = SizedQueue.new(3)
#queue = Queue.new # goes berzerk
logger = Logger.new(STDOUT)

Thread.new do
  item = 0
  loop do
    item += 1
    queue << item
    logger.info "#{item} produced"
  end
end

consumers = 2.times.map do |i|
  Thread.new do
    loop do
      item = queue.pop
      logger.info "consumed #{item}"
      sleep item
    end
  end
end

consumers.each(&:join)

如何停止工作线程

require 'thread'
require 'logger'

queue = Queue.new
logger = Logger.new(STDOUT)
consumers_count = 5

end_object = BasicObject.new

consumers = consumers_count.times.map do
  Thread.new do
    until (item = queue.pop) == end_object
      logger.info "consumed #{item}"
    end
  end
end

1000.times.each { |item| queue << item }
consumers_count.times { queue << end_object }

consumers.each(&:join)

进一步阅读:

关于ruby - 在 Ruby 中实现生产者消费者模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29762173/

相关文章:

ruby-on-rails - Rails 3 和 PDFKit。如何指定页面大小?

ruby-on-rails - Resque - 类的未定义方法 'perform'

c# - 在多线程中更新 UI 的最佳方法是什么

java - 在线程中读取时的 BufferedReader readline

ios - 什么可能导致此代码不允许重新进入跳板?

ruby-on-rails - 嵌套资源错误 : Circular dependency detected while autoloading constant

ruby - 有没有人让 Padrino 与 Sprockets 和 Compass 一起工作?

java - 如何在 JavaFX 线程之外更新 TableView 项目

ruby-on-rails - 如何进行 Ruby on Rails 并发测试?

android - 除非我在停止后留出足够的时间,否则为什么不会在下一场比赛中调用 onMarkerReached?