sink的invoke
方法似乎没有办法进行异步io?例如返回 future
?
例如redis连接器使用jedis lib同步执行redis命令:
那么它会阻塞 flink 的任务线程等待每个命令的 redis 服务器的网络响应?!其他操作符是否可以与接收器在同一线程中运行?如果是这样,那么它也会阻止他们吗?
我知道flink有asyncio api,但它似乎不适用于sink impl?
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html
最佳答案
正如 @Dexter 提到的,您可以使用 RichAsyncFunction
。这是一个示例代码(可能需要进一步更新才能使其工作;)
AsyncDataStream.orderedWait(ds, new RichAsyncFunction<Tuple2<String,MyEvent>, String>() {
transient private RedisClient client;
transient private RedisAsyncCommands<String, String> commands;
transient private ExecutorService executor;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
client = RedisClient.create("redis://localhost");
commands = client.connect().async();
executor = Executors.newFixedThreadPool(10);
}
@Override
public void close() throws Exception {
// shut down the connection and thread pool.
client.shutdown();
executor.shutdown();
super.close();
}
public void asyncInvoke(Tuple2<String, MyEvent> input, final AsyncCollector<String> collector) throws Exception {
// eg.g get something from redis in async
final RedisFuture<String> future = commands.get("key");
future.thenAccept(new Consumer<String>() {
@Override
public void accept(String value) {
collector.collect(Collections.singletonList(future.get()));
}
});
}
}, 1000, TimeUnit.MILLISECONDS);
关于asynchronous - flink的sink只支持bio吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47707550/