java - 如何在apache flink Streaming中从关系数据库读取数据

标签 java streaming apache-flink flink-streaming

我们如何使用自定义数据源从关系数据库读取数据。我是 flink 流媒体新手。我在添加新的自定义数据源时遇到问题。因此,请帮助我添加自定义数据源并从源数据库连续读取数据

最佳答案

正如程志所建议的,关系数据库并不是为了以流式处理方式而设计的,最好使用 Kafka、Kinesis 或其他一些系统。

但是,您可以编写一个使用 JDBC 连接来获取数据的自定义源函数。它必须不断查询数据库以获取任何新数据。这里的问题是,您需要一种方法来确定哪些数据已经读取/处理,哪些数据没有读取/处理。从我的想法来看,您可以使用一些东西,例如记住最后处理的主键是什么,并在后续查询中使用它,例如:

从事件中选择 * WHERE event_id > $last_processed_event_id;

或者,您可以清除某些事务中的events表,例如:

从未处理的事件中选择*; DELETE FROM unprocessed_events WHERE event_id IN $PROCESSED_EVENT_IDS;

event_id 可以是任何可以让您唯一标识记录的内容,也许它可以是某个时间戳或一组字段。

要考虑的另一件事是,如果您想提供任何合理的at-least-once,则必须手动处理检查点(last_processed_even_id 偏移量)或恰好一次保证。

关于java - 如何在apache flink Streaming中从关系数据库读取数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48162464/

相关文章:

streaming - 是否可以从 Elasticsearch 流式传输文档?

java - 如何将实时数据(数字)传输到 Android 应用程序?

java - 如何在消息驱动 Bean 中确认消息

Java - 无法看到包内的类

iphone - iPhone 上的流式 SAX XML 处理

apache-flink - 在远程集群上调试

docker - 如何从 Kubernetes 中的 flink docker 镜像启动我的 jar 应用程序?

java - 如何使用java制作背景会逐渐变化的html文件?

java - 从另一个表创建表时出现语法错误

scala - Apache 弗林克 : Count window with timeout