asynchronous - 进行 REST 调用的 Flink 转换(异步、Future、Netty)

标签 asynchronous netty apache-flink

让我们假设 Flink 每秒接收 1000 条推文流,并且在这个过程中的某个地方,它需要将它们分类为垃圾邮件。我有一个集群,例如20 台机器通过 REST API 提供“分类”微服务,它们可以提供每秒 10k 条推文的最大吞吐量,延迟为 3 秒。这意味着在最坏的情况下,我可能有 3 万条动态推文,这没关系。我猜想从 Flink 中使用这个服务,一个实现将是这样的:

public class Classifier implements MapFunction<Tweet, TweetWithClass> {
  @Override
  public TweetWithClass map(Tweet tweet) {
    TweetWithClass twc = new TweetWithClass(tweet);
    twc.classes = (new Post('http://my.classifier.com', data = tweet.body)).bodyAsStringArrayFromJson();
    return twc;
  }
}

DataSet<TweetWithClass> outTweets = inTweets.map(new Classifier()).setParallelism(30000);

现在,鉴于此 API,我的猜测是 Flink 除了启动 30k 线程外别无选择,这可能很糟糕。我在源代码中看到 Flink 使用了 Netty,我想它可以通过使用异步调用更有效地支持这种操作......如果存在虚构的漂亮的 Netty,Flink 和 Java API,这将是这样的:

public class Classifier implements MapFunction<Tweet, TweetWithClass> {
  @Override
  public Future<TweetWithClass> map(Tweet tweet) {
    Future<String[]> classes = (new NettyPost('http://my.classifier.com', data = tweet.body)).asyncBodyAsStringArrayFromJson();
    return classes.onGet( (String[] classes) -> new TweetWithClass(tweet, twc.classes) );
  }
}

DataSet<TweetWithClass> outTweets = inTweets.nettyMap(new Classifier()).setMaxParallelism(30000);

有没有一种方法可以使用异步调用在 Flink 中以很少的线程实现巨大的可扩展性?

最佳答案

我知道这是一个相对较老的问题,但从 Flink 1.2(2017 年 2 月发布)开始,Flink 提供了一个用于此目的的 API。 它称为异步 I/O。

使用异步 I/O,您可以执行对外部数据库或外部 Web 服务的异步调用,并通过 future 内部的回调获取结果。

可在此处找到更多信息:https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/asyncio.html

关于asynchronous - 进行 REST 调用的 Flink 转换(异步、Future、Netty),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38810393/

相关文章:

scala - 将 scala 延续与 netty/NIO 监听器结合使用

java - FLINK CEP (Java 8) - 通过匹配模式持久化 "identity"

ruby - JRuby 多核处理用法

c# - await 运算符只能在异步方法中使用 - 但方法是异步的

node.js - Node.js 中 API 的异步调用模式

java - 如何配置 Netty 以拥有单个工作线程?

c# - 使用 ThreadPool.QueueUserWorkItem - 线程意外退出

java - Netty 无法在 channel 中发送第二个请求

java - 如何使用 Apache Flink 按属性和时间窗口进行计数?

scala - 使用 scala sbt 对 kafka + flink 示例进行故障排除?