java - Apache 弗林克 : Custom InputFormat only runs with parallelism of 1

标签 java elasticsearch apache-flink

我正在为 Apache Flink 实现自定义输入格式。我创建了一个返回 3 行的虚拟输入格式。

public class ElasticsearchInputFormat extends GenericInputFormat<Row> {
    @Override
    public void configure(Configuration parameters) {
        System.out.println("configuring");
    }

    @Override
    public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOException {
        return cachedStatistics;
    }

    @Override
    public void open(GenericInputSplit split) throws IOException {
        System.out.println("opening: " + split);
        super.open(split);
    }

    @Override
    public void close() throws IOException {
        System.out.println("closing");
        super.close();
    }

    private int a = 0;

    public boolean reachedEnd() throws IOException {
        a++;
        return a > 3;
    }

    public Row nextRecord(Row reuse) throws IOException {
        Row r = new Row(2);
        r.setField(0, "osman");
        r.setField(1, "wow");
        return r;
    }
}

我的示例代码如下:

final ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
env.setParallelism(8);

DataSource<Row> input = env.createInput(new ElasticsearchInputFormat());

input.print();

然而,虽然并行度设置为8,但它打印:

configuring
opening: GenericSplit (0/1)
closing
osman,wow
osman,wow
osman,wow

为什么没有并行化?我想要有多个分割,这样它就可以被其他运算符并行使用。

最佳答案

createCollectionsEnvironment() 返回一个隐式并行度为 1 的特殊环境。来自 Javadocs...

Creates a {@link CollectionEnvironment} that uses Java Collections underneath. This will execute in a single thread in the current JVM. It is very fast but will fail if the data does not fit into memory. parallelism will always be 1. This is useful during implementation and for debugging.

关于java - Apache 弗林克 : Custom InputFormat only runs with parallelism of 1,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49543205/

相关文章:

C#用 Elasticsearch 计算地理距离(nest 2)

apache-flink - "Buffer pool is destroyed"当我使用 Flink SlidingEventTimeWindows

java - Spring Data 规范 - 带 join 的 RSQL

java - android - 将水平viewpager与垂直viewpager结合起来

ElasticSearch:_template 与 _index_template

ruby - 从多个 ruby block 组成轮胎查询

Scala WindowFunction 无法编译

apache-flink - Flink,使用多个Kafka源时如何正确设置并行性?

java - JUnit 断言失败是可怕的。可以做什么?

java - 如何避免 '@RepeatedTest Annotation runs BeforeEach and AfterEach in JUnit' ?