如何从mysql并行读取数据到flink?我想构建一个sourceFunction,每隔一段时间从mysql并行连续读取数据,如何实现?
最佳答案
这个问题的答案包括两个方面:
- 并行读取 MySQL(或任何其他 JDBC 源)
- 定期从 MySQL(或任何其他 JDBC 源)读取数据
并行读取MySQL
为了并行读取 MySQL,您需要发送多个不同的查询。查询必须以其结果的并集等于预期结果的方式组合。例如,您可以使用范围谓词在数字属性之间拆分查询:
Q1: SELECT * FROM sourceT WHERE num < 10;
Q2: SELECT * FROM sourceT WHERE num >= 10 AND num < 20;
Q3: SELECT * FROM sourceT WHERE num >= 20;
还有其他方法可以对查询进行分区。但是为了真正有所收获,DBMS 必须能够比查询整个表的单个查询更有效地处理多个查询。因此,通常情况下,您希望确保对您进行分区的属性(上例中的 num
)进行了索引。尽管如此,在单个数据库实例上执行多个查询会导致开销。因此,找到提供最佳性能的并行机制并非易事。
定期从 MySQL 中读取
这类似于并行读取。同样,您需要对查询进行分区。但现在您想根据描述记录时间的属性来执行此操作。因此,在每个时间间隔中,您想要询问自上次时间间隔以来插入的行。同样,这将通过时间属性的范围谓词来完成。
Q at T1: SELECT * FROM sourceT WHERE rowtime < T1;
Q at T2: SELECT * FROM sourceT WHERE rowtime < T2;
和以前一样,只有当表在 rowtime
属性上建立索引时,这才有效。否则,您将进行全表扫描,并且随着插入的数据越来越多,查询会变得越来越慢。
定期从 MySQL 并行读取
为此,您“只需”结合这两种方法并为每个查询添加两个谓词。实际上,您要做的是将表划分为分离部分,并随着时间的推移并行读取它们。
然而,正如我之前指出的,确切的分区取决于您的数据和用例。此外,您需要创建适当的索引以避免全表扫描。另请注意,使用上述方法,您不会看到在读取后修改的行的任何更新。
关于mysql - 如何并行读取mysql的数据到flink?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46445085/