我一直在使用 Spark 从 kafka 流式传输数据,这非常简单。
我认为使用 MQTT utils 也很容易,但出于某种原因并非如此。
我正在尝试执行以下代码。
val sparkConf = new SparkConf(true).setAppName("amqStream").setMaster("local")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val actorSystem = ActorSystem()
implicit val kafkaProducerActor = actorSystem.actorOf(Props[KafkaProducerActor])
MQTTUtils.createStream(ssc, "tcp://localhost:1883", "AkkaTest")
.foreachRDD { rdd =>
println("got rdd: " + rdd.toString())
rdd.foreach { msg =>
println("got msg: " + msg)
}
}
ssc.start()
ssc.awaitTermination()
奇怪的是,spark 记录了我在控制台中发送的消息,但没有记录我的 println。
它记录如下内容:
19:38:18.803 [RecurringTimer - BlockGenerator] DEBUG o.a.s.s.receiver.BlockGenerator - Last element in input-0-1435790298600 is SOME MESSAGE
最佳答案
foreach
是一个分布式操作,因此您的 println 可能在工作线程上执行。如果您想查看本地打印的一些消息,您可以使用 DStream 上内置的 print
函数,或者代替 foreachRDD
收集(或获取)一些消息将元素返回给驱动程序并在那里打印它们。希望对 Spark Streaming 有帮助,祝您好运:)
关于scala - Spark 流 MQTT,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31172858/