因此,我尝试通过 Heroku Worker 实例上的 Sidekiq 后台作业处理来处理 CSV 文件。虽然我可以完成这个过程,但我觉得它肯定可以比我现在做的更快/更有效。这个问题包含两个部分 - 首先是数据库池设置是否正确,其次是如何优化流程。
应用环境:
- Rails 4 应用程序
- unicorn
- Sidekiq
- Redis-to-go(迷你计划,最多 50 个连接)
- CarrierWave S3 实现
- Heroku Postgres(标准 Yanari,最多 60 个连接)
- 1 Heroku Web dyno
- 1 Heroku Worker dyno
- NewRelic 监控
配置/unicorn.rb
worker_processes 3
timeout 15
preload_app true
before_fork do |server, worker|
Signal.trap 'TERM' do
puts 'Unicorn master intercepting TERM and sending myself QUIT instead'
Process.kill 'QUIT', Process.pid
end
if defined?(ActiveRecord::Base)
ActiveRecord::Base.connection.disconnect!
end
end
after_fork do |server, worker|
Signal.trap 'TERM' do
puts 'Unicorn worker intercepting TERM and doing nothing. Wait for master to send QUIT'
end
if defined?(ActiveRecord::Base)
config = ActiveRecord::Base.configurations[Rails.env] ||
Rails.application.config.database_configuration[Rails.env]
config['reaping_frequency'] = ENV['DB_REAP_FREQ'] || 10 # seconds
config['pool'] = ENV['DB_POOL'] || 2
ActiveRecord::Base.establish_connection(config)
end
end
配置/sidekiq.yml
---
:concurrency: 5
staging:
:concurrency: 5
production:
:concurrency: 35
:queues:
- [default, 1]
- [imports, 10]
- [validators, 10]
- [send, 5]
- [clean_up_tasks, 30]
- [contact_generator, 20]
config/initializers/sidekiq.rb
ENV["REDISTOGO_URL"] ||= "redis://localhost:6379"
Sidekiq.configure_server do |config|
config.redis = { url: ENV["REDISTOGO_URL"] }
database_url = ENV['DATABASE_URL']
if database_url
ENV['DATABASE_URL'] = "#{database_url}?pool=50"
ActiveRecord::Base.establish_connection
end
end
Sidekiq.configure_client do |config|
config.redis = { url: ENV["REDISTOGO_URL"] }
end
数据库连接池的计算方式如下:
我有 3 个 Web 进程 (unicorn worker_processes),我通过 after_fork Hook (config/unicorn.rb) 为每个进程分配 2 个 ActiveRecord 连接,以分配给 Web 的 60 个可用 Postgres 连接中的 6 个(最多)动力。在 Sidekiq 初始化程序中,我通过附加到 ENV['DATABASE_URL'] 的 ?pool=50 参数分配 50 个 Postgres 连接,如文档中(某处)所述。我将 Sidekiq 并发值保持在 35 (sidekiq.yml),以确保保持在 50 个 Redis 连接和 60 个 Postgres 连接限制之下。这仍然需要更细粒度的调整,但我宁愿在进一步处理之前先对数据处理本身进行排序。
现在,假设上述内容是正确的(如果不正确,我也不会感到惊讶),我正在处理以下场景:
用户上传要通过浏览器处理的 CSV 文件。该文件的行数可以在 50 行到 1000 万行之间。文件通过 CarrierWave gem 上传到 S3。
然后,用户通过 UI 配置一些导入设置,最终将 FileImporter 作业添加到 Sidekiq 队列中,以开始根据行创建各种模型。
导入工作程序看起来像:
class FileImporter
include Sidekiq::Worker
sidekiq_options :queue => :imports
def perform(import_id)
import = Import.find_by_id import_id
CSV.foreach(open(import.csv_data), headers: true) do |row|
# import.csv_data is the S3 URL of the file
# here I do some validation against a prebuilt redis table
# to validate the row without making any activerecord calls
# (business logic validations rather than straight DB ones anyway)
unless invalid_record # invalid_record being the product of the previous validations
# queue another job to actually create the AR models for this row
ImportValidator.perform_async(import_id, row)
# increment some redis counters
end
end
end
这很慢 - 我试图限制 FileImporter 工作线程中对 ActiveRecord 的调用,所以我假设这是因为我正在从 S3 流式传输文件。它处理行的速度不够快,无法建立队列,因此我从不利用所有工作线程(通常 35 个可用线程中的 15-20 个处于事件状态。我尝试将此作业拆分并提供 100 个行)一次进入一个中间工作人员,然后该工作人员以更并行的方式创建 ImportValidator 作业,但效果并没有好多少。
所以我的问题是,完成这样的任务的最佳/最有效的方法是什么?
最佳答案
有可能您的 CPU 利用率为 100%,有 20 个线程。您需要另一个测功机。
关于ruby-on-rails - 优化 Sidekiq、Redis、Heroku 和 Rails,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24403843/