azure - 使用 PySpark 从 Azure HDInsight 检索数据

标签 azure apache-spark pyspark azure-sql-database azure-dsvm

我有用于访问 Azure 数据库的凭据和 URL。

我想使用 pyspark 读取数据,但我不知道该怎么做。

是否有连接到 Azure 数据库的特定语法?

编辑

使用共享代码后,我收到了这种错误,有什么建议吗?

我在机器上的示例中看到他们使用 ODBC 驱动程序,也许涉及到这一点?

2018-07-14 11:22:00 WARN  SQLServerConnection:2141 - ConnectionID:1 ClientConnectionId: 7561d3ba-71ac-43b3-a35f-26ababef90cc Prelogin error: host servername.azurehdinsight.net port 443 Error reading prelogin response: An existing connection was forcibly closed by the remote host ClientConnectionId:7561d3ba-71ac-43b3-a35f-26ababef90cc

Traceback (most recent call last):
  File "C:/Users/team2/PycharmProjects/Bridgestone/spark_driver_style.py", line 46, in <module>
    .option("password", "**********")\
  File "C:\dsvm\tools\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\readwriter.py", line 172, in load
    return self._df(self._jreader.load())
  File "C:\Users\team2\PycharmProjects\Bridgestone\venv\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "C:\dsvm\tools\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\utils.py", line 63, in deco
    return f(*a, **kw)
  File "C:\Users\team2\PycharmProjects\Bridgestone\venv\lib\site-packages\py4j\protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o29.load.
: com.microsoft.sqlserver.jdbc.SQLServerException: An existing connection was forcibly closed by the remote host ClientConnectionId:7561d3ba-71ac-43b3-a35f-26ababef90cc
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:2400)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.terminate(SQLServerConnection.java:2384)
    at com.microsoft.sqlserver.jdbc.TDSChannel.read(IOBuffer.java:1884)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.Prelogin(SQLServerConnection.java:2137)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.connectHelper(SQLServerConnection.java:1973)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.login(SQLServerConnection.java:1628)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.connectInternal(SQLServerConnection.java:1459)
    at com.microsoft.sqlserver.jdbc.SQLServerConnection.connect(SQLServerConnection.java:773)
    at com.microsoft.sqlserver.jdbc.SQLServerDriver.connect(SQLServerDriver.java:1168)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:56)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:115)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:52)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:340)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)

最佳答案

如果要从数据科学 VM 中的 pyspark 笔记本访问 HDInsight 群集,可以按照 Tutorial 中描述的步骤操作。在步骤 7 下。

导入需要的包:

#Import required Packages
import pyodbc
import time as time
import json
import os
import urllib
import warnings
import re
import pandas as pd

设置 Hive Metastore 连接(需要集群中的用户和密码):

#Create the connection to Hive using ODBC
SERVER_NAME='xxx.azurehdinsight.net'
DATABASE_NAME='default'
USERID='xxx'
PASSWORD='xxxx'
DB_DRIVER='Microsoft Hive ODBC Driver'
driver = 'DRIVER={' + DB_DRIVER + '}'
server = 'Host=' + SERVER_NAME + ';Port=443'
database = 'Schema=' + DATABASE_NAME
hiveserv = 'HiveServerType=2'
auth = 'AuthMech=6'
uid = 'UID=' + USERID
pwd = 'PWD=' + PASSWORD
CONNECTION_STRING = ';'.join([driver,server,database,hiveserv,auth,uid,pwd])
connection = pyodbc.connect(CONNECTION_STRING, autocommit=True)
cursor=connection.cursor()

查询数据:

queryString = """
    show tables in default;
"""
pd.read_sql(queryString,connection)

关于azure - 使用 PySpark 从 Azure HDInsight 检索数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51328497/

相关文章:

amazon-web-services - 无法在新的 AWS EMR 集群中获取 SparkContext

postgresql - Azure Postgresql 数据库诊断设置中看不到事件中心命名空间

model-view-controller - Windows Azure CDN::Javascript、样式表和图像无法找到(404 错误)

Azure Function - 挂起队列触发器

hadoop - 我是否必须安装 Hadoop 才能使用 Elasticsearch ES-Hadoop 连接器

apache-spark - Spark 词汇操作顺序

apache-spark - 使用 PySpark 将模型导出为 PMML

java - 在 AWS、Azure 之间切换。设计模式

java - spark - 如何减少 JavaPairRDD<Integer, Integer[]> 的洗牌大小?

hadoop - 失败 : Execution Error, 从 org.apache.hadoop.hive.ql.exec.spark.SparkTask 返回代码 2