apache-flink - "Buffer pool is destroyed"当我使用 Flink SlidingEventTimeWindows

标签 apache-flink flink-streaming

当我使用“SlidingEventTimeWindows”时,Flink 抛出“java.lang.IllegalStateException:缓冲池被破坏”,但是当我更改为“SlidingProcessingTimeWindows”时一切正常。

堆栈跟踪如下:

18:37:53,728 WARN  org.apache.flink.streaming.api.operators.AbstractStreamOperator  - Error while emitting latency marker.
java.lang.RuntimeException: Buffer pool is destroyed.
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:147)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:683)
	at org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:151)
	at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:330)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Buffer pool is destroyed.
	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegment(LocalBufferPool.java:230)
	at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:204)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:213)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:144)
	at org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:125)
	at org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:93)
	at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:144)
	... 10 more

我终于解决了以下步骤。

首先,将用于生成流数据的 My DataMockSource 中的“collect”替换为“collectWithTimestamp”。这样做之后,“Error while emitting latency marker”将在控制台中消失。

其次,将 BoundedOutOfOrdernessTimestampExtractor 替换为用于 EventTime 处理的 AscendingTimestampExtractor。在我的 DataMockSource 中,我生成数据并同时发出,因此 AscendingTimestampExtractor 是生成水印的正确方法。

我在这里发布主要代码,并在 github 上发布完整项目.希望对您有所帮助。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.enableCheckpointing(10000); //

DataStreamSource<MockData> mockDataDataStreamSource = env.addSource(new DataMockSource());
mockDataDataStreamSource.assignTimestampsAndWatermarks(
    new AscendingTimestampExtractor<MockData>() {
      @Override
      public long extractAscendingTimestamp(MockData element) {
        return element.getTimestamp();
      }
    });

SingleOutputStreamOperator<Tuple2<String, Long>> countStream = mockDataDataStreamSource
    .keyBy("country").window(
        SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(10)))
//        .allowedLateness(Time.seconds(5))
    .process(
        new FlinkEventTimeCountFunction()).name("count elements");

countStream.addSink(new SinkFunction<Tuple2<String, Long>>() {
  @Override
  public void invoke(Tuple2<String, Long> value, Context context) throws Exception {
    System.out.println(value);
  }
});

env.execute("count test ");

我的 DataMockSource 在这里:

private volatile boolean running = true;
  @Override
  public void run(SourceContext sourceContext) throws Exception {
    while (running){
      MockData mockData = new MockData();
      mockData.setAge(ThreadLocalRandom.current().nextInt(1,99));
      mockData.setCountry("country "+ThreadLocalRandom.current().nextInt(2,5));
      mockData.setId(ThreadLocalRandom.current().nextLong());
      mockData.setTimestamp(Instant.now().toEpochMilli());
      // emit record with timestamp
      sourceContext.collectWithTimestamp(mockData,Instant.now().toEpochMilli());
//      sourceContext.collect(mockData);

      TimeUnit.SECONDS.sleep(3);
    }
  }

  @Override
  public void cancel() {
     running = false;
  }

最佳答案

在事件时间工作时,您需要在源中或使用 assignTimestampsAndWatermarks 安排时间戳提取和水印发生。看起来您没有这样做,这可以解释为什么您不会获得任何输出(永远不会触发事件时间窗口)。

此外,您的来源应该有一个取消方法。像这样:

private volatile boolean running = true;

@Override
public void run(SourceContext ctx) throws Exception {
    while (running) {
        ...
    }
}

@Override
public void cancel() {
    running = false;
}

我认为这可以解释您所看到的异常。在作业开始自行关闭后,源可能继续运行并发送延迟标记。

关于apache-flink - "Buffer pool is destroyed"当我使用 Flink SlidingEventTimeWindows,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53316332/

相关文章:

apache-flink - 为什么我们在 flink 源代码中有 flink-streaming-java 和 flink-streaming-scala 模块

java - 使用Flink时Kafka中json数据不清楚如何反序列化

apache-flink - 如何在其他流的基础上过滤Apache flink流?

SingleOutputStreamOperator#returns(TypeHint<T> typeHint) 方法的 javadoc

apache-flink - Apache 弗林克 : How to query a relational database with the Table API?

java - Flink 在 timeWindow 上应用函数

scala - flatMap 函数中的 Apache Flink Streaming 类型不匹配

kotlin - Flink:如何结合countWindowAll()处理有限流的其余部分

apache-flink - 如何在 Apache Flink 1.14 中将消息 key 添加到 KafkaSink

pojo - Apache Flink - 如何使用 AWS Kinesis 发送和使用 POJO