spark-streaming - Spark Streaming UpdateStateByKey

标签 spark-streaming

我正在运行一个 24X7 的 Spark 流并使用 updateStateByKey 函数来保存计算的历史数据,就像 NetworkWordCount Example 的情况一样..

我试图流式传输一个包含 3lac 记录的文件,每 1500 条记录有 1 秒的 sleep 时间。
我正在使用 3 名 worker

  • 经过一段时间 updateStateByKey 不断增长,然后程序抛出以下异常

  • 错误执行程序:任务 ID 1635 中的异常
    java.lang.ArrayIndexOutOfBoundsException: 3
    14/10/23 21:20:43 ERROR TaskSetManager: Task 29170.0:2 failed 1 times; aborting job
    14/10/23 21:20:43 ERROR DiskBlockManager: Exception while deleting local spark dir: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141023204346-b232
    java.io.IOException: Failed to delete: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141023204346-b232/24
    
    14/10/23 21:20:43 ERROR Executor: Exception in task ID 8037
    java.io.FileNotFoundException: /var/folders/3j/9hjkw0890sx_qg9yvzlvg64cf5626b/T/spark-local-20141023204346-b232/22/shuffle_81_0_1 (No such file or directory)
        at java.io.FileOutputStream.open(Native Method)
    

    如何处理?
    我猜 updateStateByKey 应该随着它的快速增长而定期重置,请分享一些关于何时以及如何重置 updateStateByKey 的例子......或者我还有其他问题吗?一些启发。

    任何帮助深表感谢。谢谢你的时间

    最佳答案

    您是否设置了检查点
    ssc.checkpoint("检查点的路径")

    关于spark-streaming - Spark Streaming UpdateStateByKey,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26541873/

    相关文章:

    hadoop - Apache Spark S3 错误

    hadoop - 无法在Spark Streaming作业中获得广播_1的广播_1_piece0

    postgresql - Spark 流式传输多个源,重新加载数据帧

    java - 在 Spark Streaming Java 中提取嵌套 JSON 值

    hadoop - Spark Streaming 和 Spark 应用程序可以在同一个 YARN 集群中运行吗?

    scala - Spark mapWithState 更新状态输出

    apache-spark - 如何从 Apache Spark 中定期附加的日志文件中获取数据?

    java - 流启动后 Spark Stream 新作业

    java - 如何仅在处理完 RDD 中的所有分区后才在 Spark Streaming 中接收输入?

    java - 为什么使用单例来包装广播变量?