java - 如何在 Apache Spark 中使用 PathFilter?

标签 java scala hadoop apache-spark

我有一个简单的文件过滤器,基本上从特定日期选择文件。 在 Hadoop 中,我会使用 setInputPathFilterPathFilter 类设置为 InputFormat 参数。我如何在 Spark 中执行此操作?

public class FilesFilter extends Configured implements PathFilter {

    @Override
    public boolean accept(Path path) {

        try {
            if (fs.isDirectory(path))
                return true;
        } catch (IOException e1) {
            e1.printStackTrace();
            return false;
        }

        String file_date = "01.30.2015";
        SimpleDateFormat sdf = new SimpleDateFormat("MM.dd.yyyy");
        Date date = null;

        try {
            date = sdf.parse(file_date);
        } catch (ParseException e1) {
            e1.printStackTrace();
        }

        long dt = date.getTime()/(1000 * 3600 * 24);

        try {
            FileStatus file = fs.getFileStatus(path);
            long time = file.getModificationTime() / (1000 * 3600 * 24);
            return time == dt;
        } catch (IOException e) {
            e.printStackTrace();
            return false;
        }

    }
}

最佳答案

使用这个:

sc.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class", classOf[TmpFileFilter], classOf[PathFilter])

这是我的 TmpFileFilter.scala 代码,它将省略 .tmp 文件:

import org.apache.hadoop.fs.{Path, PathFilter}

class TmpFileFilter  extends PathFilter {
  override def accept(path : Path): Boolean = !path.getName.endsWith(".tmp")
}

您可以定义自己的PathFilter

关于java - 如何在 Apache Spark 中使用 PathFilter?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28330247/

相关文章:

java - 在 Scala 或 Java 中使用 Excel 语法格式化数字

java - Java 代码中出现语法错误 : Cannot cast from Object to int

scala - 如何在 Jupyter 内核中将外部 jar 添加到 Scala

java - Spark 使用编码器创建数据集,其中 row 是数组类型

scala - 对于在映射器之间共享信息的增强版 MapReduce,什么是好的应用程序?

mysql - 无法在 Mac 上使用 mysql 启动 Hive

python - 不识别所有数据类型

java - 使用 Apache Ozone FileSystem API 会导致错误

java - 如何找到远程系统的 CPU 负载

java - 仅包含可用方法和成员列表的库 jar 生成