java - spark java api 有像 hadoop MultipleOutputs/FSDataOutputStream 这样的类吗?

标签 java hadoop apache-spark multipleoutputs

我试图在减少部分输出一些特定的记录,这取决于键值记录的值。在hadoop mapreduce中可以使用类似的代码

public void setup(Context context) throws IOException, InterruptedException {
  super.setup(context);
  Configuration conf = context.getConfiguration ();
  FileSystem fs = FileSystem.get (conf);
  int taskID = context.getTaskAttemptID().getTaskID().getId();
  hdfsOutWriter = fs.create (new Path (fileName + taskID), true); // FSDataOutputStream
}
public void reduce(Text key, Iterable<Text> value, Context context) throws IOException, InterruptedException {
  boolean isSpecificRecord = false;
  ArrayList <String> valueList = new ArrayList <String> ();
  for (Text val : value) {
    String element = val.toString ();
    if (filterFunction (element)) return;
    if (specificFunction (element)) isSpecificRecord = true;
    valueList.add (element);
  }
  String returnValue = anyFunction (valueList);
  String specificInfo = anyFunction2 (valueList);
  if (isSpecificRecord) hdfsOutWriter.writeBytes (key.toString () + "\t" + specificInfo);
  context.write (key, new Text (returnValue));
}

我想在 spark 集群上运行这个过程,spark java api 可以像上面的代码那样做吗?

最佳答案

只是一个如何模拟的想法:

yoursRDD.mapPartitions(iter => {
   val fs = FileSystem.get(new Configuration())
   val ds = fs.create(new Path("outfileName_" + TaskContext.get.partitionId))
   ds.writeBytes("Put yours results")
   ds.close()
   iter
})

关于java - spark java api 有像 hadoop MultipleOutputs/FSDataOutputStream 这样的类吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43654407/

相关文章:

java - 如何在现有 Spring Hibernate 应用程序中包含 Spring JPA

hadoop - 将矩阵发送给udf pig 拉丁

scala - SPARK 数据帧错误 : cannot be cast to scala. Function2,同时使用 UDF 拆分列中的字符串

hadoop - 将大量 Spark 数据帧合并为一个

java - 如何在 Apache Spark Java 中将 JavaRDD<Row> 转换为 JavaRDD<String>

java - 如何使用SWIG包装std::function对象?

java - RelativeLayout 中的 ImageView 与父级不匹配

java - 包含 Map<Entity, Entity> 字段的正确方法是什么

hadoop - Hbase memstore 手动刷新

Hadoop 摄取自动化技术