java - 如何在 Flink 服务器上连续运行 apache flink streaming job

标签 java postgresql streaming apache-flink

您好, 我为流式作业编写代码,其中源和目标是 PostgreSQL 数据库。我使用 JDBCInputFormat/JDBCOutputFormat 来读取和写入记录(Referenced example)。 代码:

StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
    environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

    JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
            .setDrivername(JDBCConfig.DRIVER_CLASS)
            .setDBUrl(JDBCConfig.DB_URL)
            .setQuery(JDBCConfig.SELECT_FROM_SOURCE)
            .setRowTypeInfo(JDBCConfig.ROW_TYPE_INFO);

    SingleOutputStreamOperator<Row> source = environment.createInput(inputBuilder.finish())
            .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Row>() {
                @Override
                public long extractAscendingTimestamp(Row row) {
                    Date dt = (Date) row.getField(2);
                    return dt.getTime();
                }
            })
            .keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)))
            .fold(null, new FoldFunction<Row, Row>(){
                @Override
                public Row fold(Row row1, Row row) throws Exception {
                    return row;
                }
            });

    source.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
            .setDrivername(JDBCConfig.DRIVER_CLASS)
            .setDBUrl(JDBCConfig.DB_URL)
            .setQuery("insert into tablename(id, name) values (?,?)")
            .setSqlTypes(new int[]{Types.BIGINT, Types.VARCHAR})
            .finish());

此代码正确执行但未在 Flink 服务器上连续运行(选择查询仅执行一次。) 期望在 flink 服务器上持续运行

最佳答案

您可能必须定义自己的 Flink Source 或 JDBCInputFormat,因为您在此处使用的将在从 DB 获取所有结果时停止 SourceTask。解决这个问题的一种方法是根据 JDBCInputFormat 创建您自己的 jdbc 输入格式。 ,尝试在从 nextRecord 中的数据库读取最后一行时重新执行 SQL 查询.

关于java - 如何在 Flink 服务器上连续运行 apache flink streaming job,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48151881/

相关文章:

php - 如何以数据库函数为源创建一个 Eloquent 模型?

postgresql - 如何使用 Liquibase 强制执行列顺序?

android - 帮助在 android 中从 IP 摄像机流式传输视频

c# - 使用 HttpWebRequest 流式传输大文件时出现内存不足异常

java - Hadoop 构建在 Windows 7、Maven 3.1.1、Jdk 1.7.0_45 上失败(Hadoop src 2.2.0)

java - 我可以使用什么来代替 "this"作为传递参数?

java - jUnit 中的多个 RunWith 语句

mysql - 数据库可移植性(sql server 到 mysql、postgresql)

java - 麦克风直播不清晰

java - 如何在使用lucene分析器时添加短语作为停用词?