hadoop - 从Teradata查询到pyspark

标签 hadoop jdbc pyspark teradata apache-spark-2.0

我正在尝试使用pyspark在teradata中运行查询,我可以使用此功能提取整个表,但是在尝试运行查询时出现错误。
大家可以检查一下并告诉我我要去哪里错了。

def from_rdbms(spark,user, password, driver, jdbc_url, p_type, p_table, p_query, p_partition, p_numpartitions, p_lower=1, p_upper=1, p_fetchsize=1000):
    df_ret = None
    dbProperties = {
            "user": user,
            "password": password,
            "driver": driver
            }
    jdbcUrl = jdbc_url
    dbProperties["fetchsize"] = str(p_fetchsize)
    dbPropertiesExtended = dbProperties
    if p_type == "Table":
        query = p_table
    else:
        query =  p_query 
    if p_partition == '':
        df_ret = spark.read.jdbc(url=jdbcUrl , table=query , properties=dbProperties)
    else:
        dbPropertiesExtended["partition"] = str(p_partition)
        dbPropertiesExtended["lower"] = str(p_lower)
        dbPropertiesExtended["upper"] = str(p_upper)
        dbPropertiesExtended["numpartitions"] = str(p_numpartitions)
        df_ret = spark.read.jdbc(url=jdbcUrl, table=query , properties=dbPropertiesExtended)
    return df_ret

运行功能
query1="select count(*) as c from "+db_name+"."+table_name + " t1"

count_td = from_rdbms(spark,user_name, password,driver="com.teradata.jdbc.TeraDriver" , jdbc_url= source_url, p_type="Query", p_table="", p_query=query1, p_partition="", p_numpartitions="", p_lower=1, p_upper=1, p_fetchsize=1000)

我得到的错误是:
 java.sql.SQLException: [Teradata Database] [TeraJDBC 16.20.00.08] [Error 3707] [SQLState 42000] Syntax error, expected something like a name or a Unicode delimited identifier or an 'UDFCALLNAME' keyword or '(' between the 'FROM' keyword and the 'select' keyword.
        at com.teradata.jdbc.jdbc_4.util.ErrorFactory.makeDatabaseSQLException(ErrorFactory.java:309)
        at com.teradata.jdbc.jdbc_4.statemachine.ReceiveInitSubState.action(ReceiveInitSubState.java:103)
        at com.teradata.jdbc.jdbc_4.statemachine.StatementReceiveState.subStateMachine(StatementReceiveState.java:311)
        at com.teradata.jdbc.jdbc_4.statemachine.StatementReceiveState.action(StatementReceiveState.java:200)
        at com.teradata.jdbc.jdbc_4.statemachine.StatementController.runBody(StatementController.java:137)
        at com.teradata.jdbc.jdbc_4.statemachine.StatementController.run(StatementController.java:128)
        at com.teradata.jdbc.jdbc_4.TDStatement.executeStatement(TDStatement.java:389)
        at com.teradata.jdbc.jdbc_4.TDStatement.prepareRequest(TDStatement.java:576)
        at com.teradata.jdbc.jdbc_4.TDPreparedStatement.<init>(TDPreparedStatement.java:131)
        at com.teradata.jdbc.jdk6.JDK6_SQL_PreparedStatement.<init>(JDK6_SQL_PreparedStatement.java:30)
        at com.teradata.jdbc.jdk6.JDK6_SQL_Connection.constructPreparedStatement(JDK6_SQL_Connection.java:82)
        at com.teradata.jdbc.jdbc_4.TDSession.prepareStatement(TDSession.java:1337)
        at com.teradata.jdbc.jdbc_4.TDSession.prepareStatement(TDSession.java:1381)
        at com.teradata.jdbc.jdbc_4.TDSession.prepareStatement(TDSession.java:1367)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:60)
        at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:114)
        at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:52)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:309)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:178)
        at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:146)
        at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:193)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:483)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:280)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:214)
        at java.lang.Thread.run(Thread.java:745)

最佳答案

当我写查询为

query1="(select count(*) as c from "+db_name+"."+table_name + ") as t1"

关于hadoop - 从Teradata查询到pyspark,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55227892/

相关文章:

hadoop - Hadoop Hive按类别分组

hadoop - Hadoop 大数据中的复制因子

java - org.hibernate.engine.jdbc.spi.SqlExceptionHelper 表 "Club"未找到

postgresql - 由于驱动程序和数据库之间缺少连接,Pyspark 与 PostgreSQL 的 JDBC 连接失败

json - 自定义文件名以在PySpark中写入数据框

scala - 给定核心和执行程序的数量,如何确定 spark 中 rdd 的分区数量?

hadoop - 查询未在配置单元中返回任何值

java - postgresql jdbc : FATAL: database does not exist

java - 带有 birt 的 tomcat6 上的 jdbc 问题

apache-spark - PySpark 可以在没有 Spark 的情况下工作吗?