go - 客户端断开连接时如何正确使用 ctx.Done()?

标签 go grpc channel

如果客户端因网络错误而断开连接,在我的情况下服务器必须关闭 pub/sub 连接。我知道 ctx.Done() 函数,但不知道如何在我的案例中正确使用它。有人可以解释一下吗?

grpc-go: 1.7.0


func (a *API) Notifications(in *empty.Empty, stream pb.Service_NotificationsServer) error {
    ctx := stream.Context()
    _, ok := user.FromContext(ctx)
    if !ok {
        return grpc.Errorf(codes.Unauthenticated, "user not found")

    pubsub := a.redisClient.Subscribe("notifications")
    defer pubsub.Close()

    for {
        msg, err := pubsub.ReceiveMessage()
        if err != nil {
            grpclog.Warningf("Notifications: pubsub error: %v", err)
            return grpc.Errorf(codes.Internal, "pubsub error %v", err)

        notification := &pb.Notification{}
        err = json.Unmarshal([]byte(msg.Payload), notification)
        if err != nil {
            grpclog.Warningf("Notifications: parse error: %v", err)
        if err := stream.Send(notification); err != nil {
            grpclog.Warningf("Notifications: %v", err)
            return err
        grpclog.Infof("Notifications: send msg %v", notification)


您可以使用选择。不是从函数中正常获取数据,而是使用 channel 获取数据并使用 go 例程来处理数据。 像这样的事情:

func (a *API) Notifications(in *empty.Empty, stream 
    pb.Service_NotificationsServer) error {
    ctx := stream.Context()
    _, ok := user.FromContext(ctx)
    if !ok {
        return grpc.Errorf(codes.Unauthenticated, "user not found")

    pubsub := a.redisClient.Subscribe("notifications")
    defer pubsub.Close()

    // I can not build the code, so I assume the msg in your code Message struct
    c := make(chan Message)
    go func() {
        for {
            msg, err := pubsub.ReceiveMessage()
            if err != nil {
                grpclog.Warningf("Notifications: pubsub error: %v", err)
                return grpc.Errorf(codes.Internal, "pubsub error %v", err)
            c<- msg

    for {
        select {
            case msg, ok  := <-c:
                if !ok {
                    // channel is closed handle it
                notification := &pb.Notification{}
                err = json.Unmarshal([]byte(msg.Payload), notification)
                if err != nil {
                    grpclog.Warningf("Notifications: parse error: %v", err)
                if err := stream.Send(notification); err != nil {
                    grpclog.Warningf("Notifications: %v", err)
                    return err
                grpclog.Infof("Notifications: send msg %v", notification)
            case <- ctx.Done():
                // do exit logic. some how close the pubsub, so next 
                // ReceiveMessage() return an error
                // if forget to do that the go routine runs for ever 
                // until the end of main(), which I think its not what you wanted
                pubsub.Close() // Its just pseudo code

从 channel 读取消息(我假设类型是消息),并使用 select 的力量。


  1. 确保 go 例程在完成此函数后结束。我无法猜测,因为我不知道代码,但我假设有一个 Close() 方法用于关闭 pubsub 所以下一个 ReceiveMessage 返回一个错误。 (我看到延迟完成了我希望的工作)

  2. 如果在 ctx.Done 之前 ReceiveMessage 中有错误,您可以关闭 channel 然后中断循环。

关于go - 客户端断开连接时如何正确使用 ctx.Done()?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46842887/


go - 等待多个协程的结果

memory - 哪种 channel 类型在 Go 中使用最少的内存?

go - 尝试提供 Gin Gonic 应用程序时出现紧急错误

google-app-engine - golang 数据存储投影查询从填充实体返回空字符串

Golang - slice 追加(需要时)不起作用

asp.net-core - 如何使用bloomRPC 调用.NET Core 3.0 gRPC 服务器应用程序?

sorting - 一个用于对通用集合进行排序的排序器函数

java - gRPC + Bazel + Envoy Json Proxy - 如何导入 google/api/annotations.proto

android - GRPC 错误日志(Android Studio)

time - 为什么这个 Golang 代码不能在多个时间之间进行选择。 channel 工作后?