go - 在我的代码中安全完成 goroutines 的正确方法是什么?

标签 go channel goroutine

我正在写一个简单的 tcp 服务器,goroutine 模型非常简单:

一个goroutine负责接受新的连接;对于每个新连接,都会启动三个 goroutines:

  1. 一个供阅读
  2. 一个用于处理和处理应用程序逻辑
  3. 一个写

目前一台服务器将服务不超过 1000 个用户,所以我不尝试限制 goroutine 数量。

for {
    conn, err := listener.Accept()
    // ....
    connHandler := connHandler{
        conn:      conn,
        done:      make(chan struct{}),
        readChan:  make(chan string, 100),
        writeChan: make(chan string, 100),
    }
    // ....
    go connHandler.readAll()    
    go connHandler.processAll() 
    go connHandler.writeAll()   
}

我使用 done channel 通知所有三个 channel 完成,当用户注销或发生永久性网络错误时,done channel 将被关闭(使用 sync.Once 来确保关闭只发生一次):

func (connHandler *connHandler) Close() {
    connHandler.doOnce.Do(func() {
        connHandler.isClosed = true
        close(connHandler.done)
    })
}

下面是writeAll()的代码方法:

func (connHandler *connHandler) writeAll() {
    writer := bufio.NewWriter(connHandler.conn)

    for {
        select {
        case <-connHandler.done:
            connHandler.conn.Close()
            return
        case msg := <-connHandler.writeChan:
            connHandler.writeOne(msg, writer)
        }
    }
}

有一个Send通过将字符串发送到写入 channel 来向用户发送消息的方法:

func (connHandler *connHandler) Send(msg string) {
    case connHandler.writeChan <- msg:
}

Send方法将主要在 processAll() 中调用goroutine,但也存在于许多其他 goroutine 中,因为不同的用户需要相互通信。

现在的问题是:如果 userA 注销或网络出现故障,userB 向 userA 发送消息,userB 的 goroutine 可能会被永久阻塞,因为没有人会从该 channel 收到消息。

我的解决方案:

我的第一个想法是使用一个 bool 值来确保 connHanler 在发送给它时没有关闭:

func (connHandler *connHandler) Send(msg string) {
    if !connHandler.isClosed {
        connHandler.writeChan <- msg
    }
}

但我认为connHandler.writeChan <- msgclose(done)仍然可以同时发生,阻塞的可能性仍然存在。所以我必须添加一个超时:

func (connHandler *connHandler) Send(msg string) {
    if !connHandler.isClosed {
        timer := time.NewTimer(10 * time.Second)
        defer timer.Stop()
        select {
        case connHandler.writeChan <- msg:
        case <-timer.C:
            log.Warning(connHandler.Addr() + " send msg timeout:" + msg)
        }
    }

}

现在我觉得代码是安全的,但也很丑陋,并且每次发送消息时都启动一个计时器感觉像是一种不必要的开销。

然后我看了这篇文章:https://go101.org/article/channel-closing.html ,我的问题看起来像文章中的第二个例子:

One receiver, N senders, the receiver says "please stop sending more" by closing an additional signal channel

但我认为这种解决方案并不能消除在我的情况下阻塞的可能性。

也许最简单的解决方案是关闭写入 channel 并让 Send方法 panic ,然后使用 recover处理 panic ?但这看起来也是一种丑陋的方式。

那么有没有一种简单直接的方法来完成我想做的事情呢?

(本人英文不好,如有歧义,请指出,谢谢。)

最佳答案

您的示例看起来很不错,我认为您已获得所需内容的 90%。

我认为您看到的问题在于发送,而您实际上可能已“完成”。

您可以使用“完成” channel 通知所有您已完成的围棋例程。您将始终能够从关闭的 channel 中读取一个值(它将是零值)。这意味着您可以更新您的 Send(msg) 方法以考虑完成 channel 。

func (connHandler *connHandler) Send(msg string) {
    select {
    case connHandler.writeChan <- msg:
    case <- connHandler.done:
        log.Debug("connHandler is done, exiting Send without sending.")
    case <-time.After(10 * time.Second):
        log.Warning(connHandler.Addr() + " send msg timeout:" + msg)
    }
}

现在这个 select 中会发生的是以下之一:

  1. 信息发送到writeChan
  2. close(done) 已在别处调用,done chan 已关闭。您将能够从完成中读取,中断选择。
  3. time.After(...) 将达到 10 秒,您将能够使发送超时。

关于go - 在我的代码中安全完成 goroutines 的正确方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51131240/

相关文章:

c - 如何使用指向 C 数据的指针写入 Golang 中的文件?

go - 如何在 redisearch-go (Golang) 中向 redigo 客户端提供密码

go - go channels 在被阻塞时会保留顺序吗?

multithreading - 为什么竞争检测器没有检测到这种竞争条件?

Goroutine 从 main 返回后没有执行完。为什么?

postgresql - 如何使用 cgo 构建 Postgres 扩展

go - Golang 中的 echo 命令

go - channel 关闭时收到的 bool 标志与Golang中的预期不符

concurrency - 使select语句同时等待多个 channel

go - 第一个协程示例,奇怪的结果