java - 使用 Java 和 Spark 将本地镜像的序列文件写入 HDFS

标签 java hadoop apache-spark hdfs spark-streaming

正如标题所说,这就是我现在的目标。

  • 我需要从目录加载一堆非文本文件
  • 从中提取通常的文件信息(创建日期、作者、类型……那些)
  • 创建一个类型的序列文件
  • 将新提取的信息放入.seq文件的Key中
  • 将它们全部存储在一个 hdfs 目录中。

我使用 spark 的原因是为了可伸缩性(要处理数以千计的文件,我将有一个工作集群可用)并且因为我正在考虑在图像目录上实现一个 SParkStreaming 接收器,以便文件将被自动处理。 这是我的初始代码:

JavaPairRDD<String, String> imageRDD = jsc.wholeTextFiles("file:///home/cloudera/Pictures/");

    imageRDD.mapToPair(new PairFunction<Tuple2<String,String>, Text, Text>() {

        @Override
        public Tuple2<Text, Text> call(Tuple2<String, String> arg0)
                throws Exception {
            return new Tuple2<Text, Text>(new Text(arg0._1),new Text(arg0._2));
        }

    }).saveAsNewAPIHadoopFile("hdfs://localhost:8020/user/hdfs/sparkling/try.seq", Text.class, Text.class, SequenceFileOutputFormat.class);

在这里,我将图像作为文本文件加载,并使用 hadoop 库中的文本类型创建一个元组。这可行,但是:

  1. 文件没有保存为单个文件,而是保存为包含分区的文件夹。
  2. 它不是字节数组,而是文件的文本表示。我们都知道从文本重新转换为图像(或任何图像)是多么烦人
  3. 如果我像这样加载文件,是否有提取所需信息的方法?

我尝试将文件加载为 aa sparkContext.binaryFiles(<directory>) ,但我总是不知道如何提取信息以及如何保存信息。
我似乎无法在互联网上找到答案:你们中有人知道这件事吗?

最佳答案

这是我的做法:

JavaPairRDD<String, PortableDataStream> imageByteRDD = jsc.binaryFiles(SOURCE_PATH);
        if(!imageByteRDD.isEmpty())
            imageByteRDD.foreachPartition(new VoidFunction<Iterator<Tuple2<String,PortableDataStream>>>() {

            @Override
            public void call(
                    Iterator<Tuple2<String, PortableDataStream>> arg0)
                    throws Exception {
                Configuration conf = new Configuration();
                conf.set("fs.defaultFS", HDFS_PATH);
                while(arg0.hasNext()){
                    Tuple2<String,PortableDataStream>fileTuple = arg0.next();
                    Text key = new Text(fileTuple._1());
                    String fileName = key.toString().split(SEP_PATH)[key.toString().split(SEP_PATH).length-1].split(DOT_REGEX)[0];
                    String fileExtension = fileName.split(DOT_REGEX)[fileName.split(DOT_REGEX).length-1];

                      BytesWritable value = new BytesWritable( fileTuple._2().toArray());
                         SequenceFile.Writer writer = SequenceFile.createWriter(
                                 conf, 
                                 SequenceFile.Writer.file(new Path(DEST_PATH + fileName + SEP_KEY + getCurrentTimeStamp()+DOT+fileExtension)),
                               SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD, new BZip2Codec()),
                               SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(BytesWritable.class));
                         key = new Text(key.toString().split(SEP_PATH)[key.toString().split(SEP_PATH).length-2] + SEP_KEY + fileName + SEP_KEY + fileExtension);
                            writer.append(key, value);
                         IOUtils.closeStream(writer);

                }
            }
        });

关于java - 使用 Java 和 Spark 将本地镜像的序列文件写入 HDFS,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38016313/

相关文章:

java - 为什么我的循环 GUI 计时器没有显示?

java - 在hadoop上解析xml文件...

hadoop - 无法在Spark Streaming作业中获得广播_1的广播_1_piece0

json - 将 JSON 对象转换为 RDD

hadoop - 在 Apache Hadoop 中的 Datanode 内重新平衡磁盘

apache-spark - 如何使用 Spark 对象获取 Hive 表的位置值?

java - 为什么在初始化 Timer 实例时会收到 java.security.AccessControlException?

java - 获取实现接口(interface)的通用枚举的值

java - 无法在单独的线程中设置 SelectionKey 的interestOps

Hadoop双节点集群环境,NameNode的web UI显示活节点数为1,死节点数为0