java - Apache 弗林克 : Process data in order with mapPartition

标签 java apache-flink

我正在尝试一个简单的 Flink 程序,它只需要一个文件,反转文件中的字符串并将其写出来。

程序可以运行,只是个别行出现了问题。

例如

文件输入

Thing,Name
Person,Vineet
Fish,Karp
Dog,Fido

输出文件

Fish,praK
Thing,emaN
Person,teeniV
Dog,odiF

我期待的是:

Thing,emaN
Person,teeniV
Fish,praK
Dog,odiF

下面是我为实现此目的而编写的程序:

package testflink;

import java.util.Iterator;
import java.util.StringJoiner;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.core.fs.FileSystem.WriteMode;
import org.apache.flink.util.Collector;

public class BatchJob {

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        System.err.println(env.getParallelism());
        DataSource<String> file = env.readTextFile("./data.csv");
        file.mapPartition((Iterable<String> values, Collector<String> out) -> {
            System.err.println("************* " + out.hashCode() + " Begin");
            Iterator<String> iterator = values.iterator();
            while (iterator.hasNext()) {
                String tuple = iterator.next();
                System.err.println("************* " + out.hashCode() + tuple);
                String[] split = tuple.split(",");
                String tuple1Rev = new StringBuilder(split[1]).reverse().toString();
                out.collect(new StringJoiner(",").add(split[0]).add(tuple1Rev).toString());
            }
            System.err.println("************* " + out.hashCode() + " End");
        }).returns(String.class).writeAsText("./dataO.csv", WriteMode.OVERWRITE).setParallelism(1);
        env.execute("Flink Batch Java API Skeleton");
        System.out.println("Done");
    }
}
  • 是否可以保持输入顺序?有什么好的解决办法吗?
  • 我知道我正在读取 csv 并在存在 readAsCsv() 时分割字符串方法可用。问题是 csv 每行/元组可以有动态数量的列。我无法弄清楚如何将其转换为每个元组具有动态列数的数据源。 MapPartition 需要定义的类型 - 我如何替换 Tuple0 -Tuple25在运行时?
  • 最后一个问题 - 我能否限制分区在 Iterable<String> values 中不采用超过 n 个值?参数?

提前致谢! :)

最佳答案

Flink 的 mapPartition 维护每个并行分区内记录的顺序。但是,您的用例中的问题是如何将数据分发到 MapPartition 运算符的并行任务。

您正在使用TextInputFormat,它将输入文件划分为多个输入分割,这些分割由数据源运算符的并行实例独立处理。每个数据源实例在本地将其所有记录转发到后续的 MapPartition 运算符,并将其结果记录转发到接收器。管道如下所示:

source_1 -> mapPartition_1 -> sink_1
source_2 -> mapPartition_2 -> sink_2
source_3 -> mapPartition_3 -> sink_3
...

因此从源头开始,所有记录都是按顺序处理的。但是,由于输入 split 是随机分配给源任务的,并且接收器独立运行(无协调),因此输出仅是部分排序的(从同一 split 读取的记录是有序的)。

将源的并行度设置为 1 不会有帮助,因为它将以循环方式将其结果记录发送到后续任务,以利用后续运​​算符的并行度。此外,将整个作业的并行度设置为 1 也没有帮助,因为拆分仍然可以由单个源任务以随机顺序处理。我知道的唯一解决方案是对每个输入记录进行编号和 sorting on that number (with range partitioning for parallel processing)在写结果之前。

关于java - Apache 弗林克 : Process data in order with mapPartition,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44840029/

相关文章:

apache-kafka - Flink Kafka Producer 中的 Exactly-once 语义

java - Spring data JPA - 封装的表达式不是有效的表达式

java - 在 Java 中防止在 String.format ("%.2f", doubleValue) 中四舍五入

apache-flink - 创建具有通用返回类型的 FlinkSQL UDF

ubuntu - Zeppelin - Flink 问题运行基础教程

apache-flink - 在 Apache Flink 中,Job Manager 和 Job Master 有什么区别?

java - 使用 Robospice 在 Android 上固定证书

java - Spring MVC 3 中的 MarshallingView

java - 国际化 JRE6 或 JDK6 或以 "cp037"编码方案读取文件

java - 使用 DataStream API 进行批处理的 Flink Consumer - 我们如何知道何时停止以及如何停止处理 [ 2 倍 ]