如果我有一个 Node js 流,例如来自 process.stdin
或 fs.createReadStream
,我如何将其转换为 RxJs Observable使用 RxJs5 流?
我看到了 RxJs-Node有一个 fromReadableStream
方法,但看起来它已经将近一年没有更新了。
最佳答案
对于任何正在寻找这个的人,根据 Mark 的建议,我为 rxjs5 改编了 rx-node fromStream
实现。
import { Observable } from 'rxjs';
// Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
export default function fromStream(stream, finishEventName = 'end', dataEventName = 'data') {
stream.pause();
return new Observable((observer) => {
function dataHandler(data) {
observer.next(data);
}
function errorHandler(err) {
observer.error(err);
}
function endHandler() {
observer.complete();
}
stream.addListener(dataEventName, dataHandler);
stream.addListener('error', errorHandler);
stream.addListener(finishEventName, endHandler);
stream.resume();
return () => {
stream.removeListener(dataEventName, dataHandler);
stream.removeListener('error', errorHandler);
stream.removeListener(finishEventName, endHandler);
};
}).share();
}
请注意,它本质上破坏了流的所有背压功能。 Observables 是一种推送技术。所有输入 block 都将被读取并尽快推送给观察者。根据您的情况,它可能不是最佳解决方案。
关于javascript - 如何将 Node 可读流转换为 RX 可观察流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41537504/