python-3.x - Microsoft Azure Spark kusto 连接器 - 是否可以从 databricks 中获取 azure 存储的文件?

标签 python-3.x azure apache-spark pyspark azure-data-explorer

我正在尝试在 azure 存储中读取和写入文件,到目前为止我的尝试:

创建 Spark session :

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext


sparkOptions = {"executor_memory" : "1G","driver_memory": "1G", "max_results_size": "1G"}
conf = pyspark.SparkConf().setAppName(app)
conf = (conf.setMaster("local[*]")
    .set('spark.executor.memory', sparkOptions["executor_memory"])\
    .set('spark.driver.memory', sparkOptions["driver_memory"])\
    .set('spark.driver.maxResultSize', sparkOptions["max_results_size"])\
    .set('spark.sql.crossJoin.enabled', "true")\
    .set('spark.jars.packages', 'com.microsoft.azure.kusto:spark-kusto-connector:1.0.0')\
    .set("fs.azure", "org.apache.hadoop.fs.azure.NativeAzureFileSystem")\
    .set("fs.azure.account.auth.type", "OAuth")\
    .set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")\
    .set("fs.azure.account.oauth2.client.id", id)\
    .set("fs.azure.account.oauth2.client.secret", secret)\
    .set("fs.azure.account.oauth2.client.endpoint", endpoint)\
    .set("fs.azure.createRemoteFileSystemDuringInitialization", "true")
    )

sparkContext = pyspark.SparkContext(conf=conf)
sparkSession = SparkSession(sparkContext)
sqlContext = SQLContext(sparkContext)

尝试读取 Azure 存储中的 CSV:

df = sparkSession.read.option("header", "true").csv("wasbs://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="eb8884859f8a82858e99ab8a8888849e859fc589878489c58884998ec59c82858f849c98c5858e9f" rel="noreferrer noopener nofollow">[email protected]</a>/archive.csv")
df.show()

错误:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-3-975f978e0f66> in <module>()
----> 1 df = sparkSession.read.option("header", "true").csv("wasbs://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="d7b4b8b9a3b6beb9b2a597b6b4b4b8a2b9a3f9b5bbb8b5f9b4b8a5b2f9a0beb9b3b8a0a4f9b9b2a3" rel="noreferrer noopener nofollow">[email protected]</a>/archive.csv")
      2 df.show()

~/anaconda3/lib/python3.6/site-packages/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue)
    474             path = [path]
    475         if type(path) == list:
--> 476             return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    477         elif isinstance(path, RDD):
    478             def func(iterator):

~/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

~/anaconda3/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

~/anaconda3/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o68.csv.
: java.io.IOException: No FileSystem for scheme: wasbs
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:547)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:355)
    at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:618)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:834)

尝试使用 abfss:

df = sparkSession.read.option("header", "true").csv("abfss://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="dbb8b4b5afbab2b5bea99bbab8b8b4aeb5aff5b9b7b4b9f5b8b4a9bef5acb2b5bfb4aca8f5b5beaf" rel="noreferrer noopener nofollow">[email protected]</a>/archive.csv")
df.show()

错误:

y4JJavaError                             Traceback (most recent call last)
<ipython-input-4-02abec06890e> in <module>()
----> 1 df = sparkSession.read.option("header", "true").csv("abfss://<a href="https://stackoverflow.com/cdn-cgi/l/email-protection" class="__cf_email__" data-cfemail="eb8884859f8a82858e99ab8a8888849e859fc589878489c58884998ec59c82858f849c98c5858e9f" rel="noreferrer noopener nofollow">[email protected]</a>/archive.csv")
      2 df.show()

~/anaconda3/lib/python3.6/site-packages/pyspark/sql/readwriter.py in csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue)
    474             path = [path]
    475         if type(path) == list:
--> 476             return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    477         elif isinstance(path, RDD):
    478             def func(iterator):

~/anaconda3/lib/python3.6/site-packages/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

~/anaconda3/lib/python3.6/site-packages/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

~/anaconda3/lib/python3.6/site-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o104.csv.
: java.io.IOException: No FileSystem for scheme: abfss
    at org.apache.hadoop.fs.FileSystem.getFileSystemClass(FileSystem.java:2660)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2667)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:94)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2703)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2685)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:373)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:295)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:547)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary$1.apply(DataSource.scala:545)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at scala.collection.immutable.List.flatMap(List.scala:355)
    at org.apache.spark.sql.execution.datasources.DataSource.org$apache$spark$sql$execution$datasources$DataSource$$checkAndGlobPathIfNecessary(DataSource.scala:545)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:359)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
    at org.apache.spark.sql.DataFrameReader.csv(DataFrameReader.scala:618)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:834)

搜索 kusto-spark 连接器的使用示例,我只在 databricks 中找到了使用 dbutils 的示例,我想知道是否可以在 databricks 之外使用连接器以及我在代码中做错了什么,谢谢。

最佳答案

这与 kusto 无关 您使用的是 Azure 数据 block 吗?如果是这样,只需引用他们的 docs 。 如果没有尝试导入

         <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-azure</artifactId>
            <version>2.7.0</version>
        </dependency>

如果没有帮助 - 从 GitHub 下载连接器代码并更改此依赖项 至2.7(连接器使用3.2) 顺便说一句,不知道您是否这样做,但您必须通过 Spark conf 设置此容器的 key 或 sas

关于python-3.x - Microsoft Azure Spark kusto 连接器 - 是否可以从 databricks 中获取 azure 存储的文件?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60011225/

相关文章:

python-3.x - Numpy.linalg.norm 性能显然不随维数变化

java - Jmeter - 无法每秒运行 1000 个并发用户

scala - 如何从 Scala 的可迭代列表创建 DataFrame?

scala - 如何从spark将文件写入cassandra

mysql - Python3 MySQL 驱动程序

python-3.x - Bokeh - 为 DataTable 添加标题

c# - Azure 事件中心 : Send Async performance

c# - PopReceipt 属性是否保证 CloudQueueMessage 已成功添加到 Azure 存储队列?

java - 如何使用 Java 在 Apache spark 中将一行数组平面映射为多行?

python - 什么是python中的 '<U20' dtype?