apache-spark - 在 Spark Streaming 中使用 updateStateByKey() 从原始事件流生成状态更改流

标签 apache-spark spark-streaming

当我遇到 updateStateByKey() 函数时,我刚刚开始寻找使用 Spark Streaming 进行有状态计算的解决方案。

我试图解决的问题:
10,000 个传感器每分钟产生一个二进制值。

如果传感器报告的连续值彼此不同,我想标记它并将其作为状态更改事件发送到 Kafka。

我的假设是 updateStateByKey() 可以在这个例子中使用,但是我并不完全了解实现相同的推荐方法。

最佳答案

我假设您将从传感器获得 (String, Int) 对流,其中 String 是传感器的 ID,Int 是传感器返回的二进制值。有了这个假设,你可以尝试这样的事情:

val sensorData: DStream[(String, Int)] = ???

val state = sensorData.updateStateByKey[(String, Int)](updateFunction _)

def updateFunction(newValues: Seq[(String, Int)], currentValues: Seq[(String, Int)]) = {
    val newValuesMap = newValues.toMap
    val currentValuesMap = currentValues.toMap

    currentValuesMap.keys.foreach ( (id) =>
            if(currrentValuesMap.get(id) != newValuesMap.getOrElse(id, -1)) {
                //send to Kafka
            }
    )       
    Some(newValues)
}

关于apache-spark - 在 Spark Streaming 中使用 updateStateByKey() 从原始事件流生成状态更改流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32088242/

相关文章:

hadoop - Spark Hive Context - 带有分区和大写字段名称的 Avro 表

python - Pyspark 窗口函数计算站点之间的公交数量

java - 如何在java中处理来自Apache Spark Streaming的Json数据

apache-spark - 启用检查点的 Spark Streaming 中的 java.io.NotSerializedException

scala - Spark Scala UDP 在监听端口上接收

amazon-web-services - 使用本地机器从 s3 读取数据 - pyspark

java - 来自 Spark 的集合库作为具有快速 HashMap 的单独项目

clojure - Spark 会支持 Clojure 吗?

java - Spark序列化错误: When I insert Spark Stream data into HBase

java - 使用 Spark SQL Row 在 Java 中访问多维 WrappedArray 元素