elasticsearch - Logstash -> Elasticsearch - 更新非规范化数据

标签 elasticsearch logstash event-sourcing denormalization

用例说明

我们有一个关系数据库,其中包含有关我们日常运营的数据。目标是让用户使用全文搜索引擎搜索重要数据。数据是规范化的,因此不是进行全文查询的最佳形式,所以我们的想法是对数据的一个子集进行非规范化,并将其实时复制到 Elasticsearch,这使我们能够创建一个快速准确的搜索应用程序.

我们已经有一个系统可以启用 Event Sourcing我们的数据库操作(插入、更新、删除)。事件只包含更改的列和主键(在更新时我们没有得到整行)。 Logstash 已收到每个事件的通知,因此这部分已处理。


实际问题

现在我们开始解决问题了。由于计划是对我们的数据进行非规范化,因此我们必须确保父对象的更新传播到 Elasticsearch 中的非规范化子对象。我们如何配置 logstash 来执行此操作?

例子

假设我们在 Elasticsearch 中维护一个 Employees 列表。每个 Employee 都分配给一个 Company。由于数据是非规范化的(为了加快搜索速度),每个 Employee 还带有 Company 的名称和地址。更新更改了 Company 的名称 - 我们如何配置 logstash 以更新分配给 Company 的所有 Employees 中的公司名称>?


补充说明

@Darth_Vader: 我们面临的问题是,我们收到一个事件,表明 Company 发生了变化,但是我们想在 Elasticsearch 中修改类型为 Employee 的文档,因为它们携带了关于公司本身。您的回答预计我们将为每个 Employee 获取一个事件,但事实并非如此。

也许这会让事情变得更清楚。我们在 Elasticsearch 有 3 名员工:

{type:'employee',id:'1',name:'Person 1',company.cmp_id:'1',company.name:'Company A'}
{type:'employee',id:'2',name:'Person 2',company.cmp_id:'1',company.name:'Company A'}
{type:'employee',id:'3',name:'Person 3',company.cmp_id:'2',company.name:'Company B'}

然后在源数据库中发生更新。

UPDATE company SET name = 'Company NEW' WHERE cmp_id = 1;

我们在 logstash 中得到一个事件,它说的是这样的:

{type:'company',cmp_id:'1',old.name:'Company A',new.name:'Company NEW'}

然后应将其传播到 Elasticsearch,以便生成的员工:

{type:'employee',id:'1',name:'Person 1',company.cmp_id:'1',company.name:'Company NEW'}
{type:'employee',id:'2',name:'Person 2',company.cmp_id:'1',company.name:'Company NEW'}
{type:'employee',id:'3',name:'Person 3',company.cmp_id:'2',company.name:'Company B'}

请注意字段 company.name 已更改。

最佳答案

我建议采用与我发布的内容类似的解决方案 here ,即使用 http 输出插件,以便通过对 Employee 索引的查询调用来发布更新。查询需要如下所示:

POST employees/_update_by_query
{
  "script": {
    "source": "ctx._source.company.name = params.name",
    "lang": "painless",
    "params": {
      "name": "Company NEW"
    }
  },
  "query": {
    "term": {
      "company.cmp_id": "1"
    }
  }
}

因此您的 Logstash 配置应如下所示:

input {
  ... 
}
filter {
  mutate {
    add_field => {
      "[script][lang]" => "painless"
      "[script][source]" => "ctx._source.company.name = params.name"
      "[script][params][name]" => "%{new.name}"
      "[query][term][company.cmp_id]" => "%{cmp_id}"
    }
    remove_field => ["host", "@version", "@timestamp", "type", "cmp_id", "old.name", "new.name"]
  }
}
output {
  http {
    url => "http://localhost:9200/employees/_update_by_query"
    http_method => "post"
    format => "json"
  }
}

关于elasticsearch - Logstash -> Elasticsearch - 更新非规范化数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41976143/

相关文章:

mysql - Logstash 输入 MySQL

Elasticsearch 从日期字段按年份过滤

elasticsearch - 节点不会加入集群 : NotMasterException (Weird master election bug)

java - Logstash由于错误而停止处理:(NameError)无法加载(ext)(org.jruby.ext.openssl.OpenSSL)

elasticsearch - 无法在端口 5601 上加载 Kibana

cqrs - 带有 CQRS 和事件溯源的 SO 风格声誉系统

docker - Elasticsearch 错误 : "Native controller process has stopped - no new native processes can be started"

elasticsearch - 在发送到Logstash/ Elasticsearch 之前,可以编写grok表达式来丰富FileBeat中的日志文件

cqrs - 事件溯源和事件/聚合版本

apache-kafka - Apache Kafka 中的事件外包