我正在尝试向一台服务器异步发送大量 http posts 请求。我的目标是将每个响应与其原始请求进行比较。
为此,我关注 Netty Snoop example .
但是,这个示例(以及其他 http 示例)没有介绍如何异步发送多个请求,也没有介绍如何将它们随后链接到相应的请求。
所有类似的问题(例如 this one 、 this one 或 this one )都实现 SimpleChannelUpstreamHandler 类,该类来自 netty 3,在 4.0 中不再存在( documentation netty 4.0 )
有人知道如何在 netty 4.0 中解决这个问题吗?
编辑:
我的问题是,虽然我向 channel 写入了大量消息,但我只收到非常慢的响应(1 个响应/秒,而希望收到几千个/秒)。为了澄清这一点,让我发布到目前为止我得到的信息。我确信我发送请求的服务器也可以处理大量流量。
到目前为止我得到了什么:
import java.net.URI
import java.nio.charset.StandardCharsets
import java.io.File
import io.netty.bootstrap.Bootstrap
import io.netty.buffer.{Unpooled, ByteBuf}
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler, ChannelInitializer}
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.http._
import io.netty.handler.timeout.IdleStateHandler
import io.netty.util.{ReferenceCountUtil, CharsetUtil}
import io.netty.channel.nio.NioEventLoopGroup
import scala.io.Source
object ClientTest {
val URL = System.getProperty("url", MY_URL)
val configuration = new Configuration
def main(args: Array[String]) {
println("Starting client")
start()
}
def start(): Unit = {
val group = new NioEventLoopGroup()
try {
val uri: URI = new URI(URL)
val host: String= {val h = uri.getHost(); if (h != null) h else "127.0.0.1"}
val port: Int = {val p = uri.getPort; if (p != -1) p else 80}
val b = new Bootstrap()
b.group(group)
.channel(classOf[NioSocketChannel])
.handler(new HttpClientInitializer())
val ch = b.connect(host, port).sync().channel()
val logFolder: File = new File(configuration.LOG_FOLDER)
val fileToProcess: Array[File] = logFolder.listFiles()
for (file <- fileToProcess){
val name: String = file.getName()
val source = Source.fromFile(configuration.LOG_FOLDER + "/" + name)
val lineIterator: Iterator[String] = source.getLines()
while (lineIterator.hasNext) {
val line = lineIterator.next()
val jsonString = parseLine(line)
val request = createRequest(jsonString, uri, host)
ch.writeAndFlush(request)
}
println("closing")
ch.closeFuture().sync()
}
} finally {
group.shutdownGracefully()
}
}
private def parseLine(line: String) = {
//do some parsing to get the json string I want
}
def createRequest(jsonString: String, uri: URI, host: String): FullHttpRequest = {
val bytebuf: ByteBuf = Unpooled.copiedBuffer(jsonString, StandardCharsets.UTF_8)
val request: FullHttpRequest = new DefaultFullHttpRequest(
HttpVersion.HTTP_1_1, HttpMethod.POST, uri.getRawPath())
request.headers().set(HttpHeaders.Names.HOST, host)
request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE)
request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP)
request.headers().add(HttpHeaders.Names.CONTENT_TYPE, "application/json")
request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, bytebuf.readableBytes())
request.content().clear().writeBytes(bytebuf)
request
}
}
class HttpClientInitializer() extends ChannelInitializer[SocketChannel] {
override def initChannel(ch: SocketChannel) = {
val pipeline = ch.pipeline()
pipeline.addLast(new HttpClientCodec())
//aggregates all http messages into one if content is chunked
pipeline.addLast(new HttpObjectAggregator(1048576))
pipeline.addLast(new IdleStateHandler(0, 0, 600))
pipeline.addLast(new HttpClientHandler())
}
}
class HttpClientHandler extends SimpleChannelInboundHandler[HttpObject] {
override def channelRead0(ctx: ChannelHandlerContext, msg: HttpObject) {
try {
msg match {
case res: FullHttpResponse =>
println("response is: " + res.content().toString(CharsetUtil.US_ASCII))
ReferenceCountUtil.retain(msg)
}
} finally {
ReferenceCountUtil.release(msg)
}
}
override def exceptionCaught(ctx: ChannelHandlerContext, e: Throwable) = {
println("HttpHandler caught exception", e)
ctx.close()
}
}
最佳答案
ChannelFuture cf = channel.writeAndFlush(createRequest());
nor how to link them subsequently to the corresponding requests.
Can netty assign multiple IO threads to the same Channel?
一旦分配给 channel 的工作线程在 channel 的生命周期内就不会改变。所以我们没有从线程中受益。这是因为您保持连接处于 Activity 状态,因此 channel 也保持 Activity 状态。
要解决此问题,您可以考虑使用 channel 池(例如 30 个)。然后使用 channel 池来发出您的请求。
int concurrent = 30;
// Start the client.
ChannelFuture[] channels = new ChannelFuture[concurrent];
for (int i = 0; i < channels.length; i++) {
channels[i] = b.connect(host, port).sync();
}
for (int i = 0; i < 1000; i++) {
ChannelFuture requestHandle = process(channels[(i+1)%concurrent]);
// do something with the request handle
}
for (int i = 0; i < channels.length; i++) {
channels[i].channel().closeFuture().sync();
}
HTH
关于java - 如何使用 Netty 异步发送多个 http 请求?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37117345/