java - 使用 jOOQ 从 Postgres 获取的流不从类返回结果

标签 java spring postgresql stream jooq

问题

我正在尝试将 postgres 查询的结果流式传输到前端应用程序,而不是急切地获取所有结果。问题是我只能在终端中看到流式结果(即首先在 "org.jooq.tools.LoggerListener : Record fetched: ..." 中看到,然后在 stream 中看到。 get().forEach(s -> debug)),并且引用此流的类在调用查看 ResultSet 时仅生成 null 值前端。

该数据也可用于其他任务(例如可视化、下载/导出、汇总统计等)。我一直在浏览有关 jOOQ 的文档和帖子,我将其用作我的 ORM,并且我尝试使用以下方法:

使用以下内容进行急切获取目前效果很好,但这将返回一个巨大的 ResponseEntity 中的所有内容,并且不会传输结果:


当前类(class)

DataController.java

@RestController
@RequestMapping(value = "/v3")
@Validated
public class DataController {

  @Autowired private QueryService queryService;

  @PostMapping(value = "/data", produces = MediaType.APPLICATION_JSON_UTF8_VALUE)
  @ApiOperation(value = "Query the data")
  @ResponseStatus(HttpStatus.CREATED)
  public ResponseEntity<QueryResult> getQueryResults(
      @RequestBody @ValidQuery Query query, HttpServletRequest request) {

    QueryResult res = queryService.search(query);
    return ResponseEntity.ok(res);
  }
// ...
}

QueryResult.java

public QueryResult(Stream<Record> result) {
    this.result = result;
  }

//  public List<Map<String, Object>> getResult() { return result; }
  @JsonProperty("result")
  public Stream<Record> getResult() { return result; }


//  public void setResult(List<Map<String, Object>> result) { this.result = result; }
  public void setResult(Stream<Record> result) { this.result = result; }

}

QueryService.java

@Service
public class QueryService implements SearchService{
  @Autowired DefaultDSLContext dslContext;

  public QueryResult search(Query query) {

    LinkedHashMap<DataSourceName, List<String>> selections = query.getSelections();

    // Build selected fields
    List<SelectField> selectFields = QueryUtils.getSelectionFields(selections);

    // Current support is for a single query. All others passed will be ignored
    List<Filter> filters = query.getFilters();
    Filter leadingFilter = QueryUtils.getLeadingFilter(filters);

    // Build "where" conditions
    Condition conditionClause = QueryUtils.getConditionClause(leadingFilter);

    // Get "from" statement
    Table<Record> fromClause = QueryUtils.getFromStatement(fromDataSource,query.getJoins());

    /*
    // Works fine, but is not lazy fetching
    List<Map<String, Object>> results =
        dslContext
            .select(selectFields)
            .from(fromClause)
            .where(conditionClause)
            .limit(query.getOffset(), query.getLimit())
            .fetchMaps();
    */

      // Appears to work only once. 
      // Cannot see any results returned, but the number of records is correct. 
      // Everything in the records is null / undefined in the frontend
      Supplier<Stream<Record>> results = () ->
              dslContext
                      .select(selectFields)
                      .from(fromClause)
                      .where(conditionClause)
                      .limit(query.getOffset(), query.getLimit())
                      .fetchStream();

      // "stream has already been operated upon or closed" is returned when using a Supplier
      results.get().forEach(s -> logger.debug("Streamed record: \n" + String.valueOf(s)));

      return new QueryResult(results.get());

  }
}

Query.java

public class Query {
  @NotNull(message = "Query must contain selection(s)")
  private LinkedHashMap<DataSourceName, List<String>> selections;
  private List<Filter> filters;
  private List<Join> joins;
  private List<Sort> sorts;
  private long offset;
  private int limit;

  private QueryOptions options;

  @JsonProperty("selections")
  public LinkedHashMap<DataSourceName, List<String>> getSelections() {
    return selections;
  }

  public void setSelections(LinkedHashMap<DataSourceName, List<String>> selections) {
    this.selections = selections;
  }

  @JsonProperty("filters")
  public List<Filter> getFilters() {
    return filters;
  }

  public void setFilters(List<Filter> filters) {
    this.filters = filters;
  }

  @JsonProperty("joins")
  public List<Join> getJoins() {
    return joins;
  }

  public void setJoins(List<Join> joins) {
    this.joins = joins;
  }

  @JsonProperty("sorts")
  public List<Sort> getSorts() {
    return sorts;
  }

  public void setSorts(List<Sort> sorts) {
    this.sorts = sorts;
  }

  @JsonProperty("options")
  public QueryOptions getOptions() {
    return options;
  }

  public void setOptions(QueryOptions options) {
    this.options = options;
  }

  @JsonProperty("offset")
  public long getOffset() {
    return offset;
  }

  public void setOffset(long offset) {
    this.offset = offset;
  }

  @JsonProperty("limit")
  public int getLimit() {
    return limit;
  }

  public void setLimit(int limit) {
    this.limit = limit;
  }

  @Override
  public String toString() {
    return "Query{"
        + "selections=" + selections
        + ", filters="  + filters
        + ", sorts="    + sorts
        + ", offSet="   + offset
        + ", limit="    + limit
        + ", options="  + options
        + '}';
  }
}

DataApi.js

// ...
const dataApi = axios.create({baseURL: `${my_data_url}`,});
// ...
export default dataApi;

Data.jsx

// ...

// This block queries Spring, and it returns the ResponseEntity with the ResultSet
// Streaming returns the right number of records, but every record is null / undefined
try {
      const response = await dataApi.post('/v3/data', query);
} catch (error) {
// ...
}
// ...

控制台返回结果

{data: {…}, status: 200, statusText: "OK", headers: {…}, config: {…}, …}
data:
result: Array(100)
0: {}
1: {}
2: {}
3: {}
...

堆栈:

  • Docker:19.03.5
  • Spring Boot:v2.1.8.RELEASE
  • 节点:v12.13.1
  • react :16.9.0
  • OpenJDK:12.0.2
  • jOOQ:3.12.3
  • postgres:10.7

最佳答案

Java Stream API 的全部意义在于这样的流最多被使用一次。它没有任何缓冲功能,也不支持像响应式(Reactive)流实现那样的基于推送的流模型。

您可以将另一个 API 添加到您的堆栈中,例如Reactor (还有其他的,但因为您已经在使用 Spring...),它支持向多个使用者缓冲和重放流,但这与 jOOQ 直接无关,并且会严重影响您的应用程序的架构。

请注意,jOOQ 的 ResultQuery 扩展了 org.reactivestreams.Publisher 和 JDK 9 的 Flow.Publisher,以便与此类响应式(Reactive)流实现更好的互操作性。

关于java - 使用 jOOQ 从 Postgres 获取的流不从类返回结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59956246/

相关文章:

java - 订阅 MailChimp 列表 : Content Type '' not supported

java - Camel 路线 : read xml into pojo and write it back into xml file

database - 为什么\dt *.向我展示 psql 中的关系列表,但\dt 不向我展示我的 postgreSQL 数据库中的表列表?

ruby-on-rails - Rails+ postgres : Loading db structure floods stdout

spring - 带 Spring 的 Tomcat

java - 谷歌 Collection ( Guava 图书馆): ImmutableSet/List/Map and Filtering

java - JSP Servlet session invalidate() 不会使 session 为空

java - Oracle 通过 JPA 排序,数字在前

sql - 在 SQL 中计算文档频率

java - 如何从多部分文件中获取文件,以便可以创建PDImageXObject?