go - 如何使用 gracefulStop 关闭所有 grpc 服务器流?

标签 go grpc grpc-go

我正在尝试停止从服务器端连接到流服务器的所有客户端。 其实我使用 GracefulStop 方法来优雅地处理它。

我正在等待 channel 上的 os.Interrupt 信号为 gRPC 执行正常停止。但是当客户端连接时它会卡在 server.GracefulStop() 上。

func (s *Service) Subscribe(_ *empty.Empty, srv clientapi.ClientApi_SubscribeServer) error {
    ctx := srv.Context()

    updateCh := make(chan *clientapi.Update, 100)
    stopCh := make(chan bool)
    defer func() {
        stopCh<-true
        close(updateCh)
    }

    go func() {
        ticker := time.NewTicker(1 * time.Second)
        defer func() {
            ticker.Stop()
            close(stopCh)
        }
        for {
            select {
            case <-stopCh:
                return
            case <-ticker.C:
                updateCh<- &clientapi.Update{Name: "notification": Payload: "sample notification every 1 second"}
            }
        }
    }()

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()

        case notif := <-updateCh:
            err := srv.Send(notif)
            if err == io.EOF {
                return nil
            }

            if err != nil {
                s.logger.Named("Subscribe").Error("error", zap.Error(err))
                continue
            }
        }
    }
}

我希望 ctx.Done() 方法中的 context 可以处理它并打破 for 循环。 如何关闭所有像这样的响应流?

最佳答案

为您的 gRPC 服务创建一个全局 context。因此,浏览各个部分:

  • 每个 gRPC 服务请求都将使用此上下文(连同客户端上下文)来完成该请求
  • os.Interrupt 处理程序将取消全局上下文;从而取消任何当前正在运行的请求
  • 最后发出 server.GracefulStop() - 它应该等待所有事件的 gRPC 调用完成(如果他们没有立即看到取消)

例如,在设置 gRPC 服务时:

pctx := context.Background()
globalCtx, globalCancel := context.WithCancel(pctx)

mysrv := MyService{
    gctx: globalCtx
}

s := grpc.NewServer()
pb.RegisterMyService(s, mysrv)

os.Interrupt 处理程序启动并等待关闭:

globalCancel()
server.GracefulStop()

gRPC 方法:

func(s *MyService) SomeRpcMethod(ctx context.Context, req *pb.Request) error {

    // merge client and server contexts into one `mctx`
    // (client context will cancel if client disconnects)
    // (server context will cancel if service Ctrl-C'ed)

    mctx, mcancel := mergeContext(ctx, s.gctx)

    defer mcancel() // so we don't leak, if neither client or server context cancels

    // RPC WORK GOES HERE
    // RPC WORK GOES HERE
    // RPC WORK GOES HERE

    // pass mctx to any blocking calls:
    // - http REST calls
    // - SQL queries etc.
    // - or if running a long loop; status check the context occasionally like so:

    // Example long request (10s)
    for i:=0; i<10*1000; i++ {
        time.Sleep(1*time.Milliscond)

        // poll merged context
        select {
            case <-mctx.Done():
                return fmt.Errorf("request canceled: %s", mctx.Err())
            default:
        }
    }
}

还有:

func mergeContext(a, b context.Context) (context.Context, context.CancelFunc) {
    mctx, mcancel := context.WithCancel(a) // will cancel if `a` cancels

    go func() {
        select {
        case <-mctx.Done(): // don't leak go-routine on clean gRPC run
        case <-b.Done():
            mcancel() // b canceled, so cancel mctx 
        }
    }()

    return mctx, mcancel
}

关于go - 如何使用 gracefulStop 关闭所有 grpc 服务器流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58480775/

相关文章:

使用 GRPC 流请求进行代理负载平衡

go - Go中的高性能gRPC丰富错误处理

grpc - grpc-go 中的 session 和远程 IP 地址

docker - 我尝试在 docker 中部署 gRPC (go) 服务器并在本地端口中公开端口,但端口绑定(bind)不起作用

go - golang中的等效盐和哈希

go - 在 Golang 中遇到 gzip.Reader 问题

Go模块替换为本地模块的特定版本

c++ - 我的项目不会编译出现 undefined reference 错误,如 "undefined reference to ` grpc::g_core_codegen_interface'"

go - 如何强制 Protobuf 3 字段?

syntax - 关键字 var 后的下划线和接口(interface)名称是什么意思?