我正在阅读SingleOutputStreamOperator#returns
的源代码,它的javadoc是:
/**
* Adds a type information hint about the return type of this operator. This method
* can be used in cases where Flink cannot determine automatically what the produced
* type of a function is. That can be the case if the function uses generic type variables
* in the return type that cannot be inferred from the input type.
*
* <p>Use this method the following way:
* <pre>{@code
* DataStream<Tuple2<String, Double>> result =
* stream.flatMap(new FunctionWithNonInferrableReturnType())
* .returns(new TypeHint<Tuple2<String, Double>>(){});
* }</pre>
*
* @param typeHint The type hint for the returned data type.
* @return This operator with the type information corresponding to the given type hint.
*/
它提到FunctionWithNonInferrableReturnType
来说明返回方法的必要性,但我无法编写这样一个NonInferrableReturnType
类。如何写一个简单的?
最佳答案
当文档说 NonInferrableReturnType
时这意味着我们可以使用类型变量 <T>
,或您喜欢的任何其他字母。所以你可以创建一个MapFunction
返回 T
。但是你必须使用 .returns(TypeInformation.of(String.class)
例如,如果您的目标是返回 String
.
public class MyMapFunctionNonInferrableReturnType<T> implements MapFunction<AbstractDataModel, T> {
@Override
public T map(AbstractDataModel value) throws Exception {
return (T) value.getValue();
}
}
这里我使用你上一个问题的类Compiling fails when creating MapFunction with super type 。相同的代码没有 .returns(TypeInformation.of(String.class))
编译但抛出运行时异常:
could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
public class NonInferrableReturnTypeStreamJob {
private final List<AbstractDataModel> abstractDataModelList;
private final ValenciaSinkFunction sink;
public NonInferrableReturnTypeStreamJob() {
this.abstractDataModelList = new ArrayList<AbstractDataModel>();
this.abstractDataModelList.add(new ConcreteModel("a", "1"));
this.abstractDataModelList.add(new ConcreteModel("a", "2"));
this.sink = new ValenciaSinkFunction();
}
public NonInferrableReturnTypeStreamJob(List<AbstractDataModel> abstractDataModelList, ValenciaSinkFunction sink) {
this.abstractDataModelList = abstractDataModelList;
this.sink = sink;
}
public static void main(String[] args) throws Exception {
NonInferrableReturnTypeStreamJob concreteModelTest = new NonInferrableReturnTypeStreamJob();
concreteModelTest.execute();
}
public void execute() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromCollection(this.abstractDataModelList)
.map(new MyMapFunctionNonInferrableReturnType())
.returns(TypeInformation.of(String.class))
.addSink(sink);
env.execute();
}
}
如果您愿意,这里是此示例的集成测试:
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.ClassRule;
import org.junit.Test;
import org.sense.flink.examples.stream.valencia.ValenciaSinkFunction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import static junit.framework.TestCase.assertEquals;
import static org.junit.Assert.assertTrue;
public class NonInferrableReturnTypeStreamJobTest {
@ClassRule
public static MiniClusterWithClientResource flinkCluster;
private final int minAvailableProcessors = 4;
private final boolean runInParallel;
public NonInferrableReturnTypeStreamJobTest() {
int availableProcessors = Runtime.getRuntime().availableProcessors();
this.runInParallel = availableProcessors >= minAvailableProcessors;
if (this.runInParallel) {
flinkCluster = new MiniClusterWithClientResource(
new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(minAvailableProcessors)
.setNumberTaskManagers(1)
.build());
}
}
@Test
public void execute() throws Exception {
List<AbstractDataModel> abstractDataModelList = new ArrayList<AbstractDataModel>();
abstractDataModelList.add(new ConcreteModel("a", "1"));
abstractDataModelList.add(new ConcreteModel("a", "2"));
ValenciaSinkFunction.values.clear();
NonInferrableReturnTypeStreamJob streamJob = new NonInferrableReturnTypeStreamJob(abstractDataModelList, new ValenciaSinkFunction());
streamJob.execute();
List<String> results = ValenciaSinkFunction.values;
assertEquals(2, results.size());
assertTrue(results.containsAll(Arrays.asList("1", "2")));
}
}
关于SingleOutputStreamOperator#returns(TypeHint<T> typeHint) 方法的 javadoc,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68187973/