javascript - 如何将 Node.js 异步流回调转换为异步生成器?

标签 javascript node.js typescript async-await generator

我有一个函数可以通过回调批量传输数据。

每个批处理将在获取另一个批处理之前等待回调函数,并且整个函数返回一个在所有批处理完成时解析的 promise 。

(我使用 TypeScript 注释来帮助提高可读性)

async function callbackStream(fn: (batch: Array<number>) => Promise<void>) {}

我如何将这个函数变成一个一次产生一个值的异步生成器?

async function* generatorStream(): AsyncIterableIterator<number> {}

事实证明,这是一项相当艰巨的任务。

我一直在研究这个问题,我构建了一些可行的东西,但它非常复杂,我无法证明合并这段代码并让我团队中的其他人处理它是合理的。


这是我当前的实现:

我正在使用这个创建“延迟” promise 的辅助函数,它有助于围绕回调传递 promise 。

interface DeferredPromise<T> {
    resolve: (value: T) => void
    reject: (error: any) => void
    promise: Promise<T>
}

function deferred<T>(): DeferredPromise<T> {
    let resolve
    let reject
    const promise = new Promise<T>((res, rej) => {
        resolve = res
        reject = rej
    })
    return {
        resolve: resolve as (value: T) => void,
        reject: reject as (error: any) => void,
        promise,
    }
}

接下来我有一个逻辑毛团,将 promise 回调线性化为一个链,其中每个 promise 使用 next 函数解析一个批处理,该函数将返回另一个获取下一个批处理的 promise。

type Done = { done: true }
type More = { done: false; value: Array<number>; next: () => Promise<Result> }
type Result = More | Done

async function chainedPromises() {
    let deferred = PromiseUtils.deferred<Result>()

    callbackStream(async batch => {
        const next = PromiseUtils.deferred<null>()
        deferred.resolve({
            done: false,
            value: batch,
            next: () => {
                deferred = PromiseUtils.deferred<Result>()
                next.resolve(null)
                return deferred.promise
            },
        })
        await next.promise
    }).then(() => {
        deferred.resolve({ done: true })
    })

    return deferred.promise
}

从这里开始,创建一个一次生成一个项目的生成器并不是很困难:

async function* generatorStream(): AsyncIterableIterator<number> {
    let next = chainedPromises
    while (true) {
        const result = await next()
        if (result.done) {
            return
        }
        for (const item of result.value) {
            yield item
        }
        next = result.next
    }
}

我想我们都同意中间的 chainedPromises 函数非常困惑和令人费解。 有什么方法可以将 callbackStream 转换为 generatorStream 且易于理解和遵循? 我不介意使用一个库,如果它建立得很好,但我也很欣赏从第一性原理出发的简单实现。

最佳答案

不,我不认为有一种易于理解和遵循的方式来实现这种转换。但是,我建议放弃 deferred(无论如何你永远不会 rejecting)并只使用 promise 构造函数。另外,我宁愿立即实现一个异步生成器。

function queue() {
    let resolve = () => {};
    const q = {
        put() {
            resolve();
            q.promise = new Promise(r => { resolve = r; });
        },
        promise: null,
    }
    q.put(); // generate first promise
    return q;
}
function toAsyncIterator(callbackStream) {
    const query = queue();
    const result = queue();
    const end = callbackStream(batch => {
        result.put(batch);
        return query.promise;
    }).then(value => ({value, done: true}));
    end.catch(e => void e); // prevent unhandled promise rejection warnings
    return {
        [Symbol.asyncIterator]() { return this; },
        next(x) {
            query.put(x);
            return Promise.race([
                end,
                result.promise.then(value => ({value, done:false})
            ]);
        }
    }
}
async function* batchToAsyncIterator(batchCallbackStream) {
    for await (const batch of toAsyncIterator(batchCallbackStream)) {
        // for (const val of batch) yield val;
        // or simpler:
        yield* batch;
    }
}

关于javascript - 如何将 Node.js 异步流回调转换为异步生成器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50862698/

相关文章:

typescript - 对自定义 typescript 类型的反省

typescript - TS2322 : Type 'string | 3000' is not assignable to number

javascript - 无法使用 sortable 对数据进行排序

javascript - C3.js 折线图 - 轴标签问题

javascript - Node js 的 npm 操纵杆问题

node.js - 忽略调试器;调试 session 期间 Node.js 中的语句

knockout.js - Typescript 错误上下文 this

javascript 使用 Firefox/chrome 进行 WMI 查询?

javascript - 使用此代码,我在屏幕左下角收到 IE 页面加载错误

javascript - 从 Node.js 读取 Oracle 中的 BLOB