我有两种数据类型。
type1 and type2
我有一个数据流 type1
。
DataStream<type1> stream1 =...
内部stream1
我想创建 type2
的对象我想收集 type1
的对象和type2
。
一个数据流是否可以有一种输入类型和两种输出类型?或者是否可以在 DataStream<type2> stream2
内部创建一个新的数据流( stream1
) ?
或者是否有其他方法来收集从一种类型评估的两种不同类型的数据?
最佳答案
您需要先创建一个包装器类型,然后再拆分并选择您的流。对于包装器,只有一个成员not-null;
class TypeWrapper {
// keeping this short for brevity
public TypeA firstType;
public TypeB secondType;
}
分割并选择:
DataStream<TypeWrapper> stream1 = ...
DataStream<TypeA> streamA = stream1.filter(new FilterFunction<TypeWrapper>() {
@Override
public boolean filter(TypeWrapper value) throws Exception {
return value.firstType != null;
}
})
.map(new MapFunction<TypeWrapper, TypeA>() {
@Override
public TypeA map(TypeWrapper value) throws Exception {
return value.firstType;
}
});
DataStream<TypeB> streamB = stream1.filter(new FilterFunction<TypeWrapper>() {
@Override
public boolean filter(TypeWrapper value) throws Exception {
return value.secondType != null;
}
})
.map(new MapFunction<TypeWrapper, TypeB>() {
@Override
public TypeB map(TypeWrapper value) throws Exception {
return value.secondType;
}
});
因为 filter()
和 map()
将链接到 stream1
,两者都在同一个线程上执行,并且操作很便宜。
关于java - 在另一个数据流中创建新的数据流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40870532/