将以下 colab python 代码(参见下面的链接)部署到 Google Cloud 上的 Dataproc,它仅在 input_list 是包含一项的数组时有效,当 input_list有两个项目,然后 PySpark 作业在下面的 get_similarity 方法中的“for r in result.collect()”行出现以下错误而终止:
java.io.IOException: Premature EOF from inputStream
at org.apache.hadoop.io.IOUtils.readFully(IOUtils.java:194)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doReadFully(PacketReceiver.java:213)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.doRead(PacketReceiver.java:134)
at org.apache.hadoop.hdfs.protocol.datatransfer.PacketReceiver.receiveNextPacket(PacketReceiver.java:109)
at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receivePacket(BlockReceiver.java:446)
at org.apache.hadoop.hdfs.server.datanode.BlockReceiver.receiveBlock(BlockReceiver.java:702)
at org.apache.hadoop.hdfs.server.datanode.DataXceiver.writeBlock(DataXceiver.java:739)
at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.opWriteBlock(Receiver.java:124)
at org.apache.hadoop.hdfs.protocol.datatransfer.Receiver.processOp(Receiver.java:71)
at org.apache.hadoop.hdfs.server.datanode.DataXceiver.run(DataXceiver.java:232)
at java.lang.Thread.run(Thread.java:745)
input_list=["no error"] <---- works
input_list=["this", "throws EOF error"] <---- does not work
使用 spark-nlp 链接到 colab 以获得句子相似度: https://colab.research.google.com/github/JohnSnowLabs/spark-nlp-workshop/blob/master/tutorials/streamlit_notebooks/SENTENCE_SIMILARITY.ipynb#scrollTo=6E0Y5wtunFi4
def get_similarity(input_list):
df = spark.createDataFrame(pd.DataFrame({'text': input_list}))
result = light_pipeline.transform(df)
embeddings = []
for r in result.collect():
embeddings.append(r.sentence_embeddings[0].embeddings)
embeddings_matrix = np.array(embeddings)
return np.matmul(embeddings_matrix, embeddings_matrix.transpose())
我已经尝试在 hadoop 集群配置中将 dfs.datanode.max.transfer.threads
更改为 8192
但仍然没有成功:
hadoop_config.set('dfs.datanode.max.transfer.threads', "8192")
当 input_list 数组中有多个项目时,如何让这段代码正常工作?
最佳答案
java.io.IOException:来自 inputStream 的过早 EOF
可能表示磁盘带宽不足、HDFS DataNode 过载或许多其他问题:Hadoop MapReduce job I/O Exception due to premature EOF from inputStream .
在 Spark 应用程序中增加 DataNode 传输线程的数量并没有改变任何东西,因为您需要在每个集群 worker 上的 HDFS 配置中更改此属性并在每个 worker 上重新启动 DataNode 服务。最简单的方法是使用 hdfs:dfs.datanode.max.transfer.threads=8192
cluster property 重新创建一个集群。 .
请注意,如果问题的根本原因是磁盘带宽不足,那么增加 DataNode 中的传输线程数量只会加剧问题,而不是解决问题。
您有多种选择来尝试解决此问题:
关于apache-spark - 与 SparkNLP 的句子相似性仅适用于带有一个句子的 Google Dataproc,当提供多个句子时失败,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64901343/