netty - 在同一台开发机器上运行多个 Netty 客户端和一个服务器

标签 netty

我正在编写一个应用程序,其中客户端和服务器都是使用 Netty 编写的,并且服务器应该(显然)同时支持多个客户端。我试图通过创建 1000 个客户端共享一个 EventLoopGroup 并在一台机器上运行所有内容来测试它。

最初,我有多个客户端有时会因超时而无法连接。在客户端增加 SO_TIMEOUT_MILLIS 并将服务器上的 SO_BACKLOG 设置为 numberOfClients 解决了这个问题。但是,我仍然得到 connection reset by peer

io.netty.channel.AbstractChannel$AnnotatedConnectException: syscall:getsockopt(..) failed: Connection refused: localhost/127.0.0.1:8080
    at io.netty.channel.unix.Socket.finishConnect(..)(Unknown Source)
Caused by: io.netty.channel.unix.Errors$NativeConnectException: syscall:getsockopt(..) failed: Connection refused
    ... 1 more

有时在客户端(尤其是当我增加客户端数量时)。服务器端 LoggingHandler 的输出似乎没有显示任何尝试从这些 channel 绑定(bind)到客户端的端口进行连接。尝试使用 Nio* 而不是 Epoll* 类型也无济于事。

是否需要设置其他选项以允许更多连接(可能在服务器端,如果它真的是拒绝/重置连接的那个)?

为了简化情况,我删除了自己的逻辑,因此客户端只需通过 websocket 连接并在握手成功后关闭 channel 。 据我了解,Netty 在处理 10000 个并发的 websocket 连接时通常不会有问题,这些连接不会做太多事情。

ulimit -n 是 1000000,ulimit -u 是 772794,所以两者都应该不是问题。

这是代码(在 Kotlin 中,但 Java 翻译应该很清楚):

package netty

import io.netty.bootstrap.Bootstrap
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.*
import io.netty.handler.codec.http.HttpClientCodec
import io.netty.handler.codec.http.HttpObjectAggregator
import io.netty.handler.codec.http.HttpServerCodec
import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory
import io.netty.handler.codec.http.websocketx.WebSocketClientProtocolHandler
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler
import io.netty.handler.codec.http.websocketx.WebSocketVersion
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketClientCompressionHandler
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler
import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler
import org.junit.Test
import java.net.URI

@Suppress("OverridingDeprecatedMember")
class NettyTest {
    private fun channelInitializer(f: (Channel) -> Unit) = object : ChannelInitializer<Channel>() {
        override fun initChannel(ch: Channel) {
            f(ch)
        }
    }

    private val numberOfClients = 10000
    private val maxHttpContentLength = 65536

    @Test
    fun manyClients() {
        // set up server
        val bossLoopGroup = EpollEventLoopGroup(1)
        val workerLoopGroup = EpollEventLoopGroup()
        val serverChannelFactory = ChannelFactory { EpollServerSocketChannel() }
        val clientLoopGroup = EpollEventLoopGroup()
        val clientChannelFactory = ChannelFactory { EpollSocketChannel() }
        val serverChannel = ServerBootstrap().channelFactory(serverChannelFactory).group(bossLoopGroup, workerLoopGroup).handler(LoggingHandler(LogLevel.DEBUG)).childHandler(channelInitializer {
            it.pipeline().addLast(
                    HttpServerCodec(),
                    HttpObjectAggregator(maxHttpContentLength),
                    WebSocketServerCompressionHandler(),
                    WebSocketServerProtocolHandler("/", null, true, maxHttpContentLength)/*,
                    myServerHandler*/
            )
        }).option(ChannelOption.SO_BACKLOG, numberOfClients).bind("localhost", 8080).sync().channel()
        println("Server started")

        try {
            // set up clients    
            val url = URI("ws://localhost")
            val futures = List(numberOfClients) { clientIndex ->
                val handshaker = WebSocketClientHandshakerFactory.newHandshaker(url, WebSocketVersion.V13, null, true, null)
                val promise = clientLoopGroup.next().newPromise<Channel>()

                val connectFuture = Bootstrap().channelFactory(clientChannelFactory).group(clientLoopGroup).handler(channelInitializer {
                    it.pipeline().addLast(
                            HttpClientCodec(),
                            HttpObjectAggregator(maxHttpContentLength),
                            WebSocketClientCompressionHandler.INSTANCE,
                            WebSocketClientProtocolHandler(handshaker, true),
                            object : ChannelInboundHandlerAdapter() {
                                override fun userEventTriggered(ctx: ChannelHandlerContext, evt: Any) {
                                    if (evt == WebSocketClientProtocolHandler.ClientHandshakeStateEvent.HANDSHAKE_COMPLETE) {
                                        promise.setSuccess(ctx.channel())
                                        println("Client $clientIndex handshake successful")
                                    }
                                }

                                override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
                                    promise.setFailure(cause)
                                }
                            })
                }).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 120000).connect("localhost", 8080)
                Pair(promise, connectFuture)
            }
            for ((promise, connectFuture) in futures) {
                connectFuture.sync()
                try {
                    promise.sync()
                } finally { connectFuture.channel().close().sync() }
            }
        } finally {
            try { serverChannel.close().sync() } finally {
                workerLoopGroup.shutdownGracefully()
                bossLoopGroup.shutdownGracefully()
                clientLoopGroup.shutdownGracefully()
            }
        }
    }
}

最佳答案

只有 1 个线程用于接受传入连接:bossLoopGroup = EpollEventLoopGroup(1)。也许这不足以接受客户端连接群。

我建议共享一个 EventLoopGroup 作为 boss、worker 和 client,使用默认的线程数(Netty 会考虑核心数)。因此您不会有未充分使用/过度使用的线程池。

如果您想使用不同的线程池运行您的测试,请创建具有明确大小的线程池,并为您的 bossLoopGroup 使用 1 个以上的线程。

关于netty - 在同一台开发机器上运行多个 Netty 客户端和一个服务器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47788698/

相关文章:

java - Netty UDP 服务器可以有多个 eventloop 线程吗?

java - 一个端口用于读取,一个端口用于写入是套接字应用程序的好主意吗?

java - Netty 和 MongoDB 异步回调不能一起工作

java - TLS Android 的问题 - Netty

c# - 使用 TLS 1.2 将客户端连接到 TCP 服务器

java - Java 客户端尝试连接到 Elastic Search 时出错

java - 找不到 TLS ALPN 提供商;没有可用的 netty-tcnative、Conscrypt 或 Jetty NPN/ALPN

client-server - 网络本地主机

java - 使用 WriteTimeoutHandler 在 Netty 中实现保活消息

websocket - Netty websocket服务器高可用性