我正在写一个简单的 tcp 服务器,goroutine 模型非常简单:
一个goroutine负责接受新的连接;对于每个新连接,都会启动三个 goroutines:
- 一个供阅读
- 一个用于处理和处理应用程序逻辑
- 一个写
目前一台服务器将服务不超过 1000 个用户,所以我不尝试限制 goroutine 数量。
for {
conn, err := listener.Accept()
// ....
connHandler := connHandler{
conn: conn,
done: make(chan struct{}),
readChan: make(chan string, 100),
writeChan: make(chan string, 100),
}
// ....
go connHandler.readAll()
go connHandler.processAll()
go connHandler.writeAll()
}
我使用 done
channel 通知所有三个 channel 完成,当用户注销或发生永久性网络错误时,done
channel 将被关闭(使用 sync.Once 来确保关闭只发生一次):
func (connHandler *connHandler) Close() {
connHandler.doOnce.Do(func() {
connHandler.isClosed = true
close(connHandler.done)
})
}
下面是writeAll()
的代码方法:
func (connHandler *connHandler) writeAll() {
writer := bufio.NewWriter(connHandler.conn)
for {
select {
case <-connHandler.done:
connHandler.conn.Close()
return
case msg := <-connHandler.writeChan:
connHandler.writeOne(msg, writer)
}
}
}
有一个Send
通过将字符串发送到写入 channel 来向用户发送消息的方法:
func (connHandler *connHandler) Send(msg string) {
case connHandler.writeChan <- msg:
}
Send
方法将主要在 processAll()
中调用goroutine,但也存在于许多其他 goroutine 中,因为不同的用户需要相互通信。
现在的问题是:如果 userA 注销或网络出现故障,userB 向 userA 发送消息,userB 的 goroutine 可能会被永久阻塞,因为没有人会从该 channel 收到消息。
我的解决方案:
我的第一个想法是使用一个 bool 值来确保 connHanler 在发送给它时没有关闭:
func (connHandler *connHandler) Send(msg string) {
if !connHandler.isClosed {
connHandler.writeChan <- msg
}
}
但我认为connHandler.writeChan <- msg
和 close(done)
仍然可以同时发生,阻塞的可能性仍然存在。所以我必须添加一个超时:
func (connHandler *connHandler) Send(msg string) {
if !connHandler.isClosed {
timer := time.NewTimer(10 * time.Second)
defer timer.Stop()
select {
case connHandler.writeChan <- msg:
case <-timer.C:
log.Warning(connHandler.Addr() + " send msg timeout:" + msg)
}
}
}
现在我觉得代码是安全的,但也很丑陋,并且每次发送消息时都启动一个计时器感觉像是一种不必要的开销。
然后我看了这篇文章:https://go101.org/article/channel-closing.html ,我的问题看起来像文章中的第二个例子:
One receiver, N senders, the receiver says "please stop sending more" by closing an additional signal channel
但我认为这种解决方案并不能消除在我的情况下阻塞的可能性。
也许最简单的解决方案是关闭写入 channel 并让 Send
方法 panic ,然后使用 recover
处理 panic ?但这看起来也是一种丑陋的方式。
那么有没有一种简单直接的方法来完成我想做的事情呢?
(本人英文不好,如有歧义,请指出,谢谢。)
最佳答案
您的示例看起来很不错,我认为您已获得所需内容的 90%。
我认为您看到的问题在于发送,而您实际上可能已“完成”。
您可以使用“完成” channel 通知所有您已完成的围棋例程。您将始终能够从关闭的 channel 中读取一个值(它将是零值)。这意味着您可以更新您的 Send(msg)
方法以考虑完成 channel 。
func (connHandler *connHandler) Send(msg string) {
select {
case connHandler.writeChan <- msg:
case <- connHandler.done:
log.Debug("connHandler is done, exiting Send without sending.")
case <-time.After(10 * time.Second):
log.Warning(connHandler.Addr() + " send msg timeout:" + msg)
}
}
现在这个 select 中会发生的是以下之一:
- 信息发送到
writeChan
close(done)
已在别处调用,done chan 已关闭。您将能够从完成中读取,中断选择。- time.After(...) 将达到 10 秒,您将能够使发送超时。
关于go - 在我的代码中安全完成 goroutines 的正确方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51131240/