javascript - flatMap 到streamToRx 永远不会完成

标签 javascript node.js rxjs

我有这个测试

const Rx = require('rx')
const fs = require('fs')
const {streamToRx} = require('rxjs-stream')

it('should not be infinite', done => {
  let streamObservable = streamToRx(fs.createReadStream('/some/file.txt'));
  Rx.Observable.of(1).flatMap(any => streamObservable)
  // streamObservable
    .map(any => 'file processed')
    .subscribe(x => console.log('next', x), err => {
        console.error(err)
        done(err)
      },
      () => {
        console.log('complete!')
        done()
      }
    )
})

此测试超时 - 意味着流永远不会完成。但是,当我不像这样链接 flatMap 时:

const Rx = require('rx')
const fs = require('fs')
const {streamToRx} = require('rxjs-stream')

it('should not be infinite', done => {
  let streamObservable = streamToRx(fs.createReadStream('/some/file.txt'));
  // Rx.Observable.of(1).flatMap(any => streamObservable)
  streamObservable
    .map(any => 'file processed')
    .subscribe(x => console.log('next', x), err => {
        console.error(err)
        done(err)
      },
      () => {
        console.log('complete!')
        done()
      }
    )
})

那么输出是:

next file processed
complete!

链接这些可观察量时我做错了什么?这似乎只有当第二个是使用 rxjs-stream 从流转换时才会发生。

最佳答案

我查看了 RxNode.fromStreamrxjs-stream.streamToRx 代码,RxNode.fromStream 创建并返回一个冷的 Observable而streamToRx错误地返回了一个热门的Observable(在本例中是一个Subject)。

在您的示例中,createReadStream 因此在调用点执行并解析,因此后续订阅会订阅已解析的 fs.readStream。简而言之,这是 streamToRx 中的一个错误。

关于javascript - flatMap 到streamToRx 永远不会完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49771969/

相关文章:

javascript - 如何在 Node.js Nginx 中允许 http 请求?

javascript - 替换字符串中的 HTML 实体,避免使用 <img> 标签

php - Heroku 上的基本 Node.js 应用程序 - 免费发送电子邮件的方式?

typescript - 如何从字符串输入中使用 RxJs 5 的主题和去抖动

javascript - 如何将 Observable 转换为 FirebaseObjectObservable

javascript - 如何在 JavaScript 中创建配分函数。使用以下指南

javascript - 为什么我的文字不闪烁?

node.js - 如何从 Node 应用程序打开 ec2 上的 http 流量端口?

javascript - 从 busboy 管道流到请求帖子

angular - 使用Rxjs映射函数时如何保留对象属性大小写