apache-spark - Spark Redshift 连接器 : combine saving to redshift with a delete query

标签 apache-spark amazon-redshift

我想使用 spark-redshift-connector (scala) 定期更新 Redshift。 每次更新都以删除操作开始。 (我正在对 Redshift 执行更新插入)

有没有办法可以用库来执行它? 可以通过交易吗?

任何建议将不胜感激。

谢谢, 埃兰。

最佳答案

请引用下面的例子:

val min_date=mydf.select(min("actual_ship_date")).rdd.map(line=>line(0)).take(1)
val max_date=mydf.select(max("actual_ship_date")).rdd.map(line=>line(0)).take(1)
val query="delete from semi_sdt.kgd_tsb_shippment where 
actual_ship_date>='"+min_date(0).toString+"' and 
actual_ship_date<='"+max_date(0).toString+"'"
//Write data to RedShift
mydf.coalesce(1).write.
format("com.databricks.spark.redshift").
option("url",redShiftUrl).
option("dbtable","semi_sdt.kgd_tsb_shippment").
option("tempdir",s3dir).
option("forward_spark_s3_credentials",true).
option("preactions",query).
mode("append").
save()

关于apache-spark - Spark Redshift 连接器 : combine saving to redshift with a delete query,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38422523/

相关文章:

python - 通过 PySpark 连接到 Redshift,我们如何让驱动程序工作?

sql - 如何从亚马逊 Redshift 中的字符串中删除非数字字符(句号 "."除外)

mysql - 选择 id(在本例中为 dupersid),其中第二列(在本例中为 icd9codx)的值仅在 (296, 311) 和 250 中

scala - 如何在 Scala 中将 Array[(Double, Double)] 转换为 Array[Double]?

performance - DataFrame/Dataset groupBy 行为/优化

apache-spark - 如何根据基于 Pyspark 中另一列的表达式的评估有条件地替换列中的值?

sql - 从 Redshift 中的时间戳中提取时间

sql - 如何使用移动窗口/分区或任何其他方法获得不同的每周活跃用户/不同的每月活跃用户?

sql - 如何将 String 值转换(或强制转换)为 Integer 值?

python - 在Spark中,RDD是不可变的,那么Accumulators是如何实现的呢?