我已经实现了一个 SourceFunction
,它从 URL 中获取数据(一个字符串)。然后我对该数据执行 keyBy()
并应用 10 分钟的窗口。现在 SourceFunction
只被调用一次,窗口对数据进行 10 分钟的操作。如何从 SourceFunction
中连续获取数据?
DataStream<String> = env.addSource(MySource()) // This runs only once
.keyBy(some keyby function)
.window(for 10 minutes) // This runs for 10 minutes for the data obtained once by Source function
.process(some process function)
我想在一定的时间间隔内重复运行 SourceFunction
,让窗口处理连续获取的数据。
最佳答案
您的 SourceFunction
的 run() 方法应该是一个循环,它通过 hibernate (或任何其他调度机制)来完成工作。
一种常见的模式是使用某种原子 boolean 值,您在首次调用 run
时设置为 true,在调用 cancel
时设置为 false。
所以你的 run
方法中有这样的东西:
while (running) {
// fetch some data, can be async
ctx.collect(data);
Thread.sleep(period);
}
您可以按照您认为合适的方式执行该部分,但重要的是在您实际完成之前不要退出 SourceFunction
的 run
方法或者您有被取消了。
关于java - Apache 弗林克 : How to implement a SourceFunction?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49674520/