javascript - 找不到超时错误的 mongodb Node 游标

标签 javascript node.js mongodb node-mongodb-native

我有一个 nodejs/express 服务器,我正在尝试合并和排序来自多个 mongodb 集合的排序结果,以创建一个排序的 CSV 文件。我实现此目的的方法要求我让 mongodb 游标保持事件状态(无超时),直到我读取/耗尽所有数据,或者直到发生错误,在这种情况下我必须手动关闭它们。当数据点不多时,它似乎可以工作。但是,例如,当 mongo 查询请求一年的数据时,在将近半小时后的某个时间点,我收到以下 mongo 错误:找不到游标:游标 ID:59427962835

Promisebluebird promise 。用 Typescript 编写。

import * as _ from 'lodash';
import * as moment from 'moment-timezone';

function findNative(db, collection, spec={}) {
    const {query, fields, sort, limit, skip, hint, timeout=true} = spec;

    // internal function that gets a connection from the connection pool
    // returns promise with connection
    return ensureConnection(db)
        .then(connection => {
            const cursor = connection.collection(collection).find(
                query || {},
                {fields, sort, limit, skip, hint, timeout});
            // For sorted queries we have to limit batchSize
            // see https://jira.mongodb.org/browse/SERVER-14228
            if (connection.serverConfig.capabilities().maxWireVersion == 0 && sort && !limit) {
                cursor.batchSize(0);
            }

            return cursor;
        });
}

function getMongoStream(col, startdate, enddate) {
    return findNative('testDb', col, {
        query: { t: { $gte: startdate, $lte: enddate }},
        sort: { t: 1 },
        fields: { i: 0, _id: 0 },
        timeout: false
    });
}

async function fetchNextCursorData(cursor) {
    const hasMore = await cursor.hasNext();
    console.log(hasMore, cursor.cursorState.cursorId.toString());
    return hasMore ? cursor.next() : Promise.resolve(null);
}

function findEarliestDate(buffer: any[]): [string, number[]] {
    let earliestDateMS;
    const indices = _(buffer)
        .map(x => x && x.t.getTime())
        .forEach(t => {
            // make sure timestamp is defined
            // buffer also contains null values
            if(t && (!earliestDateMS || (earliestDateMS && t < earliestDateMS))) {
                earliestDateMS = t;
            }
        })
        .reduce((acc, t, i) => {
            if(t === earliestDateMS) {
                acc.push(i);
            }
            return acc;
        }, []);

    return [moment(earliestDateMS).utc().format('YYYY-MM-DD HH:mm:ss.SSS'), indices];
}

function closeAllCursors(cursors: any[]) {
    const openCursors = cursors
        .filter(c => !c.isClosed());
    openCursors.forEach(c => c.close());
}

async function csvData(req, res) {
    const collections: string[] = req.swagger.params.collections.value.split(',').sort(),
          sources: string[] = req.swagger.params.sources.value.split(',').sort(),
          startdate = new Date(Number(req.swagger.params.startdate.value)),
          enddate = new Date(Number(req.swagger.params.enddate.value));

    const filename = `${moment.utc().format('YYYY-MM-DD_HH:mm')}.csv`;

    res.set({
        'Content-Type': 'text/csv',
        'Content-Disposition': `attachment; filename="${filename}"`
    });
    res.write('Date UTC,' + sources.join(',') + '\n');

    const colPromises = collections.map(col => getMongoStream(col, startdate, enddate));
    let cursorsMap: { [rec: string]: any; };

    try {
        let buffer = [], dateCSVBuffer: any[] = _.fill(Array(sources.length), '');
        // fetch first doc from all cursors
        const cursors = await Promise.all(colPromises);
        cursorsMap = _.zipObject<any>(collections, cursors);
        let docs = await Promise.all(cursors.map(fetchNextCursorData));
        // initial request made for all collections
        let requestedIdx = _.range(0, collections.length);

        while(true) {
            docs.forEach((doc, i) => {
                buffer[requestedIdx[i]] = doc;
            });
            // null indicates that cursor won't return more data =>
            // all cursors are exhausted
            if(buffer.every(d => d === null)) {
                break;
            }

            const [date, indices] = findEarliestDate(buffer);
            requestedIdx = indices;

            indices.forEach(idx => {
                // update csv buffer
                const {data} = buffer[idx];
                Object.keys(data)
                    .forEach(ch => {
                        const sourceIndex = sources.indexOf(ch);
                        if(sourceIndex > -1) {
                            dateCSVBuffer[sourceIndex] = data[ch];
                        }
                    });
                // remove doc from buffer
                buffer[idx] = null;
            });
            // send csv string
            dateCSVBuffer.unshift(date);
            res.write(dateCSVBuffer.join(',') + '\n');
            // empty buffer
            dateCSVBuffer = dateCSVBuffer.map(() => '');

            // request new entry from cursors
            const nextDocPromises = indices
                .map(idx => cursorsMap[collections[idx]])
                .map(fetchNextCursorData);

            docs = await Promise.all(nextDocPromises);
        }
        // end data stream
        res.end();
    } catch(err) {
        // make sure to close all cursors
        // will catch all nested promise errors
        closeAllCursors(_.values(cursorsMap));
        console.error(err);
        res.status(500).json(err);
    }
}

使用以下选项创建的 Mongodb 连接:

{
    auto_reconnect: true,
    poolSize: 30,
    connectTimeoutMS: 90000
}

问题可能是我将光标引用保留在 map 中,因此它们没有更新吗?当我执行 cursor.hasNext() 时,游标已经死了?我还尝试检查是否 cursor.isClosed() 但它总是返回 false

Mongodb 驱动程序是 "mongodb": "2.2.15",查询是针对 v3.0 数据库进行测试的。

编辑:我做了一个小的计数测试,看看程序崩溃时已经处理了多少文档。 3 个游标(测试用例仅请求来自 3 个集合的数据)具有以下计数和 ID:

3097531 '59427962835'
31190333 '53750510295'
32007475 '101213786015'

最后处理的 id 为 '59427962835' 的文档游标是编号 4101。所以还没有接近完成

最佳答案

事实证明,将timeout 添加到find 查询不起作用。我必须像这样使用 noCursorTimeout 标志:

const cursor = connection.collection(collection)
    .find(query || {}, {fields, sort, limit, skip, hint})
    .addCursorFlag('noCursorTimeout', !timeout);

关于javascript - 找不到超时错误的 mongodb Node 游标,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50102006/

相关文章:

javascript - 将 'this' 分配给另一个变量时,javascript 的 "method"绑定(bind)如何工作?

javascript - 如何最小化 DatePicker 中的 JavaScript 代码

javascript - 为什么这条路线是404?

javascript - Res.Render is not a function 错误在node.js

mongodb - 使用 MongoDB Compass 连接到 docker mongo 镜像

javascript - 如何使用 Javascript 测试用户计算机的处理能力?

javascript - Window.print() 到 Link-OS Zebra 打印机?

javascript - node.js res.json 搞砸了

javascript - MongoDB RESTful API 结构

c# - mongo.linq 与 c# linq 之间的区别