我正在将一些软件从较旧的 Hadoop 集群(使用用户名/密码身份验证)转移到较新的,2.6.0-cdh5.12.0,它具有>Kerberos 身份验证 已启用。
我已经能够使用 AccumuloInput/OutputFormat 类中设置的 DelegationToken 使许多使用 Accumulo 作为其输入和/或输出的现有 Map/Reduce 作业正常工作。
但是,我有 1 个工作,它使用 AccumuloInput/OutputFormat 进行输入和输出,而且在其 Mapper.setup() 方法中,它通过 Zookeeper 连接到 Accumulo,因此在 Mapper.map() 方法中,它可以将 Mapper.map() 处理的每个键/值与另一个 Accumulo 表中的条目进行比较。
我在下面包含了相关代码,它显示了连接到 Zookeeper 用户 PasswordToken 的 setup() 方法,然后创建了一个 Accumulo 表扫描器,然后在映射器方法中使用它。
所以问题是如何用 KerberosToken 替换 PasswordToken 的使用,以便在 Mapper.setup() 方法中设置 Accumulo 扫描器?我找不到任何方法来“获取”我设置的 AccumuloInput/OutputFormat 类使用的 DelegationToken。
我已尝试使用 context.getCredentials().getAllTokens() 并寻找 org.apache.accumulo.code.client.security.tokens.AuthenticationToken 类型的 token ——此处返回的所有 token 均为 org 类型。 apache.hadoop.security.token.Token.
请注意,由于代码在未连接到 Internet 的网络上运行,因此我输入的是代码片段而不是剪切/粘贴 - 即可能存在拼写错误。 :)
//****************************
// code in the M/R driver
//****************************
ClientConfiguration accumuloCfg = ClientConfiguration.loadDefault().withInstance("Accumulo1").withZkHosts("zookeeper1");
ZooKeeperInstance inst = new ZooKeeperInstance(accumuloCfg);
AuthenticationToken dt = conn.securityOperations().getDelegationToken(new DelagationTokenConfig());
AccumuloInputFormat.setConnectorInfo(job, username, dt);
AccumuloOutputFormat.setConnectorInfo(job, username, dt);
// other job setup and then
job.waitForCompletion(true)
//****************************
// this is inside the Mapper class of the M/R job
//****************************
private Scanner index_scanner;
public void setup(Context context) {
Configuration cfg = context.getConfiguration();
// properties set and passed from M/R Driver program
String username = cfg.get("UserName");
String password = cfg.get("Password");
String accumuloInstName = cfg.get("InstanceName");
String zookeepers = cfg.get("Zookeepers");
String tableName = cfg.get("TableName");
Instance inst = new ZooKeeperInstance(accumuloInstName, zookeepers);
try {
AuthenticationToken passwordToken = new PasswordToken(password);
Connector conn = inst.getConnector(username, passwordToken);
index_scanner = conn.createScanner(tableName, conn.securityOperations().getUserAuthorizations(username));
} catch(Exception e) {
e.printStackTrace();
}
}
public void map(Key key, Value value, Context context) throws IOException, InterruptedException {
String uuid = key.getRow().toString();
index_scanner.clearColumns();
index_scanner.setRange(Range.exact(uuid));
for(Entry<Key, Value> entry : index_scanner) {
// do some processing in here
}
}
最佳答案
提供的AccumuloInputFormat和 AccumuloOutputFormat有一种方法可以使用 Accumulo*putFormat.setConnectorInfo(job, principle, token)
在作业配置中设置 token 。您还可以使用 AuthenticationTokenSerializer 序列化 HDFS 文件中的 token 。并使用接受文件名的 setConnectorInfo
方法版本。
如果传入 KerberosToken,作业将创建要使用的 DelegationToken,如果传入 DelegationToken,它只会使用它。
所提供的 AccumuloInputFormat
应该处理它自己的扫描器,所以通常情况下,如果您已正确设置配置,则不必在 Mapper 中执行此操作。但是,如果您在 Mapper 中进行二次扫描(例如连接),您可以检查提供的 AccumuloInputFormat
的 RecordReader 源代码,以获取如何检索配置和构建扫描仪。
关于hadoop - 使用 Kerberos 连接到 Mapper 内的 Accumulo,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46812480/