go - 如何在 golang 中处理 goroutines 并获得响应

标签 go grpc server-sent-events goroutine

我在 getPosition、getStatus 中调用了 go 例程,并为我的无人机重新路由,目前我正在我的主要函数中调用 go GetStatus,该函数具有 go func(),它处理流式传输 grpc 和 sse 的事件。

目前这是我的代码,我试过了

func GetPositionContext(ctx context.Context, uav pb.UAVControllerClient, uavID *pb.UAVID, projectID string) {
    log.Printf("getPosition start")

    stream, err := uav.GetPosition(ctx)

    if err != nil {
        fmt.Printf("ERROR getPosition:%s", err.Error())
    }

    streamID, eventName := EventsSubscribe(projectID, uavID.Aircraft, "get_position")

    position := make(chan models.DronePosition)

    // 受信ループ開始
    go func() {
        fmt.Print("start getPosition loop")
        for {
            msg, err := stream.Recv() // msg UAVPosition
            if err == io.EOF {
                // read done.
                fmt.Print("start getPosition loop closed")
                close(position)
                return
            }
            if err != nil {
                log.Fatalf("Failed to receive getPosition : %v", err)
                close(position)
                return
            }
            // log.Printf("Position point[%s](%f, %f, %f) H:%f", uavID.Aircraft, msg.Latitude, msg.Longitude, msg.Altitude, msg.Heading)

            wayPoint := models.WaypointItem{
                Latitude:  msg.Latitude,
                Longitude: msg.Longitude,
                Altitude:  msg.Altitude,
                Heading:   msg.Heading,
            }

            dronePosition := models.DronePosition{
                Name:          uavID.Aircraft,
                ItemParameter: wayPoint,
            }

            // publish to eventgo
            publishNotif(dronePosition, streamID, eventName)
            return
        }
    }()

    startMsg := pb.UAVControllerPositionRequest{
        UavID:       uavID,
        Instruction: true,
        Interval:    2,
    }

    fmt.Print("send getPosition start")

    if err := stream.Send(&startMsg); err != nil {
        log.Fatalf("Failed to send getPosition: %v", err)
    }

    <-position

    stream.CloseSend()
    fmt.Print("end of getPosition")
}

这是我调用这个函数的部分

go utils.GetPosition(ctx, uavService, &uavID, projectID)

我想从 go func 获取返回值,就像在 grpc 服务器中一切正常一样,没问题我应该返回 200 成功和 500 如果失败。

for {
        time.Sleep(time.Duration(60) * time.Second)
    }

调用后我有这段代码,它应该每 10 秒返回一次成功

return c.JSON(500, failed or pass)

如果 go routines 成功或失败,当流式处理部分正在为响应不挂起工作时,我希望返回一些东西到 ui,否则其他 api 调用将不起作用。

最佳答案

如果您想知道在处理 GetPositionContext 中的 go func() 时是否有错误,那么您可以执行如下操作。我将所有错误修改为冒泡而不是调用 log.Fatal。附带说明一下,推迟 stream.CloseSend() 可能是明智的,但我缺少上下文并且它超出了问题的范围。

我已尽力理解问题,但如果我遗漏了什么,请告诉我!

代码已修改为直接在 GetPositionContext 中冒出错误,而不是调用 log.Fatal。此外,如果 go func() 中出现错误,它将通过 channel 发送回 GetPositionContext,然后返回给调用者。

func GetPositionContext(ctx context.Context, uav pb.UAVControllerClient, uavID *pb.UAVID, projectID string) error {
    log.Printf("getPosition start")

    stream, err := uav.GetPosition(ctx)

    if err != nil {
        return fmt.Errorf("ERROR getPosition: %v", err.Error())
    }

    streamID, eventName := EventsSubscribe(projectID, uavID.Aircraft, "get_position")

    errC := make(chan error)

    // 受信ループ開始
    go func() {
        fmt.Print("start getPosition loop")
        for {
            msg, err := stream.Recv() // msg UAVPosition
            if err == io.EOF {
                // read done.
                fmt.Print("start getPosition loop closed")
                close(errC)
                return
            }
            if err != nil {
                errC <- fmt.Errorf("Failed to receive getPosition : %v", err)
                return
            }
            // log.Printf("Position point[%s](%f, %f, %f) H:%f", uavID.Aircraft, msg.Latitude, msg.Longitude, msg.Altitude, msg.Heading)

            wayPoint := models.WaypointItem{
                Latitude:  msg.Latitude,
                Longitude: msg.Longitude,
                Altitude:  msg.Altitude,
                Heading:   msg.Heading,
            }

            dronePosition := models.DronePosition{
                Name:          uavID.Aircraft,
                ItemParameter: wayPoint,
            }

            // publish to eventgo
            publishNotif(dronePosition, streamID, eventName)

            // Did you mean to return here? The for loop will only ever execute one time.
            // If you didn't mean to return, then remove this close
            close(errC)
            return
        }
    }()

    startMsg := pb.UAVControllerPositionRequest{
        UavID:       uavID,
        Instruction: true,
        Interval:    2,
    }

    fmt.Print("send getPosition start")

    if err := stream.Send(&startMsg); err != nil {
        return fmt.Errorf("Failed to send getPosition: %v", err)
    }

    err = <- errC

    stream.CloseSend()
    fmt.Print("end of getPosition")
    return err
}

如果您想异步调用此函数,但仍想知道是否有错误,那么您可以执行以下操作。

errC := make(chan error)
go func() {
  errC <- GetPositionContext(..........)
}()
// Do some other stuff here
// Until you need the result
err := <- errC


// ... and Eventually when you want to return the success or failure as JSON
if err != nil {
  return c.JSON(500, failed)
}
return c.JSON(500, pass)

关于go - 如何在 golang 中处理 goroutines 并获得响应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56453730/

相关文章:

go - 如何在 proto3 中复制未知字段功能?

go - go中的双向gRPC流实现

python - 如何使用 Python gRPC 发送自定义 header (元数据)?

c# - 使用 C# 监听文本/事件流

javascript - 所有事件的 HTML5 EventSource 监听器?

javascript - 如何区分不同服务器发送的事件监听器?

go - 在 Go Lang 中处理 Aerospike 库错误

xml - golang XML 不解码

go - Redigo:如何使用 Golang 从 Redis 获取键值映射?

go - goLang仅在同一文件中查找事件