ElasticSearch:如何制作聚合管道?

标签 elasticsearch elasticsearch-aggregation

想象以下用例:

我们在斯塔克航空公司工作,我们的营销团队希望对乘客进行分割,以便为他们提供折扣或礼品卡。他们决定需要两组乘客:

  • 每周至少飞行 3 次的乘客
  • 至少飞过一次但两周未飞过的乘客

  • 有了这个,他们可以为我们的乘客进行不同的营销事件!

    因此,在 Elasticsearch 中,我们有一个 trip代表乘客购买的机票的索引:
    {
       "_index" : "trip",
       "_type" : "_doc",
       "_id" : "1",
       "_score" : 1.0,
       "_source" : {
           "total_amount" : 300,
           "trip_date" : "2020/03/24 13:30:00",
           "status" : "completed",
           "passenger" : {
              "id" : 11,
              "name" : "Thiago nunes"
           }
         }
    }
    
    trip索引包含 status可能具有其他值的字段,例如:pendingopencanceled
    这意味着我们只能考虑具有 completed 的行程。状态(表示乘客确实旅行了)。

    那么,考虑到这一切......我如何通过 Elasticsearch 获得这两组乘客?

    我已经尝试了一段时间,但没有成功。

    到现在为止我做了什么:
  • 我建立了一个查询,获取所有有效行程(状态为 completed 的行程)
  • GET /trip/_search
    {
      "query": {
        "bool": {
          "must": [
            {
              "term": {
                "status": {
                  "value": "completed"
                }
              }
            }
          ]
        }
      },
      "aggs": {
        "status_viagem": {
          "terms": {
            "field": "status.keyword"
          }
        }
      }
    }
    
  • 此查询返回以下内容:
  • {
      "took" : 3,
      "timed_out" : false,
      "_shards" : {
        "total" : 1,
        "successful" : 1,
        "skipped" : 0,
        "failed" : 0
      },
      "hits" : {
        "total" : {
          "value" : 200,
          "relation" : "eq"
        },
        "max_score" : 0.18232156,
        "hits" : [...]
      },
      "aggregations" : {
        "status_viagem" : {
          "doc_count_error_upper_bound" : 0,
          "sum_other_doc_count" : 0,
          "buckets" : [
            {
              "key" : "completed",
              "doc_count" : 200
            }
          ]
        }
      }
    }
    
    

    但我被卡住了,无法弄清楚下一步。我知道接下来要做的应该是创建包含乘客的存储桶,然后将它们过滤到代表我们所需数据集的两个存储桶中。但我不知道怎么做。

    有人可以帮忙吗?

    PS:
  • 我并不完全需要这是一个单一的查询,只是关于如何构建这样一个查询的提示会很有帮助
  • 输出应该是一个数组 乘客 ID 的
  • 注意:我已缩短 trip为简单起见索引
  • 最佳答案

    根据我对你的问题的理解。

    我用过date_histogram以一周为间隔,以在哪一周收集乘客。只保留那些在一周内拥有三份文件的乘客。这将为您提供一周内旅行三次的所有乘客。

    在另一个聚合中,我使用了 terms aggregation获取乘客和他们的最后旅行日期。使用 bucket selector保留了最后一次旅行未超过特定日期的乘客。

    映射

    {
      "index87" : {
        "mappings" : {
          "properties" : {
            "passengerid" : {
              "type" : "long"
            },
            "passengername" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            },
            "status" : {
              "type" : "text",
              "fields" : {
                "keyword" : {
                  "type" : "keyword",
                  "ignore_above" : 256
                }
              }
            },
            "total_amount" : {
              "type" : "long"
            },
            "trip_date" : {
              "type" : "date"
            }
          }
        }
      }
    }
    
    

    询问
    {
      "query": {
        "bool": {
          "must": [
            {
              "term": {
                "status": {
                  "value": "completed"
                }
              }
            }
          ]
        }
      },
      "aggs": {
        "travel_thrice_week": {
          "date_histogram": {
            "field": "trip_date",
            "interval": "week"
          },
          "aggs": {
            "passenger": {
              "terms": {
                "field": "passengername.keyword",
                "min_doc_count": 3,
                "size": 10
              }
            },
            "select_bucket_with_user": {-->to keep weeks which have a pasenger with thrice 
                                        --> a day travel
              "bucket_selector": {
                "buckets_path": {
                  "passenger": "passenger._bucket_count"
                },
                "script": "if(params['passenger']>=1) {return true;} else{ return false;} "
              }
            }
          }
        },
        "not_flown_last_two_week": {
          "terms": {
            "field": "passengername.keyword",
            "size": 10
          },
          "aggs": {
            "last_travel": {
              "max": {
                "field": "trip_date" -->  most recent travel
              }
            },
            "last_travel_before_two_week": {
              "bucket_selector": {
                "buckets_path": {
                  "traveldate": "last_travel"
                },
                "script":{
                  "source": "if(params['traveldate']< params['date_epoch']) return true; else return false;",
                  "params": {
                    "date_epoch":1586408336000 --> unix epoc of cutt off date
                  }
                }
    
              }
            }
          }
        }
      }
    }
    

    结果:
    "aggregations" : {
        "not_flown_last_two_week" : {
          "doc_count_error_upper_bound" : 0,
          "sum_other_doc_count" : 0,
          "buckets" : [
            {
              "key" : "Thiago nunes",
              "doc_count" : 3,
              "last_travel" : {
                "value" : 1.5851808E12,
                "value_as_string" : "2020-03-26T00:00:00.000Z"
              }
            },
            {
              "key" : "john doe",
              "doc_count" : 1,
              "last_travel" : {
                "value" : 1.5799968E12,
                "value_as_string" : "2020-01-26T00:00:00.000Z"
              }
            }
          ]
        },
        "travel_thrice_week" : {
          "buckets" : [
            {
              "key_as_string" : "2020-03-23T00:00:00.000Z",
              "key" : 1584921600000,
              "doc_count" : 3,
              "passenger" : {
                "doc_count_error_upper_bound" : 0,
                "sum_other_doc_count" : 0,
                "buckets" : [
                  {
                    "key" : "Thiago nunes",
                    "doc_count" : 3
                  }
                ]
              }
            }
          ]
        }
      }
    

    关于ElasticSearch:如何制作聚合管道?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61099910/

    相关文章:

    angularjs - 如何用elasticui突出显示搜索结果?

    elasticsearch - Elasticsearch:随机字段的聚合

    elasticsearch - 需要在文档内部数组对象上聚合-ElasticSearch

    java - Elasticsearch - 如何使用 Java 在 JSON 对象中添加或编辑字符串数组?

    elasticsearch - 获取 Elasticsearch 聚合中的桶计数

    elasticsearch - 如何使用grok在logstash中对日志消息进行分组?

    java - Elasticsearch组/聚合响应按搜索条件

    elasticsearch - 通过 Elasticsearch 中的查询进行文档计数聚合(如 solr 中的facet.query)

    elasticsearch - Elasticsearch 总和聚合中的问题

    elasticsearch - 无法将Kibana连接到Elasticsearch