我有这个测试
const Rx = require('rx')
const fs = require('fs')
const {streamToRx} = require('rxjs-stream')
it('should not be infinite', done => {
let streamObservable = streamToRx(fs.createReadStream('/some/file.txt'));
Rx.Observable.of(1).flatMap(any => streamObservable)
// streamObservable
.map(any => 'file processed')
.subscribe(x => console.log('next', x), err => {
console.error(err)
done(err)
},
() => {
console.log('complete!')
done()
}
)
})
此测试超时 - 意味着流永远不会完成。但是,当我不像这样链接 flatMap 时:
const Rx = require('rx')
const fs = require('fs')
const {streamToRx} = require('rxjs-stream')
it('should not be infinite', done => {
let streamObservable = streamToRx(fs.createReadStream('/some/file.txt'));
// Rx.Observable.of(1).flatMap(any => streamObservable)
streamObservable
.map(any => 'file processed')
.subscribe(x => console.log('next', x), err => {
console.error(err)
done(err)
},
() => {
console.log('complete!')
done()
}
)
})
那么输出是:
next file processed
complete!
链接这些可观察量时我做错了什么?这似乎只有当第二个是使用 rxjs-stream 从流转换时才会发生。
最佳答案
我查看了 RxNode.fromStream
和 rxjs-stream.streamToRx
代码,RxNode.fromStream
创建并返回一个冷的 Observable而streamToRx
错误地返回了一个热门的Observable(在本例中是一个Subject)。
在您的示例中,createReadStream
因此在调用点执行并解析,因此后续订阅会订阅已解析的 fs.readStream
。简而言之,这是 streamToRx
中的一个错误。
关于javascript - flatMap 到streamToRx 永远不会完成,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49771969/