我的 Web 服务器(基于 Netty 定制构建)使用 Web 客户端(也是使用 Netty 定制构建)向 S3 发出代理请求。
Client -> Webserver|Webclient -> S3
该系统的目的是通过一些逻辑将文件上传直接通过管道传输到 S3:
Webserver
接受客户端请求 (POST);- 将
Client
channel 的可读性设置为 false 并验证一堆东西; - 当一切验证成功后,它使用
Webclient
连接到S3
; - 当
Webclient
连接到S3
时:- 它向客户端发送回 100-Continue
- 它将
Client
channel 可读性设置为true
- 从那时起,
Webserver
收到的所有 block 都交给Webclient
转发。
在(极不可能)Client
和 Webserver
之间的连接比 Webclient
和 S3 之间的连接更快的情况下
,我需要限制 Client
和 Webserver
之间的连接。
我采用的方法是简单地保留一个由 Webserver
接收的字节计数器(每次 Client
发送数据时它都会递增)并且每次写入时递减Webclient
完成。每当此缓冲区上的数据量超过给定阈值时,Client
的 channel 可读性将设置为 false
。
在我将 OrderedMemoryAwareThreadPoolExecutor
添加到服务器的管道之前,这一直很好用。
一个简单的解决方案是在Webclient
上使用OioClientSocketChannelFactory
。这会导致对 Channel.write
的调用被阻塞,因此当 messageReceived()
在 Webserver
的处理程序上被调用时——因此Channel.write
在 Webclient
上调用——节流“自然”发生。
但是,如果我在 Webclient
上使用 NioClientSocketChannelFactory
,那么对 Channel.write
的调用将变为异步,并且节流停止工作。
基本上,我在这里注意到的是,当 OrderedMemoryAwareThreadPoolExecutor
插入管道时,Channel.setReadability(false)
似乎没有任何效果。
如何在管道中使用 OMATPE 执行节流?
最佳答案
1) OrderedMemoryAwareThreadPoolExecutor 还监视 channel 内存(在您的情况下接收到的数据大小)并在它高于/低于配置的最大大小时暂停/启用读取(通过 OrderedMemoryAwareThreadPoolExecutor 构造函数)。
2) 当它与 ExecutionHandler 一起使用时,如果在上下文中发现某些附件,处理程序可能会丢弃 channel 状态事件(但该上下文附件通常由 OrderedMemoryAwareThreadPoolExecutor 设置,以不允许上游处理程序更改 channel 状态和导致 OutofMemoryException )。
boolean readSuspended = ctx.getAttachment() != null;
if (readSuspended) {
// Drop the request silently if MemoryAwareThreadPool has
// set the flag.
e.getFuture().setSuccess();
return;
}
我认为,您必须配置 OMATPE 的最小、最大 channel 内存大小,或者您可能有上下文附件导致这种情况?
关于java - 使用 OMATPE 进行带宽限制,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8375533/