我尝试从 HDFS 中的文件聚合数据。 我需要从那些对 hbase 中的特定表具有值(value)的数据中添加一些详细信息。
但我有异常(exception):
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
at org.apache.spark.SparkContext.clean(SparkContext.scala:1623)
at org.apache.spark.rdd.RDD.map(RDD.scala:286)
at org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:113)
at org.apache.spark.api.java.AbstractJavaRDDLike.mapToPair(JavaRDDLike.scala:46)
at ......
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:577)
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:174)
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:197)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.io.NotSerializableException: org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation
Serialization stack:
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
我知道问题发生在我们尝试在 map 函数期间访问 hbase 时。
我的问题是:如何使用 hbase 表中包含的值来完成我的 RDD。
例如: hdfs 中的文件是 csv:
Name;Number1;Number2
toto;1;2
在 hbase 中,我们有与名称 toto 关联的数据。
我需要检索数字 1 和数字 2 的总和(这是最简单的部分) 并与表中的数据聚合。 例如:
reducer 的键将是 tata 并通过获取 hbase 表中的行键 toto 来检索。
有什么建议吗?
最佳答案
最后一位同事做到了,多亏了你的建议:
所以这是允许使用 hbase 表中的数据聚合文件的映射代码。
private final Logger LOGGER = LoggerFactory.getLogger(AbtractGetSDMapFunction.class);
/**
* Namespace name
*/
public static final String NAMESPACE = "NameSpace";
private static final String ID = "id";
private Connection connection = null;
private static final String LINEID = "l";
private static final String CHANGE_LINE_ID = "clid";
private static final String CHANGE_LINE_DATE = "cld";
private String constClientPortHBase;
private String constQuorumHBase;
private int constTimeOutHBase;
private String constZnodeHBase;
public void initConnection() {
Configuration conf = HBaseConfiguration.create();
conf.setInt("timeout", constTimeOutHBase);
conf.set("hbase.zookeeper.quorum", constQuorumHBase);
conf.set("hbase.zookeeper.property.clientPort", constClientPortHBase);
conf.set("zookeeper.znode.parent", constZnodeHBase);
try {
connection = HConnectionManager.createConnection(conf);
} catch (Exception e) {
LOGGER.error("Error in the configuration of the connection with HBase.", e);
}
}
public Tuple2<String, myInput> call(String row) throws Exception {
//this is where you need to init the connection for hbase to avoid serialization problem
initConnection();
....do your work
State state = getCurrentState(myInput.getKey());
....do your work
}
public AbtractGetSDMapFunction( String constClientPortHBase, String constQuorumHBase, String constZnodeHBase, int constTimeOutHBase) {
this.constClientPortHBase = constClientPortHBase;
this.constQuorumHBase = constQuorumHBase;
this.constZnodeHBase = constZnodeHBase;
this.constTimeOutHBase = constTimeOutHBase;
}
/***************************************************************************/
/**
* Table Name
*/
public static final String TABLE_NAME = "Table";
public state getCurrentState(String key) throws TechnicalException {
LOGGER.debug("start key {}", key);
String buildRowKey = buildRowKey(key);
State currentState = new State();
String columnFamily = State.getColumnFamily();
if (!StringUtils.isEmpty(buildRowKey) && null != columnFamily) {
try {
Get scan = new Get(Bytes.toBytes(buildRowKey));
scan.addFamily(Bytes.toBytes(columnFamily));
addColumnsToScan(scan, columnFamily, ID);
Result result = getTable().get(scan);
currentState.setCurrentId(getLong(result, columnFamily, ID));
} catch (IOException ex) {
throw new TechnicalException(ex);
}
LOGGER.debug("end ");
}
return currentState;
}
/***********************************************************/
private Table getTable() throws IOException, TechnicalException {
Connection connection = getConnection();
// Table retrieve
if (connection != null) {
Table table = connection.getTable(TableName.valueOf(NAMESPACE, TABLE_NAME));
return table;
} else {
throw new TechnicalException("Connection to Hbase not available");
}
}
/****************************************************************/
private Long getLong(Result result, String columnFamily, String qualifier) {
Long toLong = null;
if (null != columnFamily && null != qualifier) {
byte[] value = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
toLong = (value != null ? Bytes.toLong(value) : null);
}
return toLong;
}
private String getString(Result result, String columnFamily, String qualifier) {
String toString = null;
if (null != columnFamily && null != qualifier) {
byte[] value = result.getValue(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier));
toString = (value != null ? Bytes.toString(value) : null);
}
return toString;
}
public Connection getConnection() {
return connection;
}
public void setConnection(Connection connection) {
this.connection = connection;
}
private void addColumnsToScan(Get scan, String family, String qualifier) {
if (org.apache.commons.lang.StringUtils.isNotEmpty(family) && org.apache.commons.lang.StringUtils.isNotEmpty(qualifier)) {
scan.addColumn(Bytes.toBytes(family), Bytes.toBytes(qualifier));
}
}
private String buildRowKey(String key) throws TechnicalException {
StringBuilder rowKeyBuilder = new StringBuilder();
rowKeyBuilder.append(HashFunction.makeSHA1Hash(key));
return rowKeyBuilder.toString();
}
关于java - 在 map 调用中获取 Spark 上的行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41985552/