我有一个函数可以通过回调批量传输数据。
每个批处理将在获取另一个批处理之前等待回调函数,并且整个函数返回一个在所有批处理完成时解析的 promise 。
(我使用 TypeScript 注释来帮助提高可读性)
async function callbackStream(fn: (batch: Array<number>) => Promise<void>) {}
我如何将这个函数变成一个一次产生一个值的异步生成器?
async function* generatorStream(): AsyncIterableIterator<number> {}
事实证明,这是一项相当艰巨的任务。
我一直在研究这个问题,我构建了一些可行的东西,但它非常复杂,我无法证明合并这段代码并让我团队中的其他人处理它是合理的。
这是我当前的实现:
我正在使用这个创建“延迟” promise 的辅助函数,它有助于围绕回调传递 promise 。
interface DeferredPromise<T> {
resolve: (value: T) => void
reject: (error: any) => void
promise: Promise<T>
}
function deferred<T>(): DeferredPromise<T> {
let resolve
let reject
const promise = new Promise<T>((res, rej) => {
resolve = res
reject = rej
})
return {
resolve: resolve as (value: T) => void,
reject: reject as (error: any) => void,
promise,
}
}
接下来我有一个逻辑毛团,将 promise 回调线性化为一个链,其中每个 promise 使用 next 函数解析一个批处理,该函数将返回另一个获取下一个批处理的 promise。
type Done = { done: true }
type More = { done: false; value: Array<number>; next: () => Promise<Result> }
type Result = More | Done
async function chainedPromises() {
let deferred = PromiseUtils.deferred<Result>()
callbackStream(async batch => {
const next = PromiseUtils.deferred<null>()
deferred.resolve({
done: false,
value: batch,
next: () => {
deferred = PromiseUtils.deferred<Result>()
next.resolve(null)
return deferred.promise
},
})
await next.promise
}).then(() => {
deferred.resolve({ done: true })
})
return deferred.promise
}
从这里开始,创建一个一次生成一个项目的生成器并不是很困难:
async function* generatorStream(): AsyncIterableIterator<number> {
let next = chainedPromises
while (true) {
const result = await next()
if (result.done) {
return
}
for (const item of result.value) {
yield item
}
next = result.next
}
}
我想我们都同意中间的 chainedPromises
函数非常困惑和令人费解。 有什么方法可以将 callbackStream
转换为 generatorStream
且易于理解和遵循? 我不介意使用一个库,如果它建立得很好,但我也很欣赏从第一性原理出发的简单实现。
最佳答案
不,我不认为有一种易于理解和遵循的方式来实现这种转换。但是,我建议放弃 deferred
(无论如何你永远不会 reject
ing)并只使用 promise 构造函数。另外,我宁愿立即实现一个异步生成器。
function queue() {
let resolve = () => {};
const q = {
put() {
resolve();
q.promise = new Promise(r => { resolve = r; });
},
promise: null,
}
q.put(); // generate first promise
return q;
}
function toAsyncIterator(callbackStream) {
const query = queue();
const result = queue();
const end = callbackStream(batch => {
result.put(batch);
return query.promise;
}).then(value => ({value, done: true}));
end.catch(e => void e); // prevent unhandled promise rejection warnings
return {
[Symbol.asyncIterator]() { return this; },
next(x) {
query.put(x);
return Promise.race([
end,
result.promise.then(value => ({value, done:false})
]);
}
}
}
async function* batchToAsyncIterator(batchCallbackStream) {
for await (const batch of toAsyncIterator(batchCallbackStream)) {
// for (const val of batch) yield val;
// or simpler:
yield* batch;
}
}
关于javascript - 如何将 Node.js 异步流回调转换为异步生成器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50862698/