scala - ETL Scala 脚本出现异常

标签 scala hadoop apache-spark etl

我正在用 scala 脚本编写简单的 ETL 过程并以“spark-shell -i rawetl.scala”运行,但是出现异常“找不到表”。我还检查了文件,它正在正确选择。

这是示例代码

import java.io.File
import sqlContext.implicits._
import scala.io.Source
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import sys.process._
case class pageRow(affiliateid : String , pageurl : String, alertedbrandsafety : String, blockedbrandsafety : String, grossimpressions : String, spider_bot : String, invalidbrowser : String ,outlieractivity : String , day : String)

object batch_nht {
    def main() {
        processRawNHT()
    }

    def processRawNHT() {

        val rawFile = "hadoop fs -ls /tmp/XXX/rawDB/" #| "tail -1" !!
        val fileName = rawFile.substring(rawFile.indexOf("/"))
        val filePathName = "hdfs://AAAAA:8020" + fileName.trim()
        println(filePathName)

        val sc = new SparkContext(new SparkConf().setAppName("analyzeBlog"))
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)

        val inviewraw = sc.textFile(filePathName).map(_.split(",")).map(x=>x.map(_.replace("\"","")))
        val base_people = inviewraw.map{r => if(r(13) == null || r(13).trim.isEmpty) (r(5) ,r(32), r(48), r(49),r(14), r(71), r(72), r(73),r(0)) else (r(5) ,r(32), r(48), r(49),r(14), r(71), r(72), r(73),r(0))}

        val logs_base_page_schemaRDD = base_people.map(p => pageRow(p._1, p._2, p._3, p._4,p._5, p._6, p._7, p._8,p._9)).toDF()

        logs_base_page_schemaRDD.registerTempTable("baseTable")

        sqlContext.sql("select * from baseTable").collect().foreach(println)
    }
}
batch_nht.main()

注意:如果我在 spark shell(没有脚本)中一个一个地运行波纹管命令,我得到正确的输出,没有任何异常。

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._

val inviewraw = sc.textFile("hdfs://AAAAA:8020/tmp/XXX/rawDB/rawFile.csv").map(_.split(",")).map(x=>x.map(_.replace("\"","")))
val base_people = inviewraw.map{r => if(r(13) == null || r(13).trim.isEmpty) (r(5) ,r(32), r(48), r(49),r(14), r(71), r(72), r(73),r(0)) else (r(5) ,r(32), r(48), r(49),r(14), r(71), r(72), r(73),r(0))}

case class pageRow(affiliateid : String , pageurl : String, alertedbrandsafety : String, blockedbrandsafety : String, grossimpressions : String, spider_bot : String, invalidbrowser : String ,outlieractivity : String , day : String)

val logs_base_page_schemaRDD = base_people.map(p => pageRow(p._1, p._2, p._3, p._4,p._5, p._6, p._7, p._8,p._9)).toDF()

--create table
logs_base_page_schemaRDD.registerTempTable("baseTable")
sqlContext.sql("select * from baseTable").collect().foreach(println)

请指出哪里出了问题?在脚本中

最佳答案

这是经过测试的代码片段。您的方法不适用于使用 scala 的编程方法。

import java.io.File
import sqlContext.implicits._
import scala.io.Source
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext
import sys.process._
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType};

object batch_nht {
    def main() {
        processRawNHT()
    }

    def processRawNHT() {
        val rawFile = "hadoop fs -ls /user/cloudera/cards/" #| "tail -1" !!
        val fileName = rawFile.substring(rawFile.indexOf("/"))
        val filePathName = "hdfs://quickstart.cloudera:8020" + fileName.trim()
        println(filePathName)
        val schemaString = "color|suit|pip"
        val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        val deck = sc.textFile(filePathName).map(_.split("\\|"))
        val schema =
            StructType(
            schemaString.split("\\|").map(fieldName => StructField(fieldName, StringType, true)))
        val base_deckRDD = deck.map{r => Row(r(0), r(1), r(2))}
        val cardsDataFrame = sqlContext.createDataFrame(base_deckRDD, schema)
        cardsDataFrame.registerTempTable("deck_of_cards")
        val firstTen = sqlContext.sql("select * from deck_of_cards limit 10")
        firstTen.map(r => (r(0), r(1), r(2))).collect().foreach(println)
    }
}

batch_nht.main()

关于scala - ETL Scala 脚本出现异常,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34066311/

相关文章:

Python 图形框架 : trouble installing dependencies

apache-spark - 在 Pyspark 中将月份名称转换为数字

java - 无法从 Spark 访问 sqlite 数据库

scala - 有没有办法在Circe解码器中具有可选字段?

hadoop - 如何依次依次运行MapReduce作业

scala - 使用 vs 代码,如何让 Scala 格式工作并格式化我的代码?

hadoop - 如何延长神经节?

java - hadoop类未找到异常,即使它在hadoop类路径中

scala - 如何使用 Scala 宏对方法调用中的命名参数进行建模?

Scala 极端解构?