apache-flink - flink key通过添加延迟;我怎样才能减少这个延迟?

标签 apache-flink flink-streaming low-latency

当我使用 KeyedStream 运行一个简单的 flink 应用程序时,我观察到事件的时间延迟从 0 到 100 毫秒不等。下面是程序

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<Long> source = env.addSource(new SourceFunction<Long>() {
        public void run(SourceContext<Long> sourceContext) throws Exception {
            while(true) {
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(System.currentTimeMillis());
                    Thread.sleep(1000);
                }
            }
        }

        public void cancel() {}
    }).keyBy(new KeySelector<Long, Long>() {
        @Override
        public Long getKey(Long l) throws Exception {
            return l;
        }
    }).addSink(new SinkFunction<Long>() {
        @Override
        public void invoke(Long l) throws Exception {
            long diff = System.currentTimeMillis() - l;
            System.out.println("in Sink: diff=" + diff);
        }
    });
    env.execute();

输出是:

in Sink: diff=0
in Sink: diff=2
in Sink: diff=4
in Sink: diff=4
in Sink: diff=5
in Sink: diff=7
in Sink: diff=9
in Sink: diff=9
in Sink: diff=11
in Sink: diff=12
in Sink: diff=14
in Sink: diff=14
in Sink: diff=16
in Sink: diff=17
in Sink: diff=18
in Sink: diff=19
in Sink: diff=21
in Sink: diff=22
in Sink: diff=24
in Sink: diff=24
in Sink: diff=26
in Sink: diff=27
in Sink: diff=29
in Sink: diff=29
in Sink: diff=31
in Sink: diff=32
in Sink: diff=34
in Sink: diff=34
in Sink: diff=36
in Sink: diff=37
in Sink: diff=39
in Sink: diff=40
in Sink: diff=41
in Sink: diff=43
in Sink: diff=45
in Sink: diff=45
in Sink: diff=47
in Sink: diff=48
in Sink: diff=50
in Sink: diff=50
in Sink: diff=52
in Sink: diff=53
in Sink: diff=55
in Sink: diff=57
in Sink: diff=57
in Sink: diff=59
in Sink: diff=60
in Sink: diff=61
in Sink: diff=62
in Sink: diff=63
in Sink: diff=65
in Sink: diff=66
in Sink: diff=67
in Sink: diff=69
in Sink: diff=70
in Sink: diff=72
in Sink: diff=72
in Sink: diff=74
in Sink: diff=76
in Sink: diff=77
in Sink: diff=78
in Sink: diff=79
in Sink: diff=81
in Sink: diff=82
in Sink: diff=83
in Sink: diff=84
in Sink: diff=86
in Sink: diff=87
in Sink: diff=88
in Sink: diff=89
in Sink: diff=91
in Sink: diff=92
in Sink: diff=94
in Sink: diff=94
in Sink: diff=96
in Sink: diff=97
in Sink: diff=99
in Sink: diff=99
in Sink: diff=0
in Sink: diff=2
in Sink: diff=3
in Sink: diff=4
in Sink: diff=4
in Sink: diff=5
in Sink: diff=7
in Sink: diff=9
in Sink: diff=9
in Sink: diff=11
in Sink: diff=12
in Sink: diff=14
in Sink: diff=14
in Sink: diff=16
in Sink: diff=17
in Sink: diff=18
in Sink: diff=19
in Sink: diff=21
in Sink: diff=22
in Sink: diff=24
in Sink: diff=24
in Sink: diff=26
in Sink: diff=46
in Sink: diff=48
in Sink: diff=50
in Sink: diff=52
in Sink: diff=53
in Sink: diff=54
in Sink: diff=56
in Sink: diff=58
in Sink: diff=59
in Sink: diff=60
in Sink: diff=62
in Sink: diff=64
in Sink: diff=65
in Sink: diff=66
in Sink: diff=68
in Sink: diff=70
in Sink: diff=71
in Sink: diff=73
in Sink: diff=74
in Sink: diff=76
in Sink: diff=77
in Sink: diff=79
in Sink: diff=81
in Sink: diff=82
in Sink: diff=83
in Sink: diff=85
in Sink: diff=86
in Sink: diff=88
in Sink: diff=88
in Sink: diff=90
in Sink: diff=92
in Sink: diff=92
in Sink: diff=94
in Sink: diff=95
in Sink: diff=97
in Sink: diff=98
in Sink: diff=99
in Sink: diff=0
in Sink: diff=2
in Sink: diff=4
in Sink: diff=4
in Sink: diff=5
in Sink: diff=7
in Sink: diff=9

正如您所看到的,延迟是一种逐渐增加到 100 然后下降并从 0 开始的模式,如此循环重复。我需要尽可能低的延迟。这个例子是我的真实应用程序的简化版本。有人可以向我解释延迟的原因以及如何将其降低到尽可能低的水平吗?

最佳答案

造成这种延迟的原因是,通过添加 keyBy,您将强制进行网络洗牌以及序列化/反序列化。延迟变化如此之大的原因是由于网络缓冲。

您需要阅读文档中名为 Controlling Latency 的部分。 。 tl;dr 是您想要将网络缓冲区超时设置为较小的值:

env.setBufferTimeout(timeoutMillis);

如果需要,您可以将缓冲区超时设置为零,但这比将其设置为较小的值(例如 1 毫秒或 5 毫秒)对吞吐量的影响更大。默认值为 100 毫秒。有关 Flink 中网络堆栈如何组织的详细信息,请参阅A Deep-Dive into Flink's Network Stack在 Flink 项目博客上。

当我们讨论这个主题时,其他延迟来源可能包括检查点屏障对齐和垃圾收集。

env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);

将禁用屏障对齐,代价是放弃一次处理语义。

使用 RocksDB 状态后端将减少垃圾收集的对象数量(因为 RocksDB 将其状态保持在堆外),在某些情况下,以更差的平均延迟为代价改善最坏情况的延迟。然而,对于现代垃圾收集器来说,使用 RocksDB 来改善最坏情况下的延迟可能是一个错误。

另外,

env.getConfig().enableObjectReuse();

将指示运行时重用用户对象以获得更好的性能。请记住,当用户代码函数不知道此行为时,这可能会导致错误。

如果您使用水印,水印延迟会影响触发事件时间计时器(包括窗口)的延迟,并且 autoWatermarkInterval也会对延迟产生影响。

最后,事务接收器的使用会增加端到端延迟,因为这些接收器的下游消费者在事务完成之前不会看到提交的结果。预期延迟大约是检查点间隔的一半。

如果您对测量延迟感兴趣,请查看 Latency Tracking以及 Monitoring Apache Flink Applications 101 中有关延迟的部分.

关于apache-flink - flink key通过添加延迟;我怎样才能减少这个延迟?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57394914/

相关文章:

batch-processing - Flink 批处理接收器

monitoring - Flink 应用中的延迟监控

c# - 学习如何编写延迟关键、快速的 C++/Java/C# 代码的最佳方法?

c++ - ZeroMQ 并发发布和订阅

audio-streaming - Icecast 和 Darkice 的高音频延迟

apache-flink - 从 Apache Flink 中的 DataStream 类扩展的 SingleOutputStreamOperator 背后的想法是什么?

apache-flink - Apache Flink(如何唯一标记作业)

apache-spark - Flink 或 Sparks vs Akka 流中的非阻塞操作

scala - FlinkML:加入 LabeledVector 的数据集不起作用

scala - 无法从 JAR 文件构建程序