我想将网络协议(protocol)实现对象设计为完全与套接字无关,并纯粹充当双向转换器。因此,协议(protocol)对象应该从“控制”端输入对象或命令,并从“网络”端发出字节,并接受来自“网络”端的字节以转换为对象/响应并从“控制”端发出。
我无法选择优雅的设计模式来在 Node.js 中执行此操作。我希望它完全兼容 Stream
,到目前为止我最终采用了这种方法:
socket = getSocketSomehow();
proto = new Protocol();
socket.pipe(proto.aux);
proto.aux.pipe(socket);
proto.write({ foo: 'this', bar: ['is', 'command'] });
proto.once('data', function(response) {
console.log('this is response: ' + response.quux);
});
proto
是两个交叉连接的双工流的集合,它本身是 stream.Duplex
以及 aux
。传入的网络数据进入 proto.aux
,然后从 proto
解析并作为对象发出。传入对象转到 proto
并组合成字节并从 proto.aux
发出。
有更好的方法来做同样的事情吗?
最佳答案
我以以下方法结束。代码示例采用 CoffeeScript 格式,以提高可读性。
<小时/>Bond
类实现 Duplex
流接口(interface),但将两个不相关的流绑定(bind)在一起,因此读取和写入被代理到单独的流。
'use strict'
{ EventEmitter } = require 'events'
class Bond extends EventEmitter
proxyReadableMethod = (method) =>
@::[method] = -> @_bondState.readable[method] arguments...
proxyWritableMethod = (method) =>
@::[method] = -> @_bondState.writable[method] arguments...
proxyReadableMethod 'read'
proxyReadableMethod 'setEncoding'
proxyReadableMethod 'resume'
proxyReadableMethod 'pause'
proxyReadableMethod 'pipe'
proxyReadableMethod 'unpipe'
proxyReadableMethod 'unshift'
proxyReadableMethod 'wrap'
proxyWritableMethod 'write'
proxyWritableMethod 'end'
constructor: (readable, writable) ->
super
@_bondState = {}
@_bondState.readable = readable
@_bondState.writable = writable
proxyEvent = (obj, event) =>
obj.on event, => @emit event, arguments...
proxyEvent readable, 'readable'
proxyEvent readable, 'data'
proxyEvent readable, 'end'
proxyEvent readable, 'close'
# proxyEvent readable, 'error'
proxyEvent writable, 'drain'
proxyEvent writable, 'finish'
proxyEvent writable, 'pipe'
proxyEvent writable, 'unpipe'
# proxyEvent writable, 'error'
module.exports = Bond
<小时/>
Protocol
聚合了两个内部 Transform
流 — Parser
和 Composer
。 Parser
获取来自 aux
端的数据,并将其转换为来自 ctl
端的数据,而 Composer
则执行相反的操作。 aux
和 ctl
都是解析器和编译器的纽带,但方向不同 - 因此 aux
仅处理传入和传出的“组合”数据,而 ctl
端发出并接受“已解析”数据。我的设计决定是通过 Protocol
本身公开 ctl
,并且 aux
作为实例变量可见。
协议(protocol)
公开:
_parse
、_compose
作为_transform
类方法_parseEnd
、_composeEnd
作为_flush
类方法解析
、组合
为类似push
的方法unparse
、uncompose
作为类似于unshift
的方法
'use strict'
Bond = require './bond'
BacklogTransform = require './backlog-transform'
class Protocol extends Bond
constructor: (options) ->
@_protocolState = {}
@_protocolState.options = options
parser = @_protocolState.parser = new ParserTransform @
composer = @_protocolState.composer = new ComposerTransform @
parser.__name = 'parser'
composer.__name = 'composer'
proxyEvent = (source, event) =>
source.on event, =>
@emit event, arguments...
proxyParserEvent = (event) =>
proxyEvent @_protocolState.parser, event
proxyComposerEvent = (event) =>
proxyEvent @_protocolState.composer, event
proxyParserEvent 'error'
proxyComposerEvent 'error'
super @_protocolState.parser, @_protocolState.composer
@aux = @_protocolState.aux = new Bond @_protocolState.composer, @_protocolState.parser
# @_protocolState.main = @main = new Bond @_protocolState.parser, @_protocolState.composer
parsed: (chunk, encoding) ->
@_protocolState.parser.push chunk, encoding
composed: (chunk, encoding) ->
@_protocolState.composer.push chunk, encoding
unparse: (chunk, encoding) ->
@_protocolState.parser.unshift chunk, encoding
uncompose: (chunk, encoding) ->
@_protocolState.composer.unshift chunk, encoding
#
_parse: (chunk, encoding, callback) ->
throw new TypeError 'not implemented'
_compose: (chunk, encoding, callback) ->
throw new TypeError 'not implemented'
_parseEnd: (callback) ->
callback()
_composeEnd: (callback) ->
callback()
class ParserTransform extends BacklogTransform
constructor: (@protocol) ->
options = @protocol._protocolState.options
super options, options.auxObjectMode, options.mainObjectMode
__transform: (chunk, encoding, callback) ->
@protocol._parse chunk, encoding, callback
__flush: (callback) ->
@protocol._parseEnd callback
class ComposerTransform extends BacklogTransform
constructor: (@protocol) ->
options = @protocol._protocolState.options
super options, options.mainObjectMode, options.auxObjectMode
__transform: (chunk, encoding, callback) ->
@protocol._compose chunk, encoding, callback
__flush: (callback) ->
@protocol._composeEnd callback
module.exports = Protocol
<小时/>
BacklogTransform
是实用程序类,扩展了 Transform
流,能够通过在 _transform
期间调用 unshift
方法将未转换的 block 移回到队列中,因此未移位的数据将出现在下一个 _transform
上,并添加到新 block 之前。不幸的是,实现并不像我希望的那么理想......
'use strict'
async = require 'async'
stream = require 'stream'
class BacklogTransform extends stream.Transform
constructor: (options, writableObjectMode, readableObjectMode) ->
options ?= {}
super options
@_writableState.objectMode = writableObjectMode ? options.writableObjectMode
@_readableState.objectMode = readableObjectMode ? options.readableObjectMode
@_backlogTransformState = {}
@_backlogTransformState.backlog = []
unshift: (chunk, encoding = null) ->
if @_writableState.decodeStrings
chunk = new Buffer chunk, encoding ? @_writableState.defaultEncoding
@_backlogTransformState.backlog.unshift { chunk, encoding }
_flushBacklog: (callback) ->
backlog = @_backlogTransformState.backlog
if backlog.length
if @_writableState.objectMode
async.forever(
(next) =>
return next {} if not backlog.length
{ chunk, encoding } = backlog.shift()
@__transform chunk, encoding, (err) ->
return next { err } if err?
next null
({ err }) ->
return callback err if err?
callback()
)
else
chunks = (chunk for { chunk, encoding } in backlog)
if @_writableState.decodeStrings
encoding = 'buffer'
chunk = Buffer.concat chunks
else
encoding = backlog[0].encoding
for item in backlog[1..]
if encoding != item.encoding
encoding = null
break
chunk = chunks.join ''
@_backlogTransformState.backlog = []
@__transform chunk, encoding, callback
else
callback()
_transform: (chunk, encoding, callback) ->
backlog = @_backlogTransformState.backlog
if backlog.length
backlog.push { chunk, encoding }
@_flushBacklog callback
else
@__transform chunk, encoding, callback
_flush: (callback) ->
@_flushBacklog =>
@__flush callback
__transform: (chunk, encoding, callback) ->
throw new TypeError 'not implemented'
__flush: (callback) ->
callback()
module.exports = BacklogTransform
关于javascript - 双向流转换器的设计模式,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24460963/