scala - 玩mongo枚举器意外停止

标签 scala akka reactive-programming reactivemongo

设置 Scala 2.11.4、Playframework 2.3.7、Reacivemongo(0.10.5.0.akka23/0.11.0-SNAPSHOT 都尝试过)。

我们有一个包含 18,000 个实体的集合,使用 Enumerator/Iteratee 方法以异步方式处理这个集合。

案例 1。 处理很简单(将实体提取为 CSV 格式并将它们作为 REST 响应以 block 的形式发送)一切正常,所有记录都被提取和处理。

案例 2。 处理涉及最多需要 10 秒的计算,以及计算后更新记录,计算是使用 foreach Iteratee 完成的,它会更新内部任务跟踪器中已处理实体的数量。处理过程可能需要一段时间,但没关系

        Patient.findByClient(clientName) &>
            Enumeratee.mapM(patient => {
                val evaluatedAndSaveTask = patient.
                    evaluate(parser).
                    flatMap(patientOpt =>
                        patientOpt.
                            map(evaluatedPatient => evaluatedPatient.saveAndGet().map(Some(_))).
                            getOrElse(Future.successful(None))
                    )
                evaluatedAndSaveTask.recover({
                    case t =>
                        t.printStackTrace()
                        None
                })
            })
        // Step 2.1. Running evaluation process through Iteratee
        val evaluationTask = evaluation run Iteratee.foreach(patientOpt => {
            collection.update(Json.obj("clientName" -> clientName), Json.obj("$inc" -> Json.obj("processedPatients" -> 1))))
        )
        // Step 2.3. Log errors
        evaluationTask.onSuccess({ case _ => Patient.LOG.info("PatientEvaluation DONE") })
        evaluationTask.onFailure({ case t => {
            t.printStackTrace();
            Patient.LOG.info("PatientEvaluation FAILED");
        }})

在这种情况下,只有 575 个实体得到处理,Iteratee 结束时打印出“Patient evaluation DONE”。

我从等式中删除了保存,但没有帮助。

为什么会这样?

最佳答案

我终于找到了问题的罪魁祸首 - Mongo 在超时后自动过期评估,您可以指定 noCursorTimeout 标志,以防止出现这种情况:

        collection.
            find(findQ).
            sort(if(sortQF.values.isEmpty) sortQ else sortQF).
            options(QueryOpts(skipN = offset + page._1 * page._2).noCursorTimeout).
            cursor[T].

出于某种原因,ReactiveMongo 在这种情况下不会抛出 Exception,而只是关闭 Iterator。我在 ReactiveMongo 中创建了一个问题 https://github.com/ReactiveMongo/ReactiveMongo/issues/250 , 紧随其后。

目前,对我来说,使游标过期并以偏移量重新启动可能更安全。

关于scala - 玩mongo枚举器意外停止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28031374/

相关文章:

c# - 在前一个元素匹配条件后获取流的第一个元素

scala - 忽略自定义任务和范围中的 SBT 工件?

scala - 嵌套 Future.sequence 按顺序执行包含的 Futures

scala - Akka-如何检查收件箱中的邮件有多长时间?

scala - 为什么它不识别 asScala 方法?

javascript - 如何正确应用distinctUntilChanged进行DOM操作最小化

javascript - rxjs 将 Observable<List<X>> 转换为 Observable<X> 的运算符

scala - 使用 Slick 的代码生成器时,如何过滤列?

Scala trait 及其方法的参数化

Akka 聚类冲突