java - 在netty中池化客户端 channel 的正确方法?

标签 java netty

我在以下代码中收到 java.nio.channels.NotYetConnectedException,因为我正在尝试写入尚未打开的 channel 。

本质上,我拥有的是一个 channel 池,如果 channel 空闲,我会在其中获取一个 channel 进行写入,如果 channel 不可用,我会创建一个新 channel 。我的问题是,当我创建一个新 channel 时,当我调用 connect 时该 channel 尚未准备好写入,并且我不想在返回之前等待连接打开,因为我不想阻塞线程。最好的方法是什么?另外,我检索/返回 channel 的逻辑是否有效?请参阅下面的代码。

我有一个简单的连接池,如下所示:

private static class ChannelPool {
    private final ClientBootstrap cb;
    private Set<Channel> activeChannels = new HashSet<Channel>();
    private Deque<Channel> freeChannels = new ArrayDeque<Channel>();
    public ChannelPool() {
        ChannelFactory clientFactory =
                new NioClientSocketChannelFactory(
                        Executors.newCachedThreadPool(),
                        Executors.newCachedThreadPool());
        cb = new ClientBootstrap(clientFactory);
        cb.setPipelineFactory(new ChannelPipelineFactory() {
            public ChannelPipeline getPipeline() {
                return Channels.pipeline(
                        new HttpRequestEncoder(),
                        new HttpResponseDecoder(),
                        new ResponseHandler());
            }
        });
    }

    private Channel newChannel() {
        ChannelFuture cf;
        synchronized (cb) {
            cf = cb.connect(new InetSocketAddress("localhost", 18080));
        }
        final Channel ret = cf.getChannel();
        ret.getCloseFuture().addListener(new ChannelFutureListener() {

            @Override
            public void operationComplete(ChannelFuture arg0) throws Exception {
                System.out.println("channel closed?");
                synchronized (activeChannels) {
                    activeChannels.remove(ret);
                }
            }
        });
        synchronized (activeChannels) {
            activeChannels.add(ret);
        }
        System.out.println("returning new channel");
        return ret;
    }

    public Channel getFreeChannel() {
        synchronized (freeChannels) {
            while (!freeChannels.isEmpty()) {
                Channel ch = freeChannels.pollFirst();
                if (ch.isOpen()) {
                    return ch;
                }
            }
        }
        return newChannel();
    }

    public void returnChannel(Channel ch) {
        synchronized (freeChannels) {
            freeChannels.addLast(ch);
        }
    }
}

我尝试在处理程序中使用它,如下所示:

private static class RequestHandler extends SimpleChannelHandler {

    @Override
    public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e) {
        final HttpRequest request = (HttpRequest) e.getMessage();
        Channel proxyChannel = pool.getFreeChannel();
        proxyToClient.put(proxyChannel, e.getChannel());
        proxyChannel.write(request);
    }
}

最佳答案

您不必在 bootstrap.connect(..) 之后立即将新 channel 添加到 activeChannels,而是必须向 ChannelFuture 添加监听器> 由 bootstrap.connect(..) 返回,并将 channel 添加到添加的监听器中的 activeChannels 中。这样,getFreeChannel() 将永远不会获取尚未连接的 channel 。

因为即使您调用 newChannel()activeChannels 也可能为空(newChannel() 即使在建立连接之前也会返回),您必须决定在这种情况下该怎么做。如果我是您,我会将 getFreeChannel() 的返回类型从 Channel 更改为 ChannelFuture,以便在空闲 channel 时调用者收到通知已准备就绪。

关于java - 在netty中池化客户端 channel 的正确方法?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15799820/

相关文章:

java - Android 设备上的 GCM 演示

java - YouTube 播放器在选项卡式 Activity 中导致此错误

java - 是否有一种设计模式可以帮助在通用显示器上配置命令?

java - Netty:未使用 DefaultFileRegion 调用 ChannelProgressiveFutureListener

java - Netty编码器未执行

java - HttpObjectAggregator 处的 Netty 4 泄漏异常

java - 每个 UDP 数据报的 Netty 不同管道

Java -Xmx1028m 在 Mac OS X 中不起作用

java - 无法计算两位数

java - Netty大POJO传输错误: TooLongFrameException