scala - Spark Scala UDP 在监听端口上接收

标签 scala sockets apache-spark udp spark-streaming

中提到的例子 http://spark.apache.org/docs/latest/streaming-programming-guide.html 让我在 TCP 流中接收数据包并在 9999 端口

上监听
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))


 // Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

我可以通过在我的 Linux 系统中使用创建数据服务器来通过 TCP 发送数据 $ nc -lk 9999

问题
我需要使用 UDP 和 Scala/Spark
从安卓手机接收流 val lines = ssc.socketTextStream("localhost", 9999)
仅在 TCP 流中接收。

我如何使用 Scala+Spark 以类似的简单方式接收 UDP 流并创建 Spark DStream。

最佳答案

没有内置的东西,但你自己完成它并没有太多的工作。这是我基于自定义 UdpSocketInputDStream[T] 制作的简单解决方案:

import java.io._
import java.net.{ConnectException, DatagramPacket, DatagramSocket, InetAddress}

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

import scala.reflect.ClassTag
import scala.util.control.NonFatal

class UdpSocketInputDStream[T: ClassTag](
                                          _ssc: StreamingContext,
                                          host: String,
                                          port: Int,
                                          bytesToObjects: InputStream => Iterator[T],
                                          storageLevel: StorageLevel
                                        ) extends ReceiverInputDStream[T](_ssc) {

  def getReceiver(): Receiver[T] = {
    new UdpSocketReceiver(host, port, bytesToObjects, storageLevel)
  }
}

class UdpSocketReceiver[T: ClassTag](host: String,
                                     port: Int,
                                     bytesToObjects: InputStream => Iterator[T],
                                     storageLevel: StorageLevel) extends Receiver[T](storageLevel) {

  var udpSocket: DatagramSocket = _

  override def onStart(): Unit = {

    try {
      udpSocket = new DatagramSocket(port, InetAddress.getByName(host))
    } catch {
      case e: ConnectException =>
        restart(s"Error connecting to $port", e)
        return
    }

    // Start the thread that receives data over a connection
    new Thread("Udp Socket Receiver") {
      setDaemon(true)

      override def run() {
        receive()
      }
    }.start()
  }

  /** Create a socket connection and receive data until receiver is stopped */
  def receive() {
    try {
      val buffer = new Array[Byte](2048)

      // Create a packet to receive data into the buffer
      val packet = new DatagramPacket(buffer, buffer.length)

      udpSocket.receive(packet)

      val iterator = bytesToObjects(new ByteArrayInputStream(packet.getData, packet.getOffset, packet.getLength))
      // Now loop forever, waiting to receive packets and printing them.
      while (!isStopped() && iterator.hasNext) {
        store(iterator.next())
      }

      if (!isStopped()) {
        restart("Udp socket data stream had no more data")
      }
    } catch {
      case NonFatal(e) =>
        restart("Error receiving data", e)
    } finally {
      onStop()
    }
  }

  override def onStop(): Unit = {
    synchronized {
      if (udpSocket != null) {
        udpSocket.close()
        udpSocket = null
      }
    }
  }
}

为了让StreamingContext在自身上添加一个方法,我们用一个隐式类来丰富它:

object Implicits {
  implicit class StreamingContextOps(val ssc: StreamingContext) extends AnyVal {
    def udpSocketStream[T: ClassTag](host: String,
                                     port: Int,
                                     converter: InputStream => Iterator[T],
                                     storageLevel: StorageLevel): InputDStream[T] = {
      new UdpSocketInputDStream(ssc, host, port, converter, storageLevel)
    }
  }
}

这就是你如何调用它:

import java.io.{BufferedReader, InputStream, InputStreamReader}
import java.nio.charset.StandardCharsets

import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.reflect.ClassTag

object TestRunner {
  import Implicits._

  def main(args: Array[String]): Unit = {
    val sparkContext = new SparkContext("local[*]", "udpTest")
    val ssc = new StreamingContext(sparkContext, Seconds(4))

    val stream = ssc.udpSocketStream("localhost", 
                                     3003, 
                                     bytesToLines, 
                                     StorageLevel.MEMORY_AND_DISK_SER_2)
    stream.print()

    ssc.start()
    ssc.awaitTermination()
  }

  def bytesToLines(inputStream: InputStream): Iterator[String] = {
    val dataInputStream = new BufferedReader(
      new InputStreamReader(inputStream, StandardCharsets.UTF_8))
    new NextIterator[String] {
      protected override def getNext(): String = {
        val nextValue = dataInputStream.readLine()
        if (nextValue == null) {
          finished = true
        }
        nextValue
      }

      protected override def close() {
        dataInputStream.close()
      }
    }
  }

  abstract class NextIterator[U] extends Iterator[U] {
    protected var finished = false
    private var gotNext = false
    private var nextValue: U = _
    private var closed = false

    override def next(): U = {
      if (!hasNext) {
        throw new NoSuchElementException("End of stream")
      }
      gotNext = false
      nextValue
    }

    override def hasNext: Boolean = {
      if (!finished) {
        if (!gotNext) {
          nextValue = getNext()
          if (finished) {
            closeIfNeeded()
          }
          gotNext = true
        }
      }
      !finished
    }

    def closeIfNeeded() {
      if (!closed) {
        closed = true
        close()
      }
    }

    protected def getNext(): U
    protected def close()
  }
}

这段代码大部分取自Spark提供的SocketInputDStream[T],我简单的复用了一下。我还获取了 bytesToLines 使用的 NextIterator 的代码,它所做的只是消耗数据包中的行并将其转换为 String。如果您有更复杂的逻辑,您可以通过传递 converter: InputStream => Iterator[T] 您自己的实现来提供它。

使用简单的 UDP 数据包对其进行测试:

echo -n "hello hello hello!" >/dev/udp/localhost/3003

产量:

-------------------------------------------
Time: 1482676728000 ms
-------------------------------------------
hello hello hello!

当然,这个还有待进一步测试。我还有一个隐藏的假设,即从 DatagramPacket 创建的每个 buffer 都是 2048 字节,这可能是您想要更改的内容。

关于scala - Spark Scala UDP 在监听端口上接收,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41320483/

相关文章:

c# - 通过套接字发送和接收多个变量数据的最佳方式

c++ - 收到信号 : SIGPIPE (Broken pipe)

python - 为什么我会收到错误的文件描述符错误?

scala - Spark 管示例

scala - Scala Netty如何为基于字节数据的协议(protocol)创建简单的客户端?

scala - 如何在 Apache Spark 中执行 UPSERT 或 MERGE 操作?

apache-spark - 创建一个包含不使用 UDF 的数字范围的 Df 列

mysql - Spark - Mysql 连接 - 不支持的主要版本 52.0 错误

mongodb - 尝试使用 scala 从 Spark 向 mongodb 写入 $group 聚合时出现重复键错误

scala - 值(value) !不是 Actor 的成员