tcp - 在运行Apache Spark Job时由对等方重置连接

标签 tcp hdfs rpc hortonworks-data-platform namenode

我们有两个HDP集群的设置,我们称它们为A和B。

群集点:


它总共包含20台商用机器。
有20个数据节点。
配置了名称节点HA后,将有一个活动的名称节点和一个备用的名称节点。


群集B点:


它总共包含5台商用机器。
有5个数据节点。
没有配置高可用性,并且该群集具有一个主要名称节点和一个次要名称节点。


我们的应用程序中有三个主要组件,它们对传入的文件执行ETL(提取,转换和加载)操作。我将这些组件分别称为E,T和L。

组件E特性:


该组件是一个Apache Spark作业,仅在集群B上运行。
它的工作是从NAS存储中拾取文件并将它们放入群集B中的HDFS中。


组件T特性:


该组件也是Apache Spark Job,它在集群B上运行。
任务是拾取组件E编写的HDFS中的文件,对其进行转换,然后将转换后的文件写入群集A中的HDFS。


组件L特性:


该组件也是Apache Spark作业,它仅在集群A上运行。
它的工作是拾取由组件T编写的文件,并将数据加载到群集A中存在的Hive表中。


组件L是所有三个组件中的宝石,我们没有遇到任何故障。组件E中有一些无法解释的小故障,但是组件T是最麻烦的故障。

组件E和T都使用DFS客户端与名称节点进行通信。

以下是我们在运行组件T时间歇观察到的异常的摘录:

clusterA.namenode.com/10.141.160.141:8020. Trying to fail over immediately.
java.io.IOException: Failed on local exception: java.io.IOException: Connection reset by peer; Host Details : local host is: "clusterB.datanode.com"; destination host is: "clusterA.namenode.com":8020;
            at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:782)
            at org.apache.hadoop.ipc.Client.call(Client.java:1459)
            at org.apache.hadoop.ipc.Client.call(Client.java:1392)
            at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:229)
            at com.sun.proxy.$Proxy15.complete(Unknown Source)
            at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.complete(ClientNamenodeProtocolTranslatorPB.java:464)
            at sun.reflect.GeneratedMethodAccessor1240.invoke(Unknown Source)
            at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
            at java.lang.reflect.Method.invoke(Method.java:498)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:258)
            at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:104)
            at com.sun.proxy.$Proxy16.complete(Unknown Source)
            at org.apache.hadoop.hdfs.DFSOutputStream.completeFile(DFSOutputStream.java:2361)
            at org.apache.hadoop.hdfs.DFSOutputStream.closeImpl(DFSOutputStream.java:2338)
            at org.apache.hadoop.hdfs.DFSOutputStream.close(DFSOutputStream.java:2303)
            at org.apache.hadoop.fs.FSDataOutputStream$PositionCache.close(FSDataOutputStream.java:72)
            at org.apache.hadoop.fs.FSDataOutputStream.close(FSDataOutputStream.java:106)
            at org.apache.hadoop.io.compress.CompressorStream.close(CompressorStream.java:109)
            at sun.nio.cs.StreamEncoder.implClose(StreamEncoder.java:320)
            at sun.nio.cs.StreamEncoder.close(StreamEncoder.java:149)
            at java.io.OutputStreamWriter.close(OutputStreamWriter.java:233)
            at com.abc.xyz.io.CounterWriter.close(CounterWriter.java:34)
            at com.abc.xyz.common.io.PathDataSink.close(PathDataSink.java:47)
            at com.abc.xyz.diamond.parse.map.node.AbstractOutputNode.finalise(AbstractOutputNode.java:142)
            at com.abc.xyz.diamond.parse.map.application.spark.node.SparkOutputNode.finalise(SparkOutputNode.java:239)
            at com.abc.xyz.diamond.parse.map.DiamondMapper.onParseComplete(DiamondMapper.java:1072)
            at com.abc.xyz.diamond.parse.decode.decoder.DiamondDecoder.parse(DiamondDecoder.java:956)
            at com.abc.xyz.parsing.functions.ProcessorWrapper.process(ProcessorWrapper.java:96)
            at com.abc.xyz.parser.FlumeEvent2AvroBytes.call(FlumeEvent2AvroBytes.java:131)
            at com.abc.xyz.parser.FlumeEvent2AvroBytes.call(FlumeEvent2AvroBytes.java:45)
            at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
            at org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$1$1.apply(JavaRDDLike.scala:129)
            at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
            at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
            at scala.collection.convert.Wrappers$IteratorWrapper.hasNext(Wrappers.scala:29)
            at com.abc.xyz.zzz.ParseFrameHolder$ToKafkaStream.call(ParseFrameHolder.java:123)
            at com.abc.xyz.zzz.ParseFrameHolder$ToKafkaStream.call(ParseFrameHolder.java:82)
            at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
            at org.apache.spark.api.java.JavaRDDLike$$anonfun$foreachPartition$1.apply(JavaRDDLike.scala:225)
            at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927)
            at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$35.apply(RDD.scala:927)
            at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
            at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1882)
            at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
            at org.apache.spark.scheduler.Task.run(Task.scala:89)
            at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
            at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
            at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
            at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Connection reset by peer
            at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
            at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
            at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
            at sun.nio.ch.IOUtil.read(IOUtil.java:197)
            at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:380)
            at org.apache.hadoop.net.SocketInputStream$Reader.performIO(SocketInputStream.java:57)
            at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:142)
            at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
            at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
            at java.io.FilterInputStream.read(FilterInputStream.java:133)
            at java.io.FilterInputStream.read(FilterInputStream.java:133)
            at org.apache.hadoop.ipc.Client$Connection$PingInputStream.read(Client.java:554)
            at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
            at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
            at java.io.DataInputStream.readInt(DataInputStream.java:387)
            at org.apache.hadoop.ipc.Client$Connection.receiveRpcResponse(Client.java:1116)
            at org.apache.hadoop.ipc.Client$Connection.run(Client.java:1011)   


如前所述,我们非常间歇地遇到此异常,当它确实发生时,我们的应用程序被卡住,导致我们重新启动它。

我们尝试过的解决方案:


我们的第一个怀疑是,由于组件T确实并行打开了许多DFS客户端并在不同文件上执行文件操作(在同一文件上没有争用问题),我们正在使群集A中的活动名称节点超载。为了解决这个问题,我们查看了namenode dfs.namenode.handler.count和ipc.server.listen.queue.size的两个关键参数,并将后者从128(默认值)提高到1024。
不幸的是,问题仍然存在于组件T中。我们开始对问题采取不同的方法。我们仅专注于查找发生“对等连接重置”的原因。根据许多文章和堆栈交换讨论,问题描述如下,对等体设置了RST标志,导致立即终止连接。在我们的案例中,我们确定对等点是集群A的名称节点。
牢记RST标志,我深入了解了TCP通信的内部结构。 RST标志的原因。
Linux发行版(不是BSD)中的每个套接字都有两个与之关联的队列,即accept和backlog队列。
在TCP握手过程中,所有请求都保留在积压队列中,直到从开始建立连接的节点接收到ACK数据包为止。接收到请求后,请求将被传输到接受队列,打开套接字的应用程序可以开始接收来自远程客户端的数据包。
待办事项队列的大小由两个内核级别参数控制,即net.ipv4.tcp_max_syn_backlog和net.core.somaxconn,而应用程序(在本例中为namenode)可以向内核请求其希望受上限限制的队列大小(我们认为接受队列大小是ipc.server.listen.queue.size定义的队列大小。
另外,这里要注意的另一件事是,如果net.ipv4.tcp_max_syn_backlog的大小大于net.core.somaxconn,则前者的值将被截断为后者。该声明基于Linux文档,可以在https://linux.die.net/man/2/listen上找到。
回到这一点,当待办事项完全填满时,TCP会以两种方式起作用,并且该行为也可以由称为net.ipv4.tcp_abort_on_overflow的内核参数来控制。默认情况下,此值设置为0,并导致当积压已满时内核丢弃所有新的SYN数据包,这又使发送方重新发送SYN数据包。设置为1时,内核将标记数据包中的RST标志并将其发送给发送方,从而突然终止连接。
我们检查了上面提到的内核参数的值,发现net.core.somaxconn设置为1024,net.ipv4.tcp_abort_on_overflow设置为0,net.ipv4.tcp_max_syn_backlog在两个服务器上的所有机器上都设置为4096。集群。
我们现在剩下的唯一怀疑是将群集A连接到群集B的交换机,因为任何群集中的任何机器都不会将RST标志设置为参数net.ipv4.tcp_abort_on_overflow设置为0。


我的问题


从HDFS文档中可以明显看出DFS客户端使用RPC与namenode进行通信以执行文件操作。是否每个RPC调用都涉及到namenode的TCP连接的建立?
参数ipc.server.listen.queue.size是否定义namenode接受RPC请求的套接字的接受队列长度?
即使在内核参数net.ipv4.tcp_abort_on_overflow设置为0的情况下,重负载时,namenode能否隐式关闭与DFS客户端的连接,从而使内核发送带有RST标志的数据包?
L2或L3交换机(用于连接我们两个集群中的机器)是否能够设置RST标志,因为它们不能处理突发流量?


解决此问题的下一个方法是使用tcpdump或wireshark分析数据包,从而确定哪个机器或交换机(不涉及路由器)正在设置RST标志。我们还将将上述所有队列的大小增加到4096,以有效处理突发流量。

名称节点日志没有显示任何异常的迹象,除了在某些时间点(如在Ambari中窥见的)名称节点连接负载(不一定是在发生“对等重置连接”异常时)。

总而言之,我想知道我们是否正朝正确的方向解决这个问题,或者我们只是走到了尽头?

附言对于问题的内容,我深表歉意。在寻求任何帮助或建议之前,我想向读者介绍整个上下文。感谢您的耐心等待。

最佳答案

首先,您的网络中确实确实有些奇怪,也许您将设法通过提及的步骤对其进行跟踪。

话虽这么说,但在观察步骤时,我个人发现有悖常理的事情。

当前,您需要执行步骤T,以及最脆弱的集群内传输。也许您看到的可靠性要比普通人差,但是我会认真考虑将复杂部分和易碎部分分开。

如果执行此操作(或将工作拆分成较小的块),则设计一个解决方案应该很简单,该解决方案可能会发现其脆弱的步骤有时会失败,但是在这种情况下只需重试即可。当然,重试将以最低的成本进行,因为只需要重试一小部分工作。



结论:解决连接问题可能会有所帮助,但是,如果可能的话,您可能想设计间歇性的故障诱饵。

关于tcp - 在运行Apache Spark Job时由对等方重置连接,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44058613/

相关文章:

hadoop - 如何在pyspark中更改DataFrame的HDFS block 大小

go - 名称未注册接口(interface)

performance - 处理大量网络流量的应用程序服务器上的防病毒软件。是还是不是?

gsm - TCP/IP 模型 : Interface Layer for mobile broadband

javascript - 使用nodejs通过tcp套接字传输数据

xml - 如何附加 xml 数据并在配置单元中添加 xml 版本和编码

hadoop - 当节点数少于复制因子时HDFS如何进行复制?

java - gwt javax.servlet.ServletContext 日志 : Exception while dispatching incoming RPC call creating mapdb db

websocket - 如何通过 websockets 连接到 RSK 公共(public)节点?

sockets - 文件流遇到缓冲区欠载/下溢?