我正在尝试用响应式(Reactive)异步 postgres-async-driver 替换 PostgreSQL 数据库轮询器,并将新插入的行流式传输到 Spring 5 Webflux Reactive websocket 客户端,如 Josh Long 的精彩示例演示 here 并基于 Sébastien Deleuze's spring-reactive-playground。
我的 Publisher
获得第一 row
,但随后不返回后续行。
是我的 Observable
、我的 Publisher
还是我使用 postgres-async-driver Db
的方式有问题?
public Observable<WebSocketMessage> getObservableWSM(WebSocketSession session){
return
// com.github.pgasync.Db
db.queryRows(sql)
// ~RowMapper method
.map(row -> mapRowToDto(row))
// serialize dto to String for websocket
.map(dto -> { return objectMapper.writeValueAsString(dto); })
// finally, write to websocket session
.map(str -> { return session.textMessage((String) str);
});
}
然后,我使用 RxReactiveStream.toPublisher
转换器将 Observable
连接到我的 WebSocketHandler
:
@Bean
WebSocketHandler dbWebSocketHandler() {
return session -> {
Observable<WebSocketMessage> o = getObservableWSM(session);
return session.send(Flux.from(RxReactiveStreams.toPublisher(o)));
};
}
这将从我的 sql
语句中获取第一个 row
,但不会获取其他行。如何继续流式传输其他行?
理想情况下,我想我想要 PostgreSQL 相当于 MongoDB Tailable cursor 。
最佳答案
我创建了一个 Postgres Trigger触发 INSERT
s 到我的表基于
this example :
CREATE OR REPLACE FUNCTION table_update_notify() RETURNS trigger AS $$
DECLARE
id bigint;
BEGIN
IF TG_OP = 'INSERT' THEN
id = NEW.id;
ELSE
id = OLD.id;
END IF;
PERFORM pg_notify('my_trigger_name', json_build_object('table', TG_TABLE_NAME, 'id', id, 'type', TG_OP)::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
然后我使用 reactive-pg-client 订阅了那个 Postgres 触发器。 . 以下是他们的 Pub/Sub 示例中的代码:
@Bean
PgPool subscribedNotificationHandler() {
PgPool client = pgPool();
client.getConnection(asyncResult -> {
if (asyncResult.succeeded()) {
PgConnection connection = asyncResult.result();
connection.notificationHandler(notification -> {
notification.getPayload();
// do things with payload
});
connection.query("LISTEN my_trigger_name", ar -> {
log.info("Subscribed to channel");
});
}
});
return client;
}
关于Spring Webflux Async PostgreSQL Publisher 在第一个结果后停止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49410703/