join - 加入2个Elasticsearch索引后计算新字段并进行汇总

标签 join elasticsearch aggregation

我已经发布了另一个有关2个Elasticsearch索引-Join elasticsearch indices while matching fields in nested/inner objects之间的相关性的问题。我现在正在尝试在此之上扩展。以下是我根据帖子给出的答案创建的代码。

数据创建:

curl -XPUT http://localhost:9200/currencylookup/inr/1 -d '{
"conv":[
{
"currency":"usd",
"currencyname": "US Dollar",
"units_per_inr":"0.016155969",
"inr_per_unit": "61.89662756" 
},
{
"currency":"inr",
"currencyname": "Indian Rupee",
"units_per_inr":"1",
"inr_per_unit": "1" 
 },

{
"currency":"idr",
"currencyname": "Indonesian Rupiah",
"units_per_inr":"199.2576913",
"inr_per_unit": "0.005018627" 
}
]
}'

curl -XPUT "http://localhost:9200/expenses/overseas/1" -d '{ "amount":"100", "currency":"usd", "location":"USA" }'

curl -XPUT "http://localhost:9200/expenses/overseas/2" -d '{ "amount":"50", "currency":"JPY", "location":"JAPAN" }'

curl -XPUT "http://localhost:9200/expenses/overseas/3" -d '{ "amount":"50", "currency":"inr", "location":"INDIA" }'

curl -XPUT "http://localhost:9200/expenses/overseas/4" -d '{ "amount":"30", "currency" : "IDR", "location": "Indonesia"}'

curl -XPUT "http://localhost:9200/expenses/overseas/5" -d '{ "amount":"89", "currency":"USD", "location":"USA" }'

查询:
curl -XPOST http://localhost:9200/expenses/overseas/_search?pretty -d '{
   "query" : {
 "filtered" : {
   "filter" : {
     "terms" : {
       "currency" : {
        "index" : "currencylookup",
         "type" : "inr",
         "id" : "1",
         "path" : "conv.currency"
       },
       "_cache_key" : "currencyexchange"
     }
   }
 }
   }
 }'

我得到了结果-来自费用指数的4条记录(不包括JPY)不在货币查询中。

但是,我最终要做的是以一种货币获取所有费用数据,因此这意味着我必须以另一种方式进行查询,这就是问题发生的时间。
 curl -XPOST http://localhost:9200/currencylookup/inr/_search?pretty -d '{
   "query" : {
 "filtered" : {
   "filter" : {
     "terms" : {
         "conv.currency" : {
         "index" : "expenses",
         "type" : "overseas",
         "id" : "2",
         "path" : "currency"
       },
       "_cache_key" : "currencyexchange6"
     }
   }
 }
   }
 }'

查找与conv.currency似乎不起作用。我无法为其指定路径。我试图使currencylookup为扁平结构,但这也不起作用。我不想将我的费用作为嵌套/内部对象数组使用。

因此,给定费用索引中的费用ID,我如何才能在currencylookup索引中查找适当的货币汇率并计算具有目标货币金额的新字段。例如:对于费用ID 1,我必须在currencylookup中查找“usd”,获取字段inr_per_unit并计算ensemiseAmountInINR。

如果我能做到这一点,我想根据一些参数汇总转换后的费用金额。有可能这样做吗?

最佳答案

这可能不是解决此问题的正确方法,我所做的只是彻底的破解,但是您可以使用scripted metric aggregation完成您所要的操作。此聚合是v1.4.x中的新增功能,并且仍处于实验阶段(因此请在生产中谨慎使用)。

我稍微修改了currencylookup索引,为每个转换因子创建了一个文档:

curl -XDELETE "http://localhost:9200/currencylookup"

curl -XPUT "http://localhost:9200/currencylookup/inr/usd" -d'
{
   "currency": "usd",
   "currencyname": "US Dollar",
   "units_per_inr": 0.016155969,
   "inr_per_unit": 61.89662756
}'
curl -XPUT "http://localhost:9200/currencylookup/inr/inr" -d'
{
   "currency": "inr",
   "currencyname": "Indian Rupee",
   "units_per_inr": 1,
   "inr_per_unit": 1
}'
curl -XPUT "http://localhost:9200/currencylookup/inr/idr" -d'
{
   "currency": "idr",
   "currencyname": "Indonesian Rupiah",
   "units_per_inr": 199.2576913,
   "inr_per_unit": 0.005018627
}'

并像设置它一样设置expenses索引:
curl -XDELETE "http://localhost:9200/expenses"

curl -XPUT "http://localhost:9200/expenses/overseas/1" -d'
{ "amount":100, "currency":"usd", "location":"USA" }'
curl -XPUT "http://localhost:9200/expenses/overseas/2" -d'
{ "amount":50, "currency":"JPY", "location":"JAPAN" }'
curl -XPUT "http://localhost:9200/expenses/overseas/3" -d'
{ "amount":50, "currency":"inr", "location":"INDIA" }'
curl -XPUT "http://localhost:9200/expenses/overseas/4" -d'
{ "amount":30, "currency" : "IDR", "location": "Indonesia"}'
curl -XPUT "http://localhost:9200/expenses/overseas/5" -d'
{ "amount":89, "currency":"USD", "location":"USA" }'

然后,我使用脚本化的度量汇总如下一起查询两个索引:
curl -XPOST "http://localhost:9200/expenses,currencylookup/_search?search_type=count" -d'
{
    "aggs": {
        "results": {
            "scripted_metric": {
                "init_script" : "_agg[\"exp\"] = []; _agg[\"cur\"] = []",
                "map_script" : "if (doc[\"_type\"].value == \"inr\") { _agg.cur.add([doc[\"currency\"].value, doc[\"inr_per_unit\"].value]) } else { _agg.exp.add([doc[\"currency\"].value, doc[\"amount\"].value]) }",
                "reduce_script" : "exp=[]; cur=[]; for (item in _aggs) { exp += item.exp; cur += item.cur }; results=[]; for (c in cur) { for (e in exp) { if (e[0] == c[0]) { results.add(e[1]*c[1]) } } }; return results;"
            }
        }
    }
}'

产生转换后的值:
{
   "took": 3,
   "timed_out": false,
   "_shards": {
      "total": 10,
      "successful": 10,
      "failed": 0
   },
   "hits": {
      "total": 8,
      "max_score": 0,
      "hits": []
   },
   "aggregations": {
      "results": {
         "value": [
            5508.79985284,
            6189.662756,
            50,
            0.15055881000000002
         ]
      }
   }
}

要将结果限制为仅usd中的结果,我们可以使用filtered query:
curl -XPOST "http://localhost:9200/expenses,currencylookup/_search?search_type=count" -d'
{
   "query": {
      "filtered": {
         "query": {
            "match_all": {}
         },
         "filter": {
            "term": {
               "currency": "usd"
            }
         }
      }
   },
   "aggs": {
      "converted_expenses": {
         "scripted_metric": {
            "init_script": "_agg[\"exp\"] = []; _agg[\"cur\"] = []",
            "map_script": "if (doc[\"_type\"].value == \"inr\") { _agg.cur.add([doc[\"currency\"].value, doc[\"inr_per_unit\"].value]) } else { _agg.exp.add([doc[\"currency\"].value, doc[\"amount\"].value]) }",
            "reduce_script": "exp=[]; cur=[]; for (item in _aggs) { exp += item.exp; cur += item.cur }; results=[]; for (c in cur) { for (e in exp) { if (e[0] == c[0]) { results.add(e[1]*c[1]) } } }; return results;"
         }
      }
   }
}'

产生:
{
   "took": 4,
   "timed_out": false,
   "_shards": {
      "total": 10,
      "successful": 10,
      "failed": 0
   },
   "hits": {
      "total": 3,
      "max_score": 0,
      "hits": []
   },
   "aggregations": {
      "converted_expenses": {
         "value": [
            5508.79985284,
            6189.662756
         ]
      }
   }
}

我怀疑这种方法能否很好地扩展。就像我说的那样,这可能不是解决问题的最佳方法。如果是我,我可能会找到一种在应用程序代码中而不是在Elasticsearch中进行转换的方法。但是,你去了。

这是我用于解决此问题的代码:

http://sense.qbox.io/gist/6e7c8467ad7732c296448cec86e5c25e3c3c7326

(要在浏览器中使用此代码,您必须在elasticsearch实例中设置http.cors.enabled: true;跨域访问默认在v1.4.2中处于关闭状态)

编辑:据我所知,似乎没有一种方法可以使用该技术进一步汇总结果。

关于join - 加入2个Elasticsearch索引后计算新字段并进行汇总,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27565951/

相关文章:

php - Doctrine DBAL - SELECT 连接两个表,在结果的键中带有前缀

postgresql - 大数据集的空间连接查询优化

elasticsearch - 如何在Elasticsearch中设置Telegraf配置以收集kafka主题/分区滞后

java - UML关系类型: Variables & Parameters & Cloning

mysql - 查找分组依据的最大计数

Python:将多个列表连接成一个句子

sql - 我们可以将本地 SQL Server 数据库中的表连接到 Azure Delta Lake 中 Delta 表中的表吗?我有什么选择

python - 在同一环境中使用不同版本的python包

php - Elasticsearch——统计文档中关键字出现的次数

mongodb - 在 MongoDB Spring Data 中使用多个方面