python - 使用 Spark-DataFrame 将 HDFS 保存到 MongoDB

标签 python mongodb csv hadoop apache-spark

我正在尝试使用 PyMongo 连接器保存 Spark-DataFrame。 以下是我的代码,但每次运行代码时都会出现错误:

java.io.IOException: No FileSystem for scheme: mongodb

以下是我的代码:

import pymongo
import pymongo_spark
pymongo_spark.activate()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext=SQLContext(sc)
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
path = "hdfs://localhost:9000/home/hadoop/h_data/sales_ord_univ.csv"
df=sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load(path)
collections=df.collect()
df.write.format('mongodb://localhost:27017/test.sales_order_2').save()

因为我是新手,所以我有一个非常天真的代码,但是对此的任何帮助将不胜感激。我正在使用 Spark- 2.0.0、Python 2.7.6、MongoDB:3.2.9

最佳答案

I'm trying to save a Spark-DataFrame using PyMongo connector

你可以尝试使用MongoDB Connector for Spark .使用 Apache Spark v2.0.x 的设置环境、Python v2.7.x 和 MongoDB v3.2.x,你可以做如下的事情:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Application Name").getOrCreate()
dataframe = spark.read.csv("path/to/file.csv", header=True, mode="DROPMALFORMED")
dataframe.write.format("com.mongodb.spark.sql.DefaultSource")\
               .option("spark.mongodb.output.uri", "mongodb://localhost:27017/database.collection")\
               .save()

Python 示例文件的完整版本可以在 MongoDB PySpark Docker: examples.py 上找到.其中包括一个使用示例 MongoDB Aggregation在 Spark 中,和 Spark SQL .

如果您熟悉 docker ,你可以执行git项目MongoDB PySpark Docker使用 docker-compose并运行一些 PySpark 示例。

您可能会发现以下资源很有用:

关于python - 使用 Spark-DataFrame 将 HDFS 保存到 MongoDB,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39965271/

相关文章:

python - 从 csv.dictreader 排序和过滤数据

python - djangoproject.com 如何部署到产品?我应该打包我的 django 项目来部署它吗?

python - 带有空字符的 numpy.genfromtxt csv 文件

mongodb - MongoDB block 迁移到底是如何工作的?

java - Morphia Complex Mongodb 聚合($substr、$project、$sort 等...)

java - 如何在JAVA中使用rtrim PHP函数

parsing - 如何使用PARSE方言从CSV中读取行?

python - 由于 .h 文件出现错误,我无法下载 py4ak lib。 C++ 构建工具存在一些问题,我需要帮助解决

python - 构建一个包含冒号的元组以用于索引 numpy 数组

mongodb - 使用 Ansible 安装特定于版本的 MongoDB