我正在使用 Apache HttpClient 4 与 REST 通信API 和大多数时间我做冗长的 PUT 操作。由于这些可能发生在不稳定的互联网连接上,我需要检测连接是否中断并且可能需要重试(使用恢复请求)。
为了在现实世界中尝试我的例程,我开始了 PUT 操作,然后我打开了笔记本电脑的 Wi-Fi 开关,导致所有数据流立即完全中断。然而,最终抛出 SocketException 需要很长时间(可能需要 5 分钟左右)。
我怎样才能加快处理速度?我想将超时设置为 30 秒左右。
更新:
澄清一下,我的请求是 PUT 操作。所以很长一段时间(可能是几个小时)唯一的操作是 write() 操作并且没有读取操作。有一个timeout setting for read() operations , 但我找不到用于写操作的。
我正在使用我自己的实体实现,因此我直接写入一个 OutputStream,一旦 Internet 连接中断,它几乎会立即阻塞。如果 OutputStreams 有一个超时参数,那么我可以编写 out.write(nextChunk, 30000);
我可以自己检测到这样的问题。实际上我试过了:
public class TimeoutHttpEntity extends HttpEntityWrapper {
public TimeoutHttpEntity(HttpEntity wrappedEntity) {
super(wrappedEntity);
}
@Override
public void writeTo(OutputStream outstream) throws IOException {
try(TimeoutOutputStreamWrapper wrapper = new TimeoutOutputStreamWrapper(outstream, 30000)) {
super.writeTo(wrapper);
}
}
}
public class TimeoutOutputStreamWrapper extends OutputStream {
private final OutputStream delegate;
private final long timeout;
private final ExecutorService executorService = Executors.newSingleThreadExecutor();
public TimeoutOutputStreamWrapper(OutputStream delegate, long timeout) {
this.delegate = delegate;
this.timeout = timeout;
}
@Override
public void write(int b) throws IOException {
executeWithTimeout(() -> {
delegate.write(b);
return null;
});
}
@Override
public void write(byte[] b) throws IOException {
executeWithTimeout(() -> {
delegate.write(b);
return null;
});
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
executeWithTimeout(() -> {
delegate.write(b, off, len);
return null;
});
}
@Override
public void close() throws IOException {
try {
executeWithTimeout(() -> {
delegate.close();
return null;
});
} finally {
executorService.shutdown();
}
}
private void executeWithTimeout(final Callable<?> task) throws IOException {
try {
executorService.submit(task).get(timeout, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
throw new IOException(e);
} catch (ExecutionException e) {
final Throwable cause = e.getCause();
if (cause instanceof IOException) {
throw (IOException)cause;
}
throw new Error(cause);
} catch (InterruptedException e) {
throw new Error(e);
}
}
}
public class TimeoutOutputStreamWrapperTest {
private static final byte[] DEMO_ARRAY = new byte[]{1,2,3};
private TimeoutOutputStreamWrapper streamWrapper;
private OutputStream delegateOutput;
public void setUp(long timeout) {
delegateOutput = mock(OutputStream.class);
streamWrapper = new TimeoutOutputStreamWrapper(delegateOutput, timeout);
}
@AfterMethod
public void teardown() throws Exception {
streamWrapper.close();
}
@Test
public void write_writesByte() throws Exception {
// Setup
setUp(Long.MAX_VALUE);
// Execution
streamWrapper.write(DEMO_ARRAY);
// Evaluation
verify(delegateOutput).write(DEMO_ARRAY);
}
@Test(expectedExceptions = DemoIOException.class)
public void write_passesThruException() throws Exception {
// Setup
setUp(Long.MAX_VALUE);
doThrow(DemoIOException.class).when(delegateOutput).write(DEMO_ARRAY);
// Execution
streamWrapper.write(DEMO_ARRAY);
// Evaluation performed by expected exception
}
@Test(expectedExceptions = IOException.class)
public void write_throwsIOException_onTimeout() throws Exception {
// Setup
final CountDownLatch executionDone = new CountDownLatch(1);
setUp(100);
doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
executionDone.await();
return null;
}
}).when(delegateOutput).write(DEMO_ARRAY);
// Execution
try {
streamWrapper.write(DEMO_ARRAY);
} finally {
executionDone.countDown();
}
// Evaluation performed by expected exception
}
public static class DemoIOException extends IOException {
}
}
这有点复杂,但在我的单元测试中效果很好。它在现实生活中也有效,除了 HttpRequestExecutor
在第 127 行捕获异常并尝试关闭连接。然而,当尝试关闭连接时,它首先尝试刷新再次阻塞的连接。
我或许能够深入挖掘 HttpClient 并弄清楚如何防止这种刷新操作,但这已经不是一个很好的解决方案,而且它只会变得更糟。
更新:
这在 Java 级别上似乎无法完成。我可以在另一个层面上做吗? (我正在使用 Linux)。
最佳答案
Java 阻塞 I/O 不支持写入操作的套接字超时。您完全受 OS/JRE 的摆布,无法解锁被写操作阻塞的线程。此外,这种行为往往是特定于操作系统/JRE 的。
这可能是考虑使用基于非阻塞 I/O (NIO) 的 HTTP 客户端的合理案例,例如 Apache HttpAsyncClient .
关于java - 在使用 Apache 'HttpClient' 的 PUT 操作期间更快地检测中断的连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/25168586/