我正在尝试在 azure 存储中读取和写入文件,到目前为止我的尝试:
创建 Spark session :
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
sparkOptions = {"executor_memory" : "1G","driver_memory": "1G", "max_results_size": "1G"}
conf = pyspark.SparkConf().setAppName(app)
conf = (conf.setMaster("local[*]")
.set('spark.executor.memory', sparkOptions["executor_memory"])\
.set('spark.driver.memory', sparkOptions["driver_memory"])\
.set('spark.driver.maxResultSize', sparkOptions["max_results_size"])\
.set('spark.sql.crossJoin.enabled', "true")\
.set('spark.jars.packages', 'com.microsoft.azure.kusto:spark-kusto-connector:1.0.0')\
.set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")\
.set("fs.azure.account.auth.type", "OAuth")\
.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")\
.set("fs.azure.account.oauth2.client.id", id)\
.set("fs.azure.account.oauth2.client.secret", secret)\
.set("fs.azure.account.oauth2.client.endpoint", endpoint)\
.set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
)
sparkContext = pyspark.SparkContext(conf=conf)
sparkSession = SparkSession(sparkContext)
sqlContext = SQLContext(sparkContext)
尝试读取 Azure 存储中的 CSV:
df = sparkSession.read.option("header", "true").csv("wasbs://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="eb8884859f8a82858e99ab8a8888849e859fc589878489c58884998ec59c82858f849c98c5858e9f" rel="noreferrer noopener nofollow">[email protected]</a>/archive.csv")
df.show()
错误:
Py4JJavaError Traceback (most recent call last)
<ipython-input-3-975f978e0f66> in <module>()
----> 1 df = sparkSession.read.option("header", "true").csv("wasbs://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="d7b4b8b9a3b6beb9b2a597b6b4b4b8a2b9a3f9b5bbb8b5f9b4b8a5b2f9a0beb9b3b8a0a4f9b9b2a3" rel="noreferrer noopener nofollow">[email protected]</a>/archive.csv")
2 df.show()
~/anaconda3/lib/python3.6/site-packages/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue)
474 path = [path]
475 if type(path) == list:
--> 476 return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
477 elif isinstance(path, RDD):
478 def func(iterator):
~/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
~/anaconda3/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
~/anaconda3/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o68.csv.
: java.io.IOException: No FileSystem for scheme: wasbs
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:547)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:355)
at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:618)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
尝试使用 abfss:
df = sparkSession.read.option("header", "true").csv("abfss://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="dbb8b4b5afbab2b5bea99bbab8b8b4aeb5aff5b9b7b4b9f5b8b4a9bef5acb2b5bfb4aca8f5b5beaf" rel="noreferrer noopener nofollow">[email protected]</a>/archive.csv")
df.show()
错误:
y4JJavaError Traceback (most recent call last)
<ipython-input-4-02abec06890e> in <module>()
----> 1 df = sparkSession.read.option("header", "true").csv("abfss://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="eb8884859f8a82858e99ab8a8888849e859fc589878489c58884998ec59c82858f849c98c5858e9f" rel="noreferrer noopener nofollow">[email protected]</a>/archive.csv")
2 df.show()
~/anaconda3/lib/python3.6/site-packages/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue)
474 path = [path]
475 if type(path) == list:
--> 476 return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
477 elif isinstance(path, RDD):
478 def func(iterator):
~/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
~/anaconda3/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
~/anaconda3/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o104.csv.
: java.io.IOException: No FileSystem for scheme: abfss
at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:547)
at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.immutable.List.foreach(List.scala:392)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.immutable.List.flatMap(List.scala:355)
at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:618)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.base/java.lang.Thread.run(Thread.java:834)
搜索 kusto-spark 连接器的使用示例,我只在 databricks 中找到了使用 dbutils 的示例,我想知道是否可以在 databricks 之外使用连接器以及我在代码中做错了什么,谢谢。
最佳答案
这与 kusto 无关 您使用的是 Azure 数据 block 吗?如果是这样,只需引用他们的 docs 。 如果没有尝试导入
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-azure</artifactId>
<version>2.7.0</version>
</dependency>
如果没有帮助 - 从 GitHub 下载连接器代码并更改此依赖项 至2.7(连接器使用3.2) 顺便说一句,不知道您是否这样做,但您必须通过 Spark conf 设置此容器的 key 或 sas
关于python-3.x - Microsoft Azure Spark kusto 连接器 - 是否可以从 databricks 中获取 azure 存储的文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60011225/