java - 在另一个数据流中创建新的数据流

标签 java apache-flink flink-streaming

我有两种数据类型。

type1 and type2 

我有一个数据流 type1

DataStream<type1> stream1 =... 

内部stream1我想创建 type2 的对象我想收集 type1 的对象和type2

一个数据流是否可以有一种输入类型和两种输出类型?或者是否可以在 DataStream<type2> stream2 内部创建一个新的数据流( stream1 ) ?

或者是否有其他方法来收集从一种类型评估的两种不同类型的数据?

最佳答案

您需要先创建一个包装器类型,然后再拆分并选择您的流。对于包装器,只有一个成员not-null

class TypeWrapper {
    // keeping this short for brevity
    public TypeA firstType;
    public TypeB secondType;
}

分割并选择:

DataStream<TypeWrapper> stream1 = ...

DataStream<TypeA> streamA = stream1.filter(new FilterFunction<TypeWrapper>() {
    @Override
    public boolean filter(TypeWrapper value) throws Exception {
        return value.firstType != null;
    }
})
.map(new MapFunction<TypeWrapper, TypeA>() {
    @Override
    public TypeA map(TypeWrapper value) throws Exception {
        return value.firstType;
    }
});

DataStream<TypeB> streamB = stream1.filter(new FilterFunction<TypeWrapper>() {
    @Override
    public boolean filter(TypeWrapper value) throws Exception {
        return value.secondType != null;
    }
})
.map(new MapFunction<TypeWrapper, TypeB>() {
    @Override
    public TypeB map(TypeWrapper value) throws Exception {
        return value.secondType;
    }
});

因为 filter()map() 将链接到 stream1 ,两者都在同一个线程上执行,并且操作很便宜。

关于java - 在另一个数据流中创建新的数据流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40870532/

相关文章:

apache-flink - Flink 一次性消息处理

scala - 使用 Flink 从 kafka 主题的开头进行消费

java - OpenCSV 不会将数据库的第一行写入 CSV 文件

java - 无法解析的日期异常 Windows Azure 移动服务

apache-flink - 为什么在申请任务管理器时 '-n' 或 '-yn' 之类的参数不起作用

hadoop - 在 Amazon EMR 上配置 Flink Rest API

java - 弗林克 : How to read from multiple kafka cluster using same StreamExecutionEnvironment

java - 如何在不使用 Collections.sort() 的情况下获得 Java 中的排序列表行为?

java - CellTable 的 GWT RPC 匹配数据

scala - Flink 获取 RuntimeContext