java - 优化使用数据压缩的 Java 服务器

标签 java sockets stream

对于多客户端服务器程序,我使用了我在网上找到的 java.util.zip.Inflater 和 Deflater 的包装器。看来——因为我经常以 ImageIcons 的形式传输大量数据——使用这些压缩方法可以显着加快我的程序速度。

然而,在尝试优化我的程序时,我注意到的一件事是,在客户端之间传输数据时,服务器的 CPU 负载很重。罪魁祸首是服务器花费不必要的CPU时间来解压缩客户端发送的对象并重新压缩它们以将其发送给其他客户端。

我的这个粗略示意图可以更清楚地解释正在发生的事情:

enter image description here

我的问题:

如何将客户端发送到服务器的原始压缩数据直接发送给其他客户端,而不需要在服务器端解压和压缩?

我对 IO 流一点也不熟悉(我只是为了爱好而编写代码),所以我一无所知。有人有涵盖这个领域的好资源吗?

<小时/>

下面是我在服务器端和客户端上使用的用于发送和接收压缩数据的代码。

创建压缩器

new ObjectOutputStream(
    new BufferedOutputStream(
        new CompressedBlockOutputStream(
            socket.getOutputStream(), 1024)));

创建解压器

new ObjectInputStream(
    new BufferedInputStream(
        new CompressedBlockInputStream(
            socket.getInputStream())));

压缩 block (输入/输出)流的代码如下

<小时/>

我从许可证中描述的来源复制的代码。

CompressedBlockInputStream.java

import java.io.EOFException;
import java.io.FilterInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.zip.DataFormatException;
import java.util.zip.Inflater;

/**
 * Input stream that decompresses data.
 *
 * Copyright 2005 - Philip Isenhour - http://javatechniques.com/
 *
 * This software is provided 'as-is', without any express or
 * implied warranty. In no event will the authors be held liable
 * for any damages arising from the use of this software.
 *
 * Permission is granted to anyone to use this software for any
 * purpose, including commercial applications, and to alter it and
 * redistribute it freely, subject to the following restrictions:
 *
 *  1. The origin of this software must not be misrepresented; you
 *     must not claim that you wrote the original software. If you
 *     use this software in a product, an acknowledgment in the
 *     product documentation would be appreciated but is not required.
 *
 *  2. Altered source versions must be plainly marked as such, and
 *     must not be misrepresented as being the original software.
 *
 *  3. This notice may not be removed or altered from any source
 *     distribution.
 *
 * $Id:  1.2 2005/10/26 17:40:19 isenhour Exp $
 */
public class CompressedBlockInputStream extends FilterInputStream {
    /**
     * Buffer of compressed data read from the stream
     */
    private byte[] inBuf = null;

    /**
     * Length of data in the input data
     */
    private int inLength = 0;

    /**
     * Buffer of uncompressed data
     */
    private byte[] outBuf = null;

    /**
     * Offset and length of uncompressed data
     */
    private int outOffs = 0;
    private int outLength = 0;

    /**
     * Inflater for decompressing
     */
    private Inflater inflater = null;

    public CompressedBlockInputStream(InputStream is) {
        super(is);
        inflater = new Inflater();
    }

    private void readAndDecompress() throws IOException {
        // Read the length of the compressed block
        int ch1 = in.read();
        int ch2 = in.read();
        int ch3 = in.read();
        int ch4 = in.read();
        if ((ch1 | ch2 | ch3 | ch4) < 0)
            throw new EOFException();
        inLength = ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));

        ch1 = in.read();
        ch2 = in.read();
        ch3 = in.read();
        ch4 = in.read();
        if ((ch1 | ch2 | ch3 | ch4) < 0)
            throw new EOFException();
        outLength = ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));

        // Make sure we've got enough space to read the block
        if ((inBuf == null) || (inLength > inBuf.length)) {
            inBuf = new byte[inLength];
        }

        if ((outBuf == null) || (outLength > outBuf.length)) {
            outBuf = new byte[outLength];
        }

        // Read until we're got the entire compressed buffer.
        // read(...) will not necessarily block until all
        // requested data has been read, so we loop until
        // we're done.
        int inOffs = 0;
        while (inOffs < inLength) {
            int inCount = in.read(inBuf, inOffs, inLength - inOffs);
            if (inCount == -1) {
                throw new EOFException();
            }
            inOffs += inCount;
        }

        inflater.setInput(inBuf, 0, inLength);
        try {
            inflater.inflate(outBuf);
        } catch(DataFormatException dfe) {
            throw new IOException("Data format exception - " + dfe.getMessage());
        }

        // Reset the inflator so we can re-use it for the
        // next block
        inflater.reset();

        outOffs = 0;
    }

    @Override
    public int read() throws IOException {
        if (outOffs >= outLength) {
            try {
                readAndDecompress();
            }
            catch(EOFException eof) {
                return -1;
            }
        }

        return outBuf[outOffs++] & 0xff;
    }

    @Override
    public int read(byte[] b, int off, int len) throws IOException {
        int count = 0;
        while (count < len) {
            if (outOffs >= outLength) {
                try {
                    // If we've read at least one decompressed
                    // byte and further decompression would
                    // require blocking, return the count.
                    if ((count > 0) && (in.available() == 0))
                        return count;
                    else
                        readAndDecompress();
                } catch(EOFException eof) {
                    if (count == 0)
                        count = -1;
                    return count;
                }
            }

            int toCopy = Math.min(outLength - outOffs, len - count);
            System.arraycopy(outBuf, outOffs, b, off + count, toCopy);
            outOffs += toCopy;
            count += toCopy;
        }

        return count;
    }

    @Override
    public int available() throws IOException {
        // This isn't precise, but should be an adequate
        // lower bound on the actual amount of available data
        return (outLength - outOffs) + in.available();
    }

}



<小时/>

我从许可证中描述的来源复制的代码。

CompressedBlockOutputStream.java

import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.Deflater;

/**
 * Output stream that compresses data. A compressed block
 * is generated and transmitted once a given number of bytes
 * have been written, or when the flush method is invoked.
 *
 * Copyright 2005 - Philip Isenhour - http://javatechniques.com/
 *
 * This software is provided 'as-is', without any express or
 * implied warranty. In no event will the authors be held liable
 * for any damages arising from the use of this software.
 *
 * Permission is granted to anyone to use this software for any
 * purpose, including commercial applications, and to alter it and
 * redistribute it freely, subject to the following restrictions:
 *
 *  1. The origin of this software must not be misrepresented; you
 *     must not claim that you wrote the original software. If you
 *     use this software in a product, an acknowledgment in the
 *     product documentation would be appreciated but is not required.
 *
 *  2. Altered source versions must be plainly marked as such, and
 *     must not be misrepresented as being the original software.
 *
 *  3. This notice may not be removed or altered from any source
 *     distribution.
 *
 * $Id:  1.1 2005/10/26 17:19:05 isenhour Exp $
 */
public class CompressedBlockOutputStream extends FilterOutputStream {
    /**
     * Buffer for input data
     */
    private byte[] inBuf = null;

    /**
     * Buffer for compressed data to be written
     */
    private byte[] outBuf = null;

    /**
     * Number of bytes in the buffer
     */
    private int len = 0;

    /**
     * Deflater for compressing data
     */
    private Deflater deflater = null;

    /**
     * Constructs a CompressedBlockOutputStream that writes to
     * the given underlying output stream 'os' and sends a compressed
     * block once 'size' byte have been written. The default
     * compression strategy and level are used.
     */
    public CompressedBlockOutputStream(OutputStream os, int size) {
        this(os, size, Deflater.DEFAULT_COMPRESSION, Deflater.DEFAULT_STRATEGY);
    }

    /**
     * Constructs a CompressedBlockOutputStream that writes to the
     * given underlying output stream 'os' and sends a compressed
     * block once 'size' byte have been written. The compression
     * level and strategy should be specified using the constants
     * defined in java.util.zip.Deflator.
     */
    public CompressedBlockOutputStream(OutputStream os, int size, int level, int strategy) {
        super(os);
        this.inBuf = new byte[size];
        this.outBuf = new byte[size + 64];
        this.deflater = new Deflater(level);
        this.deflater.setStrategy(strategy);
    }

    protected void compressAndSend() throws IOException {
        if (len > 0) {
            deflater.setInput(inBuf, 0, len);
            deflater.finish();
            int size = deflater.deflate(outBuf);

            // Write the size of the compressed data, followed
            // by the size of the uncompressed data
            out.write((size >> 24) & 0xFF);
            out.write((size >> 16) & 0xFF);
            out.write((size >>  8) & 0xFF);
            out.write((size >>  0) & 0xFF);

            out.write((len >> 24) & 0xFF);
            out.write((len >> 16) & 0xFF);
            out.write((len >>  8) & 0xFF);
            out.write((len >>  0) & 0xFF);

            out.write(outBuf, 0, size);
            out.flush();

            len = 0;
            deflater.reset();
        }
    }

    @Override
    public void write(int b) throws IOException {
        inBuf[len++] = (byte) b;
        if (len == inBuf.length) {
            compressAndSend();
        }
    }

    @Override
    public void write(byte[] b, int boff, int blen) throws IOException {
        while ((len + blen) > inBuf.length) {
            int toCopy = inBuf.length - len;
            System.arraycopy(b, boff, inBuf, len, toCopy);
            len += toCopy;
            compressAndSend();
            boff += toCopy;
            blen -= toCopy;
        }
        System.arraycopy(b, boff, inBuf, len, blen);
        len += blen;
    }

    @Override
    public void flush() throws IOException {
        compressAndSend();
        out.flush();
    }

    @Override
    public void close() throws IOException {
        compressAndSend();
        out.close();
    }
}

最佳答案

您可以将 ObjectOutputStreamObjectInputStream 替换为普通的 InputStreamOutputStream 甚至 BufferedInputStreamBufferedOutputStream

这是一个例子:

try(InputStream is = socket.getInputStream()){
    byte[] b = new byte[2048];// you can change the buffer's size.
    for(int r = 0; (r = is.read(b))!= -1;){
        for(OutputStream client : clients){
            client.write(b, 0, r);
        }
    }
}catch(Exception e){
    e.printStackTrace();
}

这会将服务器收到的原始字节发送到所有客户端(无需再次解压和压缩)

关于java - 优化使用数据压缩的 Java 服务器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38583402/

相关文章:

java - JScrollPane 不工作

Python socket 强调并发

Android TCP 发送文件。 Android 客户端将图像发送到 Java 服务器。无法发送图像。该怎么办?

java - 发送/接收数据出现问题

c# - 在 C# 程序中用字符串替换流

java - 如果我有多个格式不同的字符串,我是否需要一个单独的 DateFormat 实例来解析每个字符串?

java - Struts 1到Spring的迁移,启动时加载所有属性文件

java - 轻量化过程是什么意思?

java - 如何使用 ObjectOutputStream 和 ObjectInputStream 正确(反)序列化为字节数组?

java - 服务器和客户端之间的通信标准