apache-spark - Spark 结构化流式传输蓝/绿部署

标签 apache-spark hadoop deployment spark-structured-streaming blue-green-deployment

我们希望能够部署我们的 Spark 作业,以便在部署期间处理数据时没有任何停机时间(目前大约有 2-3 分钟的窗口)。在我看来,最简单的方法是模拟“蓝/绿部署”理念,即启动新版本的 Spark 作业,让它预热,然后关闭旧作业。但是,对于结构化流和检查点,我们不能这样做,因为新的 Spark 作业发现最新的检查点文件已经存在(来自旧作业)。我在下面附上了一个示例错误。有人对潜在的解决方法有任何想法吗?

我考虑过将现有的检查点目录复制到新创建的作业的另一个检查点目录 - 虽然这应该作为一种解决方法(一些数据可能会重新处理,但我们的数据库应该重复数据删除),这看起来 super hacky 和我的东西宁愿不追求。

Caused by: org.apache.hadoop.fs.FileAlreadyExistsException: rename destination /user/checkpoint/job/offsets/3472939 already exists
    at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.validateOverwrite(FSDirRenameOp.java:520)
    at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.unprotectedRenameTo(FSDirRenameOp.java:364)
    at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameTo(FSDirRenameOp.java:282)
    at org.apache.hadoop.hdfs.server.namenode.FSDirRenameOp.renameToInt(FSDirRenameOp.java:247)
    at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.renameTo(FSNamesystem.java:3677)
    at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.rename2(NameNodeRpcServer.java:914)
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.rename2(ClientNamenodeProtocolServerSideTranslatorPB.java:587)
    at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
    at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
    at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
    at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2045)

    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
    at org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
    at org.apache.hadoop.hdfs.DFSClient.rename(DFSClient.java:1991)
    at org.apache.hadoop.fs.Hdfs.renameInternal(Hdfs.java:335)
    at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:678)
    at org.apache.hadoop.fs.FileContext.rename(FileContext.java:958)
    at org.apache.spark.sql.execution.streaming.HDFSMetadataLog$FileContextManager.rename(HDFSMetadataLog.scala:356)
    at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.org$apache$spark$sql$execution$streaming$HDFSMetadataLog$$writeBatch(HDFSMetadataLog.scala:160)
    ... 20 more
Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.fs.FileAlreadyExistsException): rename destination /user/checkpoint/job/offsets/3472939 already exists

最佳答案

这是可能的,但它会给您的应用程序增加一些复杂性。启动流通常很快,因此可以公平地假设延迟是由静态对象和依赖项的初始化引起的。在那种情况下,您将只需要 SparkContext/SparkSession,并且不需要流依赖项,因此过程可以描述为:

  • 启动新的 Spark 应用程序。
  • 初始化面向批处理的对象。
  • 将消息传递给之前的应用程序以让其下台。
  • 等待确认。
  • 开始直播。

在非常高的层次上,幸福之路可以形象化为:

enter image description here

由于它是非常通用的模式,因此可以以不同的方式实现,具体取决于语言和基础设施:

  • 像 ØMQ 这样的轻量级消息队列。
  • 通过分布式文件系统传递消息。
  • 将应用程序置于交互式上下文(Apache Toree、Apache Livy)中并使用外部客户端进行编排。

关于apache-spark - Spark 结构化流式传输蓝/绿部署,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49660079/

相关文章:

apache-spark - Spark 错误和hadoop错误

hadoop - 配置单元:存储桶表完整性检查

Kubernetes 等效于 Terraform 模块和变量

deployment - 如何部署play框架应用程序并排除 "public"目录下的特定资源?

Heroku:虽然在 requirements.txt 中没有名为 flask 的模块

apache-spark - 在 Spark 中保存时在文本文件之前添加 header

machine-learning - 我可以使用逻辑回归算法根据历史数据预测给定任务的预计到达时间吗?

java - 使用spark-submit部署程序时出现java.lang.NoSuchMethodError

sorting - Spark 按键排序,然后按分组以获得有序可迭代?

java - Hadoop如何计算不同单词的数量