apache-flink - Flink 可重扩展键控流状态函数

标签 apache-flink flink-streaming

我有以下 Flink 作业,我尝试在后端类型 RockDB 中使用键控流状态函数 (MapState),

environment
.addSource(consumer).name("MyKafkaSource").uid("kafka-id")
.flatMap(pojoMapper).name("MyMapFunction").uid("map-id")
.keyBy(new MyKeyExtractor())
.map(new MyRichMapFunction()).name("MyRichMapFunction").uid("rich-map-id")
.addSink(sink).name("MyFileSink").uid("sink-id")

MyRichMapFunction 是一个有状态函数,它扩展了具有以下代码的 RichMapFunction,

public static class MyRichMapFunction extends RichMapFunction<MyEvent, MyEvent> {
    private transient MapState<String, Boolean> cache;
    @Override
    public void open(Configuration config) {
        MapStateDescriptor<String, Boolean> descriptor =
                new MapStateDescriptor("seen-values", TypeInformation.of(new TypeHint<String>() {}), TypeInformation.of(new TypeHint<Boolean>() {}));
        cache = getRuntimeContext().getMapState(descriptor);
    }
    @Override
    public MyEvent map(MyEvent value) throws Exception {
        if (cache.contains(value.getEventId())) {
            value.setIsSeenAlready(Boolean.TRUE);
            return value;
        }
        value.setIsSeenAlready(Boolean.FALSE);
        cache.put(value.getEventId(), Boolean.TRUE)
        return value;
    }
}

将来,我想重新缩放并行度(从 2 到 4),所以我的问题是,如何实现可重新缩放的键控状态,以便在更改并行度后我可以将相应的缓存键控数据获取到其对应的任务槽。我试图探索这个,在那里我找到了文档 here .据此,可以通过使用 ListCheckPointed 接口(interface)实现可重新缩放的操作状态,该接口(interface)为此提供了 snapshotState/restoreState 方法。但不确定如何实现可重新缩放的键控状态(MyRichMapFunction)?我是否需要为我的 MyRichMapFunction 类实现 ListCheckPointed 接口(interface)?如果是,我如何根据 restoreState 方法上的新并行 key 散列重新分配缓存(我的 MapState 将在启用 TTL 的情况下保存大量 key ,假设最多它将在任何时间点保存 10 亿个 key )?有人可以帮我解决这个问题吗,或者如果你给我指出任何例子也很好。

最佳答案

您编写的代码已经可以重新缩放; Flink 的 managed keyed state 在设计上是可扩展的。通过重新平衡键对实例的分配来重新缩放键控状态。 (您可以将键控状态视为分片键/值存储。从技术上讲,一致性哈希用于将键映射到键组,并且每个并行实例负责一些键组. 重新缩放只涉及在实例之间重新分配 key 组。)

ListCheckpointed接口(interface)用于非键控上下文中使用的状态,因此它不适合您正在做的事情。还要注意 ListCheckpointed将在 Flink 1.11 中弃用,取而代之的是更通用的 CheckpointedFunction .

还有一件事:如果MyKeyExtractorvalue.getEventId() 键入, 那么你可以使用 ValueState<Boolean>用于您的缓存,而不是 MapState<String, Boolean> .这是有效的,因为对于键控状态,每个键都有一个单独的 ValueState 值。只有当您需要为流中的每个键存储多个属性/值对时,您才需要使用 MapState。

大部分内容在 Flink 文档中讨论 Hands-on Training , 其中包括 an example这与您正在做的非常接近。

关于apache-flink - Flink 可重扩展键控流状态函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61601786/

相关文章:

apache-flink - Flink BucketingSink 使用自定义 AvroParquetWriter 创建空文件

java - 如何仅允许单个连接(url/端口)从 flink 应用程序读取和写入

maven - 如何通过 bazel 使用/导入 Flink 的 TestHarness 类?

logback - Flink 日志记录限制 : How to pass logging configuration to a flink job

java - 如何在 java 中使用 protobuf 获取和解析序列化字符串?

java - 使用基于计数的窗口连接两个流

java - Apache Flink CEP 如何检测事件是否在 x 秒内没有发生?

java - 使用 Flink DataStream 计算窗口持续时间的平均值

apache-flink - 如何在其他流的基础上过滤Apache flink流?

optimization - 如何知道哪些运算符可以在 Apache Flink 中链接