java - Apache Flink 测试中是否有像 Reactor 和 RxJava 中那样的虚拟时间概念

标签 java rx-java apache-flink flink-streaming project-reactor

在 RxJava 和 Reactor 中,存在虚拟时间的概念来测试依赖于时间的运算符。我不知道如何在 Flink 中做到这一点。例如,我整理了以下示例,我想尝试一下迟到事件以了解它们是如何处理的。但是我无法理解这样的测试会是什么样子?有没有办法将Flink和Reactor结合起来,让测试变得更好?

public class PlayWithFlink {

    public static void main(String[] args) throws Exception {

        final OutputTag<MyEvent> lateOutputTag = new OutputTag<MyEvent>("late-data"){};

        // TODO understand how BoundedOutOfOrderness is related to allowedLateness
        BoundedOutOfOrdernessTimestampExtractor<MyEvent> eventTimeFunction = new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
            @Override
            public long extractTimestamp(MyEvent element) {
                return element.getEventTime();
            }
        };

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

        DataStream<MyEvent> events = env.fromCollection(MyEvent.examples())
                .assignTimestampsAndWatermarks(eventTimeFunction);

        AggregateFunction<MyEvent, MyAggregate, MyAggregate> aggregateFn = new AggregateFunction<MyEvent, MyAggregate, MyAggregate>() {
            @Override
            public MyAggregate createAccumulator() {
                return new MyAggregate();
            }

            @Override
            public MyAggregate add(MyEvent myEvent, MyAggregate myAggregate) {
                if (myEvent.getTracingId().equals("trace1")) {
                    myAggregate.getTrace1().add(myEvent);
                    return myAggregate;
                }
                myAggregate.getTrace2().add(myEvent);
                return myAggregate;
            }

            @Override
            public MyAggregate getResult(MyAggregate myAggregate) {
                return myAggregate;
            }

            @Override
            public MyAggregate merge(MyAggregate myAggregate, MyAggregate acc1) {
                acc1.getTrace1().addAll(myAggregate.getTrace1());
                acc1.getTrace2().addAll(myAggregate.getTrace2());
                return acc1;
            }
        };

        KeySelector<MyEvent, String> keyFn = new KeySelector<MyEvent, String>() {
            @Override
            public String getKey(MyEvent myEvent) throws Exception {
                return myEvent.getTracingId();
            }
        };

        SingleOutputStreamOperator<MyAggregate> result = events
                .keyBy(keyFn)
                .window(EventTimeSessionWindows.withGap(Time.seconds(10)))
                .allowedLateness(Time.seconds(20))
                .sideOutputLateData(lateOutputTag)
                .aggregate(aggregateFn);


        DataStream lateStream = result.getSideOutput(lateOutputTag);

        result.print("SessionData");

        lateStream.print("LateData");

        env.execute();
    }
}

class MyEvent {
    private final String tracingId;
    private final Integer count;
    private final long eventTime;

    public MyEvent(String tracingId, Integer count, long eventTime) {
        this.tracingId = tracingId;
        this.count = count;
        this.eventTime = eventTime;
    }

    public String getTracingId() {
        return tracingId;
    }

    public Integer getCount() {
        return count;
    }

    public long getEventTime() {
        return eventTime;
    }

    public static List<MyEvent> examples() {
        long now = System.currentTimeMillis();
        MyEvent e1 = new MyEvent("trace1", 1, now);
        MyEvent e2 = new MyEvent("trace2", 1, now);
        MyEvent e3 = new MyEvent("trace2", 1, now - 1000);
        MyEvent e4 = new MyEvent("trace1", 1, now - 200);
        MyEvent e5 = new MyEvent("trace1", 1, now - 50000);
        return Arrays.asList(e1,e2,e3,e4, e5);
    }

    @Override
    public String toString() {
        return "MyEvent{" +
                "tracingId='" + tracingId + '\'' +
                ", count=" + count +
                ", eventTime=" + eventTime +
                '}';
    }
}

class MyAggregate {
    private final List<MyEvent> trace1 = new ArrayList<>();
    private final List<MyEvent> trace2 = new ArrayList<>();


    public List<MyEvent> getTrace1() {
        return trace1;
    }

    public List<MyEvent> getTrace2() {
        return trace2;
    }

    @Override
    public String toString() {
        return "MyAggregate{" +
                "trace1=" + trace1 +
                ", trace2=" + trace2 +
                '}';
    }
}

运行的输出是:

SessionData:1> MyAggregate{trace1=[], trace2=[MyEvent{tracingId='trace2', count=1, eventTime=1551034666081}, MyEvent{tracingId='trace2', count=1, eventTime=1551034665081}]}
SessionData:3> MyAggregate{trace1=[MyEvent{tracingId='trace1', count=1, eventTime=1551034166081}], trace2=[]}
SessionData:3> MyAggregate{trace1=[MyEvent{tracingId='trace1', count=1, eventTime=1551034666081}, MyEvent{tracingId='trace1', count=1, eventTime=1551034665881}], trace2=[]}

但是,我希望看到 e5 事件的 LateStream 触发器,该触发器应该在第一个事件触发之前 50 秒。

最佳答案

如果您将水印分配器修改为这样

AssignerWithPunctuatedWatermarks eventTimeFunction = new AssignerWithPunctuatedWatermarks<MyEvent>() {
    long maxTs = 0;

    @Override
    public long extractTimestamp(MyEvent myEvent, long l) {
        long ts = myEvent.getEventTime();
        if (ts > maxTs) {
            maxTs = ts;
        }
        return ts;
    }

    @Override
    public Watermark checkAndGetNextWatermark(MyEvent event, long extractedTimestamp) {
        return new Watermark(maxTs - 10000);
    }
};

然后你就会得到你期望的结果。我不推荐这样做——只是用它来说明正在发生的事情。

这里发生的是 BoundedOutOfOrdernessTimestampExtractor是一个周期性水印生成器,仅每 200 毫秒(默认情况下)将水印插入到流中。因为您的作业在此之前很久就完成了,所以您的作业遇到的唯一水印是 Flink 在每个有限流末尾注入(inject)的水印(值为 MAX_WATERMARK)。迟到与水印有关,您预计迟到的事件正在设法在该水印之前到达。

通过切换到标点水印,您可以强制在流中的特定点更频繁或更精确地添加水印。这通常是不必要的(太频繁的水印会导致开销),但当您想要对水印的顺序进行强有力的控制时,这很有帮助。

至于如何编写测试,你可以看看test harnesses用于 Flink 自己的测试,或者在 flink-spector .

更新:

与 BoundedOutOfOrdernessTimestampExtractor 关联的时间间隔是流预计的无序程度的规范。在此范围内到达的事件不会被视为延迟,并且事件时间计时器不会触发,直到经过此延迟,从而为无序事件的到达提供了时间。 allowedLateness 仅适用于窗口 API,并描述了框架在正常窗口触发时间之后多长时间保持窗口状态,以便事件仍然可以添加到窗口并导致延迟触发。在此额外间隔之后,窗口状态将被清除,后续事件将发送到侧面输出(如果已配置)。

enter image description here

所以当你使用BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10))时您不是说“在每个事件之后等待 10 秒,以防更早的事件可能仍然到达”。但你说你的事件最多应该有 10 秒的困惑。因此,如果您正在处理实时事件流,这意味着您最多将等待 10 秒,以防较早的事件到达。 (如果您正在处理历史数据,那么您可能能够在 1 秒内处理 10 秒的数据,或者不能——知道您将等待 n 秒的事件时间过去,但这并不能说明实际需要多长时间。 )

有关此主题的更多信息,请参阅 Event Time and Watermarks .

关于java - Apache Flink 测试中是否有像 Reactor 和 RxJava 中那样的虚拟时间概念,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54855358/

相关文章:

java - TreeSet 中的值更改后不会重新排序

java - 通过自行实现的 crypter 类加密和解密对象

java - 使用 RxJava 异步线程实现防止竞争条件

apache-flink - 为什么我们在 flink 源代码中有 flink-streaming-java 和 flink-streaming-scala 模块

java - pom.xml 中的依赖项在 flink kafka 连接器示例中不起作用

java - 使用多个 JFrame 导出 JAR

java - RabbitTemplate 扩展 DirectReplyToMessageListenerContainer

java - 使用 OutOfMemoryException 调用订阅者 onError() 时出现 IllegalStateException

android - 如何将 BehaviorSubject<Optional<List<File>>> 转换为 Kotlin?

apache-flink - 从 IDE 运行时的 Flink webui