python - (Py)Spark中如何使用JDBC源读写数据?

标签 python scala apache-spark apache-spark-sql pyspark

这个问题的目的是记录:

  • 在 PySpark 中使用 JDBC 连接读取和写入数据所需的步骤
  • JDBC 源的可能问题和了解解决方案

  • 只需稍加改动,这些方法就可以与其他受支持的语言(包括 Scala 和 R)一起使用。

    最佳答案

    写入数据

  • 提交应用程序或启动 shell 时包括适用的 JDBC 驱动程序。您可以使用例如 --packages :
     bin/pyspark --packages group:name:version  
    

  • 或合并 driver-class-pathjars
        bin/pyspark --driver-class-path $PATH_TO_DRIVER_JAR --jars $PATH_TO_DRIVER_JAR
    
    这些属性也可以使用 PYSPARK_SUBMIT_ARGS 设置JVM 实例启动或使用之前的环境变量 conf/spark-defaults.conf设置 spark.jars.packagesspark.jars/spark.driver.extraClassPath .
  • 选择所需的模式。 Spark JDBC writer 支持以下模式:
    • append: Append contents of this :class:DataFrame to existing data.
    • overwrite: Overwrite existing data.
    • ignore: Silently ignore this operation if data already exists.
    • error (default case): Throw an exception if data already exists.

    Upserts 或其他细粒度的修改 are not supported
     mode = ...
    
  • 准备 JDBC URI,例如:
     # You can encode credentials in URI or pass
     # separately using properties argument
     # of jdbc method or options
    
     url = "jdbc:postgresql://localhost/foobar"
    
  • (可选)创建 JDBC 参数字典。
     properties = {
         "user": "foo",
         "password": "bar"
     }
    
    properties/options也可用于设置supported JDBC connection properties .
  • 使用 DataFrame.write.jdbc
     df.write.jdbc(url=url, table="baz", mode=mode, properties=properties)
    

  • 保存数据(详情见 pyspark.sql.DataFrameWriter )。
    已知问题 :
  • 使用 --packages 包含驱动程序后,找不到合适的驱动程序( java.sql.SQLException: No suitable driver found for jdbc: ... )
    假设没有驱动程序版本不匹配来解决这个问题,您可以添加 driver类到 properties .例如:
      properties = {
          ...
          "driver": "org.postgresql.Driver"
      }
    
  • 使用 df.write.format("jdbc").options(...).save()可能导致:

    java.lang.RuntimeException: org.apache.spark.sql.execution.datasources.jdbc.DefaultSource does not allow create table as select.


    解决方案未知。
  • 在 Pyspark 1.3 中,您可以尝试直接调用 Java 方法:
      df._jdf.insertIntoJDBC(url, "baz", True)
    

  • 读取数据
  • 按照写入数据中的步骤 1-4
  • 使用 sqlContext.read.jdbc :
     sqlContext.read.jdbc(url=url, table="baz", properties=properties)
    

  • sqlContext.read.format("jdbc") :
        (sqlContext.read.format("jdbc")
            .options(url=url, dbtable="baz", **properties)
            .load())
    
    已知问题和陷阱 :
  • 找不到合适的驱动程序 - 请参阅:写入数据
  • Spark SQL 支持使用 JDBC 源进行谓词下推,尽管并非所有谓词都可以下推。它也不委托(delegate)限制或聚合。可能的解决方法是更换 dbtable/table带有有效子查询的参数。见例如:
  • Does spark predicate pushdown work with JDBC?
  • More than one hour to execute pyspark.sql.DataFrame.take(4)
  • How to use SQL query to define table in dbtable?

  • 默认情况下,JDBC 数据源使用单个执行程序线程按顺序加载数据。为确保分布式数据加载,您可以:
  • 提供分区column (必须是 IntegerType )、lowerBound , upperBound , numPartitions .
  • 提供互斥谓词列表 predicates , 每个所需分区一个。

  • 看:
  • Partitioning in spark while reading from RDBMS via JDBC ,
  • How to optimize partitioning when migrating data from JDBC source? ,
  • How to improve performance for slow Spark jobs using DataFrame and JDBC connection?
  • How to partition Spark RDD when importing Postgres using JDBC?

  • 在分布式模式下(带有分区列或谓词),每个执行器都在自己的事务中运行。如果同时修改源数据库,则无法保证最终 View 保持一致。

  • 在哪里可以找到合适的驱动程序:
  • Maven Repository (要获得 --packages 所需的坐标,选择所需的版本并从 Gradle 选项卡中复制数据,格式为 compile-group:name:version,替换相应的字段)或 Maven Central Repository :
  • PostgreSQL
  • MySQL


  • 其他选项
    根据可能存在的数据库专用源,并且在某些情况下是首选:
  • Greenplum - Pivotal Greenplum-Spark Connector
  • Apache 凤凰 - Apache Spark Plugin
  • Microsoft SQL Server - Spark connector for Azure SQL Databases and SQL Server
  • 亚马逊红移 - Databricks Redshift connector (当前版本仅在专有 Databricks 运行时可用。Discontinued open source version, available on GitHub)。
  • 关于python - (Py)Spark中如何使用JDBC源读写数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30983982/

    相关文章:

    python - 如何使用 numpy 广播按条件组合多索引列值

    python - 在列名python中的特定位置保留字符串

    python - Int 对象不可下标?

    java - Scala (java) grpc 异步拦截器状态传播

    scala - 使 Scala Remote Actors 更加稳定

    python - 为什么 numpy 数组不能从 datetime 隐式转换为 np.datetime64?

    scala - 如何在Spark SQL中使用连字符对列名进行转义

    apache-spark - 没有足够的副本可用于一致性 LOCAL_ONE 的查询(需要 1 个,但只有 0 个存活)

    sql - 如何使用 Hive 查询 3 个大表的相交值?

    apache-spark - Apache Spark DAG 行为联合分组操作