假设我有 200 个昂贵的方法调用(每个都有不同的参数)。出于某种原因,我可以并行执行其中的 5 个调用,但不能更多。我可以一次执行一个,但一次执行 5 个要快 5 倍。
我想一直执行五件事。不想排五个,等五个都排完了,再排五个。如果我排队 A、B、C、D、E 并且 C 先完成,我想立即用 F 替换它,即使 A 和 B 还没有完成。
我一直在研究这个问题,因为我可以想象它会定期发生。解决方案似乎是生产者-消费者模式,Ruby 在其标准库中内置了一些用于该模式的结构(Queue
和 SizedQueue
)。我玩过代码示例,阅读了一些文档,我想我对它有一个粗略的了解。但是我有一些问题我对我的解决方案没有信心,而且多线程的整个领域对我来说都是新领域,所以我想我会在这里问以确保我不是,你知道的,完全错误而且只是幸运.
所以这是我写的一个测试程序:
q = Queue.new
q << 'balloon'
q << 'sandwich'
q << 'clown'
q << 'fairy floss'
q << 'ferris wheel'
q << 'magician'
q << 'cake'
q << 'present'
q << 'chip'
q << 'game'
q << 'animal'
consumer_1 = Thread.new do
until q.empty?
sleep rand(0..10)
print "#{q.pop}\n"
end
end
# consumer 2 and 3 are identical to consumer 1
[consumer_1, consumer_2, consumer_3].map(&:join)
队列包含生日聚会所需的元素 list 。 3 个消费者线程处理列表。
这有效,我的问题是:
如果消费者的数量决定了并行处理的项目数量,那么拥有一个大小队列有什么意义?
是否只有在任务无限、未知或数量庞大并且您想在填满队列之前暂停的情况下,大小队列才有用?
我是否未能正确实现问题?手动创建多个线程,然后对它们调用 join
看起来有点乱。有更好的解决方案吗?
最佳答案
SizedQueue 可防止生产者添加项目的速度快于消费者可以消费的速度。
If there is no space left in the queue, waits until space becomes available
If the queue is empty, the calling thread is suspended until data is pushed onto the queue.
SizedQueue 与队列
require 'thread'
require 'logger'
queue = SizedQueue.new(3)
#queue = Queue.new # goes berzerk
logger = Logger.new(STDOUT)
Thread.new do
item = 0
loop do
item += 1
queue << item
logger.info "#{item} produced"
end
end
consumers = 2.times.map do |i|
Thread.new do
loop do
item = queue.pop
logger.info "consumed #{item}"
sleep item
end
end
end
consumers.each(&:join)
如何停止工作线程
require 'thread'
require 'logger'
queue = Queue.new
logger = Logger.new(STDOUT)
consumers_count = 5
end_object = BasicObject.new
consumers = consumers_count.times.map do
Thread.new do
until (item = queue.pop) == end_object
logger.info "consumed #{item}"
end
end
end
1000.times.each { |item| queue << item }
consumers_count.times { queue << end_object }
consumers.each(&:join)
进一步阅读:
关于ruby - 在 Ruby 中实现生产者消费者模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29762173/