scala - 将 Spark Streaming RDD 推送到 Neo4j -Scala

标签 scala neo4j apache-spark spark-streaming anormcypher

我需要建立从 Spark Streaming 到 Neo4j 图形数据库的连接。RDD 的类型为((is,I),(am,Hello)(sam,happy....)。我需要在 Neo4j 中的每对单词之间建立一条边。

在我发现的 Spark Streaming 文档中

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

to the push to the data to an external database.

我正在 Scala 中执行此操作。我对如何去做有点困惑?我找到了 AnormCypher 和 Neo4jScala 包装器。我可以使用这些来完成工作吗?如果是这样,我该怎么做?如果没有,还有更好的选择吗?

谢谢你们....

最佳答案

我用 AnormCypher 做了一个实验

像这样:

implicit val connection = Neo4jREST.setServer("localhost", 7474, "/db/data/")
val conf = new SparkConf().setAppName("Simple Application")
val sc = new SparkContext(conf)
val logData = sc.textFile(FILE, 4).cache()
val count = logData
  .flatMap( _.split(" "))
  .map( w =>
    Cypher("CREATE(:Word {text:{text}})")
      .on( "text" -> w ).execute()
   ).filter( _ ).count()

Neo4j 2.2.x 具有出色的并发写入性能,您可以在 Spark 中使用。因此,您可以向 Neo4j 写入的并发线程越多越好。如果每个请求可以批量处理 100 到 1000 个语句,那就更好了。

关于scala - 将 Spark Streaming RDD 推送到 Neo4j -Scala,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31059930/

相关文章:

json - 无法在 Neo4j 3.2.6 上调用 apoc.load.json

neo4j - Spring Data Neo4j 5 和动态@Properties - InvalidDataAccessApiUsageException

mysql - 用于检索电影关系的图形引擎

python - 获取元组的第一项以在 pyspark 的列表中教授一行

scala - MapPartitions 上的垃圾收集问题

scala - 错误: Invalid or corrupt jarfile sbt/sbt-launch-0. 13.5.jar

scala - Apache Spark 抛出 java.lang.IllegalStateException : unread block data

python - 错误 "AttributeError: ' Py4JError'对象没有属性 'message'构建DecisionTreeModel

java - "Malformed data length is negative",当尝试使用带有 Avro 数据源的 kafka 的 Spark 结构化流时

hadoop - YARN 阈值错误