asynchronous - flink的sink只支持bio吗?

标签 asynchronous nio apache-flink flink-streaming

sink的invoke方法似乎没有办法进行异步io?例如返回 future

例如redis连接器使用jedis lib同步执行redis命令:

https://github.com/apache/bahir-flink/blob/master/flink-connector-redis/src/main/java/org/apache/flink/streaming/connectors/redis/RedisSink.java

那么它会阻塞 flink 的任务线程等待每个命令的 redis 服务器的网络响应?!其他操作符是否可以与接收器在同一线程中运行?如果是这样,那么它也会阻止他们吗?

我知道flink有asyncio api,但它似乎不适用于sink impl?

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html

最佳答案

正如 @Dexter 提到的,您可以使用 RichAsyncFunction。这是一个示例代码(可能需要进一步更新才能使其工作;)

    AsyncDataStream.orderedWait(ds, new RichAsyncFunction<Tuple2<String,MyEvent>, String>() {
        transient private RedisClient client;
        transient private RedisAsyncCommands<String, String> commands;
        transient private ExecutorService executor;

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            client = RedisClient.create("redis://localhost");
            commands = client.connect().async();
            executor = Executors.newFixedThreadPool(10);
        }

        @Override
        public void close() throws Exception {
            // shut down the connection and thread pool.
            client.shutdown();
            executor.shutdown();

            super.close();
        }

        public void asyncInvoke(Tuple2<String, MyEvent> input, final AsyncCollector<String> collector) throws Exception {
            // eg.g get something from redis in async
            final RedisFuture<String> future = commands.get("key");
            future.thenAccept(new Consumer<String>() {
                @Override
                public void accept(String value) {
                     collector.collect(Collections.singletonList(future.get()));
                }
            });
        }
    }, 1000, TimeUnit.MILLISECONDS);

关于asynchronous - flink的sink只支持bio吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47707550/

相关文章:

multithreading - Lwt.async() 未按预期工作

javascript - 在 AngularJS 中异步调用 hprose.httpclient

java - 如何检测流中的第一个文件

debugging - 如何调试 Apache Flink?

apache-flink - 链接映射状态大小和键数

java - 如何将Apache flink连接到rabbitmq?

c# - 在 .Net 中阻塞一个线程

javascript - NodeJS : How to use async. js处理数据库中的项目列表

java - 重用内存 MappedByteBuffer

java - 使用 Netty 提供文件 - 响应被截断一个字节