python - 让 Spark、Python 和 MongoDB 协同工作

标签 python mongodb apache-spark pyspark pymongo

我很难将这些组件正确地结合在一起。我已成功安装 Spark 并运行,我可以在本地、独立以及通过 YARN 运行作业。我已按照建议的步骤进行操作(据我所知)herehere

我正在开发 Ubuntu,我拥有的各种组件版本是

我在执行各个步骤时遇到了一些困难,例如将哪些 jar 添加到哪个路径,所以我添加的是

  • /usr/local/share/hadoop-2.6.1/share/hadoop/mapreduce 我已经添加 mongo-hadoop-core-1.5。 0-SNAPSHOT.jar
  • 以下环境变量
    • export HADOOP_HOME="/usr/local/share/hadoop-2.6.1"
    • 导出 PATH=$PATH:$HADOOP_HOME/bin
    • export SPARK_HOME="/usr/local/share/spark-1.5.1-bin-hadoop2.6"
    • export PYTHONPATH="/usr/local/share/mongo-hadoop/spark/src/main/python"
    • 导出 PATH=$PATH:$SPARK_HOME/bin

我的 Python 程序是基本的

from pyspark import SparkContext, SparkConf
import pymongo_spark
pymongo_spark.activate()

def main():
    conf = SparkConf().setAppName("pyspark test")
    sc = SparkContext(conf=conf)
    rdd = sc.mongoRDD(
        'mongodb://username:password@localhost:27017/mydb.mycollection')

if __name__ == '__main__':
    main()

我正在使用命令运行它

$SPARK_HOME/bin/spark-submit --driver-class-path /usr/local/share/mongo-hadoop/spark/build/libs/ --master local[4] ~/sparkPythonExample/SparkPythonExample.py

结果我得到以下输出

Traceback (most recent call last):
  File "/home/me/sparkPythonExample/SparkPythonExample.py", line 24, in <module>
    main()
  File "/home/me/sparkPythonExample/SparkPythonExample.py", line 17, in main
    rdd = sc.mongoRDD('mongodb://username:password@localhost:27017/mydb.mycollection')
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 161, in mongoRDD
    return self.mongoPairRDD(connection_string, config).values()
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 143, in mongoPairRDD
    _ensure_pickles(self)
  File "/usr/local/share/mongo-hadoop/spark/src/main/python/pymongo_spark.py", line 80, in _ensure_pickles
    orig_tb)
py4j.protocol.Py4JError

根据here

This exception is raised when an exception occurs in the Java client code. For example, if you try to pop an element from an empty stack. The instance of the Java exception thrown is stored in the java_exception member.

查看 pymongo_spark.py 的源代码和抛出错误的行,它说

"Error while communicating with the JVM. Is the MongoDB Spark jar on Spark's CLASSPATH? : "

因此,作为回应,我试图确保传递了正确的 jar ,但我可能做错了,见下文

$SPARK_HOME/bin/spark-submit --jars /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar --driver-class-path /usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-java-driver-3.0.4.jar,/usr/local/share/spark-1.5.1-bin-hadoop2.6/lib/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar --master local[4] ~/sparkPythonExample/SparkPythonExample.py

我已经将 pymongo 导入到同一个 python 程序中,以验证我至少可以使用它访问 MongoDB,而且我可以。

我知道这里有很多事件部件,所以如果我能提供更多有用的信息,请告诉我。

最佳答案

更新:

2016-07-04

自上次更新 MongoDB Spark Connector成熟了很多。它提供up-to-date binaries和基于数据源的 API,但它使用 SparkConf 配置,因此主观上不如 Stratio/Spark-MongoDB 灵活。

2016-03-30

从最初的答案开始,我发现了两种从 Spark 连接到 MongoDB 的不同方法:

虽然前者似乎相对不成熟,但后者看起来比 Mongo-Hadoop 连接器更好,并提供 Spark SQL API。

# Adjust Scala and package version according to your setup
# although officially 0.11 supports only Spark 1.5
# I haven't encountered any issues on 1.6.1
bin/pyspark --packages com.stratio.datasource:spark-mongodb_2.11:0.11.0
df = (sqlContext.read
  .format("com.stratio.datasource.mongodb")
  .options(host="mongo:27017", database="foo", collection="bar")
  .load())

df.show()

## +---+----+--------------------+
## |  x|   y|                 _id|
## +---+----+--------------------+
## |1.0|-1.0|56fbe6f6e4120712c...|
## |0.0| 4.0|56fbe701e4120712c...|
## +---+----+--------------------+

似乎比mongo-hadoop-spark稳定很多,支持predicate pushdown,无需静态配置,简单易用。

原答案:

确实,这里有很多事件部件。我试图通过构建一个与描述的配置大致匹配的简单 Docker 镜像来使其更易于管理(但为了简洁起见,我省略了 Hadoop 库)。您可以找到complete source on GitHub ( DOI 10.5281/zenodo.47882 ) 并从头开始构建它:

git clone https://github.com/zero323/docker-mongo-spark.git
cd docker-mongo-spark
docker build -t zero323/mongo-spark .

或下载我拥有的图片pushed to Docker Hub所以你可以简单地 docker pull zero323/mongo-spark):

开始图片:

docker run -d --name mongo mongo:2.6
docker run -i -t --link mongo:mongo zero323/mongo-spark /bin/bash

通过 --jars--driver-class-path 启动 PySpark shell:

pyspark --jars ${JARS} --driver-class-path ${SPARK_DRIVER_EXTRA_CLASSPATH}

最后看看它是如何工作的:

import pymongo
import pymongo_spark

mongo_url = 'mongodb://mongo:27017/'

client = pymongo.MongoClient(mongo_url)
client.foo.bar.insert_many([
    {"x": 1.0, "y": -1.0}, {"x": 0.0, "y": 4.0}])
client.close()

pymongo_spark.activate()
rdd = (sc.mongoRDD('{0}foo.bar'.format(mongo_url))
    .map(lambda doc: (doc.get('x'), doc.get('y'))))
rdd.collect()

## [(1.0, -1.0), (0.0, 4.0)]

请注意,mongo-hadoop 似乎在第一次操作后关闭了连接。所以在collect之后调用例如rdd.count()会抛出异常。

基于我在创建此图像时遇到的不同问题,我倾向于相信 passing mongo-hadoop-1.5.0-SNAPSHOT.jarmongo -hadoop-spark-1.5.0-SNAPSHOT.jar 对两个 --jars--driver-class-path 是唯一的硬性要求

注意事项:

关于python - 让 Spark、Python 和 MongoDB 协同工作,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33391840/

相关文章:

json - 如何在 JSON : SPARK Scala 中使用 read.schema 仅指定特定字段

python - 奇怪的 If、Elif、Else 行为

mongodb - 防止 Lithium/mongoDB 中的表单操作

javascript - 如何使用 Passport.js 循环遍历用户并将用户列表打印到 DOM 中

python + pymongo : how to insert a new field on an existing document in mongo from a for loop

python - 读取分布式制表符分隔的 CSV

python - 如何使用 Python 对 Spark 中的 LIBSVM 文件进行特征选择和缩减?

python - 如何在 Pandas DF 中将具有特定值的特定列设置为新值?

python - 这是在 Python 中使用 pandas 的好例子吗?

python - 在 Python 线程化过程中更新 Tkinter 标签