我有一个问题,它是这个问题的变体:Flink: how to store state and use in another stream?
我有两个流:
val ipStream: DataStream[IPAddress] = ???
valrouteStream: DataStream[RoutingTable] = ???
我想找出哪个包使用了哪条路由。通常这可以通过以下方式完成:
val ip = IPAddress("10.10.10.10")
val table = RoutingTable(Seq("10.10.10.0/24", "5.5.5.0/24"))
val route = table.lookup(ip) // == "10.10.10.0/24"
这里的问题是我无法真正在此处输入流的键,因为这需要完整的表和 IP 地址(并且键必须单独计算)。
对于 ipStream
中的每个元素,我需要最新的 routeStream
元素。现在我正在使用一种 hack,所有这些都是非并行处理的:
ipStream
.connect(routeStream)
.keyBy(_ => 0, _ => 0)
.flatMap(new MyRichCoFlatMapFunction) // with ValueState[RoutingTable]
这听起来像是广播策略的用例。然而,routeStream会被更新并且不会固定在文件中。问题仍然是:是否有办法拥有两个流,其中一个包含另一个流的更改控制数据?
最佳答案
既然我解决了这个问题,我不妨在这里写一个答案:)
我像这样设置了两个流:
- RoutingTable 流使用网络路由的第一个字节进行键入
- IPAddress 也由地址的第一个字节作为 key
这在 IP 包通常在具有相同/8 前缀的网络中路由的情况下起作用,这对于大多数流量都是假设的。
然后,通过拥有一个有状态的 RichCoFlatMap
,可以将路由表状态构建为 key 。当收到新的IP包时,在路由表中进行查找。现在有两种可能的情况:
- 未找到匹配的路线。我们可以将包存储在这里以供稍后使用,但丢弃它也可以。
- 如果找到路由,则输出[IPAddress, RoutingTableEntry]元组。
这样,我们就有了两个流,其中一个流的控制数据不断变化。
关于apache-flink - Flink流: Data stream that gets controlled by control stream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41511048/