apache-spark - Kappa 架构 : when insert to batch/analytic serving layer happens

标签 apache-spark architecture streaming apache-flink lambda-architecture

如您所知,Kappa 架构是 Lambda 架构的某种简化。 Kappa 不需要批处理层,而是速度层必须保证历史数据重新计算的计算精度和足够的吞吐量(更多的并行性/资源)。

仍然 Kappa 架构需要两个服务层,以防您需要根据历史数据进行分析。例如,年龄 < 2 周的数据存储在 Redis(流服务层)中,而所有较旧的数据存储在 HBase(批处理服务层)的某个地方。

什么时候(由于 Kappa 架构)我必须将数据插入到批处理服务层?
如果流层立即将数据插入批处理和流服务层 - 那么延迟数据到达呢?或者流层应该定期备份速度服务层到批量服务层?

示例:假设数据源是 Kafka,数据由 Spark Structured Streaming 或 Flink 处理,Sinks 是 Redis 和 HBase。何时写入 Redis & HBase 应该发生?

最佳答案

如果我们执行流处理,我们要确保首先将输出数据作为数据流提供。在您的示例中,这意味着我们将 Kafka 作为主要接收器写入。

现在你有两个选择:

  • 具有从该 Kafka 主题读取并写入 Redis 和 HBase 的辅助作业。这就是 Kafka 方式,因为 Kafka Streams 不支持直接写入这些系统中的任何一个,并且您设置了 Kafka 连接作业。然后可以针对特定的接收器定制这些辅助作业,但它们会增加额外的操作开销。 (这是您提到的一些备份选项)。
  • 使用 Spark 和 Flink,您还可以选择直接在您的工作中使用辅助接收器。您可以添加额外的处理步骤来将 Kafka 输出转换为更适合接收器的形式,但是在配置作业时您会受到更多限制。例如在 Flink 中,您需要对 Kafka sink 和 Redis/HBase sink 使用相同的检查点设置。然而,如果设置成功,您只需要运行一个流式作业而不是 2 或 3。

  • 迟到的事件

    现在的问题是如何处理迟到的数据。最好的解决方案是让框架通过水印来处理。也就是说,只有在框架确定没有延迟数据到达时,才会在所有接收器上提交数据。如果这不起作用,因为您确实需要处理延迟事件,即使它们到达很晚很晚并且仍然希望获得临时结果,则必须使用更新事件。

    更新事件

    (根据 OP 的要求,我将在更新事件中添加更多详细信息)

    在 Kafka Streams 中,默认情况下,元素是通过持续细化机制发出的。这意味着,窗口聚合一旦拥有任何有效数据点就会发出结果,并在接收新数据时更新该结果。因此,处理任何迟到的事件并产生更新的结果。虽然这种方法很好地减轻了用户的负担,因为他们不需要理解水印,但它有一些严重的缺点,导致 Kafka Streams 开发人员添加 Suppression在 2.1 及以后。

    主要问题是它对向下用户处理中间结果提出了相当大的挑战,这在关于 Suppression 的文章中也有解释。如果结果是临时的还是“最终的”(在所有预期事件都已处理的意义上)并不明显,那么许多应用程序就很难实现。特别是,需要在消费者端复制窗口操作以获得“最终”值。

    另一个问题是数据量被炸毁。如果你有一个强大的聚合因子,使用基于水印的发射将在第一次操作后大量减少你的数据量。但是,连续细化将添加一个恒定的体积因子,因为每个记录都会为所有中间步骤触发一个新的(中间)记录。

    最后,对您来说特别有趣的是,如果您有更新事件,如何将数据卸载到外部系统。理想情况下,您会连续或定期卸载具有一定时间延迟的数据。该方法再次在消费者端模拟基于水印的发射。

    混合选项

    可以对初始发射使用水印,然后对后期事件使用更新事件。然后减少所有“准时”事件的音量。例如,Flink 提供 allowed lateness使窗口再次触发延迟事件。

    这种设置使卸载数据变得更加容易,因为只有在延迟事件实际发生时才需要将数据重新发送到外部系统。应该对系统进行调整,以便延迟事件是一种罕见的情况。

    关于apache-spark - Kappa 架构 : when insert to batch/analytic serving layer happens,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58390639/

    相关文章:

    hadoop - sqlContext.read...load() 和 sqlContext.write...save() 代码在 Spark Cluster 上运行在哪里?

    apache-spark - 将 mqtt 与 pyspark 流结合使用

    c# - 如何设计与不可测试函数绑定(bind)的可测试代码

    swift - Apple 公司是否允许应用程序从 YouTube 流式传输视频?

    java - response.flushBuffer() 不工作

    apache-spark - 计算pyspark中的分组中位数

    scala - 如何在spark中为diff文件名调用单独的逻辑

    optimization - 使用CMake编译openCV:设置架构和优化标志

    .net - 关于 "remoting"的最佳实践,一个带有 WCF 的 .NET 类库

    json - 为什么一些带有地理标签的推文为空? (Twitter 流 API)