scala - Spark 流 MQTT

标签 scala apache-spark mqtt spark-streaming

我一直在使用 Spark 从 kafka 流式传输数据,这非常简单。

我认为使用 MQTT utils 也很容易,但出于某种原因并非如此。

我正在尝试执行以下代码。

  val sparkConf = new SparkConf(true).setAppName("amqStream").setMaster("local")
  val ssc = new StreamingContext(sparkConf, Seconds(10))

  val actorSystem = ActorSystem()
  implicit val kafkaProducerActor = actorSystem.actorOf(Props[KafkaProducerActor])

  MQTTUtils.createStream(ssc, "tcp://localhost:1883", "AkkaTest")
    .foreachRDD { rdd =>
      println("got rdd: " + rdd.toString())
      rdd.foreach { msg =>
        println("got msg: " + msg)
      }
    }

  ssc.start()
  ssc.awaitTermination()

奇怪的是,spark 记录了我在控制台中发送的消息,但没有记录我的 println。

它记录如下内容:

19:38:18.803 [RecurringTimer - BlockGenerator] DEBUG o.a.s.s.receiver.BlockGenerator - Last element in input-0-1435790298600 is SOME MESSAGE

最佳答案

foreach 是一个分布式操作,因此您的 println 可能在工作线程上执行。如果您想查看本地打印的一些消息,您可以使用 DStream 上内置的 print 函数,或者代替 foreachRDD 收集(或获取)一些消息将元素返回给驱动程序并在那里打印它们。希望对 Spark Streaming 有帮助,祝您好运:)

关于scala - Spark 流 MQTT,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31172858/

相关文章:

apache-spark - spark redis 键列映射不起作用 - 返回 null

mqtt - Mosquitto 2.0.14 MQTT 与 TLS1.2 连接问题客户端 <unknown> 由于数据包格式错误而断开连接

scala - Cats Effect IO - 在使用尝试/兑换 "inside"顶级平面图时,如何确保将引发的异常捕获为值?

python - 如何在scala中实现Python的norm.expect

apache-spark - Spark MLlib 和 Spark ML 中的 PCA

java - 如何通过 Spark Listener 检查 Databricks 上是否安装了类?

golang mqtt 发布和订阅

node.js - 使用相同的 JWT token 进行 MQTT 身份验证

scala - 使用 Scalas 新动态类型的动态代理

java - 在 View 中 Play Framework 帮助