java - 使用基于计数的窗口连接两个流

标签 java apache-flink flink-streaming

我是 Flink Streaming API 的新手,我想完成以下简单的(IMO)任务。我有两个流,我想使用基于计数的窗口加入它们。到目前为止我的代码如下:

public class BaselineCategoryEquiJoin {

private static final String recordFile = "some_file.txt";

private static class ParseRecordFunction implements MapFunction<String, Tuple2<String[], MyRecord>> {
    public Tuple2<String[], MyRecord> map(String s) throws Exception {
        MyRecord myRecord = parse(s);
        return new Tuple2<String[], myRecord>(myRecord.attributes, myRecord);
    }
}

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment environment = StreamExecutionEnvironment.createLocalEnvironment();
    ExecutionConfig config = environment.getConfig();
    config.setParallelism(8);
    DataStream<Tuple2<String[], MyRecord>> dataStream = environment.readTextFile(recordFile)
            .map(new ParseRecordFunction());
    DataStream<Tuple2<String[], MyRecord>> dataStream1 = environment.readTextFile(recordFile)
            .map(new ParseRecordFunction());
    DataStreamSink<Tuple2<String[], String[]>> joinedStream = dataStream1
            .join(dataStream)
            .where(new KeySelector<Tuple2<String[],MyRecord>, String[]>() {
                public String[] getKey(Tuple2<String[], MyRecord> recordTuple2) throws Exception {
                    return recordTuple2.f0;
                }
            }).equalTo(new KeySelector<Tuple2<String[], MyRecord>, String[]>() {
                public String[] getKey(Tuple2<String[], MyRecord> recordTuple2) throws Exception {
                    return recordTuple2.f0;
                }
            }).window(TumblingProcessingTimeWindows.of(Time.seconds(1)))
            .apply(new JoinFunction<Tuple2<String[],MyRecord>, Tuple2<String[],MyRecord>, Tuple2<String[], String[]>>() {
                public Tuple2<String[], String[]> join(Tuple2<String[], MyRecord> tuple1, Tuple2<String[], MyRecord> tuple2) throws Exception {
                    return new Tuple2<String[], String[]>(tuple1.f0, tuple1.f0);
                }
            }).print();
    environment.execute();
}
}

我的代码运行没有错误,但没有产生任何结果。事实上,对apply方法的调用永远不会被调用(通过在 Debug模式下添加断点来验证)。我认为,前者的主要原因是我的数据没有时间属性。因此,窗口化(通过window具体化)没有正确完成。因此,我的问题是如何表明我希望基于计数窗口进行加入。例如,我希望连接具体化每个流中的每 100 个元组。前面的在Flink中可行吗?如果是,我应该改变我的代码来实现它。

此时,我必须通知您,我尝试调用 countWindow() 方法,但由于某种原因,Flink 的 JoinedStreams 不提供该方法。

谢谢

最佳答案

不支持基于计数的联接。您可以通过使用“事件时间”语义来模拟基于计数的窗口,并将唯一的 seq-id 作为时间戳应用于每个记录。因此,时间窗口“5”实际上是计数窗口 5。

关于java - 使用基于计数的窗口连接两个流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37732978/

相关文章:

java - 窗口应用程序无法在 Eclipse 中打开

java - 可以使用 JAXX 吗?

sql - 由带标题的 CSV 支持的 Flink SQL 表

apache-flink - Flink CEP 状态存储

apache-flink - 如何正确实现 HTTP 接收器?

apache-flink - Flink - 如何同时计算总和和平均值?

java - 插入 } 来完成 ClassBody

java - 检查类型安全配置中的部分 key

java - 连接到远程任务管理器失败。这可能表明远程任务管理器已丢失

apache-flink - Flink 检查点不断失败