java - 在使用 Apache 'HttpClient' 的 PUT 操作期间更快地检测中断的连接

标签 java linux sockets timeout apache-httpclient-4.x

我正在使用 Apache HttpClient 4 与 REST 通信API 和大多数时间我做冗长的 PUT 操作。由于这些可能发生在不稳定的互联网连接上,我需要检测连接是否中断并且可能需要重试(使用恢复请求)。

为了在现实世界中尝试我的例程,我开始了 PUT 操作,然后我打开了笔记本电脑的 Wi-Fi 开关,导致所有数据流立即完全中断。然而,最终抛出 SocketException 需要很长时间(可能需要 5 分钟左右)。

我怎样才能加快处理速度?我想将超时设置为 30 秒左右。

更新:

澄清一下,我的请求是 PUT 操作。所以很长一段时间(可能是几个小时)唯一的操作是 write() 操作并且没有读取操作。有一个timeout setting for read() operations , 但我找不到用于写操作的。

我正在使用我自己的实体实现,因此我直接写入一个 OutputStream,一旦 Internet 连接中断,它几乎会立即阻塞。如果 OutputStreams 有一个超时参数,那么我可以编写 out.write(nextChunk, 30000); 我可以自己检测到这样的问题。实际上我试过了:

public class TimeoutHttpEntity extends HttpEntityWrapper {

  public TimeoutHttpEntity(HttpEntity wrappedEntity) {
    super(wrappedEntity);
  }

  @Override
  public void writeTo(OutputStream outstream) throws IOException {
    try(TimeoutOutputStreamWrapper wrapper = new TimeoutOutputStreamWrapper(outstream, 30000)) {
      super.writeTo(wrapper);
    }
  }
}


public class TimeoutOutputStreamWrapper extends OutputStream {
  private final OutputStream delegate;
  private final long timeout;
  private final ExecutorService executorService = Executors.newSingleThreadExecutor();

  public TimeoutOutputStreamWrapper(OutputStream delegate, long timeout) {
    this.delegate = delegate;
    this.timeout = timeout;
  }

  @Override
  public void write(int b) throws IOException {
    executeWithTimeout(() -> {
      delegate.write(b);
      return null;
    });
  }

  @Override
  public void write(byte[] b) throws IOException {
    executeWithTimeout(() -> {
      delegate.write(b);
      return null;
    });
  }

  @Override
  public void write(byte[] b, int off, int len) throws IOException {
    executeWithTimeout(() -> {
      delegate.write(b, off, len);
      return null;
    });
  }

  @Override
  public void close() throws IOException {
    try {
      executeWithTimeout(() -> {
        delegate.close();
        return null;
      });
    } finally {
      executorService.shutdown();
    }
  }

  private void executeWithTimeout(final Callable<?> task) throws IOException {
    try {
      executorService.submit(task).get(timeout, TimeUnit.MILLISECONDS);
    } catch (TimeoutException e) {
      throw new IOException(e);
    } catch (ExecutionException e) {
      final Throwable cause = e.getCause();
      if (cause instanceof IOException) {
        throw (IOException)cause;
      }
      throw new Error(cause);
    } catch (InterruptedException e) {
      throw new Error(e);
    }
  }
}

public class TimeoutOutputStreamWrapperTest {
  private static final byte[] DEMO_ARRAY = new byte[]{1,2,3};
  private TimeoutOutputStreamWrapper streamWrapper;
  private OutputStream delegateOutput;

  public void setUp(long timeout) {
    delegateOutput = mock(OutputStream.class);
    streamWrapper = new TimeoutOutputStreamWrapper(delegateOutput, timeout);
  }

  @AfterMethod
  public void teardown() throws Exception {
    streamWrapper.close();
  }

  @Test
  public void write_writesByte() throws Exception {
    // Setup
    setUp(Long.MAX_VALUE);

    // Execution
    streamWrapper.write(DEMO_ARRAY);

    // Evaluation
    verify(delegateOutput).write(DEMO_ARRAY);
  }

  @Test(expectedExceptions = DemoIOException.class)
  public void write_passesThruException() throws Exception {
    // Setup
    setUp(Long.MAX_VALUE);
    doThrow(DemoIOException.class).when(delegateOutput).write(DEMO_ARRAY);

    // Execution
    streamWrapper.write(DEMO_ARRAY);

    // Evaluation performed by expected exception
  }

  @Test(expectedExceptions = IOException.class)
  public void write_throwsIOException_onTimeout() throws Exception {
    // Setup
    final CountDownLatch executionDone = new CountDownLatch(1);
    setUp(100);
    doAnswer(new Answer<Void>() {
      @Override
      public Void answer(InvocationOnMock invocation) throws Throwable {
        executionDone.await();
        return null;
      }
    }).when(delegateOutput).write(DEMO_ARRAY);

    // Execution
    try {
      streamWrapper.write(DEMO_ARRAY);
    } finally {
      executionDone.countDown();
    }

    // Evaluation performed by expected exception
  }

  public static class DemoIOException extends IOException {

  }
}

这有点复杂,但在我的单元测试中效果很好。它在现实生活中也有效,除了 HttpRequestExecutor 在第 127 行捕获异常并尝试关闭连接。然而,当尝试关闭连接时,它首先尝试刷新再次阻塞的连接。

我或许能够深入挖掘 HttpClient 并弄清楚如何防止这种刷新操作,但这已经不是一个很好的解决方案,而且它只会变得更糟。

更新:

这在 Java 级别上似乎无法完成。我可以在另一个层面上做吗? (我正在使用 Linux)。

最佳答案

Java 阻塞 I/O 不支持写入操作的套接字超时。您完全受 OS/JRE 的摆布,无法解锁被写操作阻塞的线程。此外,这种行为往往是特定于操作系统/JRE 的。

这可能是考虑使用基于非阻塞 I/O (NIO) 的 HTTP 客户端的合理案例,例如 Apache HttpAsyncClient .

关于java - 在使用 Apache 'HttpClient' 的 PUT 操作期间更快地检测中断的连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25168586/

相关文章:

java - 我想存储从 3 到 100 的素数并将它们存储在数组中而不使用构造函数

java - 短信发送错误

linux - 使用 "dpkg-buildpackage -b"为本地构建禁用 CDBS 许可证检查步骤

c++ - SSL_Read() 返回 SSL_ERROR_ZERO_RETURN 但 ERR_get_error() 为 0

java - Confluence 中的 WikiToXhtmlMigrator

java - NoNodeAvailableException : No node was available to execute the query

regex - Linux 将多个文本文件解析成单独的文件

linux - ionic Cordova 构建

java - 什么是多路复用套接字通信?

java - groovy和java程序之间通过socket进行通信