java - Map Reduce程序将多个xml文件合并为一个xml文件

标签 java hadoop mapreduce azure-hdinsight

我是 Hindsight 和 Hadoop map reduce 概念的新手。我正在尝试使用 map reduce 程序将多个 XML 文件合并为一个 XML 文件。我的目的是通过将文件名作为开始和结束标记添加和附加到文件名中,将每个 XML 文件合并到目标 XML 文件中。 例如。以下 XML 应合并为如下所示的单个 XML

输入 XML 文件

<xml><a></a></xml>
<xml><b></b></xml>
<xml><c></c></xml>

输出 XML 文件

<xml>
 <File1Name><xml><a></a></xml><File2Name>
 <File2Name><xml><b></b></xml><File3Name>
 <File3Name><xml><c></c></xml><File3Name>
<xml>

问题 1:是否可以将一个 XML 文件映射到每个映射器并创建一个键值对,键作为文件名,值作为每个 XML 文件的前缀和附加文件名作为开始和结束标记以及缩减器进行合并所有 XML 到单个上下文并输出到如上所示的 XML。

问题 2:如何在映射器代码中获取文件名作为键?

最佳答案

答案 1: 我不建议只向映射器发送单个 XML,除非文件超过 1gb。您可以将 xml 位置列表发送到您的映射器,然后在您的映射器代码中打开每个位置并将数据提取到您的输出中。

答案 2: 如果使用 azure blob 存储,您可以列出容器中的所有 blob,并将它们分配给输入拆分。

How to create your list of InputSplits:
ArrayList<InputSplit> ret = new ArrayList<InputSplit>();

/*Do this for each path we receive.  Creates a directory of splits in this order s = input path (S1,1),(s2,1)…(sN,1),(s1,2),(sN,2),(sN,3) etc..
 */
for (int i = numMinNameHashSplits; i <=     Math.min(numMaxNameHashSplits,numNameHashSplits–1); i++) {
for (Path inputPath : inputPaths) {
  ret.add(new ParseDirectoryInputSplit(inputPath.toString(), i));
  System.out.println(i + ” “+inputPath.toString());
 }
 }
return ret;
  }
 }

Once the List<InputSplits> is assembled, each InputSplit is handed to a Record Reader class where each Key, Value, pair is read then passed to the map task.  The initialization of the recordreader class uses the InputSplit, a string representing the location of a “folder” of invoices in blob storage, to return a list of all blobs within the folder, the blobs variable below.  The below Java code demonstrates the creation of the record reader for each hashslot and the resulting list of blobs in that location.

Public class ParseDirectoryFileNameRecordReader

extends RecordReader<IntWritable, Text> {
private int nameHashSlot;
private int numNameHashSlots;
private Path myDir;
private Path currentPath;
private Iterator<ListBlobItem> blobs;
private int currentLocation;

public void initialize(InputSplit split, TaskAttemptContext context)
throws IOException, InterruptedException {
   myDir = ((ParseDirectoryInputSplit)split).getDirectoryPath();

//getNameHashSlot tells us which slot this record reader is responsible for
   nameHashSlot = ((ParseDirectoryInputSplit)split).getNameHashSlot();

//gets the total number of hashslots
   numNameHashSlots = getNumNameHashSplits(context.getConfiguration());

//gets the input credientals to the storage account assigned to this record reader.
   String inputCreds = getInputCreds(context.getConfiguration());

//break the directory path to get account name    
   String[] authComponents = myDir.toUri().getAuthority().split(“@”);
   String accountName = authComponents[1].split(“\\.”)[0];
   String containerName = authComponents[0];
   String accountKey = Utils.returnInputkey(inputCreds, accountName);
   System.out.println(“This mapper is assigned the following     account:”+accountName);
StorageCredentials creds = new        StorageCredentialsAccountAndKey(accountName,accountKey);
CloudStorageAccount account = new CloudStorageAccount(creds);
   CloudBlobClient client = account.createCloudBlobClient();
   CloudBlobContainer container =        client.getContainerReference(containerName);
blobs = container.listBlobs(myDir.toUri().getPath().substring(1) +     “/”,     true,EnumSet.noneOf(BlobListingDetails.class), null,null).iterator();
   currentLocation = –1;
return;
 }

Once initialized, the record reader is used to pass the next key to the map task.  This is controlled by the nextKeyValue method, and it is called every time map task starts.  The blow Java code demonstrates this.




//This checks if the next key value is assigned to this task or is assigned to another mapper.  If it assigned to this task the location is passed to the mapper, otherwise return false
 @Override
public boolean nextKeyValue() throws IOException, InterruptedException {
while (blobs.hasNext()) {
  ListBlobItem currentBlob = blobs.next();

//Returns a number between 1 and number of hashslots. If it matches the number assigned to this Mapper and its length is greater than 0, return the path to the map function
  if (doesBlobMatchNameHash(currentBlob) && getBlobLength(currentBlob) > 0) {
String[] pathComponents = currentBlob.getUri().getPath().split(“/”);

String pathWithoutContainer =
currentBlob.getUri().getPath().substring(pathComponents[1].length() + 1);

currentPath = new Path(myDir.toUri().getScheme(),     myDir.toUri().getAuthority(),pathWithoutContainer);

currentLocation++;
return true;
 }
    }
return false;
 }

The logic in the map function is than simply as follows, with inputStream containing the entire XML string

Path inputFile = new Path(value.toString());
FileSystem fs = inputFile.getFileSystem(context.getConfiguration());

//Input stream contains all data from the blob in the location provided by Text
FSDataInputStream inputStream = fs.open(inputFile);

资源:

http://www.andrewsmoll.com/3-hacks-for-hadoop-and-hdinsight-clusters/ “黑客 3”

http://blogs.msdn.com/b/mostlytrue/archive/2014/04/10/merging-small-files-on-hdinsight.aspx

关于java - Map Reduce程序将多个xml文件合并为一个xml文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34884505/

相关文章:

java - 如何选择目录中的上一张或下一张图像?

java - 如何在Struts中使用servlet在JSP中动态检索SQL记录

java - 下载servlet抛出java堆空间异常

java - 在 mapreduce 作业中对单独的行应用 wordcount

hadoop - Input Split 大小是常量还是取决于逻辑记录?

java - 重构一个长 servlet 条件

hadoop map-reduce : how to deploy non-jar files

hadoop - 使用Hive反序列化protobuf列

hadoop - Hadoop 中的 mapred.child.javaopts 参数 - 读取序列化的 HashMap

hadoop - 在较早的帖子中进行了澄清(处理输入文件中的前N行)