python - Elasticsearch analytics()与Python中的Spark不兼容?

标签 python elasticsearch apache-spark elasticsearch-py

我正在使用Python 3在PySpark中使用elasticsearch-py客户端,并且将带有ES的analytics()函数与RDD结合使用时遇到了问题。特别是,RDD中的每个记录都是文本字符串,我试图对其进行分析以获取 token 信息,但是在Spark的map函数中尝试使用它时遇到错误。

例如,这可以很好地工作:

from elasticsearch import Elasticsearch
es = Elasticsearch()
t = 'the quick brown fox'
es.indices.analyze(text=t)['tokens'][0]

{'end_offset': 3,
'position': 1,
'start_offset': 0,
'token': 'the',
'type': '<ALPHANUM>'}

但是,当我尝试这样做:
trdd = sc.parallelize(['the quick brown fox'])
trdd.map(lambda x: es.indices.analyze(text=x)['tokens'][0]).collect()

我收到与腌制相关的非常长的错误消息(到此结束):
(self, obj)    109if'recursion'in.[0]:    110="""Could not pickle object as excessively deep recursion required."""--> 111                  picklePicklingErrormsg

  save_memoryviewself obj

: Could not pickle object as excessively deep recursion required.

raise.()    112    113def(,):PicklingError

我不确定该错误是什么意思。难道我做错了什么?有没有办法将ES分析功能映射到RDD的记录上?

编辑:当也从elasticsearch-py应用其他功能时,例如在es.termvector()时,我也遇到这种情况。

最佳答案

本质上,Elasticsearch客户端不可序列化。因此,您需要为每个分区创建一个客户端实例,并对其进行处理:
def get_tokens(part): es = Elasticsearch() yield [es.indices.analyze(text=x)['tokens'][0] for x in part] rdd = sc.parallelize([['the quick brown fox'], ['brown quick dog']], numSlices=2) rdd.mapPartitions(lambda p: get_tokens(p)).collect()
应给出以下结果:Out[17]: [[{u'end_offset': 3, u'position': 1, u'start_offset': 0, u'token': u'the', u'type': u'<ALPHANUM>'}], [{u'end_offset': 5, u'position': 1, u'start_offset': 0, u'token': u'brown', u'type': u'<ALPHANUM>'}]]
请注意,对于大型数据集,这将非常低效,因为它涉及对数据集中每个元素的REST调用。

关于python - Elasticsearch analytics()与Python中的Spark不兼容?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32161865/

相关文章:

javascript - 如何在我的 Django 表单输入中添加额外的属性?

python - 如何将 .swf 文件嵌入到我的 Google App Engine 应用程序中?

elasticsearch - 如何为Elasticsearch产生Confluent的Kafka虚拟数据生成器(datagen)消息?

python - 寻找下一个素数

python - 将默认值设置为稀疏 scipy 矩阵

elasticsearch - 如何检查 ElasticSearch 索引是否存在并准备就绪?

elasticsearch - 当按特定字段搜索时,ElasticSearch不返回结果

scala - Spark 中的每文档字数统计

java.lang.Long 和 scala.Long

apache-spark - 手动停止 Spark Worker