apache-spark - Spark Streaming:无状态重叠窗口与保持状态

标签 apache-spark spark-streaming

在使用Spark Streaming处理连续的有限事件 session 流时,选择无状态滑动窗口操作(例如reduceByKeyAndWindow)与选择保持状态(例如通过updateStateByKey或新的mapStateByKey)会有什么考虑?

例如,考虑以下情形:

A wearable device tracks physical exercises performed by the wearer. The device automatically detects when an exercise starts, and emits a message; emits additional messages while the exercise is undergoing (e.g. heart rate); and finally, emits a message when the exercise is done.



理想的结果是每个练习的汇总记录流。也就是说,同一 session 的所有事件都应该汇总在一起(例如,以便每个 session 都可以保存在单个数据库行中)。请注意,每个 session 的长度是有限的,但是来自多个设备的整个流是连续的。为了方便起见,让我们假设设备为每次锻炼生成一个GUID。

我可以看到两种通过Spark Streaming处理此用例的方法:
  • 使用不重叠的窗口,并保持状态。每个GUID都会保存状态,所有事件都与之匹配。当新事件到达时,状态被更新(例如,使用mapWithState),并且如果该事件是“运动 session 结束”,则将基于该状态发出汇总记录,并且移除密钥。
  • 使用重叠的滑动窗口,并仅保留第一个 session 。假设滑动窗口的长度为2,间隔为1(请参见下图)。还要假设窗口长度为2 X(最大可能的运动时间)。在每个窗口上,事件都由GUID合并,例如使用reduceByKeyAndWindow。然后,将从窗口后半部分开始的所有 session 都转储,并发射其余 session 。这样一来,每个事件就可以使用一次,并确保将属于同一 session 的所有事件汇总到一起。

  • 方法2的示意图:

    Only sessions starting in the areas marked with \\\ will be emitted. 
    -----------
    |window 1 |
    |\\\\|    |
    -----------
         ----------
         |window 2 |
         |\\\\|    |  
         -----------
              ----------
              |window 3 |
              |\\\\|    |
              -----------
    


    我看到的利弊:

    方法#1的计算开销较小,但是需要保存和管理状态(例如,如果并发 session 数增加,则状态可能会大于内存)。但是,如果最大并发 session 数是有界的,这可能不是问题。

    方法2的成本是两倍(每个事件被处理两次),并且具有更高的延迟(最大运动时间是2倍),但是由于没有保留任何状态,因此更简单易管理。

    处理该用例的最佳方法是什么-这些方法中的任何一种是“正确的”方法,还是有更好的方法?

    应该考虑哪些其他优点/缺点?

    最佳答案

    通常没有正确的方法,每个方法都有权衡。因此,我将添加其他方法,并概述我对他们的利弊的看法。因此,您可以决定哪一个更适合您。

    外部状态方法(方法3)

    您可以在外部存储中累积事件的状态。 Cassandra 经常被用于此。您可以分别处理最终事件和正在进行的事件,如下所示:

    val stream = ...
    
    val ongoingEventsStream = stream.filter(!isFinalEvent)
    val finalEventsStream = stream.filter(isFinalEvent)
    
    ongoingEventsStream.foreachRDD { /*accumulate state in casssandra*/ }
    finalEventsStream.foreachRDD { /*finalize state in casssandra, move to final destination if needed*/ }
    

    trackStateByKey方法(方法#1.1)

    这可能是您潜在的最佳解决方案,因为它消除了updateStateByKey的缺点,但考虑到它只是作为Spark 1.6版本的一部分发布的,因此它也可能存在风险(由于某种原因,它并不十分广告)。如果您想了解更多信息,可以使用link作为起点

    优点缺点

    方法1(updateStateByKey)

    优点
  • 易于理解或解释(对团队其他成员,新手等)(主观)
  • 存储:更好地利用内存仅存储锻炼的最新状态
  • 存储:仅保留正在进行的练习,并在完成
  • 后立即将其丢弃
  • 延迟仅受每个微批处理性能的限制

  • 缺点
  • 存储:如果键(并行练习)的数量很大,则可能不适合您的群集的内存
  • 处理:它将为状态图中的每个键运行updateState函数,因此,如果并发练习的数量很大-性能将受到
  • 的影响

    方法2(窗口)

    尽管可以通过Windows实现所需的功能,但在您的情况下看起来自然度大大降低。

    优点
  • 在某些情况下(取决于数据),处理可能比updateStateByKey更有效,因为即使没有实际更新
  • ,updateStateByKey也会对每个键运行更新

    缺点
  • “最大可能的运动时间”-听起来像是巨大的风险-基于人类的行为,它可能是相当随意的持续时间。有些人可能会忘记“完成运动”。还取决于锻炼的种类,但是可能需要几秒钟到几小时不等,当您想降低快速锻炼的潜伏期,而又不得不将潜伏期保持在尽可能长的锻炼时,
  • 觉得很难向其他人解释它如何工作(主观)
  • 存储:必须将所有数据保留在窗口框架内,而不仅仅是最新数据。也只有在窗口从该时隙滑出时才释放内存,而实际上没有完成锻炼时才释放。如果只保留最后两个时隙,可能不会有太大的区别-如果您尝试通过更频繁地滑动窗口来获得更大的灵活性,则差异会增加。

  • 方法3(外部状态)

    优点
  • 易于解释,等等。(主观)
  • 纯流处理方法,意味着spark负责对每个事件进行操作,但不尝试存储状态等。(主观)
  • 存储:不受群集内存的限制以存储状态-可以处理大量并发练习
  • 处理:仅当状态有实际更新时才更新状态(与updateStateByKey不同)
  • Latency与updateStateByKey相似,仅受处理每个微批处理
  • 所需的时间限制

    缺点
  • 体系结构中的额外组件(除非您已经将Cassandra用于最终输出)
  • 处理:默认情况下比仅在spark中处理要慢,因为不是内存中+您需要通过网络
  • 传输数据
  • 您必须实现一次语义,才能将数据输出到cassandra中(对于在foreachRDD期间工作人员失败的情况)

    建议的方法

    我会尝试以下方法:
  • 对数据和集群
  • 测试updateStateByKey方法
  • 查看即使进行大量并发练习(预计在高峰时间),内存消耗和处理是否可以接受
  • 如果不使用
  • ,则退回与Cassandra接触

    关于apache-spark - Spark Streaming:无状态重叠窗口与保持状态,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34630251/

    相关文章:

    hadoop - Apache Spark JavaSchemaRDD 是空的,即使它的输入 RDD 有数据

    twitter - 启动 Spark 流上下文时出错

    python - 如何使用 Pyspark Streaming 模块实现 RabbitMQ 消费者?

    java - Spark Streaming/Kafka 偏移量处理

    hadoop - Apache Spark上的Apache Hive

    python-2.7 - Spark安装错误--pyspark

    performance - 为什么 Spark 不将作业分配给所有执行程序,而是仅分配给一个执行程序?

    java - Spark : Two SparkContexts in a single Application Best Practice

    scala - 在出现一定数量的错误后停止在 Apache Spark 中处理大型文本文件

    java - 使用 Marathon 运行 Spark 作业