hadoop - sqoop中数据是如何拆分成part文件的

标签 hadoop sqoop hadoop-partitioning

我怀疑如果数据是倾斜的,数据是如何划分成部分文件的。如果可能,请帮我澄清一下。

假设这是我的 department 表,其中 department_id 作为主键。

mysql> select * from departments;
2 Fitness
3 Footwear
4 Apparel
5 Golf
6 Outdoors
7 Fan Shop

如果我通过在导入命令中提及 -m 1 使用 sqoop import,我知道我只会生成一个包含所有记录的部分文件。

现在我在不指定任何映射器的情况下运行命令。所以默认情况下它应该使用 4 个映射器并在 HDFS 中创建 4 个部分文件。以下是记录如何按零件文件分发。

[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00000
2,Fitness
3,Footwear
[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00001
4,Apparel
[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00002
5,Golf
[cloudera@centsosdemo ~]$ hadoop fs -cat /user/cloudera/departments/part-m-00003
6,Outdoors
7,Fan Shop

根据 BoundingValsQuery,默认使用 Min(department_id)=2、Max(department_id)=8 和 4 个映射器。

经计算,每个mapper应该得到(8-2)/4=1.5条记录。

这里我不知道如何分发数据。我不明白 part-m-00000 中有 2 条记录,而 part-m-00001 和 part-m-00002 中只有一条,而 part-m-00003 中又有两条记录。

最佳答案

如果你有机会去图书馆看看。其中涉及一系列步骤。

Sqoop job Read records. via DBRecordReader

 org.apache.sqoop.mapreduce.db.DBRecordReader

这里有两种方法。

方法一

protected ResultSet executeQuery(String query) throws SQLException {
Integer fetchSize = dbConf.getFetchSize();
/*get fetchSize according to split which is calculated via getSplits() method of 
org.apache.sqoop.mapreduce.db.DBInputFormat.And no. of splits are calculated
via no. of (count from table/no. of mappers). */
 }

拆分计算:-

org.apache.sqoop.mapreduce.db.DBInputFormat
 public List<InputSplit> getSplits(JobContext job) throws IOException {
 .......//here splits are calculated accroding to count of source table
 .......query.append("SELECT COUNT(*) FROM " + tableName);
}   

方法二。

 protected String getSelectQuery() {
    if (dbConf.getInputQuery() == null) {
      query.append("SELECT ");

      for (int i = 0; i < fieldNames.length; i++) {
        query.append(fieldNames[i]);
        if (i != fieldNames.length -1) {
          query.append(", ");
        }
      }

      query.append(" FROM ").append(tableName);
      query.append(" AS ").append(tableName); 
      if (conditions != null && conditions.length() > 0) {
        query.append(" WHERE (").append(conditions).append(")");
      }

      String orderBy = dbConf.getInputOrderBy();
      if (orderBy != null && orderBy.length() > 0) {
        query.append(" ORDER BY ").append(orderBy);
      }
    } else {
      //PREBUILT QUERY
      query.append(dbConf.getInputQuery());
    }

    try {// main logic to decide division of records between mappers.
      query.append(" LIMIT ").append(split.getLength());
      query.append(" OFFSET ").append(split.getStart());
    } catch (IOException ex) {
      // Ignore, will not throw.
    }

    return query.toString();
  }

检查注释下的代码部分主要逻辑...... 这里记录按照LIMIT和OFFSET划分。这个逻辑对于每个 RDBMS 的实现都是不同的。只需查找 org.apache.sqoop.mapreduce.db.OracleDBRecordReader 它与 getSelectQuery() 方法的实现几乎没有什么不同。

希望这能让您快速了解如何将记录划分为不同的映射器。

关于hadoop - sqoop中数据是如何拆分成part文件的,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45100487/

相关文章:

hadoop - Sqoop 导入 : composite primary key and textual primary key

hadoop - 将文件/ block 从 HDFS 复制到从节点的本地文件系统

bash - 在bash脚本中执行时捕获Yarn作业ID

Hadoop 2.x——如何配置辅助名称节点?

java - 使用 -libjars 的 MapReduce 流作业,自定义分区程序失败 : "class not found"

hadoop - Flume 和 Sqoop 有什么区别?

maven v3.0.5 找不到sqoop神器

hadoop - 处理 hive 中的 Blob

hadoop - fs.rename(newPath(raw FileName), in Path(process FileName)) 不工作

hadoop - 如何确定映射器总数