Spring Webflux Async PostgreSQL Publisher 在第一个结果后停止

标签 spring postgresql websocket rx-java spring-webflux

我正在尝试用响应式(Reactive)异步 postgres-async-driver 替换 PostgreSQL 数据库轮询器,并将新插入的行流式传输到 Spring 5 Webflux Reactive websocket 客户端,如 Josh Long 的精彩示例演示 here 并基于 Sébastien Deleuze's spring-reactive-playground

我的 Publisher 获得第一 row,但随后不返回后续行。 是我的 Observable、我的 Publisher 还是我使用 postgres-async-driver Db 的方式有问题?

public Observable<WebSocketMessage> getObservableWSM(WebSocketSession session){
    return
        // com.github.pgasync.Db
        db.queryRows(sql)
        // ~RowMapper method
        .map(row -> mapRowToDto(row))
        // serialize dto to String for websocket
        .map(dto -> { return objectMapper.writeValueAsString(dto); })
        // finally, write to websocket session 
        .map(str -> { return session.textMessage((String) str);
        });
}

然后,我使用 RxReactiveStream.toPublisher 转换器将 Observable 连接到我的 WebSocketHandler:

@Bean
WebSocketHandler dbWebSocketHandler() {
    return session -> {
        Observable<WebSocketMessage> o = getObservableWSM(session);
        return session.send(Flux.from(RxReactiveStreams.toPublisher(o)));
    };
}

这将从我的 sql 语句中获取第一个 row,但不会获取其他行。如何继续流式传输其他行?

理想情况下,我想我想要 PostgreSQL 相当于 MongoDB Tailable cursor

最佳答案

我创建了一个 Postgres Trigger触发 INSERTs 到我的表基于 this example :

CREATE OR REPLACE FUNCTION table_update_notify() RETURNS trigger AS $$
DECLARE
  id bigint;
BEGIN
  IF TG_OP = 'INSERT' THEN
    id = NEW.id;
  ELSE
    id = OLD.id;
  END IF;
  PERFORM pg_notify('my_trigger_name', json_build_object('table', TG_TABLE_NAME, 'id', id, 'type', TG_OP)::text);
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

然后我使用 reactive-pg-client 订阅了那个 Postgres 触发器。 . 以下是他们的 Pub/Sub 示例中的代码:

@Bean
PgPool subscribedNotificationHandler() {
    PgPool client = pgPool();
    client.getConnection(asyncResult -> {
        if (asyncResult.succeeded()) {
            PgConnection connection = asyncResult.result();
            connection.notificationHandler(notification -> {
                notification.getPayload();
                // do things with payload
            });
            connection.query("LISTEN my_trigger_name", ar -> {
                log.info("Subscribed to channel");
            });
        }
    });
    return client;
}

关于Spring Webflux Async PostgreSQL Publisher 在第一个结果后停止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49410703/

相关文章:

database - 如何在 Postgres 中使用 Blob 数据类型

node.js - 我可以浏览 nodejs ws 模块以便浏览器成为 Web 套接字连接的服务器端吗

javascript - NodeJS、WebSockets、升级头

java - 使用 CorsFilter 和 spring security 时出现 Cors 错误

spring - 由于 facebook 范围,无法使 spring-social-showcase-boot 工作?

postgresql - DEALLOCATE pdo_stmt 需要很长时间(几天),这正常吗?

laravel - 如何在Ubuntu Server 18.04中使用系统启动来运行网络套接字?

spring - 试图让 Spring/Consul/Vault 一起工作

spring - 将 spring-boot 配置为仅针对特定依赖项自动配置以进行测试

ruby-on-rails - Multi-Tenancy rails 应用程序 : what are the pros and cons of different techniques?