首先,我将解释一下我正在尝试实现的情况和逻辑:
我有多个线程,每个线程将其工作结果,一些名为
Result
的对象放入队列QueueToSend
我的
NettyClient
在线程中运行,每1毫秒从QueueToSend
获取Result
,并且应该连接到服务器并发送消息,这是从Result
创建的。 我还需要这个连接是异步的。因此,我需要 NettyHandler 知道Result
列表,以发送正确的消息并处理正确的结果,然后再次发送响应。
所以我初始化 NettyClient
bootstrap
bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
并在应用程序启动时设置管道一次。
然后,每毫秒我都会从 QueueToSend
获取 Result
对象并连接到服务器
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host,port);
ResultConcurrentHashMap.put(future.getChannel().getId(), result);
我决定使用静态ConcurrentHashMap
来保存从与 channel 关联的QueueToSend
获取的每个结果对象。
当我尝试从 ResultConcurrentHashMap
获取与 channel 关联的 Result
对象时,第一个问题发生在方法 ChannelConnected 中的 NettyHandler
中。
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
Channel channel = ctx.getPipeline.getChannel();
Result result = ResultConcurrentHashMap.get(channel.getId());
}
但有时 result
为 null(50 中的 1),即使它应该位于 ResultConcurrentHashMap
中。我认为发生这种情况是因为 channelConnected
事件在 NettyClient
运行此代码之前发生:
ResultConcurrentHashMap.put(future.getChannel().getId(), result);
如果我不在本地主机上运行 NettyServer
和 NettyClient
,它可能不会出现,但在远程,建立连接将需要更多时间。但我需要解决这个问题。
另一个问题是我每 1 毫秒异步发送一次消息,我认为消息可能是混合的,服务器无法正确读取它们。如果我一一运行它们就可以了:
future.getChannel().getCloseFuture().awaitUninterruptibly();
但我需要异步发送,并处理正确的结果,与 channel 相关联并发送响应。 我应该实现什么?
最佳答案
ChannelFutures 在事件触发之前异步执行。例如, channel 连接 future 将在触发 channel 连接事件之前完成。
因此,您必须在调用 bootstrap.connect() 后注册一个 channel future 监听器,并在监听器中编写代码来初始化 HashMap,然后它将对处理程序可见。
ChannelFuture channelFuture = bootstrap.connect(remoteAddress, localAddress);
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
resultConcurrentHashMap.put(future.getChannel().getId(), result);
}
});
关于java - Netty客户端多个请求,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9403197/