SingleOutputStreamOperator#returns(TypeHint<T> typeHint) 方法的 javadoc

标签 java apache-flink flink-streaming

我正在阅读SingleOutputStreamOperator#returns的源代码,它的javadoc是:

/**
 * Adds a type information hint about the return type of this operator. This method
 * can be used in cases where Flink cannot determine automatically what the produced
 * type of a function is. That can be the case if the function uses generic type variables
 * in the return type that cannot be inferred from the input type.
 *
 * <p>Use this method the following way:
 * <pre>{@code
 *     DataStream<Tuple2<String, Double>> result =
 *         stream.flatMap(new FunctionWithNonInferrableReturnType())
 *               .returns(new TypeHint<Tuple2<String, Double>>(){});
 * }</pre>
 *
 * @param typeHint The type hint for the returned data type.
 * @return This operator with the type information corresponding to the given type hint.
 */

它提到FunctionWithNonInferrableReturnType来说明返回方法的必要性,但我无法编写这样一个NonInferrableReturnType类。如何写一个简单的?

最佳答案

当文档说 NonInferrableReturnType 时这意味着我们可以使用类型变量 <T> ,或您喜欢的任何其他字母。所以你可以创建一个MapFunction返回 T 。但是你必须使用 .returns(TypeInformation.of(String.class)例如,如果您的目标是返回 String .

public class MyMapFunctionNonInferrableReturnType<T> implements MapFunction<AbstractDataModel, T> {
    @Override
    public T map(AbstractDataModel value) throws Exception {
        return (T) value.getValue();
    }
}

这里我使用你上一个问题的类Compiling fails when creating MapFunction with super type 。相同的代码没有 .returns(TypeInformation.of(String.class))编译但抛出运行时异常:

could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.

public class NonInferrableReturnTypeStreamJob {

    private final List<AbstractDataModel> abstractDataModelList;
    private final ValenciaSinkFunction sink;

    public NonInferrableReturnTypeStreamJob() {
        this.abstractDataModelList = new ArrayList<AbstractDataModel>();
        this.abstractDataModelList.add(new ConcreteModel("a", "1"));
        this.abstractDataModelList.add(new ConcreteModel("a", "2"));
        this.sink = new ValenciaSinkFunction();
    }

    public NonInferrableReturnTypeStreamJob(List<AbstractDataModel> abstractDataModelList, ValenciaSinkFunction sink) {
        this.abstractDataModelList = abstractDataModelList;
        this.sink = sink;
    }

    public static void main(String[] args) throws Exception {
        NonInferrableReturnTypeStreamJob concreteModelTest = new NonInferrableReturnTypeStreamJob();
        concreteModelTest.execute();
    }

    public void execute() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.fromCollection(this.abstractDataModelList)
                .map(new MyMapFunctionNonInferrableReturnType())
                .returns(TypeInformation.of(String.class))
                .addSink(sink);

        env.execute();
    }
}

如果您愿意,这里是此示例的集成测试:

import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.junit.ClassRule;
import org.junit.Test;
import org.sense.flink.examples.stream.valencia.ValenciaSinkFunction;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import static junit.framework.TestCase.assertEquals;
import static org.junit.Assert.assertTrue;

public class NonInferrableReturnTypeStreamJobTest {

    @ClassRule
    public static MiniClusterWithClientResource flinkCluster;
    private final int minAvailableProcessors = 4;
    private final boolean runInParallel;

    public NonInferrableReturnTypeStreamJobTest() {
        int availableProcessors = Runtime.getRuntime().availableProcessors();
        this.runInParallel = availableProcessors >= minAvailableProcessors;
        if (this.runInParallel) {
            flinkCluster = new MiniClusterWithClientResource(
                    new MiniClusterResourceConfiguration.Builder()
                            .setNumberSlotsPerTaskManager(minAvailableProcessors)
                            .setNumberTaskManagers(1)
                            .build());
        }
    }

    @Test
    public void execute() throws Exception {
        List<AbstractDataModel> abstractDataModelList = new ArrayList<AbstractDataModel>();
        abstractDataModelList.add(new ConcreteModel("a", "1"));
        abstractDataModelList.add(new ConcreteModel("a", "2"));
        ValenciaSinkFunction.values.clear();

        NonInferrableReturnTypeStreamJob streamJob = new NonInferrableReturnTypeStreamJob(abstractDataModelList, new ValenciaSinkFunction());
        streamJob.execute();

        List<String> results = ValenciaSinkFunction.values;
        assertEquals(2, results.size());
        assertTrue(results.containsAll(Arrays.asList("1", "2")));
    }
}

关于SingleOutputStreamOperator#returns(TypeHint<T> typeHint) 方法的 javadoc,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/68187973/

相关文章:

java - 在不调用所需的初始化方法的情况下防止子类实例化的正确方法?

apache-flink - 由于流对齐,检查点端到端持续时间增加

hadoop - Apache Flink AWS S3 Sink 是否需要 Hadoop 进行本地测试?

java - 我的剪刀石头布游戏不会显示平局

java - Hadoop:JAVA JPS工具无法正常工作

hadoop - Flink 在 YARN : Amazon S3 wrongly used instead of HDFS 上

apache-flink - Apache Flink 动态设置JVM_OPT env.java.opts

java - Elasticsearch5.x flink 连接器中的 NoSuchMethodError

java - 两个线程使用两个列表或一个同步列表是否更好?

apache-flink - Flink - 当没有以下事件时发出最后一个窗口