java - 如何使用 Netty 异步发送多个 http 请求?

标签 java scala asynchronous netty nio

我正在尝试向一台服务器异步发送大量 http posts 请求。我的目标是将每个响应与其原始请求进行比较。

为此,我关注 Netty Snoop example .

但是,这个示例(以及其他 http 示例)没有介绍如何异步发送多个请求,也没有介绍如何将它们随后链接到相应的请求。

所有类似的问题(例如 this onethis onethis one )都实现 SimpleChannelUpstreamHandler 类,该类来自 netty 3,在 4.0 中不再存在( documentation netty 4.0 )

有人知道如何在 netty 4.0 中解决这个问题吗?

编辑:

我的问题是,虽然我向 channel 写入了大量消息,但我只收到非常慢的响应(1 个响应/秒,而希望收到几千个/秒)。为了澄清这一点,让我发布到目前为止我得到的信息。我确信我发送请求的服务器也可以处理大量流量。

到目前为止我得到了什么:

import java.net.URI
import java.nio.charset.StandardCharsets
import java.io.File

import io.netty.bootstrap.Bootstrap
import io.netty.buffer.{Unpooled, ByteBuf}
import io.netty.channel.{ChannelHandlerContext, SimpleChannelInboundHandler, ChannelInitializer}
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.http._
import io.netty.handler.timeout.IdleStateHandler
import io.netty.util.{ReferenceCountUtil, CharsetUtil}
import io.netty.channel.nio.NioEventLoopGroup

import scala.io.Source

object ClientTest {

  val URL = System.getProperty("url", MY_URL)     
  val configuration = new Configuration

  def main(args: Array[String]) {
    println("Starting client")
    start()
  }

  def start(): Unit = {

    val group = new NioEventLoopGroup()

    try {

      val uri: URI = new URI(URL)
      val host: String= {val h = uri.getHost(); if (h != null) h else "127.0.0.1"}
      val port: Int = {val p = uri.getPort; if (p != -1) p else 80}

      val b = new Bootstrap()

      b.group(group)
      .channel(classOf[NioSocketChannel])
      .handler(new HttpClientInitializer())

      val ch = b.connect(host, port).sync().channel()

      val logFolder: File = new File(configuration.LOG_FOLDER)
      val fileToProcess: Array[File] = logFolder.listFiles()

      for (file <- fileToProcess){
        val name: String = file.getName()
        val source = Source.fromFile(configuration.LOG_FOLDER + "/" + name)

        val lineIterator: Iterator[String] = source.getLines()

        while (lineIterator.hasNext) {
            val line = lineIterator.next()
            val jsonString = parseLine(line)
            val request = createRequest(jsonString, uri, host)
            ch.writeAndFlush(request)
        }
        println("closing")
        ch.closeFuture().sync()
      }
    } finally {
      group.shutdownGracefully()
    }
  }

  private def parseLine(line: String) = {
    //do some parsing to get the json string I want
  }

  def createRequest(jsonString: String, uri: URI, host: String): FullHttpRequest = {
    val bytebuf: ByteBuf = Unpooled.copiedBuffer(jsonString, StandardCharsets.UTF_8)

    val request: FullHttpRequest = new DefaultFullHttpRequest(
      HttpVersion.HTTP_1_1, HttpMethod.POST, uri.getRawPath())
    request.headers().set(HttpHeaders.Names.HOST, host)
    request.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE)
    request.headers().set(HttpHeaders.Names.ACCEPT_ENCODING, HttpHeaders.Values.GZIP)
    request.headers().add(HttpHeaders.Names.CONTENT_TYPE, "application/json")

    request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, bytebuf.readableBytes())
    request.content().clear().writeBytes(bytebuf)

    request
  }
}

class HttpClientInitializer() extends ChannelInitializer[SocketChannel] {

  override def initChannel(ch: SocketChannel) = {
  val pipeline = ch.pipeline()

  pipeline.addLast(new HttpClientCodec())

  //aggregates all http messages into one if content is chunked
  pipeline.addLast(new HttpObjectAggregator(1048576))

  pipeline.addLast(new IdleStateHandler(0, 0, 600))

  pipeline.addLast(new HttpClientHandler())
  }
}

class HttpClientHandler extends SimpleChannelInboundHandler[HttpObject] {

  override def channelRead0(ctx: ChannelHandlerContext, msg: HttpObject) {
    try {
      msg match {
        case res: FullHttpResponse =>
          println("response is: " + res.content().toString(CharsetUtil.US_ASCII))
          ReferenceCountUtil.retain(msg)
      }
    } finally {
      ReferenceCountUtil.release(msg)
    }
  }

  override def exceptionCaught(ctx: ChannelHandlerContext, e: Throwable) = {
    println("HttpHandler caught exception", e)
    ctx.close()
  }
}

最佳答案

ChannelFuture cf = channel.writeAndFlush(createRequest());

nor how to link them subsequently to the corresponding requests.

Can netty assign multiple IO threads to the same Channel?

一旦分配给 channel 的工作线程在 channel 的生命周期内就不会改变。所以我们没有从线程中受益。这是因为您保持连接处于 Activity 状态,因此 channel 也保持 Activity 状态。

要解决此问题,您可以考虑使用 channel 池(例如 30 个)。然后使用 channel 池来发出您的请求。

      int concurrent = 30;

  // Start the client.
  ChannelFuture[] channels = new ChannelFuture[concurrent];
  for (int i = 0; i < channels.length; i++) {
    channels[i] = b.connect(host, port).sync();
  }

  for (int i = 0; i < 1000; i++) {
      ChannelFuture requestHandle = process(channels[(i+1)%concurrent]); 
      // do something with the request handle       
  }

  for (int i = 0; i < channels.length; i++) {
    channels[i].channel().closeFuture().sync();
  }

HTH

关于java - 如何使用 Netty 异步发送多个 http 请求?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37117345/

相关文章:

java - 迭代创建 createCriteria。出现错误 org.hibernate.QueryException : duplicate association path

java - 如何增加 Vaadin 11.0.0 网格中的行高?

scala - 结构化scala案例类的自定义json序列化

c# - 当内部调用不是自然异步时,如何创建自然异步方法?

c# - SslStream 等同于 TcpClient.Available?

java - Android XmlPullParser 是如何处理漏洞的?

java - Java 中的 grep 命令等效项

java - 格式化日期时间时如何应用时区?

java - 如何使用 Intellij Idea 远程调试 .jar 文件

c++ - 2个 future 之间的依赖