scala - 如何使用无限 Scala 流作为 Spark Streaming 中的源?

标签 scala apache-spark spark-streaming

假设我本质上希望将 Stream.from(0) 作为 InputDStream。我该怎么办呢?我能看到的唯一方法是使用 StreamingContext#queueStream ,但我必须将另一个线程或子类 Queue 中的元素排入队列以创建一个行为类似于无限流,这两者都感觉像是黑客。

正确的做法是什么?

最佳答案

我认为默认情况下它在 Spark 中不可用,但使用 ReceiverInputDStream 很容易实现它。

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

class InfiniteStreamInputDStream[T](
       @transient ssc_ : StreamingContext,
       stream: Stream[T],
       storageLevel: StorageLevel
      ) extends ReceiverInputDStream[T](ssc_)  {

  override def getReceiver(): Receiver[T] = {
    new InfiniteStreamReceiver(stream, storageLevel)
  }
}

class InfiniteStreamReceiver[T](stream: Stream[T], storageLevel: StorageLevel) extends Receiver[T](storageLevel) {

  // Stateful iterator
  private val streamIterator = stream.iterator

  private class ReadAndStore extends Runnable {
    def run(): Unit = {
      while (streamIterator.hasNext) {
        val next = streamIterator.next()
        store(next)
      }
    }
  }

  override def onStart(): Unit = {
    new Thread(new ReadAndStore).run()    
  }

  override def onStop(): Unit = { }
}

关于scala - 如何使用无限 Scala 流作为 Spark Streaming 中的源?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27742044/

相关文章:

apache-spark - 齐柏林飞艇 : Not Showing Hive Database/tables in HDP3. 0

scala - 优化案例类用作符号

scala - 按类型选择字段

scala - Spark Submit无法从jar中选择类路径

apache-spark-sql - 如何在 zeppelin 中自动更新 %spark.sql 结果以进行结构化流查询

java - Spark Streaming reduceByKeyAndWindow 示例

java - Spark + Kafka 流 NoClassDefFoundError kafka/serializer/StringDecoder

scala - 类型安全的 Scala Actor

java - 读取 Artifact 描述符失败 : IntelliJ

hadoop - Spark/YARN-并非所有节点都在Spark提交中使用