举个简单的例子,假设我想在 netty 中仅使用 2 个工作线程来处理 3 个并发 TCP 客户端连接,我该怎么做?
问题 A) 使用下面的代码,我的第三个连接不会从服务器获取任何数据 - 连接就在那里。注意 - 我的工作执行者和工作人员计数是 2。 因此,如果我有 2 个工作线程和 3 个连接,那么所有三个连接是否都应该由 2 个线程提供服务?
B) 另一个问题是-netty是否使用java.util.concurrent的CompletionService?好像没有用它。另外,我没有看到任何执行 executor.submit 或 future.get 的源代码 那么,所有这一切都增加了它如何处理数据并向比其工作线程更多的连接提供数据的困惑?
C) 我不知道 netty 如何处理 10000+ 同时 TCP 连接......它会创建 10000 个线程吗?每个连接的线程不是一个可扩展的解决方案,所以我很困惑,因为我的测试代码无法按预期工作。
import java.net.InetSocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Date;
import java.util.concurrent.Executors;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.string.StringEncoder;
public class SRNGServer {
public static void main(String[] args) throws Exception {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
//Executors.newCachedThreadPool()
Executors.newFixedThreadPool(2),2
));
// Configure the pipeline factory.
bootstrap.setPipelineFactory(new SRNGServerPipelineFactoryP());
// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(8080));
}
private static class SRNGServerHandlerP extends SimpleChannelUpstreamHandler {
private static final Logger logger = Logger.getLogger(SRNGServerHandlerP.class.getName());
@Override
public void channelConnected(
ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
// Send greeting for a new connection.
Channel ch=e.getChannel();
System.out.printf("channelConnected with channel=[%s]%n", ch);
ChannelFuture writeFuture=e.getChannel().write("It is " + new Date() + " now.\r\n");
SRNGChannelFutureListener srngcfl=new SRNGChannelFutureListener();
System.out.printf("Registered listener=[%s] for future=[%s]%n", srngcfl, writeFuture);
writeFuture.addListener(srngcfl);
}
@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.getCause());
if(e.getCause() instanceof ClosedChannelException){
logger.log(Level.INFO, "****** Connection closed by client - Closing Channel");
}
e.getChannel().close();
}
}
private static class SRNGServerPipelineFactoryP implements ChannelPipelineFactory {
public ChannelPipeline getPipeline() throws Exception {
// Create a default pipeline implementation.
ChannelPipeline pipeline = Channels.pipeline();
pipeline.addLast("encoder", new StringEncoder());
pipeline.addLast("handler", new SRNGServerHandlerP());
return pipeline;
}
}
private static class SRNGChannelFutureListener implements ChannelFutureListener{
public void operationComplete(ChannelFuture future) throws InterruptedException{
Thread.sleep(1000*5);
Channel ch=future.getChannel();
if(ch!=null && ch.isConnected()){
ChannelFuture writeFuture=ch.write("It is " + new Date() + " now.\r\n");
//-- Add this instance as listener itself.
writeFuture.addListener(this);
}
}
}
}
最佳答案
我没有详分割析你的源代码,所以我不知道为什么它不能正常工作。但 SRNGChannelFutureListener
中的这一行看起来很可疑:
Thread.sleep(1000*5);
这将使执行它的线程被锁定5秒;在此期间,线程将无法执行任何其他处理。
问题C:不,它不会创建10,000个线程; Netty 的重点在于它不会这样做,因为那确实无法很好地扩展。相反,它使用线程池中有限数量的线程,每当发生事件时生成事件,并在池中的线程上运行事件处理程序。因此,线程和连接是相互解耦的(每个连接没有一个线程)。
为了使此机制正常工作,您的事件处理程序应尽快返回,以使它们运行的线程可用于尽快运行下一个事件处理程序。如果您让线程 hibernate 5 秒,那么您将保持该线程的分配状态,因此它将无法用于处理其他事件。
问题 B:如果你真的想知道,你可以获取 Netty 的源代码并找出答案。它使用选择器和其他 java.nio 类来执行 asynchronous I/O .
关于java - Jboss Netty - 如何使用 3 个工作线程服务 2 个连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/6796717/