我尝试根据 2 个标准对传入流中的对象进行存储。
- 如果对象总数为N,则将其分桶并发送给下游。
- 如果自最后 N 个对象以来的时间 >= 超时,则将其存储并发送到下游。
这两个功能在 Flink 中分别作为 CountTrigger
和 ProcessingTimeSessionWindows
提供。
我尝试结合两者的功能来创建自定义触发器并扩展 ProcessingTimeSessionWindows
以使用该触发器。它会触发第二个条件,但不会触发第一个条件。由于该流不是键控流,因此我无法使用 ValueState 来存储计数,因此我想知道对此有哪些替代方案。
代码如下:
public class ProcessingTimeCountSessionWindow extends ProcessingTimeSessionWindows {
private static final long serialVersionUID = 786L;
private final int count;
private ProcessingTimeCountSessionWindow(int count, long timeout) {
super(timeout);
this.count = count;
}
@Override
public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
return ProcessingTimeCountTrigger.create(count);
}
/**
* Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
* elements to sessions based on the element timestamp.
*
* @param count Max count of elements in session i.e. the upper bound on count gap between sessions
* @param size The session timeout, i.e. the time gap between sessions
* @return The policy.
*/
public static ProcessingTimeCountSessionWindow withCountAndGap(int count, Time size) {
return new ProcessingTimeCountSessionWindow(count, size.toMilliseconds());
}
}
自定义触发器如下:
计数触发器使用 ReducingState
但我的流未设置键控,因此不起作用。
public class ProcessingTimeCountTrigger extends Trigger<Object, TimeWindow> {
private static final long serialVersionUID = 786L;
private final int maxCount;
private final ReducingStateDescriptor<Integer> countStateDesc =
new ReducingStateDescriptor<>("window-count", new ReduceFunctions.IntSum(), IntSerializer.INSTANCE);
private ProcessingTimeCountTrigger(int maxCount) {
this.maxCount = maxCount;
}
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
ReducingState<Integer> count = ctx.getPartitionedState(countStateDesc);
count.add(1);
if (count.get() >= maxCount) {
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
return TriggerResult.CONTINUE;
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {
ctx.registerProcessingTimeTimer(window.maxTimestamp());
}
@Override
public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(countStateDesc).clear();
}
public static ProcessingTimeCountTrigger create(int maxCount) {
return new ProcessingTimeCountTrigger(maxCount);
}
@Override
public String toString() {
return "ProcessingTimeCountTrigger(" + maxCount + ")";
}
}
最佳答案
我能够通过精确复制粘贴 CountTrigger 并覆盖以下内容来解决这个问题:
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
我也不需要扩展ProcessingTimeSessionWindow,因为我可以只使用创建的自定义触发器。不幸的是,我们无法扩展 CountTrigger,因为它是私有(private)构造函数,否则这将是最好的解决方案。
所以最终的代码看起来像这样:
consoleInput.windowAll(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
.trigger(ProcessingTimeCountTrigger.of(10L))
.process(new ProcessAllWindowFunction<String, String, TimeWindow>() {
@Override
public void process(Context context, Iterable<String> elements, Collector<String> out) throws Exception {
List<String> alphaList = new ArrayList<>();
elements.forEach(alphaList::add);
out.collect("Time is " + new Date().toString());
out.collect("Total " + alphaList.size() + " elements in window");
}
})
如果我们有 10 个元素,或者距离我们上次看到一个元素已经过去了 10 秒,这会将分桶数据发送到下游。
自定义触发代码如下:
public class ProcessingTimeCountTrigger<W extends Window> extends Trigger<Object, W> {
private static final long serialVersionUID = 1L;
private final long maxCount;
private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
private ProcessingTimeCountTrigger(long maxCount) {
this.maxCount = maxCount;
}
@Override
public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
count.add(1L);
if (count.get() >= maxCount) {
count.clear();
return TriggerResult.FIRE_AND_PURGE;
}
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
return TriggerResult.CONTINUE;
}
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
return TriggerResult.FIRE_AND_PURGE;
}
@Override
public void clear(W window, TriggerContext ctx) throws Exception {
ctx.getPartitionedState(stateDesc).clear();
}
@Override
public boolean canMerge() {
return true;
}
@Override
public void onMerge(W window, OnMergeContext ctx) throws Exception {
ctx.mergePartitionedState(stateDesc);
}
@Override
public String toString() {
return "ProcessingTimeCountTrigger(" + maxCount + ")";
}
/**
* Creates a trigger that fires once the number of elements in a pane reaches the given count.
*
* @param maxCount The count of elements at which to fire.
* @param <W> The type of {@link Window Windows} on which this trigger can operate.
*/
public static <W extends Window> ProcessingTimeCountTrigger<W> of(long maxCount) {
return new ProcessingTimeCountTrigger<>(maxCount);
}
private static class Sum implements ReduceFunction<Long> {
private static final long serialVersionUID = 1L;
@Override
public Long reduce(Long value1, Long value2) throws Exception {
return value1 + value2;
}
}
}
关于java - Apache Flink 自定义触发器与ProcessingTimeSessionWindow,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55287008/