java - 弗林克 : Broadcasted Operator chaining

标签 java scala apache-flink flink-streaming

假设我有一个事件数据流,并且我想将其广播到链接到另一个(丰富) map 运算符(map2)的(丰富) map 运算符(map1)。两个 map 的并行度是相同的。我想要的是map1的每个并行实例的输出都转到map2的一个并行实例(即两个映射之间没有广播)。这是我到目前为止所做的,但我不确定它在逻辑上是否正确。可以吗?

val trainedStream = events.broadcast.map(new Mapper1(...)).setParallelism(par)
trainedStream.startNewChain.map(new Mapper2(...)).setParallelism(par)

后续问题:map1和map2的两个链接子任务/并行实例的SubtaskIndex(从RuntimeContext.getIndexOfThisSubtask接收)是否相同?有办法检查吗?

代码是Scala,但我猜这同样适用于Java

最佳答案

只要有可能,Flink 中就会自动进行链接。因此,在您的示例中,只需使用

val trainedStream = events.broadcast.map(new Mapper1(...)).map(new Mapper2(...))

然后我会在 env 上设置并行度。

顺便说一句,您确定要广播这些 Activity 吗?默认情况下,Datastream 是并行处理的。广播事件非常不寻常,因为它们会根据并行性被多次处理。

Followup Question: Is the SubtaskIndex (received from RuntimeContext.getIndexOfThisSubtask) of two chained subtasks/parallel instances of map1 and map2 the same? Is there a way to check this?

子任务索引对于链式操作符来说是相同的,因为它们驻留在同一任务中(因此它们甚至不能有不同的索引)。如果您有任务 mapper1 -> mapper2,您可以看到链接成功。

关于java - 弗林克 : Broadcasted Operator chaining,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60620916/

相关文章:

java - 将输入默认格式更改为 D 月,YYYY

scala - free monad 和 AST 的关系

apache-flink - 如何在 Apache Flink 中对数据集进行排序?

docker - Compose-Docker拉取特定镜像:tag from a yml file service

apache-flink - 在 Flink Streaming 中按键分组并收集到一个 ListBuffer 中

java - Corba python 与 web 服务 java 集成

java - 滚动或调整主框架大小时,JPanel 中的绘图消失

java - 开源数据流引擎

scala - 在 Scala 中使用 Jedis 类型不匹配

java - 在scala中从json中提取数据