hadoop - 如何设置hdfs中文件的行组大小?

标签 hadoop hdfs parquet parquet-mr

我正在对 hdfs 中的 block 大小 (dfs.block.size) 和行组大小 (parquet.block.size) 进行一些实验。

我在hdfs中有大量数据,我想复制各种 block 大小和行组大小的数据进行测试。我能够使用不同的 block 大小复制数据:

hdfs dfs -D dfs.block.size=67108864 -D parquet.block.size=67108864 -cp /new_sample_parquet /new_sample_parquet_64M

但是只有 dfs.block.size 被改变了。我正在使用 hdfs dfs -stat 验证 block 大小,并使用 parquet-tools meta 验证行组大小。事实上,如果我将 parquet.block.size 替换为 blah.blah.blah,它具有相同的效果。我什至进入 spark-shell 并使用手动设置 parquet.block.size 属性

sc.hadoopConfiguration.setInt("parquet.block.size", 67108864).

我正在使用 hadoop 3.1.0。我从 here 得到了 parquet.block.size 的属性名称.

这是我尝试输出的前 10 行

row group 1:                    RC:4140100 TS:150147503 OFFSET:4
row group 2:                    RC:3520100 TS:158294646 OFFSET:59176084
row group 3:                    RC:880100 TS:80122359 OFFSET:119985867
row group 4:                    RC:583579 TS:197303521 OFFSET:149394540
row group 5:                    RC:585594 TS:194850776 OFFSET:213638039
row group 6:                    RC:2620100 TS:130170698 OFFSET:277223867
row group 7:                    RC:2750100 TS:136761819 OFFSET:332088066
row group 8:                    RC:1790100 TS:86766854 OFFSET:389772650
row group 9:                    RC:2620100 TS:125876377 OFFSET:428147454
row group 10:                   RC:1700100 TS:83791047 OFFSET:483600973

如您所见,TS(总大小)远大于 64MB(67108864 字节)

我目前的理论:

我在 spark-shell 中这样做:

sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
val a = spark.read.parquet("my_sample_data")
a.rdd.getNumPartitions // 1034
val s = a.coalesce(27)
s.write.format("parquet").mode("Overwrite").options(Map("dfs.block.size" -> "67108864")).save("/my_new_sample_data")

所以可能是因为我的输入数据已经有 1034 个分区。我真的不确定。我的数据每行大约有 118 列。

最佳答案

parquet.block.size 属性只影响 Parquet 编写器。另一方面,hdfs dfs -cp 命令复制文件而不考虑它们的内容。 parquet.block.size 属性因此被 hdfs dfs -cp 忽略。

假设您有一个应用程序可以根据配置文件截取 JPG 或 PNG 格式的屏幕截图。您可以使用 cp 命令复制这些屏幕截图。当然,即使您在配置文件中更改了所需的图像格式,cp 命令也将始终以原始文件的图像格式创建输出文件,而不管配置文件如何。配置文件仅供截图应用程序使用,cp 不使用。这也是 parquet.block.size 属性的工作方式。

要更改 block 大小,您可以做的是重写文件。您提到您有 spark-shell。使用它通过发出重写 Parquet 文件

sc.hadoopConfiguration.setInt("parquet.block.size", 67108864)
var df = spark.read.parquet("/path/to/input.parquet")
df.write.parquet("/path/to/output")

更新:由于您在下面的评论中提到它对您不起作用,我做了一个实验并在下面发布了 session 记录:

$ spark-shell
scala> sc.hadoopConfiguration.setInt("parquet.block.size", 200000)
scala> var df = spark.read.parquet("/tmp/infile.parquet")
df: org.apache.spark.sql.DataFrame = [field0000: binary, field0001: binary ... 78 more fields]
scala> df.write.parquet("/tmp/200K")
scala> df.write.format("parquet").mode("Overwrite").options(Map("parquet.block.size" -> "300000")).save("/tmp/300K")
scala> :quit
$ hadoop fs -copyToLocal /tmp/{200K,300K} /tmp
$ parquet-tools meta /tmp/infile.parquet | grep "row group" | head -n 3
row group 1:  RC:4291 TS:5004800 OFFSET:4
row group 2:  RC:3854 TS:4499360 OFFSET:5004804
row group 3:  RC:4293 TS:5004640 OFFSET:10000000
$ parquet-tools meta /tmp/200K/part-00000-* | grep "row group" | head -n 3
row group 1:   RC:169 TS:202080 OFFSET:4
row group 2:   RC:168 TS:201760 OFFSET:190164
row group 3:   RC:169 TS:203680 OFFSET:380324
$ parquet-tools meta /tmp/300K/part-00000-* | grep "row group" | head -n 3
row group 1:   RC:254 TS:302720 OFFSET:4
row group 2:   RC:255 TS:303280 OFFSET:284004
row group 3:   RC:263 TS:303200 OFFSET:568884

通过查看 TS 值,您可以看到输入文件的行组大小为 4.5-5M,输出文件的行组大小分别为 200K 和 300K。这表明使用 sc.hadoopConfiguration 设置的值成为“默认值”,而您在下面涉及 df.options 的评论中提到的其他方法会覆盖此默认值。

更新 2:现在您已经发布了输出,我可以看到发生了什么。在您的情况下,正在进行压缩,从而增加了适合行组的数据量。行组大小适用于压缩数据,但 TS 显示未压缩数据的大小。但是,您可以通过减去行组的起始偏移量来推断行组的大小。例如,第一个行组的压缩大小为 59176084 - 4 = 59176080 字节或更少(因为也可以进行填充)。我将您的结果复制到我计算机上的/tmp/rowgroups.dat 中,并通过发出以下命令计算了您的行组大小:

$ cat /tmp/rowgroups.dat | sed 's/.*OFFSET://' | numinterval
59176080
60809783
29408673
64243499
63585828
54864199
57684584
38374804
55453519

(numinterval 命令在 Ubuntu 上的 num-utils 包中。)如您所见,您所有的行组都小于您设置的行组大小指定的。 (它们不完全是指定大小的原因是 PARQUET-1337 。)

关于hadoop - 如何设置hdfs中文件的行组大小?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53325155/

相关文章:

hadoop - 从hdfs中删除文件是否还会从复制的datanode中删除文件?

hadoop - 使用 org.apache.hadoop.mapred.mapper 接口(interface)实现 "in mapper"设计模式

apache-spark - 从本地二进制文件保存并加载 Spark RDD - 最小工作示例

hadoop - 在 Hadoop 的上下文中,压缩编解码器的可拆分性是什么意思?

hadoop - 拥有一个大的 parquet 文件还是许多较小的 parquet 文件更好?

python - 将带有二维数组列的 pandas 数据框保存为 python 中的 Parquet 文件

java - Parquet writer : org. apache.parquet.io.ParquetEncodingException:写入空页

hadoop - 配置单元:没有位置的外部分区表

hadoop - 如何增加 Mahout MatrixMultiplicationJob 中的映射器数量?

hadoop - 如何将存储在包含行的HDFS中的文本文件转换为Pyspark中的数据框?