python - 无法 pickle _thread.rlock 对象 Pyspark 向 elasticsearch 发送请求

标签 python apache-spark elasticsearch pyspark

我正在使用 pyspark 流从 tweepy 收集数据。完成所有设置后,我通过 elasticsearch.index() 将 dict(json) 发送到 elasticsearch。但是我收到“can't pickle_thread.lock objects”错误和其他 63 个错误。 track back 日志太长,无法在我的控制台中显示!

设计是我得到一个 json/dict 类型的文件,将其转换为 DStream,通过在 map() 函数中调用 TextBlob 添加另一个名为“情感”的特征。一切正常,但是当我添加另一个映射函数来调用 elasticsearch.index() 时,出现错误。

下面是我的控制台超长错误日志的一部分。

Blockquote During handling of the above exception, another exception occurred: Traceback (most recent call last): File "/Users/ayane/anaconda/lib/python3.6/site-packages/pyspark/streaming/util.py", line 105, in dumps func.func, func.rdd_wrap_func, func.deserializers))) File "/Users/ayane/anaconda/lib/python3.6/site-packages/pyspark/serializers.py", line 460, in dumps return cloudpickle.dumps(obj, 2) File "/Users/ayane/anaconda/lib/python3.6/site-packages/pyspark/cloudpickle.py", line 704, in dumps cp.dump(obj) File "/Users/ayane/anaconda/lib/python3.6/site-packages/pyspark/cloudpickle.py", line 162, in dump raise pickle.PicklingError(msg) _pickle.PicklingError: Could not serialize object: TypeError: can't pickle _thread.lock objects at org.apache.spark.streaming.api.python.PythonTransformFunctionSerializer$.serialize(PythonDStream.scala:144) at org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply$mcV$sp(PythonDStream.scala:101) at org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply(PythonDStream.scala:100) at org.apache.spark.streaming.api.python.TransformFunction$$anonfun$writeObject$1.apply(PythonDStream.scala:100) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1303) ... 63 more

我的部分代码如下所示:

def sendPut(doc):
  res = es.index(index = "tweetrepository", doc_type= 'tweet', body = doc)
  return doc
myJson = dataStream.map(decodeJson).map(addSentiment).map(sendPut)
myJson.pprint()

这里是 decodeJson 函数:

def decodeJson(str):
  return json.loads(str)

这是 addSentiment 函数:

def addSentiment(dic):
  dic['Sentiment'] = get_tweet_sentiment(dic['Text'])
  return dic

这里是 get_tweet_sentiment 函数:

def get_tweet_sentiment(tweet):
  analysis = TextBlob(tweet)
  if analysis.sentiment.polarity > 0:
    return 'positive'
  elif analysis.sentiment.polarity == 0:
    return 'neutral'
  else:
    return 'negative'

最佳答案

Connections 对象通常是不可序列化的,因此不能通过闭包传递。你必须使用 foreachPartition pattern :

def sendPut(docs):
    es = ... # Initialize es object
    for doc in docs
        es.index(index = "tweetrepository", doc_type= 'tweet', body = doc)

myJson = (dataStream
    .map(decodeJson)
    .map(addSentiment)
    # Here you need an action.
    # `map` is lazy, and `pprint` doesn't guarantee complete execution
    .foreachPartition(sendPut))

如果你想返回一些东西,使用mapPartitions:

def sendPut(docs):
    es = ... # Initialize es object
    for doc in docs
        yield es.index(index = "tweetrepository", doc_type= 'tweet', body = doc)


myJson = (dataStream
   .map(decodeJson)
   .map(addSentiment)
   .mapPartitions(sendPut))

但是您需要一个额外的操作来强制执行。

关于python - 无法 pickle _thread.rlock 对象 Pyspark 向 elasticsearch 发送请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49891686/

相关文章:

python - Python Matplotlib 动画绘图的更新速度缓慢。我怎样才能让它更快?

python - 如何在 Heroku 上设置 ArangoDB?

python - 像 turbogears/sqlalchemy 中独一无二的 django

java - Spark - 为什么在打印 RDD 之前需要收集()到驱动程序节点?不能并行吗?

elasticsearch - 如何使用 Logstash 从 S3 解析数据并推送到 Elastic Search,然后推送到 Kibana

elasticsearch - Kibana-我可以在脚本字段中添加监视器吗?

python flask ImmutableMultiDict

apache-spark - 在 PySpark 中获取列的名称/别名

python Spark根据值对元素进行排序

elasticsearch - Elasticsearch:如何禁止对某种数据类型(例如字符串)进行分析