当我尝试在 Netty 中重用客户端连接时,我收到 java.io.IOException: Connection Reset by Peer
(如果我发送一个请求,则不会发生这种情况,但每次如果我发送两个请求,即使是从单个线程发送)。我当前的方法涉及以下实现一个简单的 ChannelPool ,其代码如下。请注意,key 方法从 freeChannels
成员获取一个空闲 channel ,或者在没有可用 channel 的情况下创建一个新 channel 。 returnChannel()
方法是负责在完成请求后释放 channel 的方法。在我们处理响应之后,它会在管道内部被调用(请参阅下面代码中的 ResponseHandler
的 messageReceived()
方法)。有谁知道我做错了什么,以及为什么我会得到异常(exception)?
channel 池代码(请注意使用 freeChannels.pollFirst()
来获取通过调用 returnChannel()
返回的空闲 channel ):
public class ChannelPool {
private final ClientBootstrap cb;
private Deque<Channel> freeChannels = new ArrayDeque<Channel>();
private static Map<Channel, Channel> proxyToClient = new ConcurrentHashMap<Channel, Channel>();
public ChannelPool(InetSocketAddress address, ChannelPipelineFactory pipelineFactory) {
ChannelFactory clientFactory =
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
cb = new ClientBootstrap(clientFactory);
cb.setPipelineFactory(pipelineFactory);
}
private void writeToNewChannel(final Object writable, Channel clientChannel) {
ChannelFuture cf;
synchronized (cb) {
cf = cb.connect(new InetSocketAddress("localhost", 18080));
}
final Channel ch = cf.getChannel();
proxyToClient.put(ch, clientChannel);
cf.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture arg0) throws Exception {
System.out.println("channel open, writing: " + ch);
ch.write(writable);
}
});
}
public void executeWrite(Object writable, Channel clientChannel) {
synchronized (freeChannels) {
while (!freeChannels.isEmpty()) {
Channel ch = freeChannels.pollFirst();
System.out.println("trying to reuse channel: " + ch + " " + ch.isOpen());
if (ch.isOpen()) {
proxyToClient.put(ch, clientChannel);
ch.write(writable).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture cf) throws Exception {
System.out.println("write from reused channel complete, success? " + cf.isSuccess());
}
});
// EDIT: I needed a return here
}
}
}
writeToNewChannel(writable, clientChannel);
}
public void returnChannel(Channel ch) {
synchronized (freeChannels) {
freeChannels.addLast(ch);
}
}
public Channel getClientChannel(Channel proxyChannel) {
return proxyToClient.get(proxyChannel);
}
}
Netty 管道代码(请注意,RequestHandler
调用使用新 channel 或旧 channel 的 executeWrite()
,并且 ResponseHandler
调用 returnChannel()
收到响应后,将内容设置在对客户端的响应中):
public class NettyExample {
private static ChannelPool pool;
public static void main(String[] args) throws Exception {
pool = new ChannelPool(
new InetSocketAddress("localhost", 18080),
new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
return Channels.pipeline(
new HttpRequestEncoder(),
new HttpResponseDecoder(),
new ResponseHandler());
}
});
ChannelFactory factory =
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
ServerBootstrap sb = new ServerBootstrap(factory);
sb.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
return Channels.pipeline(
new HttpRequestDecoder(),
new HttpResponseEncoder(),
new RequestHandler());
}
});
sb.setOption("child.tcpNoDelay", true);
sb.setOption("child.keepAlive", true);
sb.bind(new InetSocketAddress(2080));
}
private static class ResponseHandler extends SimpleChannelHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
final HttpResponse proxyResponse = (HttpResponse) e.getMessage();
final Channel proxyChannel = e.getChannel();
Channel clientChannel = pool.getClientChannel(proxyChannel);
HttpResponse clientResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
clientResponse.setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/html; charset=UTF-8");
HttpHeaders.setContentLength(clientResponse, proxyResponse.getContent().readableBytes());
clientResponse.setContent(proxyResponse.getContent());
pool.returnChannel(proxyChannel);
clientChannel.write(clientResponse);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
e.getCause().printStackTrace();
Channel ch = e.getChannel();
ch.close();
}
}
private static class RequestHandler extends SimpleChannelHandler {
@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
final HttpRequest request = (HttpRequest) e.getMessage();
pool.executeWrite(request, e.getChannel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
e.getCause().printStackTrace();
Channel ch = e.getChannel();
ch.close();
}
}
}
编辑:为了提供更多详细信息,我编写了代理连接上发生的情况的跟踪。请注意,以下涉及同步 apache commons 客户端执行的两个串行请求。第一个请求使用新 channel 并正常完成,第二个请求尝试重用相同的 channel ,该 channel 是打开的且可写的,但莫名其妙地失败了(除了注意到从工作线程)。显然,重试后第二个请求成功完成。两个请求完成后很多秒,两个连接最终关闭(即,即使连接被对等方关闭,这也不会反射(reflect)在我拦截的任何事件中):
channel open: [id: 0x6e6fbedf]
channel connect requested: [id: 0x6e6fbedf]
channel open, writing: [id: 0x6e6fbedf, /127.0.0.1:47031 => localhost/127.0.0.1:18080]
channel connected: [id: 0x6e6fbedf, /127.0.0.1:47031 => localhost/127.0.0.1:18080]
trying to reuse channel: [id: 0x6e6fbedf, /127.0.0.1:47031 => localhost/127.0.0.1:18080] true
channel open: [id: 0x3999abd1]
channel connect requested: [id: 0x3999abd1]
channel open, writing: [id: 0x3999abd1, /127.0.0.1:47032 => localhost/127.0.0.1:18080]
channel connected: [id: 0x3999abd1, /127.0.0.1:47032 => localhost/127.0.0.1:18080]
java.io.IOException: Connection reset by peer
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:218)
at sun.nio.ch.IOUtil.read(IOUtil.java:186)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:359)
at org.jboss.netty.channel.socket.nio.NioWorker.read(NioWorker.java:63)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.processSelectedKeys(AbstractNioWorker.java:373)
at org.jboss.netty.channel.socket.nio.AbstractNioWorker.run(AbstractNioWorker.java:247)
at org.jboss.netty.channel.socket.nio.NioWorker.run(NioWorker.java:35)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)
最佳答案
终于明白了。有两个问题导致连接重置。首先,我没有从向代理发送请求的 apache commons HttpClient
调用 releaseConnection()
(请参阅 follow up question )。其次,在重用连接的情况下,executeWrite 两次向代理服务器发出相同的调用。我需要在第一次写入后返回,而不是继续 while 循环。这个双重代理调用的结果是我向原始客户端发出重复的响应,破坏了与客户端的连接。
关于java - 为了在 Netty 中重用客户端连接(获取 "Connection reset by peer"),我需要监听哪些事件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15818767/