java - 无法附加到 HDFS

标签 java multithreading hadoop client hdfs

我有一个类负责从数据源接收一些批处理数据并将该数据的序列化内容写入文件(始终是同一个文件)。为此,我在创建实例时做的第一件事是检查该文件是否存在,如果不存在则创建它。这似乎可以毫无问题地创建文件,但是当我尝试使用 onOperationsBatchSynchronization 方法将序列化对象附加到文件时,问题就来了。

这是上述类的代码:

public class HDFSSpaceSynchronizationEndpoint extends SpaceSynchronizationEndpoint {

    private final static Logger LOG = LoggerFactory.getLogger(HDFSSpaceSynchronizationEndpoint.class);
    private final String uriToFileToWrite;
    private final HDFSFileUtil hdfsFileUtil;

    public HDFSSpaceSynchronizationEndpoint(HDFSFileUtil hdfsFileUtil) {
        Validate.notNull(hdfsFileUtil);
        this.hdfsFileUtil = hdfsFileUtil;
        uriToFileToWrite = hdfsFileUtil.getUriToHdfs() + "/object-container";
        createFileIfNeeded();
    }

    private void createFileIfNeeded() {
        final String methodName = "createFileIfNeeded";
        synchronized (this) {
            try {
                if (!hdfsFileUtil.fileExistsInCluster(uriToFileToWrite)) {
                    hdfsFileUtil.createFileInCluster(uriToFileToWrite);
                }
            } catch (IOException e) {
                LOG.error(methodName, "", "Error creating the file in the cluster: {}", e);
            }
        }
    }

    @Override
    public void onOperationsBatchSynchronization(OperationsBatchData batchData) {
        final String methodName = "onOperationsBatchSynchronization";
        LOG.error(methodName, "", "Batch operation received: {}", batchData.getSourceDetails().getName());
        DataSyncOperation[] operations = batchData.getBatchDataItems();
        synchronized (this) {
            for (DataSyncOperation operation : operations) {
                try {
                    hdfsFileUtil.writeObjectToAFile((Serializable) operation.getDataAsObject(), uriToFileToWrite);
                } catch (IOException e) {
                    LOG.error(methodName, "", "Error writing the object to a file in the cluster: {}", e);
                }
            }
        }
    }
}

这是负责与空间交互的类的代码:

public class HDFSFileUtilImpl implements HDFSFileUtil {

    private final static Logger LOG = LoggerFactory.getLogger(HDFSFileUtilImpl.class);
    private final static boolean DELETE_RECURSIVELY = true;
    private final String uriToHdfs;
    private final FileSystem fileSystem;

    public HDFSFileUtilImpl(HDFSConfiguration config, String uriToHdfs, String user) {
        Validate.notNull(config);
        Validate.notEmpty(uriToHdfs);
        Validate.notEmpty(user);
        this.uriToHdfs = uriToHdfs;
        try {
            fileSystem = FileSystem.get(new URI(uriToHdfs), config.getConfiguration(), user);
        } catch (IOException | URISyntaxException | InterruptedException e) {
            LOG.error("constructor", "", "HDFSFileUtilImpl constructor failed: {}", e);
            throw new IllegalStateException(e);
        }
    }

    @Override
    public String getUriToHdfs() {
        return uriToHdfs;
    }

    @Override
    public void writeObjectToAFile(Serializable obj, String fileUri) throws   IOException {
        Validate.notNull(obj);
        Validate.notEmpty(fileUri);
        FSDataOutputStream out;
        if (!fileExistsInCluster(fileUri)) {
            throw new IllegalArgumentException("File with URI: " + fileUri + " does not exist in the cluster");
        }
        out = fileSystem.append(new Path(fileUri));
        byte[] objByteArray = getBytesFromObject(obj);
        out.write(objByteArray);
        out.close();
    }

    private byte[] getBytesFromObject(Object obj) throws IOException {
        byte[] retByteArray = null;
        // try/catch used only to be able to use "try with resources" feature
        try (ByteArrayOutputStream bos = new ByteArrayOutputStream(); ObjectOutput out = new ObjectOutputStream(bos);) {
            out.writeObject(obj);
            retByteArray = bos.toByteArray();
        } catch (IOException e) {
            throw new IOException(e);
        }
        return retByteArray;
    }

     @Override
     public void createFileInCluster(String uriOfFile) throws IOException {
          Validate.notEmpty(uriOfFile);
          fileSystem.create(new Path(uriOfFile));
     }

    @Override
    public boolean fileExistsInCluster(String uri) throws IOException {
        Validate.notEmpty(uri);
        boolean result = false;
        result = fileSystem.exists(new Path(uri));
        return result;
    }

    ...
} 

数据源与我的组件建立了三个连接,方法 onOperationsBatchSynchronization 以并发方式被调用。这就是使用同步块(synchronized block)的原因,但即使使用它们,我也会从日志中得到以下异常:

   10:09:23.727  ERROR  - onOperationsBatchSynchronization
org.apache.hadoop.ipc.RemoteException: Failed to create file [/object-container] for [DFSClient_NONMAPREDUCE_1587728611_73] for client [127.0.0.1], because this file is already being created by [DFSClient_NONMAPREDUCE_1972611521_106] on [127.0.0.1]
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.recoverLeaseInternal(FSNamesystem.java:2636)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInternal(FSNamesystem.java:2462)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFileInt(FSNamesystem.java:2700)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.appendFile(FSNamesystem.java:2663)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.append(NameNodeRpcServer.java:559)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.append(ClientNamenodeProtocolServerSideTranslatorPB.java:388)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:585)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:928)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2013)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2009)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2007)

那么这里可能是什么问题呢?我有一些单元测试(更像是集成,因为它们依赖于正在运行的 Hadoop 设置)并且 HDFSFileUtilImpl 上的所有方法都正常工作并提供预期结果。

编辑:我只是尝试在集群中写入文件而不是附加到同一个文件并且它工作正常。所以我会放弃任何权限问题。

最佳答案

终于摆脱了错误。显然,当您从 filesystem 调用 create 时,必须关闭返回的 FSDataOutputStream

也就是说,这就是 HDFSFileUtilImpl 中的方法 createFileInCluster 现在是如何实现的:

@Override
 public void createFileInCluster(String uriOfFile) throws IOException {
      Validate.notEmpty(uriOfFile);
      FSDataOutputStream out = fileSystem.create(new Path(uriOfFile));
      out.close();
 }

关于java - 无法附加到 HDFS,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27314412/

相关文章:

java - 多线程中的静态方法

hadoop - 如何维护 Reducer 中 MapWritables 的顺序?

使用sparklyr从本地桌面读取存储在hdfs上的csv文件

java - 无法生成字符串的所有排列(迭代)

java - 我可以使用 Java 8 运行时通过 App Engine Standard 在运行时设置环境变量吗

java - 如何中断 ExecutorService 线程

java - 如何创建线程以避免 IllegalThreadStateException

java - 尝试将文件写入内部数据时不断出现 NullPointerException

java - 如何制作二维数组并打印它们

hadoop - HDFS 下复制 block