java - Netty 分块输入流

标签 java multithreading stream netty inbound

我看到很多关于 netty 中的分块流的问题,但其中大部分是关于出站流的解决方案,而不是入站流。

我想了解如何从 channel 获取数据并将其作为 InputStream 发送到我的业务逻辑,而无需先将所有数据加载到内存中。 这是我尝试做的事情:

public class ServerRequestHandler extends MessageToMessageDecoder<HttpObject> {

  private HttpServletRequest request;
  private PipedOutputStream os;
  private PipedInputStream is;

  @Override
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    super.handlerAdded(ctx);
    this.os = new PipedOutputStream();
    this.is = new PipedInputStream(os);
  }

  @Override
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    super.handlerRemoved(ctx);
    this.os.close();
    this.is.close();
  }

  @Override
  protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
      throws Exception {
    if (msg instanceof HttpRequest) {
      this.request = new CustomHttpRequest((HttpRequest) msg, this.is);
      out.add(this.request);
    }
    if (msg instanceof HttpContent) {
      ByteBuf body = ((HttpContent) msg).content();

      if (body.readableBytes() > 0)
        body.readBytes(os, body.readableBytes());

      if (msg instanceof LastHttpContent) {
        os.close();
      }
    }

  }

}

然后我有另一个处理程序,它将获取我的 CustomHttpRequest 并发送到我称之为 ServiceHandler 的地方,我的业务逻辑将从 InputStream 中读取。

public class ServiceRouterHandler extends SimpleChannelInboundHandler<CustomHttpRequest> {
...
    @Override
    public void channelRead0(ChannelHandlerContext ctx, CustomHttpRequest request) throws IOException {
...
        future = serviceHandler.handle(request, response);
...

这是行不通的,因为当我的处理程序将 CustomHttpRequest 转发到 ServiceHandler 并尝试从 InputStream 读取时,线程处于阻塞状态,并且 HttpContent 从未在我的解码器中处理。

我知道我可以尝试为我的业务逻辑创建一个单独的线程,但我觉得我在这里让事情过于复杂了。
我查看了 ByteBufInputStream,但它是这样说的

Please note that it only reads up to the number of readable bytes determined at the moment of construction.

所以我认为它不适用于 Chunked Http 请求。另外,我看到了 ChunkedWriteHandler,这对于 Oubound block 来说似乎很好,但我找不到像 ChunkedReadHandler 这样的东西......

所以我的问题是:最好的方法是什么?我的要求是:

- 在发送 ServiceHandlers 之前不要将数据保存在内存中;
- ServiceHandlers API 应该与 netty 无关(这就是为什么我使用我的 CustomHttpRequest,而不是 Netty 的 HttpRequest);

更新 我已经在 CustomHttpRequest 上使用更具 react 性的方法来实现它。现在,请求不向 ServiceHandlers 提供 InputStream 以便它们可以读取(这是阻塞的),而是 CustomHttpRequest 现在有一个返回 Future 的 readInto(OutputStream) 方法,所有的服务处理程序将在这个输出流被填满时执行。这是它的样子

public class CustomHttpRequest {
  ...constructors and other methods hidden...
  private final SettableFuture<Void> writeCompleteFuture = SettableFuture.create();

  private final SettableFuture<OutputStream> outputStreamFuture = SettableFuture.create();

  private ListenableFuture<Void> lastWriteFuture = Futures.transform(outputStreamFuture, x-> null);

  public ListenableFuture<Void> readInto(OutputStream os) throws IOException {
    outputStreamFuture.set(os);
    return this.writeCompleteFuture;
  }

  ListenableFuture<Void> writeChunk(byte[] buf) {
    this.lastWriteFuture = Futures.transform(lastWriteFuture, (AsyncFunction<Void, Void>) (os) -> {
      outputStreamFuture.get().write(buf);
      return Futures.immediateFuture(null);
    });
    return lastWriteFuture;
  }


  void complete() {
    ListenableFuture<Void> future =
        Futures.transform(lastWriteFuture, (AsyncFunction<Void, Void>) x -> {
          outputStreamFuture.get().close();
          return Futures.immediateFuture(null);
        });
    addFinallyCallback(future, () -> {
      this.writeCompleteFuture.set(null);
    });

  }
}

我更新后的 ServletRequestHandler 看起来像这样:

public class ServerRequestHandler extends MessageToMessageDecoder<HttpObject> {

  private NettyHttpServletRequestAdaptor request;

  @Override
  public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    super.handlerAdded(ctx);
  }

  @Override
  public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
    super.handlerRemoved(ctx);
  }


  @Override
  protected void decode(ChannelHandlerContext ctx, HttpObject msg, List<Object> out)
      throws Exception {
    if (msg instanceof HttpRequest) {
      HttpRequest request = (HttpRequest) msg;

      this.request = new CustomHttpRequest(request, ctx.channel());

      out.add(this.request);
    }
    if (msg instanceof HttpContent) {
      ByteBuf buf = ((HttpContent) msg).content();
      byte[] bytes = new byte[buf.readableBytes()];
      buf.readBytes(bytes);

      this.request.writeChunk(bytes);

      if (msg instanceof LastHttpContent) {
        this.request.complete();
      }
    }
  }
}

这工作得很好,但仍然请注意,这里的所有内容都是在单个线程中完成的,也许对于大数据,我可能想要生成一个新线程来为其他 channel 释放该线程。

最佳答案

您走在正确的轨道上 - 如果您的 serviceHandler.handle(request, response); 调用正在进行阻塞读取,您需要为其创建一个新线程。请记住,应该只有少量的 Netty 工作线程,因此您不应该在工作线程中进行任何阻塞调用。

另一个要问的问题是,您的服务处理程序需要阻塞吗?它有什么作用?如果它无论如何都在通过网络铲除数据,你能以非阻塞的方式将它合并到 Netty 管道中吗?这样,一切都是异步的,不需要阻塞调用和额外线程。

关于java - Netty 分块输入流,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48668959/

相关文章:

java - 编译 Android App 时出现错误

Java JDBC 选择记录

c - 在 OpenMP 中启动多少个线程?

java - 为什么在 Windows 上运行 jstack 会出现 'unable to attach to 64 bit process' 错误?

Android - 从 Azure 媒体服务播放自适应比特率视频

android - libstreaming - 如何旋转流媒体视频?

java - 从 FXML 文件中分离文本

java - 在二维数组上打印特定值 - Java

java - CopyOnWriteArrayList 可以与可变对象一起使用吗?

php - 如何在将无限文本流写入文件之前对其进行预处理?