javascript - 双向流转换器的设计模式

标签 javascript node.js design-patterns bidirectional

我想将网络协议(protocol)实现对象设计为完全与套接字无关,并纯粹充当双向转换器。因此,协议(protocol)对象应该从“控制”端输入对象或命令,并从“网络”端发出字节,并接受来自“网络”端的字节以转换为对象/响应并从“控制”端发出。

我无法选择优雅的设计模式来在 Node.js 中执行此操作。我希望它完全兼容 Stream,到目前为止我最终采用了这种方法:

socket = getSocketSomehow();
proto = new Protocol();

socket.pipe(proto.aux);
proto.aux.pipe(socket);

proto.write({ foo: 'this', bar: ['is', 'command'] });
proto.once('data', function(response) {
    console.log('this is response: ' + response.quux);
});

proto 是两个交叉连接的双工流的集合,它本身是 stream.Duplex 以及 aux。传入的网络数据进入 proto.aux,然后从 proto 解析并作为对象发出。传入对象转到 proto 并组合成字节并从 proto.aux 发出。

有更好的方法来做同样的事情吗?

最佳答案

我以以下方法结束。代码示例采用 CoffeeScript 格式,以提高可读性。

<小时/>

Bond 类实现 Duplex 流接口(interface),但将两个不相关的流绑定(bind)在一起,因此读取和写入被代理到单独的流。

'use strict'

{ EventEmitter } = require 'events'

class Bond extends EventEmitter
    proxyReadableMethod = (method) =>
        @::[method] = -> @_bondState.readable[method] arguments...

    proxyWritableMethod = (method) =>
        @::[method] = -> @_bondState.writable[method] arguments...

    proxyReadableMethod 'read'
    proxyReadableMethod 'setEncoding'
    proxyReadableMethod 'resume'
    proxyReadableMethod 'pause'
    proxyReadableMethod 'pipe'
    proxyReadableMethod 'unpipe'
    proxyReadableMethod 'unshift'
    proxyReadableMethod 'wrap'

    proxyWritableMethod 'write'
    proxyWritableMethod 'end'

    constructor: (readable, writable) ->
        super

        @_bondState = {}
        @_bondState.readable = readable
        @_bondState.writable = writable

        proxyEvent = (obj, event) =>
            obj.on event, => @emit event, arguments...

        proxyEvent readable, 'readable'
        proxyEvent readable, 'data'
        proxyEvent readable, 'end'
        proxyEvent readable, 'close'
        # proxyEvent readable, 'error'

        proxyEvent writable, 'drain'
        proxyEvent writable, 'finish'
        proxyEvent writable, 'pipe'
        proxyEvent writable, 'unpipe'
        # proxyEvent writable, 'error'

module.exports = Bond
<小时/>

Protocol 聚合了两个内部 Transform 流 — ParserComposerParser 获取来自 aux 端的数据,并将其转换为来自 ctl 端的数据,而 Composer 则执行相反的操作。 auxctl 都是解析器和编译器的纽带,但方向不同 - 因此 aux 仅处理传入和传出的“组合”数据,而 ctl 端发出并接受“已解析”数据。我的设计决定是通过 Protocol 本身公开 ctl,并且 aux 作为实例变量可见。

协议(protocol)公开:

  • _parse_compose 作为 _transform 类方法
  • _parseEnd_composeEnd 作为 _flush 类方法
  • 解析组合为类似push的方法
  • unparseuncompose 作为类似于 unshift 的方法
'use strict'

Bond = require './bond'
BacklogTransform = require './backlog-transform'

class Protocol extends Bond
    constructor: (options) ->
        @_protocolState = {}
        @_protocolState.options = options
        parser = @_protocolState.parser = new ParserTransform @
        composer = @_protocolState.composer = new ComposerTransform @

        parser.__name = 'parser'
        composer.__name = 'composer'

        proxyEvent = (source, event) =>
            source.on event, =>
                @emit event, arguments...

        proxyParserEvent = (event) =>
            proxyEvent @_protocolState.parser, event

        proxyComposerEvent = (event) =>
            proxyEvent @_protocolState.composer, event

        proxyParserEvent 'error'
        proxyComposerEvent 'error'

        super @_protocolState.parser, @_protocolState.composer
        @aux = @_protocolState.aux = new Bond @_protocolState.composer, @_protocolState.parser
        # @_protocolState.main = @main = new Bond @_protocolState.parser, @_protocolState.composer

    parsed: (chunk, encoding) ->
        @_protocolState.parser.push chunk, encoding

    composed: (chunk, encoding) ->
        @_protocolState.composer.push chunk, encoding

    unparse: (chunk, encoding) ->
        @_protocolState.parser.unshift chunk, encoding

    uncompose: (chunk, encoding) ->
        @_protocolState.composer.unshift chunk, encoding

    #
    _parse: (chunk, encoding, callback) ->
        throw new TypeError 'not implemented'

    _compose: (chunk, encoding, callback) ->
        throw new TypeError 'not implemented'

    _parseEnd: (callback) ->
        callback()

    _composeEnd: (callback) ->
        callback()

class ParserTransform extends BacklogTransform
    constructor: (@protocol) ->
        options = @protocol._protocolState.options
        super options, options.auxObjectMode, options.mainObjectMode

    __transform: (chunk, encoding, callback) ->
        @protocol._parse chunk, encoding, callback

    __flush: (callback) ->
        @protocol._parseEnd callback

class ComposerTransform extends BacklogTransform
    constructor: (@protocol) ->
        options = @protocol._protocolState.options
        super options, options.mainObjectMode, options.auxObjectMode

    __transform: (chunk, encoding, callback) ->
        @protocol._compose chunk, encoding, callback

    __flush: (callback) ->
        @protocol._composeEnd callback

module.exports = Protocol
<小时/>

BacklogTransform 是实用程序类,扩展了 Transform 流,能够通过在 _transform 期间调用 unshift 方法将未转换的 block 移回到队列中,因此未移位的数据将出现在下一个 _transform 上,并添加到新 block 之前。不幸的是,实现并不像我希望的那么理想......

'use strict'

async = require 'async'
stream = require 'stream'

class BacklogTransform extends stream.Transform
    constructor: (options, writableObjectMode, readableObjectMode) ->
        options ?= {}

        super options
        @_writableState.objectMode = writableObjectMode ? options.writableObjectMode
        @_readableState.objectMode = readableObjectMode ? options.readableObjectMode
        @_backlogTransformState = {}
        @_backlogTransformState.backlog = []

    unshift: (chunk, encoding = null) ->
        if @_writableState.decodeStrings
            chunk = new Buffer chunk, encoding ? @_writableState.defaultEncoding

        @_backlogTransformState.backlog.unshift { chunk, encoding }

    _flushBacklog: (callback) ->
        backlog = @_backlogTransformState.backlog

        if backlog.length
            if @_writableState.objectMode
                async.forever(
                    (next) =>
                        return next {} if not backlog.length

                        { chunk, encoding } = backlog.shift()
                        @__transform chunk, encoding, (err) ->
                            return next { err } if err?

                            next null

                    ({ err }) ->
                        return callback err if err?

                        callback()
                )
            else
                chunks = (chunk for { chunk, encoding } in backlog)

                if @_writableState.decodeStrings
                    encoding = 'buffer'
                    chunk = Buffer.concat chunks
                else
                    encoding = backlog[0].encoding
                    for item in backlog[1..]
                        if encoding != item.encoding
                            encoding = null
                            break

                    chunk = chunks.join ''

                @_backlogTransformState.backlog = []
                @__transform chunk, encoding, callback
        else
            callback()

    _transform: (chunk, encoding, callback) ->
        backlog = @_backlogTransformState.backlog

        if backlog.length
            backlog.push { chunk, encoding }

            @_flushBacklog callback
        else
            @__transform chunk, encoding, callback

    _flush: (callback) ->
        @_flushBacklog =>
            @__flush callback

    __transform: (chunk, encoding, callback) ->
        throw new TypeError 'not implemented'

    __flush: (callback) ->
        callback()

module.exports = BacklogTransform

关于javascript - 双向流转换器的设计模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24460963/

相关文章:

javascript - 无需刷新即可运行 PHP 脚本的最简单方法

javascript - 跟踪溢出内容上的鼠标移动

javascript - Mongoose:如何填充嵌套数组(深层)

node.js - 如何在 Express.js 中设置身份验证中间件

node.js - 如何使用 Google Datastore 查询 "array of objects"(多个过滤器)

java - 如何在 Java 项目中构建我的类

.net - 为什么在 Dispose() 方法中隐式提交工作单元不好?

javascript - 在表中添加和删除行 [with input field]

javascript - 我的 jQuery 图像预加载器功能可以工作吗?

java - 验证来自控制台的用户输入。命令是否有效等