ruby-on-rails - Ruby 中的互斥锁不适用于 Redis?

标签 ruby-on-rails ruby redis mutex sidekiq

我有批量导入的需求。文件可以包含 1000 条记录,每条记录都需要验证。用户希望知道有多少记录无效。最初我是用 Ruby 的 Mutex 和 Redis 的发布/订阅来做到这一点的。请注意,我有 20 个并发线程通过 Sidekiq 处理每条记录:

class Record < ActiveRecord::Base
  class << self
    # invalidated_records is SHARED memory for the Sidekiq worker threads
    attr_accessor :invalidated_records
    attr_accessor :semaphore
  end

  def self.batch_import
  self.semaphore = Mutex.new  
  self.invalid_records = []    
  redis.subscribe_with_timeout(180, 'validation_update') do |on|
    on.message do |channel, message|
      if message.to_s =~ /\d+|import_.+/
        self.semaphore.synchronize {
          self.invalidated_records << message
        }  
      elsif message == 'exit'
        redis.unsubscribe
      end
    end
  end
  end
end

Sidekiq 会发布到 Record 对象:

Redis.current.publish 'validation_update', 'import_invalid_address'

问题是发生了一些奇怪的事情。 Record.invalidated_records 中不会填充所有无效导入。他们中有许多人,但不是全部。我认为这是因为多个线程试图同时更新对象,它污染了对象。而且我认为互斥锁可以解决这个问题。但是仍然在添加 Mutex 锁之后,并不是所有的 invalid 都被填充到 Record.invalidated_records 中。

最终,我使用了 Redis 原子递减和递增来跟踪无效导入,这非常有效。但我很好奇 Ruby Mutex 和多个线程试图更新 Record.invalidated_records 的问题是什么?

最佳答案

我没有使用互斥锁,但我认为发生的事情是线程看到信号量被锁定并跳过保存 << 消息 你需要使用 https://apidock.com/ruby/ConditionVariable 等待互斥锁解锁,然后保存数据

关于ruby-on-rails - Ruby 中的互斥锁不适用于 Redis?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49826254/

相关文章:

ruby-on-rails - 如何让 Rspec2 支持不同路径中的模型和规范?

ruby-on-rails - 生成用于识别记录的唯一随机字符串

ruby - 使用 FasterCSV 解析这一行的正确方法?

ruby - ABC 大小太高,即使没有分支、赋值或条件

node.js - 我应该如何将 JSON 存储在 redis 中?

ruby-on-rails - 有没有办法让 resque 作业将生成的 pdf 直接推送到浏览器?

ruby-on-rails - Sidekiq/Redis 对不存在的作业进行排队

带通配符表达式的 Ruby 'require'

redis - 当redis-server守护进程存在时如何关闭redis-server?

redis - 检查并增加 Redis 中的计数器