Scala sql 查询从 GCP 到本地的远程访问错误

标签 scala dataframe jdbc jooq type-mismatch

我有以下代码:

     import org.jooq._ 
     import org.jooq.impl._ 
     import org.jooq.impl.DSL._ 
     import java.sql.DriverManager
     import org.apache.log4j.receivers.db.dialect.SQLDialect

     val session = SparkSession.builder().getOrCreate()
     var df1 = session.emptyDataFrame
     var df2 = session.emptyDataFrame

     val userName = "user"
     val password = "pass"

     val c = DriverManager.getConnection("jdbc:mysql://blah_blah.com", userName, password)

     df1 = sql(s"select * from $db1_name.$tb1_name")
     df2 = c.prepareStatement(s"select * from $db2_name.$tb2_name")

然后我收到以下错误:

    found : org.jooq.SQL
    required: org.apache.spark.sql.DataFrame
    (which expands to) 
    org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
    df1 = sql(s"select * from $db1_name.$tb1_name")
             ^

    found : java.sql.PreparedStatement
    required: org.apache.spark.sql.DataFrame
    (which expands to) 
    org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
    df2 = c.prepareStatement(s"select * from $db2_name.$tb2_name")

然后根据评论建议我将代码更改为:

我有以下 Scala 代码:

    val userName = "user"
    val password = "pass"

    val session = SparkSession.builder().getOrCreate()
    var df1 = session.emptyDataFrame
    var df2 = session.emptyDataFrame

    ....
    ....
    df1 = sql(s"select * from $db1_name.$tb1_name")
    df2 = session.read.format("jdbc").
    option("url", "jdbc:mysql://blah_blah.com").
    option("driver", "com.mysql.jdbc.Driver").
    option("useUnicode", "true").
    option("continueBatchOnError","true").
    option("useSSL", "false").
    option("user", userName).
    option("password", password).
    option("dbtable",s"select * from $db2_name.$tb2_name").load()

我收到如下错误:

    The last packet sent successfully to the server was 0 milliseconds 
    ago. The driver has not received any packets from the server.
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
    at com.mysql.jdbc.SQLError.createCommunicationsException(SQLError.java:989)
    at com.mysql.jdbc.MysqlIO.readPacket(MysqlIO.java:632)
    at com.mysql.jdbc.MysqlIO.doHandshake(MysqlIO.java:1016)
    at com.mysql.jdbc.ConnectionImpl.coreConnect(ConnectionImpl.java:2194)
    at com.mysql.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:2225)
    at com.mysql.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:2024)
    at com.mysql.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:779)
    at com.mysql.jdbc.JDBC4Connection.<init>(JDBC4Connection.java:47)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at com.mysql.jdbc.Util.handleNewInstance(Util.java:425)
    at com.mysql.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:389)
    at com.mysql.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:330)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:63)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:56)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:115)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:52)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:341)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
    ... 78 elided
    Caused by: java.io.EOFException: Can not read response from server. 
    Expected to read 4 bytes, read 0 bytes before connection was unexpectedly lost.
    at com.mysql.jdbc.MysqlIO.readFully(MysqlIO.java:3011)
    at com.mysql.jdbc.MysqlIO.readPacket(MysqlIO.java:567)
    ... 100 more

对于这两个错误有什么解决方案或建议吗?

我也尝试过 postgresql 和 h2 驱动程序 => org.postgresql.Driver

但是我遇到了类似的错误(可能不准确)

最佳答案

你的问题是scala编译器已经将var ds1和ds2初始化为空数据帧。 你必须尝试直接从 Spark 读取:

spark.read.format("jdbc")
.option("url", jdbcUrl)
.option("query", "select c1, c2 from t1")
.load()

其他信息可以直接在apache Spark页面查看

https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

关于Scala sql 查询从 GCP 到本地的远程访问错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58069865/

相关文章:

python - 如何创建基于一年中的星期的调度系统

python - 合并两个不同的数据框

java - Scala/Java 中的编程语言抽象

list - Scala:对列表中的键/值对使用选项

scala - Scala 中的 unapply 方法是什么?

scala - 如何找出sbt从哪里解析依赖关系?

python-3.x - 每年对数据帧索引进行切片 - Python - Pandas

Java SQL异常: Closed Resultset: next even though neither connection or resultset gets closed

java - 如何使用 java jdbc 恢复 mysql 转储文件?

java - 服务器空闲后 Hibernate Session 创建异常