elasticsearch - 排队机制和 Elasticsearch 1.4.0

标签 elasticsearch rabbitmq spring-integration spring-amqp

我有一个 RabbitMQ 代理,我在其上发布不同的消息,这些消息最终将作为 Elasticsearch 中的文档。代理有多个消费者,它们实际上是分配给 amqp 入站网关的任务执行器中的不同线程(此处使用 spring 集成和 spring amqp)。

考虑以下场景:我在 ES 中创建了一个具有结构的文档

{
   "field1" : "value1",
   "field2" : "value2"
}

之后我发送两个更新请求,都更新同一个字段,比如说 field1 .如果我一个接一个地发送此消息(生产中的常见用例),我的消费者线程将以正确的顺序获取消息(amqp 允许这样做),但处理可能以错误的顺序发生,并且稍后更新的值可能是被第一个覆盖。我最终会得到扭曲的数据。

如何确保我的数据不会损坏? => 拥有 1 个单一消费者线程是不够的,因为如果我想通过添加更多机器来扩展我的消费应用程序,我最终仍然会拥有多个消费者。我可能需要对消息进行排序,但是拥有多台机器我可能需要创建某种集群感知组件,我正在使用 SI,所以在我看来这似乎很难做到。

在 ES 的 1.2 之前的版本中,我们使用了外部版本,例如时间戳,ES 会抛出 VersionConflictException在我的场景中:第一次更新将有版本 10000 比方说,第二个 10001,如果首先处理第一个,ES 将拒绝版本为 10000 的请求,因为它低于现有版本。但是从最新版本开始,ES 家伙 have removed this functionality用于更新操作。

最佳答案

一种解决方案可能是使用多个队列并在每个队列上有一个消费者;使用散列函数总是将同一文档的更新路由到同一队列,参见 RabbitMQ Tutorials对于各种选项。

您可以通过添加更多队列(并更改哈希函数)来扩展。

为了弹性,请考虑在 Spring XD 中运行您的消费者。 .您可以拥有每个 rabbit 源的单个实例(对于每个队列),如果它发生故障,XD 将负责将其故障转移到另一个容器节点。

否则,您可以通过热备用来自己动手 - 使用 auto-startup="false" 配置的入站适配器并有一些监控和使用<control-bus/>如果事件实例关闭,则启动一个新实例。

编辑:

回应下面的第四条评论。

正如我上面所说,要向外扩展,您必须更改散列函数。所以在运行时自动添加消费者会很棘手。

您不必在 jar 中硬编码队列名称,您可以使用属性占位符并从属性、系统属性或环境变量中填充它。

此解决方案是最简单的,但确实有这些限制。

但是,您可以构建一个可以扩展它的管理应用程序 - 停止生产者,等待所有队列静默,重新配置消费者并重新启动生产者 - Spring Integration 提供了 <control-bus/>启动/停止适配器;你也可以通过 JMX 来实现。

替代解决方案是可能的,但通常需要在集群中维护一些共享状态(可能使用 zookeeper 等),因此要复杂得多;而你 还是 必须处理竞争条件(第二次更新可能在第一次更新之前到达某个消费者)。

关于elasticsearch - 排队机制和 Elasticsearch 1.4.0,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27124663/

相关文章:

Elasticsearch:多个带有快速矢量荧光笔的 pre_tags/post_tags

elasticsearch - 根据嵌套数组中的字段对结果进行排名

RabbitMQ:下一条消息只能在前一条消息被确认后才能出队

RabbitMQ——最佳实践

java - 自动启动 false 不适用于 DefaultMessageListenerContainer

lucene - not_indexed 字段存储在索引中

c# - RabbitMQ 和 Web 服务

java - 使用 Spring Integration 5 上的 Spring Integration Java DSL 在入站 channel 上配置目录扫描器

java - 如何通过重试将 Java Spring Flow 转换为 Spring Integration Work Flow?

java - Java中ElasticSearch中的字段搜索?