我正在使用 GRPC 开发一个聊天应用程序,其中服务器从客户端接收信息并将其发送回与其连接的所有客户端。为此,我使用了Saturnism 的 chat-example作为引用。我已经复制了代码,代码编译并运行但服务器应该从未收到来自客户端的任何请求。
我的问题是:
- 有没有办法在 GRPC 中启用 verbos 服务器端和客户端登录,以查看进出的请求和响应以及可能失败的内容?
- 我正在为服务器和客户端使用以下代码。以下代码中可能缺少/错误的是什么导致客户端和服务器之间没有通信。
WingokuServer.java
public class WingokuServer {
public static void main(String[] args) throws IOException, InterruptedException {
Server server = ServerBuilder.forPort(8091)
.intercept(recordRequestHeadersInterceptor())
.addService(new WingokuServiceImpl())
.build();
System.out.println("Starting server...");
server.start();
System.out.println("Server started!");
server.awaitTermination();
}
WingokuServerSideServiceImplementation:
public class WingokuServiceImpl extends WingokuServiceGrpc.WingokuServiceImplBase {
private static Set<StreamObserver<Response>> observers =
Collections.newSetFromMap(new ConcurrentHashMap<>());
public WingokuServiceImpl() {
System.out.println("WingokuServiceImp");
}
@Override
public StreamObserver<Request> messages(StreamObserver<Response> responseObserver) {
System.out.println("messages");
observers.add(responseObserver);
return new StreamObserver<Request>() {
@Override
public void onNext(Request request) {
System.out.println("Server onNext: ");
System.out.println("request from client is: "+ request.getRequestMessage());
Response response = Response.newBuilder().setResponseMessage("new Message From server at time: "+ System.nanoTime()).build();
for (StreamObserver<Response> observer : observers) {
observer.onNext(response);
}
}
@Override
public void onError(Throwable throwable) {
System.out.println("Server onError: ");
throwable.printStackTrace();
}
@Override
public void onCompleted() {
observers.remove(responseObserver);
System.out.println("Server onCompleted ");
}
};
}
}
WingokuClient:
public class WingokuClient {
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8091).usePlaintext(true).build();
WingokuServiceGrpc.WingokuServiceStub asyncStub = WingokuServiceGrpc.newStub(channel);
StreamObserver<Request> requestStreamObserver = asyncStub.messages(new StreamObserver<Response>() {
@Override
public void onNext(Response response) {
System.out.println("Client onNext");
System.out.println("REsponse from server is: "+ response.getResponseMessage());
}
@Override
public void onError(Throwable throwable) {
System.out.println("Client onError");
throwable.printStackTrace();
}
@Override
public void onCompleted() {
System.out.println("Client OnComplete");
}
});
requestStreamObserver.onNext(Request.newBuilder().setRequestMessage("Message From Client").build());
requestStreamObserver.onCompleted();
channel.shutdown();
System.out.println("exiting client");
}
}
编辑:
代码没有问题。有用。我只需要将 awaitTermination 添加到客户端的 channel ,因为如果没有它,它只会立即关闭客户端和服务器之间的连接,甚至可能在请求离开客户端进入网络之前。这就是服务器从未收到任何请求的原因。
然而,我关于启用详细日志记录和/或向服务器端添加某种拦截器的问题仍然没有得到解答。因此,我期待着从这里的专家那里得到一些指导。
最佳答案
我找到了一种使用拦截器在服务器端和客户端记录请求和响应的方法,它使代码更清晰。 也可以使用 sleuth 进行跟踪。
请使用 Spring :
implementation 'io.github.lognet:grpc-spring-boot-starter'
服务器部分
然后您可以使用 GRpcGlobalInterceptor 注释
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import org.lognet.springboot.grpc.GRpcGlobalInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
@GRpcGlobalInterceptor
public class GrpcInterceptor implements ServerInterceptor {
private Logger logger = LoggerFactory.getLogger(this.getClass());
public static final Metadata.Key<String> TRACE_ID_KEY = Metadata.Key.of("traceId", ASCII_STRING_MARSHALLER);
@Override
public <M, R> ServerCall.Listener<M> interceptCall(
ServerCall<M, R> call, Metadata headers, ServerCallHandler<M, R> next) {
String traceId = headers.get(TRACE_ID_KEY);
// TODO: Add traceId to sleuth
logger.warn("traceId from client: {}. TODO: Add traceId to sleuth", traceId);
GrpcServerCall grpcServerCall = new GrpcServerCall(call);
ServerCall.Listener listener = next.startCall(grpcServerCall, headers);
return new GrpcForwardingServerCallListener<M>(call.getMethodDescriptor(), listener) {
@Override
public void onMessage(M message) {
logger.info("Method: {}, Message: {}", methodName, message);
super.onMessage(message);
}
};
}
private class GrpcServerCall<M, R> extends ServerCall<M, R> {
ServerCall<M, R> serverCall;
protected GrpcServerCall(ServerCall<M, R> serverCall) {
this.serverCall = serverCall;
}
@Override
public void request(int numMessages) {
serverCall.request(numMessages);
}
@Override
public void sendHeaders(Metadata headers) {
serverCall.sendHeaders(headers);
}
@Override
public void sendMessage(R message) {
logger.info("Method: {}, Response: {}", serverCall.getMethodDescriptor().getFullMethodName(), message);
serverCall.sendMessage(message);
}
@Override
public void close(Status status, Metadata trailers) {
serverCall.close(status, trailers);
}
@Override
public boolean isCancelled() {
return serverCall.isCancelled();
}
@Override
public MethodDescriptor<M, R> getMethodDescriptor() {
return serverCall.getMethodDescriptor();
}
}
private class GrpcForwardingServerCallListener<M> extends io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener<M> {
String methodName;
protected GrpcForwardingServerCallListener(MethodDescriptor method, ServerCall.Listener<M> listener) {
super(listener);
methodName = method.getFullMethodName();
}
}
}
客户端
拦截器:
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
@Component
public class BackendInterceptor implements ClientInterceptor {
private Logger logger = LoggerFactory.getLogger(this.getClass());
public static final Metadata.Key<String> TRACE_ID_KEY = Metadata.Key.of("traceId", ASCII_STRING_MARSHALLER);
@Override
public <M, R> ClientCall<M, R> interceptCall(
final MethodDescriptor<M, R> method, CallOptions callOptions, Channel next) {
return new BackendForwardingClientCall<M, R>(method,
next.newCall(method, callOptions.withDeadlineAfter(10000, TimeUnit.MILLISECONDS))) {
@Override
public void sendMessage(M message) {
logger.info("Method: {}, Message: {}", methodName, message);
super.sendMessage(message);
}
@Override
public void start(Listener<R> responseListener, Metadata headers) {
// TODO: Use the sleuth traceId instead of 999
headers.put(TRACE_ID_KEY, "999");
BackendListener<R> backendListener = new BackendListener<>(methodName, responseListener);
super.start(backendListener, headers);
}
};
}
private class BackendListener<R> extends ClientCall.Listener<R> {
String methodName;
ClientCall.Listener<R> responseListener;
protected BackendListener(String methodName, ClientCall.Listener<R> responseListener) {
super();
this.methodName = methodName;
this.responseListener = responseListener;
}
@Override
public void onMessage(R message) {
logger.info("Method: {}, Response: {}", methodName, message);
responseListener.onMessage(message);
}
@Override
public void onHeaders(Metadata headers) {
responseListener.onHeaders(headers);
}
@Override
public void onClose(Status status, Metadata trailers) {
responseListener.onClose(status, trailers);
}
@Override
public void onReady() {
responseListener.onReady();
}
}
private class BackendForwardingClientCall<M, R> extends io.grpc.ForwardingClientCall.SimpleForwardingClientCall<M, R> {
String methodName;
protected BackendForwardingClientCall(MethodDescriptor<M, R> method, ClientCall delegate) {
super(delegate);
methodName = method.getFullMethodName();
}
}
}
将拦截器添加到 channel 中:
ManagedChannel managedChannel = ManagedChannelBuilder
.forAddress(_URL_, _PORT_).usePlaintext().intercept(backendInterceptor).build();
关于java - 在 GRPC 中拦截/记录请求和响应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47155084/