apache-flink - Flink流: Data stream that gets controlled by control stream

标签 apache-flink flink-streaming

我有一个问题,它是这个问题的变体:Flink: how to store state and use in another stream?

我有两个流:

  1. val ipStream: DataStream[IPAddress] = ???
  2. valrouteStream: DataStream[RoutingTable] = ???

我想找出哪个包使用了哪条路由。通常这可以通过以下方式完成:

val ip = IPAddress("10.10.10.10")
val table = RoutingTable(Seq("10.10.10.0/24", "5.5.5.0/24"))
val route = table.lookup(ip) // == "10.10.10.0/24"

这里的问题是我无法真正在此处输入流的键,因为这需要完整的表和 IP 地址(并​​且键必须单独计算)。

对于 ipStream 中的每个元素,我需要最新的 routeStream 元素。现在我正在使用一种 hack,所有这些都是非并行处理的:

ipStream
  .connect(routeStream)
  .keyBy(_ => 0, _ => 0)
  .flatMap(new MyRichCoFlatMapFunction) // with ValueState[RoutingTable]

这听起来像是广播策略的用例。然而,routeStream会被更新并且不会固定在文件中。问题仍然是:是否有办法拥有两个流,其中一个包含另一个流的更改控制数据?

最佳答案

既然我解决了这个问题,我不妨在这里写一个答案:)

我像这样设置了两个流:

  1. RoutingTable 流使用网络路由的第一个字节进行键入
  2. IPAddress 也由地址的第一个字节作为 key

这在 IP 包通常在具有相同/8 前缀的网络中路由的情况下起作用,这对于大多数流量都是假设的。

然后,通过拥有一个有状态的 RichCoFlatMap,可以将路由表状态构建为 key 。当收到新的IP包时,在路由表中进行查找。现在有两种可能的情况:

  1. 未找到匹配的路线。我们可以将包存储在这里以供稍后使用,但丢弃它也可以。
  2. 如果找到路由,则输出[IPAddress, RoutingTableEntry]元组。

这样,我们就有了两个流,其中一个流的控制数据不断变化。

关于apache-flink - Flink流: Data stream that gets controlled by control stream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41511048/

相关文章:

apache-flink - Flink 广播状态如何初始化?

java - flink - 使用 Dagger 注入(inject) - 不可序列化?

java - 使用Flink时Kafka中json数据不清楚如何反序列化

apache-flink - 在 Flink 中检查点时,计时器过多会花费太多时间

amazon-s3 - 无法执行HTTP请求: Timeout waiting for connection from pool in Flink

java - 在 flink 流中使用 grok

java - flink SourceFunction<> 在 StreamExecutionEnvironment.addSource() 中被替换?

apache-flink - Flink时间特性和AutoWatermarkInterval

java - 为什么运行示例 flink 应用程序会抛出此错误?

java - 为 flink 输出流提供类型提示的未弃用方法是什么?