java - Tcp 服务器 WritePendingException 尽管线程锁

标签 java multithreading sockets networking tcp

我用 nio 编写了一个简单的异步 tcp 服务器。 服务器应该能够同时为每个客户端读取和写入。 我用一个简单的数据包队列实现了这一点。

public class TcpJobHandler {

private BlockingQueue<TcpJob> _packetQueue = new LinkedBlockingQueue<TcpJob>();
private Thread _jobThread;
private final ReentrantLock _lock = new ReentrantLock();

public TcpJobHandler(){
    _jobThread = new Thread(new Runnable() {
        @Override
        public void run() {
            jobLoop();
        }       
    });

    _jobThread.start();
}

private void jobLoop(){
    while(true){
        try {
            _lock.lock();
            TcpJob job = _packetQueue.take();
            if(job == null){
                continue;
            }
            job.execute();
        } catch (Exception e) {
            AppLogger.error("Failed to dequeue packet from job queue.", e);
        }finally{
            _lock.unlock();
        }
    }
}

public void insertJob(TcpJob job){
    try{
        _packetQueue.put(job);
    }catch(InterruptedException e){
        AppLogger.error("Failed to queue packet to the tcp job queue.", e);
    }
}
}

这段代码的作用只是检查新数据包。如果有新的数据包可用,该数据包将被发送到客户端。 在类 tcp 作业中,只有要发送的数据包和将数据包写入客户端流的写入类。 正如您所看到的,只有一个线程应该能够将数据包写入客户端流。

这就是重点,为什么我不明白,为什么我会收到这个错误?如果我是对的,这个异常表明,我尝试将数据发送到流中,但已经有一个线程正在将数据写入该流中。但为什么?

//编辑: 我收到此异常:

19:18:41.468 [ERROR] - [mufisync.server.data.tcp.handler.TcpJobHandler] : Failed to dequeue packet from job queue. Exception: java.nio.channels.WritePendingException
at sun.nio.ch.AsynchronousSocketChannelImpl.write(Unknown Source)
at sun.nio.ch.AsynchronousSocketChannelImpl.write(Unknown Source)
at mufisync.server.data.tcp.stream.OutputStreamAdapter.write(OutputStreamAdapter.java:35)
at mufisync.server.data.tcp.stream.OutputStreamAdapter.write(OutputStreamAdapter.java:26)
at mufisync.server.data.tcp.stream.BinaryWriter.write(BinaryWriter.java:21)
at mufisync.server.data.tcp.TcpJob.execute(TcpJob.java:29)
at mufisync.server.data.tcp.handler.TcpJobHandler.jobLoop(TcpJobHandler.java:40)
at mufisync.server.data.tcp.handler.TcpJobHandler.access$0(TcpJobHandler.java:32)
at mufisync.server.data.tcp.handler.TcpJobHandler$1.run(TcpJobHandler.java:25)
at java.lang.Thread.run(Unknown Source)

TcpJob 如下所示:

public class TcpJob {

private BasePacket _packet;
private BinaryWriter _writer;

public TcpJob(BasePacket packet, BinaryWriter writer){
    _packet = packet;
    _writer = writer;
}

public void execute(){
    try {
        if(_packet == null){
            AppLogger.warn("Tcp job packet is null");
            return;
        }

        _writer.write(_packet.toByteArray());
    } catch (IOException e) {
        AppLogger.error("Failed to write packet into the stream.", e);
    }
}

public BasePacket get_packet() {
    return _packet;
}   
}

BinaryStream 只是耦合到 AsynchronousSocketChannel,后者从套接字 channel 调用 write(byte[]) 方法。

最佳答案

您正在使用异步 NIO2。当您使用异步 IO 时,在最后一次写入完成之前,您无法调用 write()。来自 Javadoc

 * @throws  WritePendingException
 *          If a write operation is already in progress on this channel

例如如果您使用过

public abstract Future<Integer> write(ByteBuffer src);

在 Future.get() 返回之前你不能再次写入。

如果你使用

public abstract <A> void write(ByteBuffer src,
                               long timeout,
                               TimeUnit unit,
                               A attachment,
                               CompletionHandler<Integer,? super A> handler);

在调用 CompletionHandler 之前,您无法再次写入。

注意:您也不能同时执行两次读取。

就你而言,你想要类似的东西

ByteBuffer lastBuffer = null;
Future<Integer> future = null;

public void execute(){
    try {
        if(_packet == null){
            AppLogger.warn("Tcp job packet is null");
            return;
        }
        // need to wait until the last buffer was written completely.
        while (future != null) {
           future.get();
           if (lastBuffer.remaining() > 0)
              future = _writer.write(lasBuffer);
           else
              break;
        }
        // start another write.
        future = _writer.write(lastBuffer = _packet.toByteArray());
    } catch (IOException e) {
        AppLogger.error("Failed to write packet into the stream.", e);
    }
}

关于java - Tcp 服务器 WritePendingException 尽管线程锁,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36520392/

相关文章:

java - 如何通过 Java SDK 在 Azure Key Vault 中创建 key ?

java - Thread.sleep() 保证等待吗?

java - 如何中断System.in的读取?

java - 生产者/消费者线程根本没有输出数据

Java Socket - read() 方法如何知道是否已到达流的末尾?

java - 我用什么来从应用程序服务器发送 SMS 消息?

java - simpleFormatDate的日期格式问题

c# - 多客户端服务器 - 2 路连接的常用方式

java - UDP 打洞 Java 示例

java - 在 HashSet 中存储坐标