javascript - 如何将 Node 可读流转换为 RX 可观察流

标签 javascript node.js rxjs rxjs5

如果我有一个 Node js 流,例如来自 process.stdinfs.createReadStream,我如何将其转换为 RxJs Observable使用 RxJs5 流?

我看到了 RxJs-Node有一个 fromReadableStream 方法,但看起来它已经将近一年没有更新了。

最佳答案

对于任何正在寻找这个的人,根据 Mark 的建议,我为 rxjs5 改编了 rx-node fromStream 实现。

import { Observable } from 'rxjs';

// Adapted from https://github.com/Reactive-Extensions/rx-node/blob/87589c07be626c32c842bdafa782fca5924e749c/index.js#L52
export default function fromStream(stream, finishEventName = 'end', dataEventName = 'data') {
  stream.pause();

  return new Observable((observer) => {
    function dataHandler(data) {
      observer.next(data);
    }

    function errorHandler(err) {
      observer.error(err);
    }

    function endHandler() {
      observer.complete();
    }

    stream.addListener(dataEventName, dataHandler);
    stream.addListener('error', errorHandler);
    stream.addListener(finishEventName, endHandler);

    stream.resume();

    return () => {
      stream.removeListener(dataEventName, dataHandler);
      stream.removeListener('error', errorHandler);
      stream.removeListener(finishEventName, endHandler);
    };
  }).share();
}

请注意,它本质上破坏了流的所有背压功能。 Observables 是一种推送技术。所有输入 block 都将被读取并尽快推送给观察者。根据您的情况,它可能不是最佳解决方案。

关于javascript - 如何将 Node 可读流转换为 RX 可观察流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41537504/

相关文章:

javascript - JS : access from NEW inner object, 父容器创建者对象方法/变量

javascript - 将 Request 2.0 应用程序生成的请求发送到 "/me"之外的其他位置

javascript - Jquery Mobile 不透明度不起作用 :(

node.js - 以编程方式将文件从任何 CDN 传输到 Google Cloud 存储桶

javascript - 为什么 RxJS 中没有调用所有订阅者

javascript - scrollIntoView() 阻止其他监听器触发

node.js - Knex + SQL Server whereIn 查询 8-12 秒——原始版本不返回任何结果,但如果我直接输入 .toQuery() 结果,我会得到结果

node.js - 机器人对用户的响应迟缓

angular - 如何在 rxjs 中立即中止和 "resolve"订阅?

javascript - 如何轻松使用 API 响应,同时向响应添加一些额外的标志?