scala - 如何序列化elastic4s ElasticSearch客户端以与Spark RDD一起运行?

标签 scala serialization elasticsearch machine-learning apache-spark

目前,我正在数百万用户和产品上运行 Spark Mllib ALS,并且由于磁盘洗牌次数较多,因此与以下代码一样,收集步骤与RecommendProductsForUsers步骤相比需要更多时间。因此,如果我可以以某种方式删除收集步骤并将数据直接从执行器提供给Elasticsearch,那么将节省大量时间和计算资源。

import com.sksamuel.elastic4s.ElasticClient
import com.sksamuel.elastic4s.ElasticDsl._
import org.elasticsearch.common.settings.ImmutableSettings


val settings = ImmutableSettings.settingsBuilder().put("cluster.name", "MYCLUSTER").build()
val client = ElasticClient.remote(settings, "11.11.11.11", 9300)
var ESMap = Map[String, List[String]]()
  val topKReco = bestModel.get
  // below step take 3 hours
  .recommendProductsForUsers(30)
  // below step takes 6 hours
  .collect()
  .foreach { r =>
  var i = 1
  var curr_user = r._1
  r._2.foreach { r2 =>
  item_ids(r2.product))
    ESMap += i.toString -> List(r2.product.toString)
    i += 1
  }
  client.execute {
    index into "recommendations1" / "items" id curr_user fields ESMap
  }.await
}

所以现在当我在没有收集步骤的情况下运行此代码时,我收到以下错误:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:315)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:305)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:132)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1893)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:869)
    at org.apache.spark.rdd.RDD$$anonfun$foreach$1.apply(RDD.scala:868)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:286)
    at org.apache.spark.rdd.RDD.foreach(RDD.scala:868)
    at CatalogALS2$.main(CatalogALS2.scala:157)
    at CatalogALS2.main(CatalogALS2.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:665)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:170)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:193)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
 Caused by: java.io.NotSerializableException:      com.sksamuel.elastic4s.ElasticClient
Serialization stack:
    - object not serializable (class: com.sksamuel.elastic4s.ElasticClient,     value: com.sksamuel.elastic4s.ElasticClient@e4c4af)
    - field (class: CatalogALS2$$anonfun$2, name: client$1, type: class    com.sksamuel.elastic4s.ElasticClient)
    - object (class CatalogALS2$$anonfun$2, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:81)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:312)

所以我从中了解到的是,如果我可以以某种方式序列化 com.sksamuel.elastic4s.ElasticClient 类,那么我可以并行运行此任务,而无需将数据收集到驱动程序。 如果我概括这个问题,那么如何序列化 scala 中的任何类或函数以在 RDD 上操作?

最佳答案

通过使用序列化找到相同的答案,例如:

object ESConnection extends Serializable {

  //    Elasticsearch Client intiation
  val settings = ImmutableSettings.settingsBuilder().put("cluster.name", "MyCluster").build()
  lazy val client = ElasticClient.remote(settings, "11.11.11.11", 9300)

}

然后您可以在执行器上通过 RDD 使用它,而无需实际将数据收集到驱动程序,如下所示:

   val topKReco = bestModel.get
      .recommendProductsForUsers(30)
      // no collect required now
      .foreach { r =>
      var i = 1
      var curr_user = r._1

      r._2.foreach { r2 =>
      ESMap += i.toString -> List(r2.product.toString, item_ids(r2.product))
        i += 1
      }
      ESConnection.client.execute {
        index into "recommendation1" / "items" id curr_user fields ESMap
      }.await

    }

关于scala - 如何序列化elastic4s ElasticSearch客户端以与Spark RDD一起运行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31933671/

相关文章:

Scala:不是合法的形式参数

scala - 处理异常以便理解

elasticsearch - 剥离HTML标签后ElasticSearch突出显示

elasticsearch - 在初始化 RestHighLevel 客户端时,我们总是需要使用新的 RestHighLevel 客户端和新主机吗?

java - 如何在不覆盖序列化方法的情况下为特定类覆盖 JsonSerializer 的 isEmpty 方法?

elasticsearch - 在kubernetes上运行Elasticsearch集群时的堆大小

java - 压缩:在Java/Scala中以指定比特率保存JPEG图像

scala - Spark : DB connection per Spark RDD partition and do mapPartition

c# - .NET,C# : How to add a custom serialization attribute that acts as ISerializable interface

java - writeObject 和 readObject 如何工作?