我们有一个包含交易的非键控数据流和一个包含规则的广播流。事实上,我们希望根据最后看到的规则来处理交易。如果我们最后看到的规则是每日,我们必须将当前交易添加到每日TrnsList。此外,如果 dailyTrnsList 大小大于阈值,我们必须清除列表并将事务写入数据库。如果最后看到的规则是 temp
,我们会执行相同的操作。
代码如下:
public class TransactionProcess extends BroadcastProcessFunction<String, String, String>{
private List<String> dailyTrnsList = new ArrayList<>();
private List<String> tempTrnsList = new ArrayList<>();
private final static int threshold = 100;
private final MapStateDescriptor<String, String> ruleStateDesc =
new MapStateDescriptor<>(
"ControlMapState",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
@Override
public void processElement(String s,
ReadOnlyContext readOnlyContext,
Collector<Transaction> collector) throws Exception
{
String ruleName = readOnlyContext.getBroadcastState(ruleStateDesc).get("rule");
if(ruleName.equals("daily"))
{
dailyTrnsList.add(s);
if(dailyTrnsList.size()>=threshold)
{
List<String> buffer = dailyTrnsList;
dailyTrnsList = new ArrayList<>();
insert_to_db(buffer,"daily");
}
}
else if(ruleName.equals("temp"))
{
tempTrnsList.add(s);
if(tempTrnsList.size()>=threshold)
{
List<String> buffer = tempTrnsList;
tempTrnsList = new ArrayList<>();
insert_to_db(buffer,"temp");
}
}
collector.collect(s);
}
@Override
public void processBroadcastElement(String s,
Context context,
Collector<CardTransaction> collector) throws Exception
{
if (s.equals("temp"))
{
context.getBroadcastState(ruleStateDesc).put("rule", "temp");
List<String> buffer = dailyTrnsList;
dailyTrnsList = new ArrayList<>();
insert_to_db(buffer,"daily");
}
else if (s.equals("daily"))
{
context.getBroadcastState(ruleStateDesc).put("rule", "daily");
List<String> buffer = tempTrnsList;
tempTrnsList = new ArrayList<>();
insert_to_db(buffer,"temp");
}
}
}
我们的问题是编写容错方法。我们不知道如何使用 ListState
来解决我们的问题。到目前为止,我们找到的唯一解决方案是实现 Working with State 下的 CheckpointedFunction
接口(interface)。 Flink 文档中的部分。
private ListState<String> dailyTrns;
private ListState<String> tempTrns;
@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
dailyTrns.clear();
tempTrns.clear();
for (String[] element : dailyTrnsList)
dailyTrns.add(element);
for (String[] element : tempTrnsList)
tempTrns.add(element);
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
dailyTrns = context.getOperatorStateStore().getListState(dailyDescriptor);
tempTrns = context.getOperatorStateStore().getListState(tempDescriptor);
if (context.isRestored()) {
for (String[] element : dailyTrns.get())
dailyTrnsList.add(element);
for (String[] element : tempTrns.get())
tempTrnsList.add(element);
}
}
请您指导我们,如果这种方法不是正确的解决方案,我们还能做什么?如果解决方案正确,则未从 dailyTrnsList
和 tempTrnsList
传输到 dailyTrns
和 tempTrns
的元素会发生什么情况>?
如有任何帮助,我们将不胜感激。
提前谢谢您。
最佳答案
您可以简化您的实现,这样就不必担心这个问题。您可以执行以下操作:
(1) 简化BroadcastProcessFunction,使其所做的只是将传入流分成两个流:日常事务流和临时事务流。它通过根据最新规则选择两个侧面输出之一来实现此目的。
(2) 按照 BroadcastProcessFunction 的计数窗口创建批处理并将其写入数据库。
或者,BroadcastProcessFunction 可以写出(规则、事务)的元组,而不是使用侧面输出,然后您可以通过规则对流进行键入。不管怎样,我们的想法是让窗口 API 为您管理容错列表。
关于stream - 如何在 Flink 中使用 ListState 进行 BroadcastProcessFunction,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59343157/