我在 getPosition、getStatus 中调用了 go 例程,并为我的无人机重新路由,目前我正在我的主要函数中调用 go GetStatus,该函数具有 go func(),它处理流式传输 grpc 和 sse 的事件。
目前这是我的代码,我试过了
func GetPositionContext(ctx context.Context, uav pb.UAVControllerClient, uavID *pb.UAVID, projectID string) {
log.Printf("getPosition start")
stream, err := uav.GetPosition(ctx)
if err != nil {
fmt.Printf("ERROR getPosition:%s", err.Error())
}
streamID, eventName := EventsSubscribe(projectID, uavID.Aircraft, "get_position")
position := make(chan models.DronePosition)
// 受信ループ開始
go func() {
fmt.Print("start getPosition loop")
for {
msg, err := stream.Recv() // msg UAVPosition
if err == io.EOF {
// read done.
fmt.Print("start getPosition loop closed")
close(position)
return
}
if err != nil {
log.Fatalf("Failed to receive getPosition : %v", err)
close(position)
return
}
// log.Printf("Position point[%s](%f, %f, %f) H:%f", uavID.Aircraft, msg.Latitude, msg.Longitude, msg.Altitude, msg.Heading)
wayPoint := models.WaypointItem{
Latitude: msg.Latitude,
Longitude: msg.Longitude,
Altitude: msg.Altitude,
Heading: msg.Heading,
}
dronePosition := models.DronePosition{
Name: uavID.Aircraft,
ItemParameter: wayPoint,
}
// publish to eventgo
publishNotif(dronePosition, streamID, eventName)
return
}
}()
startMsg := pb.UAVControllerPositionRequest{
UavID: uavID,
Instruction: true,
Interval: 2,
}
fmt.Print("send getPosition start")
if err := stream.Send(&startMsg); err != nil {
log.Fatalf("Failed to send getPosition: %v", err)
}
<-position
stream.CloseSend()
fmt.Print("end of getPosition")
}
这是我调用这个函数的部分
go utils.GetPosition(ctx, uavService, &uavID, projectID)
我想从 go func 获取返回值,就像在 grpc 服务器中一切正常一样,没问题我应该返回 200 成功和 500 如果失败。
for {
time.Sleep(time.Duration(60) * time.Second)
}
调用后我有这段代码,它应该每 10 秒返回一次成功
return c.JSON(500, failed or pass)
如果 go routines 成功或失败,当流式处理部分正在为响应不挂起工作时,我希望返回一些东西到 ui,否则其他 api 调用将不起作用。
最佳答案
如果您想知道在处理 GetPositionContext 中的 go func() 时是否有错误,那么您可以执行如下操作。我将所有错误修改为冒泡而不是调用 log.Fatal。附带说明一下,推迟 stream.CloseSend()
可能是明智的,但我缺少上下文并且它超出了问题的范围。
我已尽力理解问题,但如果我遗漏了什么,请告诉我!
代码已修改为直接在 GetPositionContext 中冒出错误,而不是调用 log.Fatal。此外,如果 go func() 中出现错误,它将通过 channel 发送回 GetPositionContext,然后返回给调用者。
func GetPositionContext(ctx context.Context, uav pb.UAVControllerClient, uavID *pb.UAVID, projectID string) error {
log.Printf("getPosition start")
stream, err := uav.GetPosition(ctx)
if err != nil {
return fmt.Errorf("ERROR getPosition: %v", err.Error())
}
streamID, eventName := EventsSubscribe(projectID, uavID.Aircraft, "get_position")
errC := make(chan error)
// 受信ループ開始
go func() {
fmt.Print("start getPosition loop")
for {
msg, err := stream.Recv() // msg UAVPosition
if err == io.EOF {
// read done.
fmt.Print("start getPosition loop closed")
close(errC)
return
}
if err != nil {
errC <- fmt.Errorf("Failed to receive getPosition : %v", err)
return
}
// log.Printf("Position point[%s](%f, %f, %f) H:%f", uavID.Aircraft, msg.Latitude, msg.Longitude, msg.Altitude, msg.Heading)
wayPoint := models.WaypointItem{
Latitude: msg.Latitude,
Longitude: msg.Longitude,
Altitude: msg.Altitude,
Heading: msg.Heading,
}
dronePosition := models.DronePosition{
Name: uavID.Aircraft,
ItemParameter: wayPoint,
}
// publish to eventgo
publishNotif(dronePosition, streamID, eventName)
// Did you mean to return here? The for loop will only ever execute one time.
// If you didn't mean to return, then remove this close
close(errC)
return
}
}()
startMsg := pb.UAVControllerPositionRequest{
UavID: uavID,
Instruction: true,
Interval: 2,
}
fmt.Print("send getPosition start")
if err := stream.Send(&startMsg); err != nil {
return fmt.Errorf("Failed to send getPosition: %v", err)
}
err = <- errC
stream.CloseSend()
fmt.Print("end of getPosition")
return err
}
如果您想异步调用此函数,但仍想知道是否有错误,那么您可以执行以下操作。
errC := make(chan error)
go func() {
errC <- GetPositionContext(..........)
}()
// Do some other stuff here
// Until you need the result
err := <- errC
// ... and Eventually when you want to return the success or failure as JSON
if err != nil {
return c.JSON(500, failed)
}
return c.JSON(500, pass)
关于go - 如何在 golang 中处理 goroutines 并获得响应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56453730/