apache-flink - 如何使用可查询状态客户端在 flink 中获取多个 keyBy 的状态?

标签 apache-flink flink-streaming

我正在使用 Flink 1.4.2,我有一个场景需要使用两个键。
例如

KeyedStream<UsageStatistics, Tuple> keyedStream = stream.keyBy("clusterId", "ssid");
usageCounts = keyedStream.process(new CustomProcessFunction(windowSize,queryableStateName));

值说明将
ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, SsidTotalUsage.class);
        descriptor.setQueryable(queryableStateName);

任何人都可以建议我使用可查询状态客户端获取 flink 中多个键的状态吗?

下面的 QueryableClient 对于单个键“clusterId”运行良好。
kvState = queryableStateClient.getKvState(JobID.fromHexString(jobId), queryableStateName, clusterId, BasicTypeInfo.STRING_TYPE_INFO, descriptor);

应该是什么类型信息对于多个键?任何与此相关的建议/示例或引用将非常有帮助?

最佳答案

我找到了解决方案。

我在 valueStateDescription 中给出了 TypeHint。

在 Flink 工作中:

TypeInformation<SsidTotalUsage> typeInformation = TypeInformation.of(new TypeHint<SsidTotalUsage>() {});

ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, typeInformation);

客户端:
ValueStateDescriptor<SsidTotalUsage> descriptor = new ValueStateDescriptor(queryableStateName, typeInformation);

我有两把 key 所以我用过 元组2 类并设置我的键的值,如下所示。
注:如果您有两个以上的键,则必须根据您的键选择 Tuple3、Tuple4 类。
 Tuple2<String, String> tuple = new Tuple2<>();
 tuple.f0 = clusterId;
 tuple.f1 = ssid;

然后我提供了 TypeHint。
TypeHint<Tuple2<String, String>> typeHint = new TypeHint<Tuple2<String, String>>() {};

CompletableFuture<ValueState<SsidTotalUsage>> kvState = queryableStateClient.getKvState(JobID.fromHexString(jobId), queryableStateName, tuple, typeHint, descriptor);

在上面的代码中, getState 方法将返回 ImmutableValueState
所以我需要像下面这样得到我的 pojo。
ImmutableValueState<SsidTotalUsage> state = (ImmutableValueState<SsidTotalUsage>) kvState.get();

totalUsage = state.value();

关于apache-flink - 如何使用可查询状态客户端在 flink 中获取多个 keyBy 的状态?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49766903/

相关文章:

apache-flink - 如何迭代 Flink DataStream 中的每条消息?

java - Apache Flink fromCollection java.lang.IllegalStateException : unread block data

java - flink - 集群不使用集群

java - Apache 弗林克 : How to count the total number of events in a DataStream

apache-flink - 使用 Flink SQL 向表添加列的语法是什么

java - Flink 应用程序在 Java 中抛出类未找到异常

google-cloud-platform - 使用 Dataproc 将 Google Pub/Sub 与 Flink Streaming 结合使用的正确方法是什么?

apache-flink - Flink 中 Lookup 和 Processing Time Temporal join 有什么区别?

apache-spark - 使用 Spark 进行流式传输时查询数据库是一种好习惯吗

apache-flink - 在 Flink Streaming 中按键分组并收集到一个 ListBuffer 中