java - 如何从 Apache Flink 的数据库中查找和更新记录的状态?

标签 java stream apache-flink

我正在开发一个数据流应用程序,我正在研究在这个项目中使用 Apache Flink 的可能性。这样做的主要原因是它支持很好的高级流构造,与 Java 8 的 Stream API 非常相似。

我将接收与数据库中特定记录相对应的事件,我希望能够处理这些事件(来自 RabbitMQ 或 Kafka 等消息代理)并最终更新数据库中的记录并推送处理/转换的事件到另一个接收器(可能是另一个消息代理)。

理想情况下,与特定记录相关的事件需要以 FIFO 顺序处理(尽管会有一个时间戳也有助于检测乱序事件),但与不同记录相关的事件可以并行处理。我打算使用 keyBy() 构造按记录对流进行分区。

需要进行的处理取决于数据库中有关记录的当前信息。但是,我无法找到一个示例或推荐的方法来查询数据库中的此类记录,以使用我需要处理它的附加信息来丰富正在处理的事件。

我想到的流水线如下:

-> 收到的id上的keyBy() -> 从数据库中检索对应id的记录 -> 对记录执行处理步骤 -> 将处理后的事件推送到外部队列并更新数据库记录

数据库记录将需要更新,因为另一个应用程序将查询数据。

在实现此管道后,可能还可以进行其他优化。例如,可以将(更新的)记录缓存在托管状态中,以便同一记录上的下一个事件不需要另一个数据库查询。但是,如果应用程序不知道特定记录,则需要从数据库中检索它。

在 Apache Flink 中使用这种场景的最佳方法是什么?

最佳答案

您可以通过扩展rich 函数来执行数据库查找,例如一个 RichFlatMap 函数,在它的 open() 方法中初始化一次数据库连接,然后在 flatMap() 方法中处理每个事件:

public static class DatabaseMapper extends RichFlatMapFunction<Event, EncrichedEvent> {

    // Declare DB coonection and query statements

    @Override
    public void open(Configuration parameters) throws Exception {
      // Initialize Database connection
      // Prepare Query statements
    }

    @Override
    public void flatMap(Event currentEvent, Collector<EncrichedEvent> out) throws Exception {
      // look up the Database, update record, enrich event
      out.collect(enrichedEvent);        
    }
})

然后您可以按如下方式使用DatabaseMapper:

stream.keyby(id)
      .flatmap(new DatabaseMapper())
      .addSink(..);

你可以找到here使用来自 Redis 的缓存数据的示例。

关于java - 如何从 Apache Flink 的数据库中查找和更新记录的状态?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38866078/

相关文章:

apache-flink - 如何确保flink作业已完成执行然后执行一些任务

java - 弗林克 : Window evaluation

使用 Apache Flink 对数据流进行排序

java - 无法使用 SAF 检索自定义额外内容

java - 使用最多五位数字来格式化 double 值,必要时四舍五入小数位

java - 多行字符串搜索

c++ - 指针数组困惑

c# - 如何在 C# 中将文件流直接保存到 excel 文件?

java - Web 应用程序似乎启动了一个名为 [Timer-0] 的线程,但未能停止它

c# - 在 C# 中使用流进行单元测试