python-3.x - Elasticsearch Python 中的批量(助手)索引错误

标签 python-3.x elasticsearch bulkinsert elasticsearch-5

我正在从 Twitter 中提取数据、过滤、制作生成器并尝试使用助手将索引批量索引到 elasticsearch 中,但是我收到以下错误,我似乎无法提取问题的确切位置。

Traceback (most recent call last):
  File "/Users/aqm1152/_acert_/basic/test_collection_dump.py", line 245, in <module>
    sinceid, complete, api_counter, maxid = search_tweets(qu=query_word, cnt=cnt, sinceid=x , maxitr= 149 , fname=query_word)
  File "/Users/aqm1152/_acert_/basic/test_collection_dump.py", line 138, in search_tweets
    res = elastic_search.bulk_es(actions=bulk_content, )
  File "/Users/aqm1152/_acert_/basic/elasticsearch/acert_basic_elastic_functions.py", line 68, in bulk_es
    return helpers.bulk(self.es, actions=actions ,stats_only=True)
  File "/Users/aqm1152/anaconda/lib/python3.5/site-packages/elasticsearch/helpers/__init__.py", line 194, in bulk
    for ok, item in streaming_bulk(client, actions, **kwargs):
  File "/Users/aqm1152/anaconda/lib/python3.5/site-packages/elasticsearch/helpers/__init__.py", line 162, in streaming_bulk
    for result in _process_bulk_chunk(client, bulk_actions, raise_on_exception, raise_on_error, **kwargs):
  File "/Users/aqm1152/anaconda/lib/python3.5/site-packages/elasticsearch/helpers/__init__.py", line 134, in _process_bulk_chunk
    raise BulkIndexError('%i document(s) failed to index.' % len(errors), errors)
elasticsearch.helpers.BulkIndexError: ('46 document(s) failed to index.', [{'index': {'_index': 'twitter', '_id': '866553007252488192', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866552145507700736', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866479151317962752', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866477250459430913', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866455181839486976', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866411931405570048', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866400265573892096', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866399318957318144', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866395810300403713', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866366506124365824', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866228703478636545', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866206827389865984', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866137742476025856', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '866026883284164610', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865968929684029441', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865728096019894273', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865707222453571585', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865675939128029185', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865626970817572865', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865564611591815168', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865553684163211268', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865519159467098113', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865466383684874240', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865362662879895552', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865339244604264449', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865331847710068736', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865251599928700928', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865246748603797505', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865230204293308416', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865229926622011392', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865194609349083136', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865165953612619777', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865165573289902082', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865083343917993984', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865078786655694849', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '865078053134905344', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '864963278233096192', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '864948505143635968', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '864929970702962688', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '864871369217015809', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '864812084521046016', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '864742828550836224', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '864662384060792832', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '864521704248418304', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '864511301221068800', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}, {'index': {'_index': 'twitter', '_id': '864310734817083392', '_type': 'tweet', 'error': {'type': 'mapper_parsing_exception', 'reason': 'failed to parse', 'caused_by': {'type': 'parse_exception', 'reason': 'field must be either [lat], [lon] or [geohash]'}}, 'status': 400}}])

似乎在许多领域,elasticsearch 似乎在摄取似乎是地理位置数据的问题时遇到了问题。此外,我生成了大约 671 条我提取的推文,但当我在 Elasticsearch 中进行计数时,似乎只有 454 条推文被编入索引,此外,我没有 46 条文档推文因为地理数据而失败,我似乎无法准确地告诉哪个字段这是。

这是我用于索引的模板:

{
   "template": "twitter",
   "settings": {
      "number_of_shards": 1,
      "number_of_replicas": 0
   },
   "mappings": {
      "tweet": {
         "properties": {
            "coordinates": {
               "type": "geo_point"
            },
            "created_at": {
               "format": "EEE MMM dd HH:mm:ss Z YYYY",
               "type": "date"
            },
            "entities": {
               "properties": {
                  "hashtags": {
                     "properties": {
                        "indices": {
                           "type": "long",
                           "index": "not_analyzed"
                        },
                        "text": {
                           "type": "text"
                        }
                     }
                  },
                  "urls": {
                     "properties": {
                        "display_url": {
                           "type": "text",
                           "index": "not_analyzed"
                        },
                        "expanded_url": {
                           "type": "text",
                           "index": "not_analyzed"
                        },
                        "indices": {
                           "type": "long",
                           "index": "not_analyzed"
                        },
                        "url": {
                           "type": "text",
                           "index": "not_analyzed"
                        }
                     }
                  }
               }
            },
            "symbols": {
               "type": "integer",
               "index": "not_analyzed"
            },
            "favorite_count": {
               "type": "double",
               "index": "not_analyzed"
            },
            "id": {
               "type": "long"
            },
            "lang": {
               "type": "text",
               "index": "analyzed"
            },
            "place": {
               "properties": {
                  "attributes": {
                     "type": "object"
                  },
                  "bounding_box": {
                     "type": "geo_point"
                  },
                  "country": {
                     "type": "text",
                     "index": "no"
                  },
                  "country_code": {
                     "type": "text"
                  },
                  "full_name": {
                     "type": "text",
                     "index": "no"
                  },
                  "id": {
                     "type": "text"
                  },
                  "name": {
                     "type": "text",
                     "index": "not_analyzed"
                  },
                  "place_type": {
                     "type": "text"
                  },
                  "url": {
                     "type": "text"
                  }
               }
            },
            "retweet_count": {
               "type": "long"
            },
            "source": {
               "type": "text"
            },
            "text": {
               "type": "text"
            },
            "user": {
              "type":"object",
               "properties": {
                  "id": {
                     "type": "long"
                  },
                  "screen_name": {
                     "type": "text"
                  }
               }
            }
         }
      }
   }
}

这是我用来创建生成器并使用 helpers.bulk 摄取的代码:

# Global variable
tweet_attributes = ['text','source','retweeted', 'retweet_count','place','lang','favorite_count','entities','id','created_at','user:id','user:screen_name','coordinates']
def _get_necessary_fields(tweets):
    doc = defaultdict(dict)
    fieldInfo = tweet_attributes
    for tweet in tweets:
        for fields in fieldInfo :
            if (len(fields.split(':'))) == 2 :
                keys = fields.split(':')
                # nested array at one level
                doc[keys[0]][keys[1]] = tweet[keys[0]][keys[1]]
                #TODO implement for more than one level, needs better algo
            else:
                # for each field
                if fields in tweet:
                    doc[fields] = tweet[fields]
        yield doc

def _json_for_bulk_body(tweets,el):
    # TODO refactor this code when you have time :
    # http://stackoverflow.com/questions/20288770/how-to-use-bulk-api-to-store-the-keywords-in-es-by-using-python
    structured_json_body = ({
                "_op_type" : "index",
                "_index": el[0],  # index name Twitter
                "_type": el[1][0],  # type is tweet
                "_id": doc['id'],  # id of the tweet
                "_source" :doc} for  doc in _get_necessary_fields(tweets))
    return structured_json_body


    **helpers.bulk(self.es, actions=structured_json_body ,stats_only=True)**

有人可以向我解释为什么除了 46 以外的所有文档都没有被摄取,以及为什么 46 文档没有被摄取吗?

最佳答案

在您的代码中,您需要检查该空条件,如果出现这种情况,则不要设置字段 coordinates

            # for each field
            if fields in tweet:
                if tweet[fields] is not None:                  <--- add this check
                    doc[fields] = tweet[fields]

关于python-3.x - Elasticsearch Python 中的批量(助手)索引错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44115558/

相关文章:

python - 有没有办法在Python中获取给定框架的函数参数的原始值?

python - 如何使用 sanic API 返回 float (或整数)?

java - 从 Elasticsearch 中获取索引名称匹配模式的列表 - JAVA

c# - OracleBulkCopy 能否将数据从我的进程地址空间移动到表中?

php - 使用PHP将服务器中的mysql表复制到本地服务器

sql-server-2008 - 从逗号分隔的字符串批量插入

python - 如何通过索引创建新字符串?

python - 如何在 pandas 数据框中的列中填充字母数字系列?

elasticsearch - Elasticsearch 路口查询

java - Java 中的 Elasticsearch 集成客户端