scala - Spark Streaming 和 Kafka : value reduceByKey is not a member of org. apache.spark.streaming.dstream.DStream[任何]

标签 scala apache-spark apache-kafka spark-streaming

我尝试使用 Kafka Consumer 和 SparkStreaming 对 DStream 进行 ETL,但出现以下错误。你能帮我解决这个问题吗?谢谢。

KafkaCardCount.scala:56:28: value reduceByKey is not a member of org.apache.spark.streaming.dstream.DStream[Any]
[error]       val wordCounts = etl.reduceByKey(_ + _)
[error]                            ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 7 s, completed Jan 14, 2018 2:52:23 PM

我有这个示例代码。我发现很多文章建议添加 import import org.apache.spark.streaming.StreamingContext._ 但它似乎对我不起作用。

package example

import org.apache.spark.streaming.StreamingContext._
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.{Durations, StreamingContext}

val ssc = new StreamingContext(sparkConf, Durations.seconds(5))

val stream = KafkaUtils.createDirectStream[String, String](
    ssc,
    PreferConsistent,
    Subscribe[String, String](topics, kafkaParams)
)

val etl = stream.map(r => {
    val split = r.value.split("\t")
    val id = split(1)
    val numStr = split(4)
    if (numStr.matches("\\d+")) {
        val num = numStr.toInt
        val tpl = (id, num)
        tpl
    } else {
        ()
    }
})

// Create the counts per game
val wordCounts = etl.reduceByKey(_ + _)

wordCounts.print()

我有这个 build.sbt。

lazy val root = (project in file(".")).
  settings(
    inThisBuild(List(
      organization := "example",
      scalaVersion := "2.11.8",
      version      := "0.1.0-SNAPSHOT"
    )),
    name := "KafkaCardCount",
    libraryDependencies ++= Seq (
      "org.apache.spark" %% "spark-core" % "2.1.0",
      "org.apache.spark" % "spark-streaming_2.11" % "2.1.0",
      "org.apache.spark" %% "spark-streaming-kafka-0-10-assembly" % "2.1.0"
    )
  )

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.first
}

最佳答案

您的问题在这里:

else {
    ()
}

(String, Int)Unit 的公共(public)父类(super class)型是 Any

您需要做的是使用类似于 success (if) 子句的类型来表示处理失败。例如:

else ("-1", -1)
 .filter { case (id, res) => id != "-1" && res != -1 }
 .reduceByKey(_ + _)

关于scala - Spark Streaming 和 Kafka : value reduceByKey is not a member of org. apache.spark.streaming.dstream.DStream[任何],我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48247123/

相关文章:

Scala:如何在编译时不知道类型的情况下调用具有类型参数和 list 的方法?

mysql - 将 Spark 和 Mysql 与 mysql-connector-java 结合使用

java - 如何获取每个分区当前最新的偏移量,然后只消耗该偏移量?

java - HDFS API - 计算目录、文件和字节数

scala - 什么是 Map.compute 在 scala.collection.mutable.Map 中的等效功能

scala - 如何将 RDD 保存到 HDFS 中并稍后将其读回?

java - 从 java 以编程方式执行 spark-submit

java - 组织.apache.spark.SparkException : Task not serializable - Passing RDD

java - 卡夫卡1.0流媒体API : message consumption from partitions get delayed

apache-kafka - Kafka - 无法建立到节点 -1 的连接