我在发送大于 15GB 的文件时遇到数据传输速率问题。我有 3 台服务器和一个客户端。从客户端向服务器发送文件时,我将文件分成 block (每个 block 通常为 256MB),每个 block 在 2 个服务器上复制。复制发生在管道方法中。发送 block 时,每个 block 被切成更小的数据包(每个数据包通常为 128 KB),发送到服务器,并在服务器端合并存储在硬盘中。这里一切都很好。我以 5GB 的增量测试了 5GB 到 50GB 文件的系统。所有文件的平均写入速度约为 600MB/秒。见下表。这里我是和HDFS比较的。
从服务器读取相同文件时会出现此问题。文件分布在多个服务器上。例如,我可以从 server1 读取 block1,从 server2 读取 block2,等等。直觉上,读取必须比写入快,因为客户端从 3 个服务器并行读取。读取小于 15GB {5GB, 10GB, 15GG} 的文件时,性能约为 1.1GB/秒。读取大于 20GB {20GB, 25GB, ...., 50GB} 的文件时会出现问题。性能会随着文件大小的增加而降低。
上图为读取50GB文件的基准测试。每个黑点显示一个单独的 block 读取时间。如您所见,性能在第 60 - 70 个 block 后开始下降。有趣的是,所有大于 15GB 的文件都会发生这种情况,并在同一位置附近(大约第 65 个 block )变慢。随着文件大小的增加,慢速部分占主导地位,性能越来越差。我觉得16GB左右有一些障碍。我看到的唯一可能有帮助的提示是 3 个服务器并行随机发送 block ,直到大约第 65 个。所以 block 的传输是重叠的。之后,一台服务器以循环顺序一次发送。我可以从日志输出中看到这一点。这里仍然有一些重叠,但没有第 65 个 block 之前那么多。
我在这个项目中使用 java 1.8 和 netty 4.1.8。作为 TCP 服务器。 操作系统是 CentOS 7。 每台服务器有两个 CPU(Intel(R) Xeon(R) CPU E5-2650 v3 @ 2.30GHz)= 40 个内核 64GB 内存 10 GBit 以太网。
我花了很多时间都找不到问题的根本原因。 问题可能来自 Java VM、Netty、操作系统、操作系统 TCP 默认值或其他原因。
服务器端 BlockSenderManager
@Override
public void run(){
while(nodeManager.isRunning()){
try
{
BlockRequest br = blockSenders.take();
if(br != null){
executor.execute(new BlockSender( br, this));
}
if(wait.take())
System.out.println(br.getBlockId()+" Delivered");
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
服务器端的BlockSender:
@Override
public void run()
{
FileInputStream fis = null;
try
{
java.io.File file = new java.io.File(path+"/" + blockRequest.getBlockId());
fis = new FileInputStream(file);
fSize = file.length();
long rem = fSize;
sendBlockInfo();
int bufSize;
if (fSize < (long) packetSize)
bufSize = (int) fSize;
else
bufSize = packetSize;
int read = 0, packetOrder = 1;
byte[] data;
if(bufSize <= rem)
data = new byte[bufSize];
else
data = new byte[(int)rem];
while ((read = (fis.read(data))) > 0)
{
if (read < 1)
break;
BlockPacket bp = new BlockPacket();
bp.setRequestId(blockRequest.getRequestId());
bp.setBlockId(blockRequest.getBlockId());
bp.setData(data);
bp.setPacketSeqNo(packetOrder);
if(read < bufSize)
{
bp.setIsLastPacket(true);
}
executor.execute(new Sender(bp));
packetOrder++;
if(rem > bufSize)
rem = rem - bufSize;
if(bufSize <= rem)
data = new byte[bufSize];
else
{
data = new byte[(int)rem];
}
}
fis.close();
executor.shutdown();
}
catch (FileNotFoundException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public class Sender implements Runnable
{
private final BlockPacket bp;
private final FileBlock fb;
private DataClient dc;
public Sender(BlockPacket bp)
{
this.bp = bp;
this.fb = null;
dc = getDataClient(requestClient);
}
public Sender(FileBlock fb)
{
this.bp = null;
this.fb = fb;
dc = getDataClient(requestClient);
}
@Override
public void run()
{
if (dc != null)
{
if (bp != null)
{
dc.send(bp);
}
else if (fb != null)
{
dc.send(fb);
}
}
}
}
客户端的ReceivedPacketProcessor
public void processBlockPacket(BlockPacket bp)
{
ByteBuffer buf = ByteBuffer.wrap(bp.getData());
try
{
inChannel.write(buf);
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
@Override
public void run()
{
try
{
aFile = new RandomAccessFile(path+"/"+fileName, "rw");
inChannel = aFile.getChannel();
//java.io.File f = new java.io.File(path+"/"+fileName);
//fop = new FileOutputStream(f);
String reqId = file.getFileID();
currentBlockId = reqId + "_" + currentBlockSeq;
while (true)
{
BlockPacket bp = null;
if (numberOfBlocks > 0)
{
try
{
bp = this.blockingQueue.take();
}
catch (InterruptedException e)
{
e.printStackTrace();
}
if (bp.getBlockId().equals(currentBlockId))
{
if (currentPacket == bp.getPacketSeqNo())
{
if(fileBlocks.containsKey(currentBlockId))
{
processBlockPacket(bp);
if(currentPacket < fileBlocks.get(currentBlockId).getNoOfPackets())
currentPacket++;
else
{
if (fileBlocks.get(currentBlockId).getPackets().size() < 1)
{
removeFileBlock(currentBlockId);
currentBlockSeq++;
currentBlockId = reqId + "_" + currentBlockSeq;
currentPacket = 1;
numberOfBlocks--;
}
}
}
else
{
tempList.add(bp);
}
for(int k =numberOfBlocks; k>0; k--)
{
if(fileBlocks.containsKey(currentBlockId))
{
int pCount = fileBlocks.get(currentBlockId).getNoOfPackets();
int i;
for (i = currentPacket; i <= pCount; i++)
{
if (fileBlocks.get(currentBlockId).getPackets().containsKey(i))
{
processBlockPacket(fileBlocks.get(currentBlockId).getPackets().remove(i));
currentPacket++;
}
else
{
break;
}
}
if(i <= pCount)
{
break;
}
else
{
if (fileBlocks.get(currentBlockId).getPackets().size() < 1)
{
removeFileBlock(currentBlockId);
currentBlockSeq++;
currentBlockId = reqId + "_" + currentBlockSeq;
currentPacket = 1;
numberOfBlocks--;
}
}
}
}
}
}
else
{
if(fileBlocks.containsKey(bp.getBlockId())){
fileBlocks.get(bp.getBlockId()).getPackets().put(bp.getPacketSeqNo(), bp);
}else{
tempList.add(bp);
}
}
}
else{
break;
}
for(int i=0; i<tempList.size(); i++){
if(fileBlocks.containsKey(tempList.get(i).getBlockId())){
BlockPacket temp = tempList.remove(i);
fileBlocks.get(temp.getBlockId()).getPackets().put(temp.getPacketSeqNo(), temp);
}
}
}
System.out.println("CLOSING FILE....");
this.isCompleted.put(true);
inChannel.force(true);
inChannel.close();
}
catch (FileNotFoundException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
catch (IOException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
打开 -XX:+PrintGCDetails,here is a sample log .
任何意见/帮助表示赞赏。
最佳答案
这是因为内存中的脏页比。由于传入数据速率高于本地 IO 刷新吞吐量,因此数据会累积在内存中。一旦达到最大允许脏页比率,接收方将不再接受更多数据。因此,在这种情况下,系统受限于本地 IO,而不是网络。因此, yield 递减发生在大约 15GB 处。您可以在
中更改一些设置/etc/sysctl.conf
如:
vm.dirty_background_ratio = 2
vm.dirty_ratio = 80
vm.dirty_expire_centisecs = 3000
vm.dirty_writeback_centisecs = 500
This可能是一本有用的读物。
系统性能仍然受到本地 IO 和最大允许脏页比率的限制。您可以增加脏页比率以仅延迟返回时间的减少。如果文件/数据较大,它将再次到达这一点。新结果:
关于java - 15GB 后数据传输速率变慢,用于更大的文件传输,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43315661/