java - Flink 中的 windowAll 运算符是否将并行度缩减为 1?

标签 java apache-flink flink-streaming stream-processing

我在 Flink 中有一个流,它从源发送立方体,对立方体进行转换(向立方体中的每个元素加 1),然后最后将它发送到下游以打印每秒的吞吐量。

流在 4 个线程上并行化。

如果我理解正确,windowAll 运算符是一个非并行转换,因此应该将并行化缩小回 1,并将其与 TumblingProcessingTimeWindows.of(Time.seconds (1)),将最近一秒内所有并行子任务的吞吐量求和并打印。我不确定我是否得到正确的输出,因为每秒的吞吐量是这样打印的:

1> 25
2> 226
3> 354
4> 372
1> 382
2> 403
3> 363
...

问题:流式打印机是打印每个线程(1、2、3 和 4)的吞吐量,还是只是它选择的,例如线程 3 打印所有子任务的吞吐量总和?

当我在开始时将环境的并行度设置为 1env.setParallelism(1) 时,我在吞吐量之前没有得到“x>”,但我似乎得到了与设置为 4 时相同(甚至更好)的吞吐量。像这样:

45
429
499
505
1
503
524
530
...

这是程序的代码片段:

imports...

public class StreamingCase {
    public static void main(String[] args) throws Exception {
        int parallelism = 4;

        final StreamExecutionEnvironment env =
                StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.setParallelism(parallelism);

        DataStream<Cube> start = env
                .addSource(new CubeSource());

        DataStream<Cube> adder = start
                .map(new MapFunction<Cube, Cube>() {
                    @Override
                    public Cube map(Cube cube) throws Exception {
                        return cube.cubeAdd(1);
                    }
                });

        DataStream<Integer> throughput = ((SingleOutputStreamOperator<Cube>) adder)
                .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(1)))
                .apply(new AllWindowFunction<Cube, Integer, TimeWindow>() {
                    @Override
                    public void apply(TimeWindow tw,
                                      Iterable<Cube> values,
                                      Collector<Integer> out) throws Exception {
                        int sum = 0;
                        for (Cube c : values)
                            sum++;
                        out.collect(sum);
                    }
                });
        throughput.print();
        env.execute("Cube Stream of Sweetness");
    }
}

最佳答案

如果环境的并行度设置为 3 并且您正在使用 WindowAll 运算符,则只有窗口运算符以并行度 1 运行。接收器仍将以并行度 3 运行。因此,计划如下所示:

In_1 -\               /- Out_1
In_2 --- WindowAll_1 --- Out_2
In_3 -/               \- Out_3

WindowAll 运算符使用循环策略将其输出发送到后续任务。这就是不同线程发出程序结果记录的原因。

当您将环境并行度设置为 1 时,所有运算符都运行一个任务。

关于java - Flink 中的 windowAll 运算符是否将并行度缩减为 1?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50837463/

相关文章:

Java HashMap 内容好像变了没变

java - 数据库到 GUI 或数据库到对象到 GUI

java - Apache 弗林克 : Extract TypeInformation of Tuple

java - 如何创建每隔一段时间进入 Apache Beam 管道的虚假数据流?

java - JpaRepository 不在自定义 RichSinkFunction 中 Autowiring

java - JAVA 中动态从 JSON 读取和存储具有相同起始名称的字段

java - 发送嵌入的签名电子邮件并设置签名顺序

java - 为两个输入流分配时间戳和水印,稍后使用 'EventTime' 连接以进行动态警报

apache-flink - Flink,使用多个Kafka源时如何正确设置并行性?

java - 如何在 Flink 中连接两个流并进行操作?