pyspark - Dataproc 上的 Spark 流数据管道遇到突然频繁的套接字超时

标签 pyspark spark-streaming google-cloud-dataproc

我在 Google Cloud Dataproc 上使用 Spark streaming 来执行一个框架(用 Python 编写),该框架由几个连续的管道组成,每个管道代表 Dataproc 上的一个作业,基本上从 Kafka 队列读取并将转换后的输出写入 Bigtable。所有管道每天通过 2 个集群处理数 GB 的数据,一个集群有 3 个工作节点,一个集群有 4 个。

在 Dataproc 上运行这个 Spark 流框架一直相当稳定,直到 5 月初(准确地说是 5 月 3 日):我们开始遇到频繁的套接字超时异常,这会终止我们的管道。它似乎与集群上的负载无关,因为它没有显着增加。它也全天随机发生,我检查了可能相关的代码更改,但我找不到任何。此外,这似乎只发生在具有 4 个工作节点的集群上,而具有 3 个节点的集群上的管道非常相似并且根本没有超时。我已经两次重新创建集群,但问题仍然存在,它会影响在此 dataproc 集群上运行的所有管道。 3 个节点的集群是 n1-standard-4 机器类型,而麻烦的 4 个节点集群是 n1-standard-8 机器类型,除了它们的配置是相同的。

出现问题和作业终止时管道作业执行的示例输出:

java.net.SocketTimeoutException: Accept timed out
    at java.net.PlainSocketImpl.socketAccept(Native Method)
    at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
    at java.net.ServerSocket.implAccept(ServerSocket.java:545)
    at java.net.ServerSocket.accept(ServerSocket.java:513)
    at org.apache.spark.api.python.PythonRDD$$anon$2.run(PythonRDD.scala:645)
16/05/23 14:45:45 ERROR org.apache.spark.streaming.scheduler.JobScheduler: Error running job streaming job 1464014740000 ms.0
org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/streaming/util.py", line 65, in call
    r = self.func(t, *rdds)
  File "/tmp/b85990ba-e152-4d5b-8977-fb38915e78c4/transformfwpythonfiles.zip/transformationsframework/StreamManager.py", line 138, in process_kafka_rdd
    .foreach(lambda *args: None)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 747, in foreach
    self.mapPartitions(processPartition).count()  # Force evaluation
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 1004, in count
    return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 995, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 869, in fold
    vals = self.mapPartitions(func).collect()
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 772, in collect
    return list(_load_from_socket(port, self._jrdd_deserializer))
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 142, in _load_from_socket
    for item in serializer.load_stream(rf):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream
    yield self._read_with_length(stream)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 156, in _read_with_length
    length = read_int(stream)
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 543, in read_int
    length = stream.read(4)
  File "/usr/lib/python2.7/socket.py", line 380, in read
    data = self._sock.recv(left)
timeout: timed out

堆栈跟踪的开始是在我们的 StreamManager 模块中,方法是 process_kafka_rdd:它在 Kafka 消息的直接流中处理单个离散的 RDD。我们对 Kafka 与 Spark 流的集成基于 http://spark.apache.org/docs/latest/streaming-kafka-integration.html 中描述的“直接方法”

最佳答案

我对 Spark 和 socket 错误的经验是某些执行程序突然死亡。当时与它通信的其他一些执行程序引发了套接字错误。

根据我的经验,执行程序意外死亡的原因是资源不足,通常是内存不足。

(调整执行程序可以使用的内存量很重要。默认值通常太低。但我怀疑您已经意识到这一点。)

我假设 Spark 在 yarn 之上运行?不幸的是,根据我的经验,当问题发生在 yarn 的内部时,Spark 在报告问题原因方面做得很差。不幸的是,必须深入研究 yarn 日志才能找出真正导致执行者突然死亡的原因。每个执行者都在一个 yarn “容器”中运行;在 yarn 日志中的某处应该有一个容器掉落的记录。

关于pyspark - Dataproc 上的 Spark 流数据管道遇到突然频繁的套接字超时,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37412713/

相关文章:

python - Spark 2.0 DataFrame 初始化可能存在的错误

python - Pyspark:扩展 pyspark 数据框添加缺失的周期

apache-spark - 通过 Apache Spark Streaming 从 RabbitMq 读取消息

pyspark - 数据处理 : Jupyter pyspark notebook unable to import graphframes package

google-cloud-platform - 如何在流式查询中使用 Google Cloud Storage 作为检查点位置?

apache-spark - 通过 python 脚本关闭 pyspark 日志记录

apache-spark - 如何计算 RDD 中列表中的项目数

java - 使用 Spark Streaming 连接到 Cassandra 时出错

apache-spark - 如何在控制台中创建多个 SparkContext

google-cloud-platform - 如何在 GCP 中执行数据沿袭?