apache-spark - Spark 如何并行处理 1TB 文件?

标签 apache-spark dataframe parallel-processing apache-spark-sql

假想问题

  • 一个巨大的 CSV 日志文件,假设大小为 1 TB,该文件位于 USB 驱动器
  • 该日志包含世界各地用户的事件日志,假设该行包含 50 列,其中包括 Country。
  • 我们想要按国家/地区降序排列的行数。
  • 假设 Spark 集群有足够的节点和 RAM 来处理整个 1TB 内存(20 个节点,4 核 CPU,每个节点有 64GB RAM)

  • 我的Poorman 的概念解决方案
    使用 SparkSQL & Databricks spark-csv
    $ ./spark-shell --packages com.databricks:spark-csv_2.10:1.4.0
    

    val dfBigLog = sqlContext.read
      .format("com.databricks.spark.csv")
      .option("header", "true")
      .load("/media/username/myUSBdrive/bogusBigLog1TB.log")
    
    dfBigLog.select("Country")
      .groupBy("Country")
      .agg(count($"Country") as "CountryCount")
      .orderBy($"CountryCount".desc).show
    

    问题一:Spark如何并行处理?

    我想上述解决方案的大部分执行时间(99%?)是将 1TB 文件从 USB 驱动器读取到 Spark 集群中。从 USB 驱动器读取文件不可并行化。但是在读取了整个文件之后,Spark 在后台做了什么来并行化处理?
  • 有多少个节点用于创建 DataFrame? (也许只有一个?)
  • 多少节点用于 groupBy & count ?假设有 100 多个国家/地区(但 Spark 还不知道)。 Spark 如何分区以在 20 个节点上分配 100 多个国家/地区值?

  • 问题 2:如何使 Spark 应用程序尽可能快?
    我想改进的领域是并行读取 1TB 文件。
  • 将 CSV 文件转换为 Parquet 文件格式 + 使用 Snappy 压缩。让我们假设这可以提前完成。
  • 在 HDFS 上复制 Parquet 文件。假设 Spark 集群在同一个 Hadoop 集群中,并且数据节点独立于 20 个节点的 Spark 集群。
  • 将 Spark 应用程序更改为从 HDFS 读取。我想 Spark 现在会使用多个节点来读取文件,因为 Parquet 是可拆分的。
  • 假设 Snappy 压缩的 Parquet 文件小 10 倍,大小 = 100GB,HDFS 块大小 = 128 MB。总共 782 个 HDFS 块。

  • 但是 Spark 如何设法使用所有 20 个节点来创建 DataFrame 和处理( groupBycount )? Spark 是否每次都使用所有节点?

    最佳答案

    Question 1: How does Spark parallelize the processing (of reading a file from a USB drive)?



    这种情况是不可能的。

    Spark 依赖于符合 hadoop 的文件系统来读取文件。挂载U盘时,只能从本地主机访问。试图执行
    .load("/media/username/myUSBdrive/bogusBigLog1TB.log")
    

    将在集群配置中失败,因为集群中的执行程序将无法访问该本地路径。

    可以在本地模式 ( master=local[*] ) 下使用 Spark 读取文件,在这种情况下,您将只有 1 个主机,因此其余问题将不适用。

    Question 2: How to make the Spark application the fastest possible?



    分而治之。
    问题中概述的策略很好。使用 Parquet 将允许 Spark 对数据进行投影,并且只有 .select("Country")列,进一步减少需要摄取的数据量,从而加快速度。

    Spark 并行性的基石是分区。同样,当我们从文件中读取数据时,Spark 依赖于 Hadoop 文件系统。从 HDFS 读取时,分区将由 HDFS 上的文件拆分决定。这些分割将在执行者之间平均分配。这就是 Spark 最初将工作分配给作业的所有可用执行器的方式。

    我对 Catalist 优化不是很熟悉,但我想我可以假设 .groupBy("Country").agg(count($"Country")将变成类似于:rdd.map(country => (country,1)).reduceByKey(_+_)map操作不会影响分区,可以现场应用。
    reduceByKey 将首先在每个分区本地应用,部分结果将在驱动程序上组合。所以大多数计数发生在集群中,并且将其加起来是集中的。

    关于apache-spark - Spark 如何并行处理 1TB 文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36522417/

    相关文章:

    r - GA算法并行计算时返回 "non-numeric argument to binary operator"

    hadoop - java.io.IOException : org. apache.hadoop.security.AccessControlException : Client cannot authenticate via:[TOKEN, KERBEROS]

    arrays - Spark 卡拉: Convert Array of Struct column to String column

    apache-spark - 如何从 Java String 数组创建 Spark 广播变量?

    r - 将 DF 结构更改为虚拟结构

    python - Pandas 数据帧 : create new ID variable based on number of modalities of an existing one

    python - 对 Pandas Dataframe 的行求和

    clojure - 如何使用Clojure并行计算大量数字的总和

    apache-spark - Kubernetes 上的 Spark

    scala - Scala 中的执行上下文是什么?