我有一个小程序,每秒将 10 条记录写入 HDFS 上的 block 压缩 SequenceFile,然后每 5 分钟运行一次 sync() 以确保超过 5 分钟的所有内容都可用于处理。
由于我的代码比较多,所以我只提取了重要的部分:
// initialize
Configuration hdfsConfig = new Configuration();
CompressionCodecFactory codecFactory = new CompressionCodecFactory(hdfsConfig);
CompressionCodec compressionCodec = codecFactory.getCodecByName("default");
SequenceFile.Writer writer = SequenceFile.createWriter(
hdfsConfig,
SequenceFile.Writer.file(path),
SequenceFile.Writer.keyClass(LongWritable.class),
SequenceFile.Writer.valueClass(Text.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK;, compressionCodec)
);
// ...
// append
LongWritable key = new LongWritable((new Date).getTime());
Text val = new Text("Some value");
writer.append(key, val);
// ...
// then every 5 minutes...
logger.info("about to sync...");
writer.hsync();
logger.info("synced!");
仅从日志来看,同步操作似乎按预期工作,但是,HDFS 上的文件仍然很小。一段时间后,可能会添加一些 header 和一些事件,但甚至接近于我 hsync() 的频率。文件关闭后,所有内容都会立即刷新。
在每次预期的同步之后也尝试手动检查文件的内容以查看数据是否存在,但是,文件在这里也显示为空: hdfs dfs -文本文件名
是否有任何已知原因导致 writer.hsync() 不起作用?如果是,是否有任何解决方法?
此问题的进一步测试用例:
import java.util.HashMap;
import java.util.Map;
import java.util.Date;
import java.util.Calendar;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.text.DateFormat;
import java.text.ParseException;
import java.util.Calendar;
import java.util.Date;
import java.util.Locale;
public class WriteTest {
private static final Logger LOG = LoggerFactory.getLogger(WriteTest.class);
public static void main(String[] args) throws Exception {
SequenceFile.CompressionType compressionType = SequenceFile.CompressionType.RECORD;
CompressionCodec compressionCodec;
String compressionCodecStr = "default";
CompressionCodecFactory codecFactory;
Configuration hdfsConfig = new Configuration();
codecFactory = new CompressionCodecFactory(hdfsConfig);
compressionCodec = codecFactory.getCodecByName(compressionCodecStr);
String hdfsURL = "hdfs://10.0.0.1/writetest/";
Date date = new Date();
Path path = new Path(
hdfsURL,
"testfile" + date.getTime()
);
SequenceFile.Writer writer = SequenceFile.createWriter(
hdfsConfig,
SequenceFile.Writer.keyClass(LongWritable.class),
SequenceFile.Writer.valueClass(Text.class),
SequenceFile.Writer.compression(compressionType, compressionCodec),
SequenceFile.Writer.file(path)
);
for(int i=0;i<10000000;i++) {
Text value = new Text("New value!");
LongWritable key = new LongWritable(date.getTime());
writer.append(key, value);
writer.hsync();
Thread.sleep(1000);
}
writer.close();
}
}
结果是在开始写入序列文件头时有一个fsync,然后没有更多的fsync。文件关闭后,内容将写入光盘。
最佳答案
这里有多个问题。
- block 压缩
当您对序列文件使用 block 压缩时,这意味着许多条目将被缓冲在内存中,然后在达到限制或同步
时以 block 压缩形式写入> 手动调用。
当您在编写器上调用 hsync
时,它会在其底层 FSDataOutputStream
上调用 hsync
。但是,这不会将位于内存中压缩缓冲区中的数据写入。因此,要将该数据可靠地传送到 Datanode,您必须先调用 sync
,然后再调用 hsync
。
请注意,这样做意味着发送到 Datanode 的 block 压缩部分包含的条目比通常少。这会对压缩质量产生负面影响,并可能导致更多的光盘使用。 (我想这就是 hsync
不在内部调用 sync
的原因。)
- 向 Namenode 报告文件大小
调用fsync
将数据发送到数据节点,但不会将新文件大小报告给名称节点。可以找到关于此的技术讨论 here和 here .显然,每次都更新长度对性能不利。 hsync
有一个特殊版本,它允许更新 Namenode 信息,但它不会被 SequenceFile.Writer
公开。
* @param syncFlags
* Indicate the semantic of the sync. Currently used to specify
* whether or not to update the block length in NameNode.
*/
public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
flushOrSync(true, syncFlags);
}
一方面,大小问题意味着即使一些工具报告文件大小没有变化,数据仍然安全地到达数据节点,并且可以在打开它们上的 InputStream 时读取。另一方面,SequenceFile.Reader 中存在压缩类型 Record
和 None
的错误。对于这些压缩类型,Reader 使用长度信息来确定阅读的距离。由于此长度信息未由 hsync
更新,因此即使数据实际可用,它也会错误地停止读取。 Block
压缩读取显然不使用长度信息,因此不会出现此错误。
关于hadoop - hsync() 不适用于 SequenceFile Writer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28946308/