您好, 我为流式作业编写代码,其中源和目标是 PostgreSQL 数据库。我使用 JDBCInputFormat/JDBCOutputFormat 来读取和写入记录(Referenced example)。 代码:
StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
JDBCInputFormatBuilder inputBuilder = JDBCInputFormat.buildJDBCInputFormat()
.setDrivername(JDBCConfig.DRIVER_CLASS)
.setDBUrl(JDBCConfig.DB_URL)
.setQuery(JDBCConfig.SELECT_FROM_SOURCE)
.setRowTypeInfo(JDBCConfig.ROW_TYPE_INFO);
SingleOutputStreamOperator<Row> source = environment.createInput(inputBuilder.finish())
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Row>() {
@Override
public long extractAscendingTimestamp(Row row) {
Date dt = (Date) row.getField(2);
return dt.getTime();
}
})
.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5)))
.fold(null, new FoldFunction<Row, Row>(){
@Override
public Row fold(Row row1, Row row) throws Exception {
return row;
}
});
source.writeUsingOutputFormat(JDBCOutputFormat.buildJDBCOutputFormat()
.setDrivername(JDBCConfig.DRIVER_CLASS)
.setDBUrl(JDBCConfig.DB_URL)
.setQuery("insert into tablename(id, name) values (?,?)")
.setSqlTypes(new int[]{Types.BIGINT, Types.VARCHAR})
.finish());
此代码正确执行但未在 Flink 服务器上连续运行(选择查询仅执行一次。) 期望在 flink 服务器上持续运行。
最佳答案
您可能必须定义自己的 Flink Source 或 JDBCInputFormat,因为您在此处使用的将在从 DB 获取所有结果时停止 SourceTask。解决这个问题的一种方法是根据 JDBCInputFormat
创建您自己的 jdbc 输入格式。 ,尝试在从 nextRecord
中的数据库读取最后一行时重新执行 SQL 查询.
关于java - 如何在 Flink 服务器上连续运行 apache flink streaming job,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48151881/