stream - 如何在 Flink 中使用 ListState 进行 BroadcastProcessFunction

标签 stream apache-flink flink-streaming

我们有一个包含交易的非键控数据流和一个包含规则的广播流。事实上,我们希望根据最后看到的规则来处理交易。如果我们最后看到的规则是每日,我们必须将当前交易添加到每日TrnsList。此外,如果 dailyTrnsList 大小大于阈值,我们必须清除列表并将事务写入数据库。如果最后看到的规则是 temp,我们会执行相同的操作。

代码如下:

public class TransactionProcess extends BroadcastProcessFunction<String, String, String>{
private List<String> dailyTrnsList = new ArrayList<>();
private List<String> tempTrnsList = new ArrayList<>();

private final static int threshold = 100;

private final MapStateDescriptor<String, String> ruleStateDesc =
        new MapStateDescriptor<>(
                "ControlMapState",
                BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO);

  @Override
  public void processElement(String s,
                           ReadOnlyContext readOnlyContext,
                           Collector<Transaction> collector) throws Exception
 {
    String ruleName = readOnlyContext.getBroadcastState(ruleStateDesc).get("rule");

    if(ruleName.equals("daily"))
        {
            dailyTrnsList.add(s);
            if(dailyTrnsList.size()>=threshold)
                {
                    List<String> buffer = dailyTrnsList;
                    dailyTrnsList = new ArrayList<>();
                    insert_to_db(buffer,"daily");
                }
        }
    else if(ruleName.equals("temp"))
        {
            tempTrnsList.add(s);
            if(tempTrnsList.size()>=threshold)
                {
                    List<String> buffer = tempTrnsList;
                    tempTrnsList = new ArrayList<>();
                    insert_to_db(buffer,"temp");
                }
        }

    collector.collect(s);

   }
  @Override
  public void processBroadcastElement(String s,
                                    Context context,
                                    Collector<CardTransaction> collector) throws Exception
  {
    if (s.equals("temp"))
    {
        context.getBroadcastState(ruleStateDesc).put("rule", "temp");
    List<String> buffer = dailyTrnsList;
        dailyTrnsList = new ArrayList<>();
        insert_to_db(buffer,"daily");
    }
    else if (s.equals("daily"))
    {
        context.getBroadcastState(ruleStateDesc).put("rule", "daily");
        List<String> buffer = tempTrnsList;
        tempTrnsList = new ArrayList<>();
        insert_to_db(buffer,"temp");
      }
    }
  }

我们的问题是编写容错方法。我们不知道如何使用 ListState 来解决我们的问题。到目前为止,我们找到的唯一解决方案是实现 Working with State 下的 CheckpointedFunction 接口(interface)。 Flink 文档中的部分。

private ListState<String> dailyTrns;
private ListState<String> tempTrns;

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
    dailyTrns.clear();
    tempTrns.clear();
    for (String[] element : dailyTrnsList)
        dailyTrns.add(element);
    for (String[] element : tempTrnsList)
        tempTrns.add(element);
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {

    dailyTrns = context.getOperatorStateStore().getListState(dailyDescriptor);
    tempTrns = context.getOperatorStateStore().getListState(tempDescriptor);
    if (context.isRestored()) {
        for (String[] element : dailyTrns.get())
            dailyTrnsList.add(element);
        for (String[] element : tempTrns.get())
            tempTrnsList.add(element);
    }
}

请您指导我们,如果这种方法不是正确的解决方案,我们还能做什么?如果解决方案正确,则未从 dailyTrnsListtempTrnsList 传输到 dailyTrnstempTrns 的元素会发生什么情况>?

如有任何帮助,我们将不胜感激。

提前谢谢您。

最佳答案

您可以简化您的实现,这样就不必担心这个问题。您可以执行以下操作:

(1) 简化BroadcastProcessFunction,使其所做的只是将传入流分成两个流:日常事务流和临时事务流。它通过根据最新规则选择两个侧面输出之一来实现此目的。

(2) 按照 BroadcastProcessFunction 的计数窗口创建批处理并将其写入数据库。

或者,BroadcastProcessFunction 可以写出(规则、事务)的元组,而不是使用侧面输出,然后您可以通过规则对流进行键入。不管怎样,我们的想法是让窗口 API 为您管理容错列表。

关于stream - 如何在 Flink 中使用 ListState 进行 BroadcastProcessFunction,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59343157/

相关文章:

java - 如何在 Apache Beam 中使用流输入 PCollection 请求 Redis 服务器?

apache-spark - Kappa 架构 : when insert to batch/analytic serving layer happens

apache-flink - Apache Flink : guideliness for setting parallelism?

apache-flink - Flink BucketingSink 使用自定义 AvroParquetWriter 创建空文件

hadoop - 使用 S3AFileSystem 的 Flink 不会从 S3 读取子文件夹

java - 如何在apache flink Streaming中从关系数据库读取数据

node.js - 如何更改 Node 中 PCM 音频流的音量?

c# - 尝试在C#中异步读取非结束字节流

java - Apache Flink,任务槽数与 env.setParallelism

c# - 从 C# 中的流对象创建一个临时文件