hadoop - sqlContext.read...load() 和 sqlContext.write...save() 代码在 Spark Cluster 上运行在哪里?

标签 hadoop apache-spark apache-spark-sql spark-dataframe hadoop2

我正在使用 Spark Dataframe API 从 NFS 共享加载/读取文件,然后将该文件的数据保存/写入 HDFS。

我有一个包含一个主节点和两个工作节点的三节点 Spark 集群。我的 Spark 集群使用 YARN 作为集群管理器,因此两个工作节点是 YARN NodeManager 节点,主节点是 Yarn ResourceManager 节点。

我有一个远程位置,比如/data/files,它安装到所有三个 YARN/SPARK 节点,因为它是 [/data/files],其中存在我想要读取的所有 csv 文件 [多个]从并最终写入 HDFS。

我在我的主节点上运行以下代码

import java.io.File
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.SQLContext

object TestMoreThan1CSV2DF {
  private val source: String = "file:///data/files/"
  private val destination = "hdfs://<myHostIP>:8020/raw/"
  private val fileFormat : String = "com.databricks.spark.csv"

  def main(args:Array[String]):Unit={
    val conf = new SparkConf().setAppName("TestMoreThan1CSV2DF").setMaster("local")
    val sc = new SparkContext(conf)

    val sqlContext = new SQLContext(sc)

    val fileArray: Array[File] = new java.io.File(source).listFiles.filter(_.getName.endsWith(".csv"))

    for(file<-fileArray){
//  reading csv file from shared location and taking whole data in a dataframe
    var df = loadCSV2DF(sqlContext, fileFormat, "true", "true", file.getName)

//      variable for holding destination location : HDFS Location
    var finalDestination: String = destination+file.getName

//  saving data into HDFS
    writeDF2HDFS(df,fileFormat,"true",finalDestination) /// saved using default number of partition = 1
    }
  }

 def loadCSV2DF(sqlContext : SQLContext, fileFormat: String, header : String, inferSchema: String, source: String) : DataFrame = {
   try{
       sqlContext.read.format(fileFormat)
                       .option("header", header) // Use first line of all files as header
                       .option("inferSchema", inferSchema) // Automatically infer data types
                       .load(source)
   }
   catch{
     case ex: OnboardingException => {
            throw ex;
        }
   }
 }

 def writeDF2HDFS(df: DataFrame, fileFormat: String, header: String, destination: String, partitions: Integer = 1){
   try{
       df.repartition(partitions).write.format(fileFormat).option("header",header).save(destination)
   }
   catch{
     Case ez : OnboardingException => {
            throw ez;
        }
   }
 }
}

This code reads all the csv files present at shared location /data/files/ and write each one of them to HDFS. Ex: /data/files/f1.csv will get loaded into HDFS as /raw/f1.csv/part-xxxxx file

在运行这段代码时,我无法弄清楚:

1) Where this whole code is running? Is it running on driver? or using both workers?

2) Does load() and save() API runs on worker nodes, does it work in parallel? If yes then how does two workers keeps track of the portion of while which it has read or written?

3) As of now I am reading each file sequentially in "for" loop and working on each one of them sequentially, is it possible to make it a multi threaded application, where each file is allocated to one thread for performing end to end read and write in parallel. Will disk IO be any constraint while doing this?

任何快速响应/引用/指针将不胜感激。

问候, 布佩什

最佳答案

为我的查询从另一个线程复制的非常好的解释: differentiate driver code and work code in Apache Spark

这里也复制其中的一部分: 转换创建的闭包内发生的所有事情都发生在 worker 上。这意味着如果在 map(...)、filter(...)、mapPartitions(...)、groupBy*(...) 内部传递某些内容,则在 worker 上执行 aggregateBy*(...)。它包括从持久存储或远程源读取数据。

count、reduce(...)、fold(...) 等操作通常在驱动程序和工作程序上执行。繁重的工作由工作人员并行执行,一些最终步骤(例如减少从工作人员收到的输出)在驱动程序上按顺序执行。

其他一切,比如触发 Action 或转换,都发生在驱动程序上。特别是,它表示需要访问 SparkContext 的每个操作。

就我的查询而言: 1) 是的,main() 方法的一部分在驱动程序上运行,但转换发生在

2) load() 和 save() 在 worker 上运行,因为我们可以看到加载创建数据帧 [存储在分区的内存中] 并保存在 hdfs 中创建 part-xxxx 文件,这表明 worker 正在这样做

3) 仍在努力实现这一点,一旦完成就会回答这个问题。

谢谢

关于hadoop - sqlContext.read...load() 和 sqlContext.write...save() 代码在 Spark Cluster 上运行在哪里?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45027385/

相关文章:

hadoop - 我在运行hadoop-core-1.2.1.jar文件时遇到问题

具有 HDFS 文件读/写的 Java

apache-spark - PySpark:减去数据框并忽略某些列

apache-spark - 按列分区但保持固定分区计数的有效方法是什么?

scala - 将每个 json 行转换为表

hadoop - Wordcount程序卡在hadoop-2.3.0

hadoop - 如何从 EXPLAIN 中为不同的配置单元阶段设置资源?

sql - SELECT 子句中忽略转义单引号

python - 如何在 Django 中管理 Apache Spark 上下文?

apache-spark - 将类型安全配置 conf 文件传递​​给 DataProcSparkOperator