hadoop - 如何将 PySpark worker 中的 numpy 数组保存到 HDFS 或共享文件系统?

标签 hadoop apache-spark hdfs pyspark shared-file

我想在 PySpark 中高效地将 numpy 数组从工作机器(函数)保存到 HDFS 或从工作机器(函数)读取 numpy 数组。我有两台机器 A 和 B。A 有 master 和 worker。 B 有一名 worker 。例如我想实现如下目标:

if __name__ == "__main__":
    conf = SparkConf().setMaster("local").setAppName("Test")
    sc = SparkContext(conf = conf)
    sc.parallelize([0,1,2,3], 2).foreachPartition(func)

def func(iterator):
    P = << LOAD from HDFS or Shared Memory as numpy array>>
    for x in iterator:
        P = P + x

    << SAVE P (numpy array) to HDFS/ shared file system >>

什么是快速有效的方法?

最佳答案

我遇到了同样的问题。并最终使用了使用 HdfsCli module 的解决方法和 Python3.4 的临时文件。

  1. 进口:
from hdfs import InsecureClient
from tempfile import TemporaryFile
  1. 创建一个 hdfs 客户端。在大多数情况下,最好在脚本中的某处使用实用函数,例如:
def get_hdfs_client():
    return InsecureClient("<your webhdfs uri>", user="<hdfs user>",
         root="<hdfs base path>")
  1. 在工作函数中加载并保存您的 numpy:
hdfs_client = get_hdfs_client()

# load from file.npy
path = "/whatever/hdfs/file.npy"
tf = TemporaryFile()

with hdfs_client.read(path) as reader:
    tf.write(reader.read())
    tf.seek(0) # important, set cursor to beginning of file

np_array = numpy.load(tf)

...

# save to file.npy
tf = TemporaryFile()
numpy.save(tf, np_array)
tf.seek(0) # important ! set the cursor to the beginning of the file
# with overwrite=False, an exception is thrown if the file already exists
hdfs_client.write("/whatever/output/file.npy", tf.read(),  overwrite=True) 

注意事项:

  • 用于创建hdfs客户端的uri以http://开头,因为它使用了hdfs文件系统的web界面;
  • 确保你传给hdfs客户端的用户有读写权限
  • 根据我的经验,开销并不大(至少在执行时间方面)
  • 使用临时文件(相对于 /tmp 中的常规文件)的优点是您可以确保在脚本结束后集群机器中没有垃圾文件,无论是否正常

关于hadoop - 如何将 PySpark worker 中的 numpy 数组保存到 HDFS 或共享文件系统?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33791535/

相关文章:

java - 当我在 Ubuntu 14.04 中运行 make-distribution.sh 时,Spark 1.3.1 在 MLlib 中安装失败

python - PySpark 数据帧管道抛出 No plan for MetastoreRelation 错误

amazon-web-services - 在亚马逊。我将结果整理到主节点上,然后准备写入 S3,出现以下错误 :

hadoop - 如何在 Solr 中索引 HDFS pdf 文件?

python - 路易吉的任务去哪儿了?

hadoop - 如何在 Mesos 集群上运行 Hadoop?

dataframe - 如何连接两个数据帧,其中列与第二个数据帧中的两列匹配?

hadoop - HDFS数据节点可以是抽象的吗?

hadoop - 从 PIG 包中提取元组

java - 从 Java 写入 HDFS,得到 "could only be replicated to 0 nodes instead of minReplication"