我有批量导入的需求。文件可以包含 1000 条记录,每条记录都需要验证。用户希望知道有多少记录无效。最初我是用 Ruby 的 Mutex 和 Redis 的发布/订阅来做到这一点的。请注意,我有 20 个并发线程通过 Sidekiq 处理每条记录:
class Record < ActiveRecord::Base
class << self
# invalidated_records is SHARED memory for the Sidekiq worker threads
attr_accessor :invalidated_records
attr_accessor :semaphore
end
def self.batch_import
self.semaphore = Mutex.new
self.invalid_records = []
redis.subscribe_with_timeout(180, 'validation_update') do |on|
on.message do |channel, message|
if message.to_s =~ /\d+|import_.+/
self.semaphore.synchronize {
self.invalidated_records << message
}
elsif message == 'exit'
redis.unsubscribe
end
end
end
end
end
Sidekiq 会发布到 Record 对象:
Redis.current.publish 'validation_update', 'import_invalid_address'
问题是发生了一些奇怪的事情。 Record.invalidated_records 中不会填充所有无效导入。他们中有许多人,但不是全部。我认为这是因为多个线程试图同时更新对象,它污染了对象。而且我认为互斥锁可以解决这个问题。但是仍然在添加 Mutex 锁之后,并不是所有的 invalid 都被填充到 Record.invalidated_records 中。
最终,我使用了 Redis 原子递减和递增来跟踪无效导入,这非常有效。但我很好奇 Ruby Mutex 和多个线程试图更新 Record.invalidated_records 的问题是什么?
最佳答案
我没有使用互斥锁,但我认为发生的事情是线程看到信号量被锁定并跳过保存 << 消息 你需要使用 https://apidock.com/ruby/ConditionVariable 等待互斥锁解锁,然后保存数据
关于ruby-on-rails - Ruby 中的互斥锁不适用于 Redis?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49826254/