抱歉,如果这是一个菜鸟问题,我是grpc服务器端流媒体的新手。
我现在在服务器上的功能中拥有什么,该功能可以流式传输到客户端
req, err := http.NewRequest("GET", actualURL, nil)
//跳过一些行//
res,_:= http.DefaultClient.Do(要求)
// closing body
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
//跳过一些行//
// unmarshaling the xml data received from the GET request done above
xmlDataErr := xml.Unmarshal(body, &ArrData)
// creating a variable of a custom type and creating object of specific struct Flight
var flightArrData ArrFlights
currArrData := flightArrData.Arr.Body.Flight
log.Println(“计算要发送到客户端的总行数”,len(currArrData))
//循环响应并将其发送到客户端
对于i:= range currArrData {
farr := &pb.FlightArrivalsResponse{
ArrivalResponse: &pb.ArrivalFlightData{
Hapt: currArrData[i].HApt,
Fltnr: currArrData[i].Fltnr,
Sdt: currArrData[i].Sdt,
Acreg: currArrData[i].Acreg,
Park: currArrData[i].Park,
EstD: currArrData[i].EstD,
Gate: currArrData[i].Gate,
AblkD: currArrData[i].AblkD,
ActD: currArrData[i].ActD,
Callsign: currArrData[i].Callsign,
},
}
senderr := stream.Send(farr)
// skipping some lines //
}
//完成后返回nil
返回零
}
我面临的问题
在客户端,我具有接收此响应,休眠n分钟并再次请求响应的功能。
客户端确实获得了第一次调用中预期的响应,但是对于每个后续调用,都会发生奇怪的事情,这就是我当前的问题,我尝试以下面的每个后续调用的形式进行说明:
从客户端到服务器调用1->服务器返回200行
客户睡了n分钟
从客户端 call 2到服务器->服务器返回400行! (基本上每行是200 + 200的两倍)
客户睡了n分钟
从客户端调用3到服务器->服务器返回600行! (200 + 200 + 200)
摘要
我确实在客户端检查错误== io.EOF,并且现在停止该从服务器到客户端的响应堆叠的唯一方法是停止服务器并重新启动。
我不确定我在这里缺少什么,以确保仅发送从GET请求收到的实际和确切的响应。将不胜感激对此的任何提示。
更多信息
原始文件中来自gRPC protobuffer def的部分
rpc GetFlightArrivals (FlightArrivalsRequestURL) returns (stream FlightArrivalsResponse) {
};
上面gRPC的服务器端隐含的完整代码
func (s *Server) GetFlightArrivals(url *pb.FlightArrivalsRequestURL, stream pb.Flight_GetFlightArrivalsServer) error {
cData := ch.ConfigProcessor() // fetches the initial part of the URL from config file
if url.ArrivalURL == "" {
actualURL = cData.FURL + "/arr/all" // adding the arrival endpoint
} else {
actualURL = url.ArrivalURL
}
// build new request to get arrival data
req, err := http.NewRequest("GET", actualURL, nil)
if err != nil {
log.Fatalln("Recheck URL or connectivity, failed to make REST call")
}
req.Header.Add("Cache-Control", "no-cache")
req.Header.Add("Accept", "text/plain")
req.Header.Add("Connection", "keep-alive")
req.Header.Add("app_id", cData.AppID)
req.Header.Add("app_key", cData.AppKey)
req.Header.Add("Content-Type", "application/xml")
res, _ := http.DefaultClient.Do(req)
// closing body
defer res.Body.Close()
body, err := ioutil.ReadAll(res.Body)
if err != nil {
log.Fatalln("Failed to get any response")
return err
}
// unmarshaling the xml data
xmlDataErr := xml.Unmarshal(body, &flightArrData)
if xmlDataErr != nil {
log.Fatalln("Failed to unmarshal arrival xml data see error, ", xmlDataErr)
return xmlDataErr
}
currArrData := flightArrData.Arr.Body.Flight
log.Println("Counting total arrivals in Finland", len(currArrData))
log.Println("Starting FlightDeparturesResponse for client")
for i := range currArrData {
farr := &pb.FlightArrivalsResponse{
ArrivalResponse: &pb.ArrivalFlightData{
Hapt: currArrData[i].HApt,
Fltnr: currArrData[i].Fltnr,
Sdt: currArrData[i].Sdt,
Acreg: currArrData[i].Acreg,
Park: currArrData[i].Park,
EstD: currArrData[i].EstD,
Gate: currArrData[i].Gate,
AblkD: currArrData[i].AblkD,
ActD: currArrData[i].ActD,
Callsign: currArrData[i].Callsign,
},
}
senderr := stream.Send(farr)
if senderr != nil {
log.Fatalln("Failed to stream arrival response to the client, see error ", senderr)
return senderr
}
}
currArrData = nil
log.Println("Attempting to empty the arrival data")
return nil
}
测试用例,用于检查以上gRPC的实现
我只是启动一个测试grpc服务器,并在此测试用例中调用该grpc。客户端上的实现与接收数据相同。
func TestGetFlightArrivals(t *testing.T) {
const addr = "localhost:50051"
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
t.Fatalf("Did not connect: #{err}")
}
defer conn.Close()
f := pb.NewFlightClient(conn)
t.Run("GetFlightArrivals", func(t *testing.T) {
var res, err = f.GetFlightArrivals(context.Background(), &pb.FlightArrivalsRequestURL{ArrivalURL: ""})
if err != nil {
t.Error("Failed to make the REST call", err)
}
for {
msg, err := res.Recv()
if err == io.EOF {
t.Log("Finished reading all the message")
break
}
if err != nil {
t.Error("Failed to receive response")
}
t.Log("Message from the server", msg.GetArrivalResponse())
}
})
}
最佳答案
基于@ rubens21的评论,我添加了以下几行var emptyArrData ArrFlights
,基本上是完整的XML结构,并将此emptyArrData分配给结构flightArrData = emptyArrData
关于go - 基于Go的grpc服务器流不断堆积响应以转到go客户端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60326617/