scala - Spark 作业未在本地并行化(使用本地文件系统中的 Parquet + Avro)

标签 scala hadoop parallel-processing bigdata apache-spark

编辑2

通过将RDD重新分区为8个分区间接解决了问题。遇到 avro 对象不是“java serialisable”的障碍,找到了一个片段 here to delegate avro serialisation to kryo.原来的问题依然存在。

编辑 1: 删除了 map 函数中的局部变量引用

我正在编写一个驱动程序,使用 parquet 和 avro for io/schema 在 spark 上运行计算繁重的作业。我似乎无法得到 Spark 来使用我所有的核心。我究竟做错了什么 ?是因为我已将键设置为 null 吗?

我刚刚开始了解 hadoop 如何组织文件。据我所知,因为我的文件有 1 GB 的原始数据,我应该期望看到与默认 block 和页面大小并行的东西。

对我的输入进行 ETL 处理的函数如下所示:

def genForum {
    class MyWriter extends AvroParquetWriter[Topic](new Path("posts.parq"), Topic.getClassSchema) {
      override def write(t: Topic) {
        synchronized {
          super.write(t)
        }
      }
    }

    def makeTopic(x: ForumTopic): Topic = {
      // Ommited to save space
    }

    val writer = new MyWriter

    val q =
      DBCrawler.db.withSession {
        Query(ForumTopics).filter(x => x.crawlState === TopicCrawlState.Done).list()
      }

    val sz = q.size
    val c = new AtomicInteger(0)

    q.par.foreach {
      x =>
        writer.write(makeTopic(x))
        val count = c.incrementAndGet()
        print(f"\r${count.toFloat * 100 / sz}%4.2f%%")
    }
    writer.close()
  }

我的转换如下所示:

def sparkNLPTransformation() {
    val sc = new SparkContext("local[8]", "forumAddNlp")

    // io configuration
    val job = new Job()
    ParquetInputFormat.setReadSupportClass(job, classOf[AvroReadSupport[Topic]])
    ParquetOutputFormat.setWriteSupportClass(job,classOf[AvroWriteSupport])
    AvroParquetOutputFormat.setSchema(job, Topic.getClassSchema)


    // configure annotator
    val props = new Properties()
    props.put("annotators", "tokenize,ssplit,pos,lemma,parse")
    val an = DAnnotator(props)


    // annotator function
    def annotatePosts(ann : DAnnotator, top : Topic) : Topic = {
      val new_p = top.getPosts.map{ x=>
        val at = new Annotation(x.getPostText.toString)
        ann.annotator.annotate(at)
        val t = at.get(classOf[SentencesAnnotation]).map(_.get(classOf[TreeAnnotation])).toList

        val r = SpecificData.get().deepCopy[Post](x.getSchema,x)
        if(t.nonEmpty) r.setTrees(t)
        r
      }
      val new_t = SpecificData.get().deepCopy[Topic](top.getSchema,top)
      new_t.setPosts(new_p)
      new_t
    }

    // transformation
    val ds = sc.newAPIHadoopFile("forum_dataset.parq", classOf[ParquetInputFormat[Topic]], classOf[Void], classOf[Topic], job.getConfiguration)
    val new_ds = ds.map(x=> ( null, annotatePosts(x._2) ) )

    new_ds.saveAsNewAPIHadoopFile("annotated_posts.parq",
      classOf[Void],
      classOf[Topic],
      classOf[ParquetOutputFormat[Topic]],
      job.getConfiguration
    )
  }

最佳答案

能否确认数据确实在HDFS的多个block中? forum_dataset.parq 文件上的总 block 数

关于scala - Spark 作业未在本地并行化(使用本地文件系统中的 Parquet + Avro),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21510508/

相关文章:

scala - 异步 I/O 是否消耗线程?

java - 您如何运行 SecureSocial 演示?

scala - Akka Actor isTermied 已弃用

R 脚本在没有性能提升的情况下运行在外部集群上

mysql - 异常Mysql存储过程调用

hadoop - 数值数据 - 高容量 + 高速度 + 快速检索

serialization - Avro 替代 Writables

r - { : task 1 failed - "could not find function "knn""中的错误

java - 为什么线程不同时运行?

hadoop - 在字符串中搜索特定文本 - Hive