我怀疑如果数据是倾斜的,数据是如何划分成部分文件的。如果可能,请帮我澄清一下。
假设这是我的 department
表,其中 department_id
作为主键。
mysql> select * from departments;
2 Fitness
3 Footwear
4 Apparel
5 Golf
6 Outdoors
7 Fan Shop
如果我通过在导入命令中提及 -m 1
使用 sqoop import
,我知道我只会生成一个包含所有记录的部分文件。
现在我在不指定任何映射器的情况下运行命令。所以默认情况下它应该使用 4 个映射器并在 HDFS 中创建 4 个部分文件。以下是记录如何按零件文件分发。
[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00000
2,Fitness
3,Footwear
[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00001
4,Apparel
[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00002
5,Golf
[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00003
6,Outdoors
7,Fan Shop
根据 BoundingValsQuery,默认使用 Min(department_id)=2、Max(department_id)=8 和 4 个映射器。
经计算,每个mapper应该得到(8-2)/4=1.5条记录。
这里我不知道如何分发数据。我不明白 part-m-00000 中有 2 条记录,而 part-m-00001 和 part-m-00002 中只有一条,而 part-m-00003 中又有两条记录。
最佳答案
如果你有机会去图书馆看看。其中涉及一系列步骤。
Sqoop job Read records. via DBRecordReader
org.apache.sqoop.mapreduce.db.DBRecordReader
这里有两种方法。
方法一
protected ResultSet executeQuery(String query) throws SQLException {
Integer fetchSize = dbConf.getFetchSize();
/*get fetchSize according to split which is calculated via getSplits() method of
org.apache.sqoop.mapreduce.db.DBInputFormat.And no. of splits are calculated
via no. of (count from table/no. of mappers). */
}
拆分计算:-
org.apache.sqoop.mapreduce.db.DBInputFormat
public List<InputSplit> getSplits(JobContext job) throws IOException {
.......//here splits are calculated accroding to count of source table
.......query.append("SELECT COUNT(*) FROM " + tableName);
}
方法二。
protected String getSelectQuery() {
if (dbConf.getInputQuery() == null) {
query.append("SELECT ");
for (int i = 0; i < fieldNames.length; i++) {
query.append(fieldNames[i]);
if (i != fieldNames.length -1) {
query.append(", ");
}
}
query.append(" FROM ").append(tableName);
query.append(" AS ").append(tableName);
if (conditions != null && conditions.length() > 0) {
query.append(" WHERE (").append(conditions).append(")");
}
String orderBy = dbConf.getInputOrderBy();
if (orderBy != null && orderBy.length() > 0) {
query.append(" ORDER BY ").append(orderBy);
}
} else {
//PREBUILT QUERY
query.append(dbConf.getInputQuery());
}
try {// main logic to decide division of records between mappers.
query.append(" LIMIT ").append(split.getLength());
query.append(" OFFSET ").append(split.getStart());
} catch (IOException ex) {
// Ignore, will not throw.
}
return query.toString();
}
检查注释下的代码部分主要逻辑......
这里记录按照LIMIT和OFFSET划分。这个逻辑对于每个 RDBMS 的实现都是不同的。只需查找 org.apache.sqoop.mapreduce.db.OracleDBRecordReader
它与 getSelectQuery() 方法的实现几乎没有什么不同。
希望这能让您快速了解如何将记录划分为不同的映射器。
关于hadoop - sqoop中数据是如何拆分成part文件的,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45100487/