postgresql - Spark Streaming jdbc 在数据到来时读取流 - 数据源 jdbc 不支持流式读取

标签 postgresql apache-spark apache-kafka spark-streaming

我使用 PostGre 作为数据库。我想为每个批次捕获一个表数据并将其转换为 Parquet 文件并存储到 s3。我尝试使用spark和readStream的JDBC选项进行连接,如下所示......

val jdbcDF = spark.readStream
    .format("jdbc")
    .option("url", "jdbc:postgresql://myserver:5432/mydatabase")
    .option("dbtable", "database.schema.table")
    .option("user", "xxxxx")
    .option("password", "xxxxx")
    .load()

但它抛出了不受支持的异常
Exception in thread "main" java.lang.UnsupportedOperationException: Data source jdbc does not support streamed reading
    at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:234)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87)
    at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87)
    at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30)
    at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150)
    at examples.SparkJDBCStreaming$.delayedEndpoint$examples$SparkJDBCStreaming$1(SparkJDBCStreaming.scala:16)
    at examples.SparkJDBCStreaming$delayedInit$body.apply(SparkJDBCStreaming.scala:5)
    at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
    at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.App$$anonfun$main$1.apply(App.scala:76)
    at scala.collection.immutable.List.foreach(List.scala:392)
    at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
    at scala.App$class.main(App.scala:76)

我在正确的轨道上吗?真的不支持数据库作为 Spark 流的数据源吗?

AFAIK 另一种这样做的方法是编写一个 kafka 生产者将数据发布到 kafka 主题,然后使用 Spark 流...

Note : I dont want to use kafka connect for this since I need to do some auxiliary transformations.



这是唯一的方法吗?

这样做的正确方法是什么?有没有这样的例子?
请协助!

最佳答案

Spark 结构化流没有标准的 JDBC 源,但您可以编写自定义,但您应该明白您的表必须有一个唯一的键,您可以通过它来跟踪更改。
例如,您可以取 my implementation ,不要忘记在依赖中添加必要的JDBC驱动

关于postgresql - Spark Streaming jdbc 在数据到来时读取流 - 数据源 jdbc 不支持流式读取,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56857171/

相关文章:

scala - Spark Send DataFrame 作为 HTTP Post 请求的主体

postgresql - 如何将 Spark DataFrame 写入 Postgres DB

postgresql - flask 无法将 postgresql 与 docker-compose 连接

sql - PostgreSQL 查询按两个分组 "parameters"

regex - 在转换为数字之前使用正则表达式过滤字符串

csv - Spark读取多个CSV文件,每个文件一个分区

C++ with Kafka - 消费者刚刚收到一些生产者消息

java - 当 Broker 不可用时,消息不会出现在 Spring Integration (Kafka) ErrorChannel 中

spring-boot - Spring Kafka为主题配置分区数

sql - 如何仅在连续行上应用聚合函数?