java - Apache 弗林克 : How to implement a SourceFunction?

标签 java streaming apache-flink flink-streaming

我已经实现了一个 SourceFunction,它从 URL 中获取数据(一个字符串)。然后我对该数据执行 keyBy() 并应用 10 分钟的窗口。现在 SourceFunction 只被调用一次,窗口对数据进行 10 分钟的操作。如何从 SourceFunction 中连续获取数据?

DataStream<String> = env.addSource(MySource())   // This runs only once
                        .keyBy(some keyby function)
                        .window(for 10 minutes)  // This runs for 10 minutes for the data obtained once by Source function
                        .process(some process function)

我想在一定的时间间隔内重复运行 SourceFunction,让窗口处理连续获取的数据。

最佳答案

您的 SourceFunction 的 run() 方法应该是一个循环,它通过 hibernate (或任何其他调度机制)来完成工作。

一种常见的模式是使用某种原子 boolean 值,您在首次调用 run 时设置为 true,在调用 cancel 时设置为 false。

所以你的 run 方法中有这样的东西:

while (running) {
   // fetch some data, can be async
   ctx.collect(data);
   Thread.sleep(period);
}

您可以按照您认为合适的方式执行该部分,但重要的是在您实际完成之前不要退出 SourceFunctionrun 方法或者您有被取消了。

关于java - Apache 弗林克 : How to implement a SourceFunction?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49674520/

相关文章:

java - 如何仅允许单个连接(url/端口)从 flink 应用程序读取和写入

java - Apache Flink 错误 java.lang.ClassNotFoundException

apache-flink - Flink 任务槽在设置算子并行度大于默认并行度时分布不均匀

java - Java Rest api 中的资源实例

ffmpeg - 使用 FFMPEG 将 Socket 上的 RTP 音频数据重新采样为 PCM 数据

c# - c#中的视频通话

具有 Hive 流的 Python 2.7 模块

Java - 在主类中放置多少逻辑?

java - 并行素因数分解

java - 对随机生成的数字 vector 进行排序