我有一台服务器通过 TCP 向我发送消息,其中前 4 个字节决定消息其余部分的长度。所以我需要
1) 将 4 个字节读入 UInt32(有效)并将其存储到 bytes_expected
2) 将 bytes_expected 字节读入消息
现在我的代码是这样的:
private let inputStreamAccessQueue = DispatchQueue(label: "SynchronizedInputStreamAccess")
func inputStreamHandler(_ event: Stream.Event) {
switch event {
case Stream.Event.hasBytesAvailable:
self.handleInput()
...
}
}
func handleInput() {
// **QUESTION: Do I use this barrier wrong?**
self.inputStreamAccessQueue.sync(flags: .barrier) {
guard let istr = self.inputStream else {
log.error(self.buildLogMessage("InputStream is nil"))
return
}
guard istr.hasBytesAvailable else {
log.error(self.buildLogMessage("handleInput() called when inputstream has no bytes available"))
return
}
let lengthbuffer = UnsafeMutablePointer<UInt8>.allocate(capacity: 4)
defer { lengthbuffer.deallocate(capacity: 4) }
let lenbytes_read = istr.read(lengthbuffer, maxLength: 4)
guard lenbytes_read == 4 else {
self.errorHandler(NetworkingError.InputError("Input Stream received \(lenbytes_read) (!=4) bytes"))
return
}
let bytes_expected = Int(UnsafeRawPointer(lengthbuffer).load(as: UInt32.self).bigEndian)
log.info(self.buildLogMessage("expect \(bytes_expected) bytes"))
print("::DEBUG", call, "bytes_expected", bytes_expected)
var message = ""
var bytes_missing = bytes_expected
while bytes_missing > 0 {
//print("::DEBUG", call, "bytes_missing", bytes_missing)
let buffer = UnsafeMutablePointer<UInt8>.allocate(capacity: bytes_missing)
let bytes_read = istr.read(buffer, maxLength: bytes_missing)
print("::DEBUG", call, "bytes_read", bytes_read)
guard bytes_read > 0 else {
print("bytes_read not > 0: \(bytes_read)")
return
}
guard bytes_read <= bytes_missing else {
print("Read more bytes than expected. missing=\(bytes_missing), read=\(bytes_read)")
return
}
guard let partial_message = String(bytesNoCopy: buffer, length: bytes_read, encoding: .utf8, freeWhenDone: true) else {
log.error("ERROR WHEN READING")
return
}
message = message + partial_message
bytes_missing -= bytes_read
}
self.handleMessage(message)
}
}
我的问题是 istr.read(buffer, maxLength: bytes_missing) 有时不会一次读取所有消息,所以我循环直到我读完所有我想要的。但我仍然看到我的应用程序崩溃(很少),因为 handleInput() 被再次调用,而对该方法的另一个调用仍在运行。在这种情况下,bytes_expected 包含随机值,应用程序由于非法内存分配而崩溃。
我想我可以通过使用屏障来避免这种情况。但这似乎行不通……我是不是用错了屏障?
最佳答案
我的建议是不要反对网络 I/O 的异步性质。
每当 Stream.Event.hasBytesAvailable
事件发生时,读取并收集缓冲区中的数据
发出信号。如果缓冲区包含足够的数据(4 个长度字节加上
预期的消息长度)然后处理数据并将其删除。否则什么也不做
并等待更多数据。
以下(未经测试的)代码仅作为演示。 它仅显示与该特定问题相关的部分。 为简洁起见,省略了初始化、事件处理程序等。
class MessageReader {
var buffer = Data(count: 1024) // Must be large enough for largest message + 4
var bytesRead = 0 // Number of bytes read so far
// Called from `handleInput` with a complete message.
func processMessage(message: Data) {
// ...
}
// Called from event handler if `Stream.Event.hasBytesAvailable` is signalled.
func handleInput(istr: InputStream) {
assert(bytesRead < buffer.count)
// Read from input stream, appending to previously read data:
let maxRead = buffer.count - bytesRead
let amount = buffer.withUnsafeMutableBytes { (p: UnsafeMutablePointer<UInt8>) in
istr.read(p + bytesRead, maxLength: maxRead)
}
guard amount > 0 else {
// handle EOF or read error ...
fatalError()
}
bytesRead += amount
while bytesRead >= 4 {
// Read message size:
let messageSize = buffer.withUnsafeBytes { (p: UnsafePointer<UInt32>) in
Int(UInt32(bigEndian: p.pointee))
}
let totalSize = 4 + messageSize
guard totalSize <= buffer.count else {
// Handle buffer too small for message situation ...
fatalError()
}
if bytesRead < totalSize {
break // Not enough data to read message.
}
// Buffer contains complete message now. Process it ...
processMessage(message: buffer[4 ..< totalSize])
// ... and remove it from the buffer:
if totalSize < bytesRead {
// Move remaining data to the front:
buffer.withUnsafeMutableBytes { (p: UnsafeMutablePointer<UInt8>) in
_ = memmove(p, p + totalSize, bytesRead - totalSize)
}
}
bytesRead -= totalSize
}
}
}
关于ios - 在 Swift 4 中从 InputStream 中准确读取 n 个字节,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48340728/