scala - 值尾不是(String,String)的成员

标签 scala hadoop apache-spark apache-kafka

我正在使用Spark-shell。我已经在Kafka主题中存储了推文,以使用Spark-shell进行情感分析。

我添加了依赖项:
org.apache.spark:spark-streaming-kafka_2.10:1.6.2
edu.stanford.nlp:stanford-corenlp:3.5.1

这些是我正在使用的代码:

import org.apache.spark._
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds 
import org.apache.spark.streaming.kafka._
val conf = new SparkConf().setMaster("local[4]").setAppName("KafkaReceiver")
val ssc = new StreamingContext(conf, Seconds(5))
val kafkaStream = KafkaUtils.createStream(ssc, "sandbox.hortonworks.com:2181","test-consumer-group", Map("test12" -> 5))
val topCounts60 = kafkaStream.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)).map { case (topic, count) => (count, topic) }.transform(_.sortByKey(false))
  topCounts60.foreachRDD(rdd => {
      val topList = rdd.take(10)
      println("\nPopular topics in last 60 seconds (%s total):".format(rdd.count()))
      topList.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
    })
kafkaStream.count().map(cnt => "Received " + cnt + " kafka messages.").print()
val wordSentimentFilePath = "hdfs://sandbox.hortonworks.com:8020/TwitterData/AFINN.txt"
    val wordSentiments = ssc.sparkContext.textFile(wordSentimentFilePath).map { line =>
    val Array(word, happiness) = line.split("\t")
    (word, happiness)
    } cache()
val happiest60 = kafkaStream.map(hashTag => (hashTag.tail, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)). transform{topicCount => wordSentiments.join(topicCount)}
                .map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}.map{case (topic, happinessValue) => (happinessValue, topic)}.transform(_.sortByKey(false))
ssc.start()
ssc.stop()

但是在执行这些行时
val happiest60 = kafkaStream.map(hashTag => (hashTag.tail,1)).reduceByKeyAndWindow(_ + _, Seconds(60)). transform{topicCount => wordSentiments.join(topicCount)}.map{case (topic, tuple) => (topic, tuple._1 * tuple._2)}.map{case (topic, happinessValue) => (happinessValue, topic)}.transform(_.sortByKey(false))

它引发错误:

error : value tail is not a member of (String, String)

最佳答案

hashTag的类型可能是(String, String),所以未定义tail操作。 tail是在集合(而非元组)上定义的函数。
map操作对从流中接收到的单个项目进行操作。如果kafka流包含类型为(String, String)的项目,那是正常的。

关于scala - 值尾不是(String,String)的成员,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46543435/

相关文章:

scala - 更改 Scala 案例类树中的节点

hadoop - 在HDFS中检查权限

java - 致命[主要] org.apache.hadoop.mapreduce.v2.app.MRAppMaster:启动MRAppMaster java.lang.NoClassDefFoundError时出错

scala - 错误 : org. apache.spark.rdd.RDD[(String,Int)] 不带参数

apache-spark - Spark :executor. CoarseGrainedExecutorBackend:驱动程序已解除关联已解除关联

bash - 如何使用bash脚本在群集中的YARN上快速设置Spark?

scala - lambda () => 1 是否会在每次作为参数传递时在运行时创建对象?

scala - 使用 "newAPIHadoopFile"API 时出错

Scala - 将特征与其导入混合(继承导入)

hadoop - Hive gzip文件解压