java - 在 GRPC 中拦截/记录请求和响应

标签 java android server grpc grpc-java

我正在使用 GRPC 开发一个聊天应用程序,其中服务器从客户端接收信息并将其发送回与其连接的所有客户端。为此,我使用了Saturnism 的 chat-example作为引用。我已经复制了代码,代码编译并运行但服务器应该从未收到来自客户端的任何请求。

我的问题是:

  1. 有没有办法在 GRPC 中启用 verbos 服务器端和客户端登录,以查看进出的请求和响应以及可能失败的内容?
  2. 我正在为服务器和客户端使用以下代码。以下代码中可能缺少/错误的是什么导致客户端和服务器之间没有通信。

WingokuServer.java

public class WingokuServer {
    public static void main(String[] args) throws IOException, InterruptedException {
        Server server = ServerBuilder.forPort(8091)
                .intercept(recordRequestHeadersInterceptor())
                .addService(new WingokuServiceImpl())
                .build();

        System.out.println("Starting server...");
        server.start();
        System.out.println("Server started!");
        server.awaitTermination();
    }

WingokuServerSideServiceImplementation:

public class WingokuServiceImpl extends WingokuServiceGrpc.WingokuServiceImplBase {
    private static Set<StreamObserver<Response>> observers =
            Collections.newSetFromMap(new ConcurrentHashMap<>());

    public WingokuServiceImpl() {
        System.out.println("WingokuServiceImp");
    }

    @Override
    public StreamObserver<Request> messages(StreamObserver<Response> responseObserver) {
        System.out.println("messages");
        observers.add(responseObserver);
        return new StreamObserver<Request>() {
            @Override
            public void onNext(Request request) {
                System.out.println("Server onNext: ");
                System.out.println("request from client is: "+ request.getRequestMessage());
                Response response = Response.newBuilder().setResponseMessage("new Message From server at time: "+ System.nanoTime()).build();
                for (StreamObserver<Response> observer : observers) {
                    observer.onNext(response);
                }
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("Server onError: ");
                throwable.printStackTrace();
            }

            @Override
            public void onCompleted() {
                observers.remove(responseObserver);
                System.out.println("Server onCompleted ");
            }
        };
    }
}

WingokuClient:

public class WingokuClient {
    public static void main(String[] args) {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8091).usePlaintext(true).build();
        WingokuServiceGrpc.WingokuServiceStub asyncStub = WingokuServiceGrpc.newStub(channel);
        StreamObserver<Request> requestStreamObserver = asyncStub.messages(new StreamObserver<Response>() {
            @Override
            public void onNext(Response response) {
                System.out.println("Client onNext");
                System.out.println("REsponse from server is: "+ response.getResponseMessage());
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("Client onError");
                throwable.printStackTrace();
            }

            @Override
            public void onCompleted() {
                System.out.println("Client OnComplete");
            }
        });

        requestStreamObserver.onNext(Request.newBuilder().setRequestMessage("Message From Client").build());
        requestStreamObserver.onCompleted();
        channel.shutdown();
        System.out.println("exiting client");
    }
}

编辑:

代码没有问题。有用。我只需要将 awaitTermination 添加到客户端的 channel ,因为如果没有它,它只会立即关闭客户端和服务器之间的连接,甚至可能在请求离开客户端进入网络之前。这就是服务器从未收到任何请求的原因。

然而,我关于启用详细日志记录和/或向服务器端添加某种拦截器的问题仍然没有得到解答。因此,我期待着从这里的专家那里得到一些指导。

最佳答案

我找到了一种使用拦截器在服务器端和客户端记录请求和响应的方法,它使代码更清晰。 也可以使用 sleuth 进行跟踪。

请使用 Spring :

implementation 'io.github.lognet:grpc-spring-boot-starter'

服务器部分

然后您可以使用 GRpcGlobalInterceptor 注释

import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import org.lognet.springboot.grpc.GRpcGlobalInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;

@GRpcGlobalInterceptor
public class GrpcInterceptor implements ServerInterceptor {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    public static final Metadata.Key<String> TRACE_ID_KEY = Metadata.Key.of("traceId", ASCII_STRING_MARSHALLER);

    @Override
    public <M, R> ServerCall.Listener<M> interceptCall(
            ServerCall<M, R> call, Metadata headers, ServerCallHandler<M, R> next) {
        String traceId = headers.get(TRACE_ID_KEY);
        // TODO: Add traceId to sleuth
        logger.warn("traceId from client: {}. TODO: Add traceId to sleuth", traceId);

        GrpcServerCall grpcServerCall = new GrpcServerCall(call);

        ServerCall.Listener listener = next.startCall(grpcServerCall, headers);

        return new GrpcForwardingServerCallListener<M>(call.getMethodDescriptor(), listener) {
            @Override
            public void onMessage(M message) {
                logger.info("Method: {}, Message: {}", methodName, message);
                super.onMessage(message);
            }
        };
    }

    private class GrpcServerCall<M, R> extends ServerCall<M, R> {

        ServerCall<M, R> serverCall;

        protected GrpcServerCall(ServerCall<M, R> serverCall) {
            this.serverCall = serverCall;
        }

        @Override
        public void request(int numMessages) {
            serverCall.request(numMessages);
        }

        @Override
        public void sendHeaders(Metadata headers) {
            serverCall.sendHeaders(headers);
        }

        @Override
        public void sendMessage(R message) {
            logger.info("Method: {}, Response: {}", serverCall.getMethodDescriptor().getFullMethodName(), message);
            serverCall.sendMessage(message);
        }

        @Override
        public void close(Status status, Metadata trailers) {
            serverCall.close(status, trailers);
        }

        @Override
        public boolean isCancelled() {
            return serverCall.isCancelled();
        }

        @Override
        public MethodDescriptor<M, R> getMethodDescriptor() {
            return serverCall.getMethodDescriptor();
        }
    }

    private class GrpcForwardingServerCallListener<M> extends io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener<M> {

        String methodName;

        protected GrpcForwardingServerCallListener(MethodDescriptor method, ServerCall.Listener<M> listener) {
            super(listener);
            methodName = method.getFullMethodName();
        }
    }
}

客户端

拦截器:

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;

@Component
public class BackendInterceptor implements ClientInterceptor {
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    public static final Metadata.Key<String> TRACE_ID_KEY = Metadata.Key.of("traceId", ASCII_STRING_MARSHALLER);

    @Override
    public <M, R> ClientCall<M, R> interceptCall(
            final MethodDescriptor<M, R> method, CallOptions callOptions, Channel next) {
        return new BackendForwardingClientCall<M, R>(method,
                next.newCall(method, callOptions.withDeadlineAfter(10000, TimeUnit.MILLISECONDS))) {

            @Override
            public void sendMessage(M message) {
                logger.info("Method: {}, Message: {}", methodName, message);
                super.sendMessage(message);
            }

            @Override
            public void start(Listener<R> responseListener, Metadata headers) {
                // TODO: Use the sleuth traceId instead of 999
                headers.put(TRACE_ID_KEY, "999");

                BackendListener<R> backendListener = new BackendListener<>(methodName, responseListener);
                super.start(backendListener, headers);
            }
        };
    }

    private class BackendListener<R> extends ClientCall.Listener<R> {

        String methodName;
        ClientCall.Listener<R> responseListener;

        protected BackendListener(String methodName, ClientCall.Listener<R> responseListener) {
            super();
            this.methodName = methodName;
            this.responseListener = responseListener;
        }

        @Override
        public void onMessage(R message) {
            logger.info("Method: {}, Response: {}", methodName, message);
            responseListener.onMessage(message);
        }

        @Override
        public void onHeaders(Metadata headers) {
            responseListener.onHeaders(headers);
        }

        @Override
        public void onClose(Status status, Metadata trailers) {
            responseListener.onClose(status, trailers);
        }

        @Override
        public void onReady() {
            responseListener.onReady();
        }
    }

    private class BackendForwardingClientCall<M, R> extends io.grpc.ForwardingClientCall.SimpleForwardingClientCall<M, R> {

        String methodName;

        protected BackendForwardingClientCall(MethodDescriptor<M, R> method, ClientCall delegate) {
            super(delegate);
            methodName = method.getFullMethodName();
        }
    }
}

将拦截器添加到 channel 中:

ManagedChannel managedChannel = ManagedChannelBuilder
                .forAddress(_URL_, _PORT_).usePlaintext().intercept(backendInterceptor).build();

关于java - 在 GRPC 中拦截/记录请求和响应,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47155084/

相关文章:

java - Anylogic microsoft sql错误索引1超出范围

java - Android Studio 中未定义NotificationCompat Builder setGroup()

node.js - PM2 - 语法错误 : Block-scoped declarations not yet supported outside strict mode

java - org.testng.TestNGException : An error occurred while instantiating class. 检查以确保它可以被实例化/访问

java - 如何将 build.xml 添加到 Eclipse 中的现有 GWT 项目?

php - 使用 cordova 3.4 和 Pushwoosh 构建 Android 应用程序时出错

node.js - Nodejs 接收错误 "options.hostname"属性必须是字符串、未定义或 null 类型之一。在 validateHost 接收到类型对象

C TCP 客户端无法连接到服务器

java - 使用 loopj : "Trust anchor for certification path not found."` 时出错

java - 为什么我的服务的 AsyncTask 会阻止来自主 Activity 的 AsyncTasks?