我正在尝试使用 PyMongo 连接器保存 Spark-DataFrame。 以下是我的代码,但每次运行代码时都会出现错误:
java.io.IOException: No FileSystem for scheme: mongodb
以下是我的代码:
import pymongo
import pymongo_spark
pymongo_spark.activate()
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext
conf = SparkConf()
sc = SparkContext(conf=conf)
sqlContext=SQLContext(sc)
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
path = "hdfs://localhost:9000/home/hadoop/h_data/sales_ord_univ.csv"
df=sqlContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").load(path)
collections=df.collect()
df.write.format('mongodb://localhost:27017/test.sales_order_2').save()
因为我是新手,所以我有一个非常天真的代码,但是对此的任何帮助将不胜感激。我正在使用 Spark- 2.0.0、Python 2.7.6、MongoDB:3.2.9
最佳答案
I'm trying to save a Spark-DataFrame using PyMongo connector
你可以尝试使用MongoDB Connector for Spark .使用 Apache Spark v2.0.x 的设置环境、Python v2.7.x 和 MongoDB v3.2.x,你可以做如下的事情:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Application Name").getOrCreate()
dataframe = spark.read.csv("path/to/file.csv", header=True, mode="DROPMALFORMED")
dataframe.write.format("com.mongodb.spark.sql.DefaultSource")\
.option("spark.mongodb.output.uri", "mongodb://localhost:27017/database.collection")\
.save()
Python 示例文件的完整版本可以在 MongoDB PySpark Docker: examples.py 上找到.其中包括一个使用示例 MongoDB Aggregation在 Spark 中,和 Spark SQL .
如果您熟悉 docker ,你可以执行git项目MongoDB PySpark Docker使用 docker-compose并运行一些 PySpark 示例。
您可能会发现以下资源很有用:
关于python - 使用 Spark-DataFrame 将 HDFS 保存到 MongoDB,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39965271/