apache-flink - 如何在 Apache Flink 中并行写入接收器

标签 apache-flink flink-streaming

我有一个并行度为 8 的 map DataStream。我向 DataStream 添加了两个接收器。一个很慢(Elasticsearch),另一个很快(HDFS)。但是,我的事件仅在刷新到 ES 后才写入 HDFS,因此使用 ES 所需的时间比不使用 ES 所需的时间长。

dataStream.setParallelism(8);
dataStream.addSink(elasticsearchSink);
dataStream.addSink(hdfsSink);

在我看来,两个水槽都使用相同的线程。是否可以通过对两个接收器使用相同的源,或者我是否必须添加另一个作业,一个用于每个接收器,以并行写入输出?

我检查了 Map(1/8) 到 Map(8/8) 正在部署和接收数据的日志。

最佳答案

如果 Elasticsearch sink 跟不上它的输入产生的速度,它就会减慢它的输入操作符。这个概念称为背压,这意味着缓慢的消费者会阻止快速的生产者进行处理。

使您的程序按预期运行的唯一方法(HDFS 接收器的写入速度比 Elasticsearch 接收器的写入速度快)是缓冲 HDFS 接收器写入但 Elasticsearch 接收器尚未写入的所有记录。如果 Elasticsearch 接收器始终较慢,您将在某个时间点耗尽内存/磁盘空间。

Flink 解决慢消费者问题的方法是背压。

我看到了两种解决此问题的方法:

  • 增加 ElasticsearchSink 的并行度。这可能有帮助,也可能无济于事,具体取决于您的 Elasticsearch 设置的功能。
  • 将两个作业作为独立的管道运行。在这种情况下,您必须计算所有结果两次。
  • 关于apache-flink - 如何在 Apache Flink 中并行写入接收器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39791975/

    相关文章:

    apache-flink - Flink 窗口状态大小和状态管理

    apache-flink - 如何确保flink作业已完成执行然后执行一些任务

    apache-flink - 如何迭代 Flink DataStream 中的每条消息?

    kubernetes - Flink-在Kubernetes上部署Flink应用程序的多个实例

    java - 如何在 Apache Flink 中注册 java.util.List 类型

    apache-flink - 从 Kafka 读取时使用 KeyBy 与 reinterpretAsKeyedStream()

    amazon-s3 - 无法执行HTTP请求: Timeout waiting for connection from pool in Flink

    elasticsearch - 如何从 Apache Flink 写入 Elasticsearch

    java - Flink readCsvFile 方法与 pojoTypes

    apache-flink - 弗林克 : How to cancel the correct job using the command line interface?