ruby - Ruby-Elastic Search和RabbitMQ-数据导入丢失,脚本无提示崩溃

标签 ruby json elasticsearch rabbitmq bunny

堆垛机

我在RabbitMQ队列中有很多消息(在我的开发环境中的localhost上运行)。消息的有效负载是一个JSON字符串,我想直接加载到Elastic Search中(目前也正在localhost上运行)。我编写了一个快速的ruby脚本,以将消息从队列中拉出并将它们加载到ES中,如下所示:

#! /usr/bin/ruby
require 'bunny'
require 'json'
require 'elasticsearch'

# Connect to RabbitMQ to collect data
mq_conn = Bunny.new
mq_conn.start
mq_ch = mq_conn.create_channel
mq_q  = mq_ch.queue("test.data")

# Connect to ElasticSearch to post the data
es = Elasticsearch::Client.new log: true

# Main loop - collect the message and stuff it into the db.
mq_q.subscribe do |delivery_info, metadata, payload|
    begin
            es.index index: "indexname",
                     type:  "relationship",
                     body:  payload

    rescue
            puts "Received #{payload} - #{delivery_info} - #{metadata}"
            puts "Exception raised"
            exit
    end
end
mq_conn.close

队列中大约有4,000,000条消息。

运行脚本时,我看到一堆消息(例如30条)被很好地加载到Elastic Search中。但是,我看到大约500条消息正在排队。
root@beep:~# rabbitmqctl list_queues
Listing queues ...
test.data    4333080
...done.
root@beep:~# rabbitmqctl list_queues
Listing queues ...
test.data    4332580
...done.

然后,脚本会悄悄退出,而不会告诉我一个异常。 begin / rescue块永远不会触发异常,因此我不知道为什么脚本要提早完成或丢失这么多消息。任何提示我下一步应该如何调试。

一种

最佳答案

我在这里添加了一个简单的工作示例:

https://github.com/elasticsearch/elasticsearch-ruby/blob/master/examples/rabbitmq/consumer-publisher.rb

如果不提供测试数据示例,则很难调试示例。

Elasticsearch的“河”功能已被弃用,最终将被删除。如果RabbitMQ和Elasticsearch是基础架构的核心部分,那么您绝对应该花时间编写自己的自定义馈送器。

关于ruby - Ruby-Elastic Search和RabbitMQ-数据导入丢失,脚本无提示崩溃,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22172975/

相关文章:

java - 使用 Jackson API 在 Java 中解析 JSON 文件

elasticsearch - Elasticsearch 中的非条件

elasticsearch - 无法创建有效的geo_shape

ruby - 打包编译好的二进制文件/带 Ruby Gem

ruby - 检查一个数组中的任何数字是否小于另一个数组中的某个数字

c++ - 在循环中从 C++ 调用 Ruby 函数导致 "stack level too deep"

json - 如何存储参数化方法的类型参数,然后使用它们将 JSON 对象转换为泛型类型的普通对象?

ruby-on-rails - 无法在centos上安装therubyracer(V8和GCC出错)

sql-server - 不使用 DTU 的 Azure SQL Server JSON

parsing - Logstash将同一单词的多个实例解析为Elasticsearch Array