apache-spark - 带水印的结构化流 - 类型错误 : 'module' object is not callable

标签 apache-spark google-cloud-platform spark-structured-streaming google-cloud-dataproc windowing

我有一个在 GCP Dataproc 上运行的结构化流 pyspark 程序,它从 Kafka 读取数据, 并进行一些数据处理和聚合。 我正在尝试使用 withWatermark(),但出现错误。

这是代码:

df_stream = spark.readStream.format('kafka') \
    .option("kafka.security.protocol", "SSL") \
    .option("kafka.ssl.truststore.location", ssl_truststore_location) \
    .option("kafka.ssl.truststore.password", ssl_truststore_password) \
    .option("kafka.ssl.keystore.location", ssl_keystore_location) \
    .option("kafka.ssl.keystore.password", ssl_keystore_password) \
    .option("kafka.bootstrap.servers",kafkaBrokers)\
    .option("subscribe", topic) \
    .option("kafka.group.id", consumerGroupId)\
    .option("startingOffsets", "earliest") \
    .option("failOnDataLoss", "false") \
    .option("maxOffsetsPerTrigger", 10) \
    .load()

# readStream calls foreachBatch(convertToDictForEachBatch)
query = df_stream.selectExpr("CAST(value AS STRING)", "timestamp").writeStream \
    .outputMode("append") \
    .trigger(processingTime='10 seconds') \
    .option("numRows",10)\
    .option("truncate", "false") \
    .option("checkpointLocation", checkpoint) \
    .foreachBatch(convertToDictForEachBatch) \
    .start()

convertToDictForEachBatch - 具有执行数据按摩和聚合的代码

def convertToDictForEachBatch(df, batchId):
    # d = df_stream.rdd.collect()
    print(" IN CONVERT TO DICT ", batchId, " currentTime ", datetime.datetime.now(), " df -> ", df)
    ll = df.rdd.map(lambda x: x[0])
    res = []
    # each row is parsed, and finally converted to rdd i.e. tdict) 
    tdict = ll.map(convertToDict)
    # converting the tdict to DF, which is passed to Alarm class, where the data massaging & aggregation is done
    dfnew = tdict.toDF()
    ap = Alarm(tdict, spark)


    #Aggregation code in Alarm call, which uses withWatermark
     def computeCount(df_processedAlarm, df_totalAlarm):
          processedAlarmCnt = None
          if df_processedAlarm.count() > 0:
               processedAlarmCnt = df_processedAlarm.withWatermark("timestamp", "10 seconds")\
               .groupBy(
                    window(col("timestamp"), "1 minutes").alias("window")
                ).count()

上述代码的目的是计算 1 分钟窗口内已处理警报的计数,水印为 10 秒

错误:

py4j.Py4JException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2442, in _call_proxy
    return_value = getattr(self.pool[obj_id], method)(*params)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 196, in call
    raise e
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 193, in call
    self.func(DataFrame(jdf, self.sql_ctx), batch_id)
  File "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 444, in convertToDictForEachBatch
    ap = Alarm(tdict, spark)
  File "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 356, in __init__
    computeCount(l_alarm_df, l_alarm1_df)
  File "/tmp/178d0ac9c82e42a09942f7f9cdc76bb7/StructuredStreaming_GCP_Versa_Sase_gcloud.py", line 262, in computeCount
    window(col("timestamp"), "10 minutes").alias("window")
TypeError: 'module' object is not callable

    at py4j.Protocol.getReturnValue(Protocol.java:476)
    at py4j.reflection.PythonProxyHandler.invoke(PythonProxyHandler.java:108)
    at com.sun.proxy.$Proxy33.call(Unknown Source)
    at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1(ForeachBatchSink.scala:55)
    at org.apache.spark.sql.execution.streaming.sources.PythonForeachBatchHelper$.$anonfun$callForeachBatch$1$adapted(ForeachBatchSink.scala:55)
    at org.apache.spark.sql.execution.streaming.sources.ForeachBatchSink.addBatch(ForeachBatchSink.scala:35)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:586)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:584)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:584)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:226)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:357)
    at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:355)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:68)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:194)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:57)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:188)
    at org.apache.spark.sql.execution.streaming.StreamExecution.$anonfun$runStream$1(StreamExecution.scala:334)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:317)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:244)
Traceback (most recent call last):

需要做什么来调试/修复此问题? 蒂亚!

最佳答案

正如 @ewertonvsilva 提到的,这与导入错误有关。 具体来说->

from spark.sql.functions import window

纠正导入后,问题得到解决。

关于apache-spark - 带水印的结构化流 - 类型错误 : 'module' object is not callable,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71137296/

相关文章:

apache-spark - 重新启动从 Delta 表读取数据的结构化流查询时更改过滤器/where 条件

apache-spark - 在 PySpark 2.0 中读取序列文件

google-cloud-platform - 谷歌计算引擎时区

docker - Google的Container OS可以与Compute Engine上的gRPC一起使用吗?

apache-spark - 如何优化 spark structured streaming app 中执行器实例的数量?

scala - 如何动态定义流式数据集的模式以写入 csv?

java - Spark 执行器: Invalid initial heap size: -Xms0M

apache-spark - SPARK : one powerful machine Vs. 几台较小的机器

apache-spark - hadoop aws 版本兼容性

google-cloud-platform - PubSub 中的消息排序