node.js - Nodejs 中组合多个可读流的策略

标签 node.js nodejs-stream

我正在尝试解决 Nodejs 流挑战。我已多次阅读有关流的 Node 文档,并实现了不同的尝试来解决该挑战。尝试使用双工、转换、可读和可写:)

我有多个 HTTP 可读流,目标是将数据发送到单个管道,并使用背压工作。我认为这张图有助于解释这一挑战:

enter image description here

更新(2017 年 9 月 13 日)。再次阅读文档后,我正在实现自定义写入的双工流。

最佳答案

这代表了双工流的一个很好的用例,与 HTTP 流的手动流控制相结合。

我编写了一个自定义双工流,其中可读和可写部分的结构如下:

enter image description here

如果您对双工流的具体代码感兴趣,请给我发私信。

代码可能看起来像这样(但它相当旧,并且可能会进一步简化):

import 'rxjs/add/operator/skip';
import 'rxjs/add/operator/take';
import { BehaviorSubject } from 'rxjs/BehaviorSubject';
import * as stream from 'stream';
import { logger, streamInspector } from '../shared';

export class DuplexStreamLinker extends stream.Duplex {
    public readCount: number = 0;
    public acceptDataCount: number = 0;
    public acceptData$: BehaviorSubject<boolean>;
    public streamName: string;

    constructor(options) {
        super(options);
        this.streamName = this.constructor.name;
        this.acceptData$ = new BehaviorSubject(false);
        streamInspector(this, this.constructor.name);
    }

    public _read(size) {
        this.readCount++;
        this.acceptData$.next(true);
    }

    public _write(chunk, encoding, cb) {
        const acceptData = this.acceptData$.getValue();
        if (acceptData) {
            cb(this.pushData(chunk));
        } else {
            this.acceptData$.skip(1).take(1).subscribe(() => {
                logger.silly('I dont fire...');
                this.acceptDataCount++;
                cb(this.pushData(chunk));
            });
        }
    }

    public endReadableStream() {
        logger.debug('DuplexStreamLinker@endReadableStream was called!');
        this.end();
        this.push(null);
    }

    public _final(cb) {
        logger.debug('DuplexStreamLinker@_final was called!');
        cb(null);
    }

    private pushData(chunk): null | Error {
        const ok = this.push(chunk);
        if (ok === false) { this.acceptData$.next(false); }
        return null;
    }

}

关于node.js - Nodejs 中组合多个可读流的策略,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46182203/

相关文章:

node.js - 我可以在 Jenkins 上运行多个 Node 版本吗?

node.js - AWS Lambda 数据库连接问题[更新的问题]

javascript - 如何读取 javascript 数组并将其重新格式化为 JSON 对象?

node.js - 在http中发送和接收一个大的json对象

node.js - 使用 nodejs 将直播流数据包发送到 youtube?

javascript - 如何将查询翻译为 Sequelize ?

node.js - firebase deploy 没有找到与模式 "' src/**/*'"匹配的文件

javascript - 如何获取 Node JS Stream 中数据的大小

node.js - 如何为 MongoDB 集合中的文档选择单个字段?

node.js - 如何使用 Node 强大的 fileWriteStreamHandler 组装流管道?