java - 使用 spring-data-mongodb 流式传输聚合操作的结果

标签 java mongodb spring-data-mongodb

我正在使用 spring-data-mongodb,我想使用游标进行聚合操作。

MongoTemplate.stream() 获取查询,因此我尝试创建聚合实例,使用 Aggregation.toDbObject()DbObject em>,使用 DbObject 创建了一个 BasicQuery,然后调用 stream() 方法。
这将返回一个空游标。

调试 spring-data-mongodb 代码显示 MongoTemplate.stream() 使用 FindOperation,这让我觉得 spring-data-mongodb 不支持流聚合操作。
有没有人能够使用 spring-data-mongodb 流式传输聚合查询的结果?

郑重声明,我可以使用 Java mongodb 驱动程序来完成,但我更喜欢使用 spring-data。

编辑 11 月 10 日 - 添加示例代码:

    MatchOperation match = Aggregation.match(Criteria.where("type").ne("AType"));
    GroupOperation group = Aggregation.group("name", "type");
    group = group.push("color").as("colors");
    group = group.push("size").as("sizes");
    TypedAggregation<MyClass> agg = Aggregation.newAggregation(MyClass.class, Arrays.asList(match, group));

    MongoConverter converter = mongoTemplate.getConverter();
    MappingContext<? extends MongoPersistentEntity<?>, MongoPersistentProperty> mappingContext = converter.getMappingContext();
    QueryMapper queryMapper = new QueryMapper(converter);
    AggregationOperationContext context = new TypeBasedAggregationOperationContext(MyClass.class, mappingContext, queryMapper);
    // create a BasicQuery to be used in the stream() method by converting the Aggregation to a DbObject
    BasicQuery query = new BasicQuery(agg.toDbObject("myClass", context));

    // spring-mongo attributes the stream() method to find() operationsm not to aggregate() operations so the stream returns an empty cursor
    CloseableIterator<MyClass> iter = mongoTemplate.stream(query, MyClass.class);

    // this is an empty cursor
    while(iter.hasNext()) {
        System.out.println(iter.next().getName());
    }

以下代码,不使用 stream() 方法,返回聚合的预期非空结果:

    AggregationResults<HashMap> result = mongoTemplate.aggregate(agg, "myClass", HashMap.class);

最佳答案

对于那些仍在努力寻找答案的人:

从 spring-data-mongo 版本 2.0.0.M4 开始 (AFAIK) MongoTemplate 获得了一个 aggregateStream 方法。

因此您可以执行以下操作:

 AggregationOptions aggregationOptions = Aggregation.newAggregationOptions()
        // this is very important: if you do not set the batch size, you'll get all the objects at once and you might run out of memory if the returning data set is too large
        .cursorBatchSize(mongoCursorBatchSize)
        .build();

    data = mongoTemplate.aggregateStream(Aggregation.newAggregation(
            Aggregation.group("person_id").count().as("count")).withOptions(aggregationOptions), collectionName, YourClazz.class);

关于java - 使用 spring-data-mongodb 流式传输聚合操作的结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40510855/

相关文章:

java - 将 KeyListener 添加到自定义对象

Java 客户端/服务器 SSL 套接字聊天

java - XML自动编辑器(基于XSD方案)

蒙戈数据库 : How to insert additional object into object collection in golang?

python - pymongo:如何对作为数组的列使用 $or 运算符?

java - 请求处理失败;嵌套异常是 java.lang.NumberFormatException : For input string: ""

JAVA - Try/Catch 遇到困难

javascript - 为什么这个 MongoDB 循环执行的次数比预期多?

MongoDB 4.x 实时同步到 ElasticSearch 6.x +

java - Spring Data MongoDB : How to Dump Raw Query/Commands generated through QueryDsl?