java - 如何使用 Jersey Client 2.2.x 取消挂起的异步请求并取消注册调用回调?

标签 java rest asynchronous jersey jersey-client

我有一个简单的 REST 服务,它有一个 sleep 方法,它除了在指定的时间内以毫秒为单位 hibernate ,然后返回一个 No Content 响应。我的 RESTTest 类尝试调用 http://localhost:8080/myapp/rest/sleep/7500首先( sleep 7.5 秒)但只等待 5 秒。 5 秒后,它取消接收到的 Future(试图取消挂起的请求)并调用 http://localhost:8080/myapp/rest/sleep/5000 ( sleep 5 秒)并等待 5 秒。

public class RESTTest {
    private final Client client = ClientBuilder.newClient();
    private final ReentrantLock lock = new ReentrantLock();
    private final Condition responseReceived = lock.newCondition();

    public static void main(final String... arguments) {
        new RESTTest().listen(10000);
    }

    public void listen(final long time) {
        System.out.println("Listen for " + time + " ms.");
        Future<Response> _response =
            client.
                target("http://localhost:8080/myapp/rest/sleep/" + time)).
                request().
                async().
                    get(
                        new InvocationCallback<Response>() {
                            public void completed(final Response response) {
                                System.out.println("COMPLETED");
                                lock.lock();
                                try {
                                    responseReceived.signalAll();
                                } finally {
                                    lock.unlock();
                                }
                            }

                            public void failed(final Throwable throwable) {
                                lock.lock();
                                try {
                                    responseReceived.signalAll();
                                } finally {
                                    lock.unlock();
                                }
                            }
                        });
        lock.lock();
        try {
            System.out.println("Waiting for 5000 ms.");
            if (!responseReceived.await(5000, TimeUnit.MILLISECONDS)) {
                System.out.println("Timed out!");
                _response.cancel(true);
                listen(5000);
            } else {
                System.out.println("Response received.");
            }
        } catch (final InterruptedException exception) {
            // Do nothing.
        } finally {
            lock.unlock();
        }
    }
}

现在我希望看到“COMPLETED”字符串只打印一次,并且“收到响应”。字符串也只打印一次。但是,“COMPLETED”字符串会打印两次!
Listen for 7500 ms.
Waiting for 5000 ms.
Timed out!
Listen for 5000 ms.
Waiting for 5000 ms.
COMPLETED
Response received.
COMPLETED

我在这里想念什么?

谢谢,

最佳答案

我相信你已经明白了,但这里有一个非常模块化的解决方案,你可以将它与简单的 Guava ListenableFuture 一起使用。当然,您不必像我在 Futures.allAsList 中所做的那样汇总响应,但您可以在最后执行类似的操作并删除您的 CountDownLatch。

顺便说一句,我很确定您的问题是线程问题。您看到 COMPLETED 是因为在您下次调用 listen(5000) 之后调用了回调。请记住,异步将被线程化,因此到控制台的输出可能会延迟到下一次上下文切换。服务器可能会在您的 7500 信号灯解锁后立即响应。

private Client client;

@Before
public void setup() {
    final ClientConfig clientConfig = new ClientConfig();
    clientConfig.register(OrtbBidRequestBodyReader.class);
    clientConfig.register(OrtbBidRequestBodyWriter.class);
    clientConfig.connectorProvider(new CachingConnectorProvider(new HttpUrlConnectorProvider()));
    clientConfig.property(ClientProperties.ASYNC_THREADPOOL_SIZE, 3);
    client = ClientBuilder.newClient(clientConfig);
}

@Test
public void testAsync() throws InterruptedException, ExecutionException, JsonProcessingException {

    final WebTarget target = client
            .target("http://localhost:8081/dsp-receiver-0.0.1-SNAPSHOT/ortb/bid/123123?testbid=bid");

    final AtomicInteger successcount = new AtomicInteger();
    final AtomicInteger noBid = new AtomicInteger();
    final AtomicInteger clientError = new AtomicInteger();

    final InvocationCallback<Response> callback = new InvocationCallback<Response>() {
        @Override
        public void completed(final Response response) {
            if (response.getStatus() == 200) {
                successcount.incrementAndGet();
            } else if (response.getStatus() == 204) {
                noBid.incrementAndGet();
            } else {
                clientError.incrementAndGet();
            }
        }

        @Override
        public void failed(final Throwable e) {
            clientError.incrementAndGet();
            logger.info("Client Error", e);
        }
    };

    final Entity<OrtbBidRequest> entity = Entity.entity(testBidRequest, MediaType.APPLICATION_JSON);
    final List<ListenableFuture<Response>> allFutures = Lists.newArrayList();
    final Stopwatch stopwatch = Stopwatch.createStarted();
    for (int i = 0; i < 100000; i++) {
        logger.info("Running request {}", i);
        final Future<Response> future = target.request().accept(MediaType.APPLICATION_JSON).async().post(entity,
                callback);
        final ListenableFuture<Response> response = JdkFutureAdapters.listenInPoolThread(future);
        allFutures.add(response);

        // For each 100 of requests we will wait on them, otherwise we
        // may run out of memory. This is really just to test the stamina
        // of the dsp
        if (i % 200 == 0) {
            Futures.allAsList(allFutures).get();
            allFutures.clear();
        }
    }

    logger.info("success count {}  nobid {} client error {} ", successcount, noBid, clientError);
    logger.info("Total time {} ms ", stopwatch.stop());
}

关于java - 如何使用 Jersey Client 2.2.x 取消挂起的异步请求并取消注册调用回调?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19523449/

相关文章:

javascript - 发送 POST 数据时阻止 Safari 规范化 Unicode?

ios - 从 Swift 函数中的异步调用返回数据

java - 为什么数组被初始化为默认值而不是 java 中的 arraylist?

java - Cause=slf4j 依赖冲突 - NoSuchMethodError : org. slf4j.helpers.MessageFormatter.format

java - SFTP使用java从Windows服务器检索文件到Linux服务器

django - 如何处理 Django Rest Framework 中的模型更改?

javascript - 无法在 setTimeout 内正确访问数组元素

JavaScript OOP 和 Chrome 扩展 api 中的异步方法

C# 单元测试异步错误

JAVA解析特殊字符