java - Apache Spark 数据集。 foreach 与 Aerospike 客户端

标签 java apache-spark serialization hive aerospike

我想通过 Apache Spark 从 Apache Hive 检索行并将每一行放入 Aerospike 缓存。

这是一个简单的例子。

var dataset = session.sql("select * from employee");
final var aerospikeClient = aerospike;  // to remove binding between lambda and the service class itself
dataset.foreach(row -> {
    var key = new Key("namespace", "set", randomUUID().toString());
    aerospikeClient.add(
        key,
        new Bin(
            "json-repr",
            row.json()
        )
    );
});

我得到一个错误:

Caused by: java.io.NotSerializableException: com.aerospike.client.reactor.AerospikeReactorClient

显然我无法使 AerospikeReactorClient 可序列化。我尝试添加 dataset.collectAsList() 并且确实有效。但据了解,此方法将所有内容加载到一个节点中。可能有大量数据。所以,这不是选择。

处理此类问题的最佳做法是什么?

最佳答案

你可以写directly from a data frame.无需遍历数据集。

Launch the spark shell and import the com.aerospike.spark.sql._ package:

$ spark-shell
scala> import com.aerospike.spark.sql._
import com.aerospike.spark.sql._

Example of writing data into Aerospike

val TEST_COUNT= 100
val simpleSchema: StructType = new StructType(
    Array(
    StructField("one", IntegerType, nullable = false),
    StructField("two", StringType, nullable = false),
    StructField("three", DoubleType, nullable = false)
  ))

val simpleDF = {
    val inputBuf=  new ArrayBuffer[Row]()
    for ( i <- 1 to num_records){
        val one = i
        val two = "two:"+i
        val three = i.toDouble
        val r = Row(one, two, three)
        inputBuf.append(r)
    }
    val inputRDD = spark.sparkContext.parallelize(inputBuf.toSeq)
    spark.createDataFrame(inputRDD,simpleSchema)
}

//Write the Sample Data to Aerospike
simpleDF.write
.format("aerospike") //aerospike specific format
.option("aerospike.writeset", "spark-test") //write to this set
.option("aerospike.updateByKey", "one")//indicates which columns should be used for construction of primary key
.option("aerospike.write.mode","update")
.save()

关于java - Apache Spark 数据集。 foreach 与 Aerospike 客户端,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70941000/

相关文章:

java - 奥普?适配器?反射(reflection)?如何泛化setter方法(不抽象目标对象)

java - Spring Batch - 根据记录计数写入多个文件

java - 类循环引用的原因?

Java 执行器 : wait for task termination.

apache-spark - 在 Pyspark 中使用 maxBytesPerTrigger 的正确方法是什么?

apache-spark - 什么是随机分区?

hadoop - 写入 hdfs 时出现 Spark Socket 超时问题

java - 序列化 PHP => 反序列化 JAVA/Serialize for php in string format

java - ObjectOutputStream 导致内存泄漏,reset() 抛出错误

c# - 用两个相等的值序列化标志枚举