apache-spark - 基于流的应用程序中的受控/手动错误/恢复处理

标签 apache-spark error-handling apache-kafka stream apache-flink

我正在开发一个基于 Apache Flink 的应用程序,它使用 Apache Kafka 进行输入和输出。可能此应用程序将移植到 Apache Spark,因此我也将其添加为标记,问题仍然存在。

我要求所有通过 kafka 收到的传入消息必须按顺序处理,并安全地存储在持久层(数据库)中,并且不能丢失任何消息。

此应用程序中的流媒体部分相当微不足道/很小,因为主要逻辑将归结为如下内容:

environment.addSource(consumer)    // 1) DataStream[Option[Elem]]
  .filter(_.isDefined)             // 2) discard unparsable messages
  .map(_.get)                      // 3) unwrap Option
  .map(InputEvent.fromXml(_))      // 4) convert from XML to internal representation
  .keyBy(_.id)                     // 5) assure in-order processing on logical-key level
  .map(new DBFunction)             // 6) database lookup, store of update and additional enrichment
  .map(InputEvent.toXml(_))        // 7) convert back to XML
  .addSink(producer)               // 8) attach kafka producer sink

现在,在此管道中,可能会发生几种错误情况:

  • 数据库变得不可用(关闭,表空间已满,...)
  • 由于逻辑错误(来自列格式)而无法存储更改
  • 由于代理不可用,kafka 生产者无法发送消息

可能还有其他情况。

现在我的问题是,在这些情况下,如何确保一致性,而实际上我必须做类似的事情:

  1. Stream-Operator 6) 检测到问题(数据库不可用)
  2. 必须恢复 DBFunction 对象的 DB 连接,这可能只会在几分钟后成功
  3. 这意味着必须暂停整个处理过程,最好是暂停整个管道,以便将传入的消息大量加载到内存中
  4. 数据库恢复后继续处理。处理必须完全恢复到在 1) 遇到问题的消息

现在我知道至少有 2 个关于故障处理的工具:

  1. kafka 消费者补偿
  2. apache flink 检查点

但是,在搜索文档时,我没有看到如何在单个运算符中的流处理过程中使用其中任何一个。

那么,在流式应用程序中进行细粒度错误处理和恢复的推荐策略是什么?

最佳答案

几点:

keyBy 不会帮助确保按顺序处理。如果有的话,它可能会交错来自不同 Kafka 分区的事件(这些分区在每个分区中可能是有序的),从而在以前不存在的地方造成乱序。在不了解您打算使用多少个 FlinkKafkaConsumer 实例、每个实例将从多少个分区、 key 如何分布在 Kafka 分区以及您认为的原因的情况下,很难更具体地评论如何保证按顺序处理一个 keyBy 是必要的——但如果你设置正确,保留顺序是可以实现的。 reinterpretAsKeyedStream在这里可能会有帮助,但此功能很难理解,并且很难正确使用。

你可以使用 Flink 的 AsyncFunction以容错、恰好一次的方式管理与外部数据库的连接。

Flink 不以系统的方式支持细粒度恢复——它的检查点是整个分布式集群状态的全局快照,旨在在恢复期间用作整体、自洽的快照。如果您的作业失败,通常唯一的办法是从检查点重新启动,这将涉及倒带输入队列(到检查点中存储的偏移量),重放自这些偏移量以来的事件,重新发出数据库查找(异步函数会自动执行),并使用kafka事务来实现端到端的exactly once语义。然而,在令人尴尬的并行作业的情况下,有时可以利用 fine-grained recovery .

关于apache-spark - 基于流的应用程序中的受控/手动错误/恢复处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54444938/

相关文章:

r - 在 SparkR 中删除 DataFrame 的列

hadoop - Spark : multiple spark-submit in parallel

apache-spark - Pyspark 中带有 IN 子句的语句时的情况

php - 将PHP异常和错误记录到数据库表PHP Slim 3中

javascript - javascript 中返回错误的纯同步函数

playframework - 与 websocket 连接时使用 akka-stream-kafka 从 kafka Topic 获取最后一条消息

python - 如何在Kafka中处理一次消息,以便服务重新启动时不会处理所有消息

apache-spark - 流式场景的 Spark UI 上的 "Stages"是什么意思

jquery - 当我使用 .ajax 和 JSONP 时,如何处理失败的 JSON 请求?

elasticsearch - Kafka Elasticsearch 连接器 - 'Flush timeout expired with unflushed records:'