apache-spark - spark 2.2 struct Streaming foreach writer jdbc sink 滞后

标签 apache-spark jdbc foreach spark-streaming sink

我在一个使用 spark 2.2 结构流将 kafka msg 读入 oracle 数据库的项目中。进入 kafka 的消息流约为每秒 4000-6000 条消息。

当使用 hdfs 文件系统作为接收器目标时,它工作正常。使用 foreach jdbc writer 时,随着时间的推移会有很大的延迟。我认为滞后是由 foreach 循环引起的。

jdbc sink 类(独立类文件):

class JDBCSink(url: String, user: String, pwd: String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row] {
  val driver = "oracle.jdbc.driver.OracleDriver"
  var connection: java.sql.Connection = _
  var statement: java.sql.PreparedStatement = _
  val v_sql = "insert INTO sparkdb.t_cf(EntityId,clientmac,stime,flag,id) values(?,?,to_date(?,'YYYY-MM-DD HH24:MI:SS'),?,stream_seq.nextval)"

  def open(partitionId: Long, version: Long): Boolean = {
    Class.forName(driver)
    connection = java.sql.DriverManager.getConnection(url, user, pwd)
    connection.setAutoCommit(false)
    statement = connection.prepareStatement(v_sql)
    true
  }

  def process(value: org.apache.spark.sql.Row): Unit = {
    statement.setString(1, value(0).toString)
    statement.setString(2, value(1).toString)
    statement.setString(3, value(2).toString)
    statement.setString(4, value(3).toString)
    statement.executeUpdate()        
  }

  def close(errorOrNull: Throwable): Unit = {
    connection.commit()
    connection.close
  }
}

水槽部分:
val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "namenode:9092").option("fetch.message.max.bytes", "50000000").option("kafka.max.partition.fetch.bytes", "50000000")
  .option("subscribe", "rawdb.raw_data")
  .option("startingOffsets", "latest")
  .load()
  .select($"value".as[Array[Byte]])
  .map(avroDeserialize(_))
  .filter(some logic).select(some logic) 
  .writeStream.format("csv").option("checkpointLocation", "/user/root/chk").option("path", "/user/root/testdir").start()

如果我改变最后一行

.writeStream.format("csv")...



进入 jdbc foreach sink 如下:
val url = "jdbc:oracle:thin:@(DESCRIPTION=(ADDRESS_LIST=(ADDRESS=(PROTOCOL=TCP)(HOST=x.x.x.x)(PORT=1521)))(CONNECT_DATA=(SERVICE_NAME=fastdb)))"
val user = "user";
val pwd = "password";

val writer = new JDBCSink(url, user, pwd)
.writeStream.foreach(writer).outputMode("append").start()


滞后出现。

我猜这个问题最有可能是由 foreach 循环机制引起的——它不是在批处理模式下处理像批处理中的几千行,作为 oracle DBA,我已经微调了 oracle 数据库端,主要是数据库正在等待空闲事件。试图通过设置 connection.setAutoCommit(false) 来避免过度提交已经,任何建议将不胜感激。

最佳答案

尽管我没有您的应用程序中花费最长时间的实际配置文件,但我认为这是由于使用 ForeachWriter 的事实。将在每次运行时有效地关闭并重新打开您的 JDBC 连接,因为这就是 ForeachWriter作品。

我建议不要使用它,而是编写一个自定义 Sink对于 JDBC,您可以在其中控制连接的打开或关闭方式。

有一个开放pull request to add a JDBC driver to Spark你可以看一看,看看可能的实现方法。

关于apache-spark - spark 2.2 struct Streaming foreach writer jdbc sink 滞后,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47130229/

相关文章:

excel - 如何使我的 VBA 错误处理更有效率

c# - 是否可以在迭代 ICollection 时确定当前位置?

apache-spark - 如何根据基于 Pyspark 中另一列的表达式的评估有条件地替换列中的值?

maven - 无法使用 spark-1.6.1-bin-hadoop1 运行 spark-terasort

scala - 我可以在 where 或过滤器中有条件吗?

java - JDBC API 和 PostgreSQL 驱动程序有什么区别?

Eclipse 中的 java.lang.ClassNotFoundException : com. mysql.jdbc.Driver

java - MySQL 不允许 Java 程序使用 JDBC 插入

c# - 插入 SQL 数据库,在带有复选框列表的 foreach 中

scala - Spark `reduceGroups`错误重载方法与替代方案