java - 弗林克 IOException : Insufficient number of network buffers

标签 java apache-flink

我正在使用 Flink v1.4.0。我正在使用 DataSet API(尽管如此,我认为这并不重要)。

我正在 12 核虚拟机上运行一些繁重的转换。我正在为一个 Flink 作业 使用 2 个内核,其中我将一些数据存储到 Flink 可查询状态中,并正在运行另一个 Flink 作业剩下的10个核心。

当我用 10 个内核运行第二个作业时,我似乎遇到了以下错误:

java.io.IOException: Insufficient number of network buffers: required 10, but only 9 available. The total number of network buffers is currently set to 4096 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.network.memory.fraction', 'taskmanager.network.memory.min', and 'taskmanager.network.memory.max'.
            at org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:257)
            at org.apache.flink.runtime.io.network.NetworkEnvironment.registerTask(NetworkEnvironment.java:199)
            at org.apache.flink.runtime.taskmanager.Task.run(Task.java:618)
            at java.lang.Thread.run(Thread.java:745)

如果我用 8 个核心运行它,它就能顺利通过。这是什么原因造成的?为什么我不能使用其他 2 --> 8+2 = 10 个核心?

最佳答案

引用 Apache Flink 常见问题解答:

If you run Flink with a very high parallelism, you may need to increase the number of network buffers.

By default, Flink takes 10% of the JVM heap size for network buffers, with a minimum of 64MB and a maximum of 1GB. You can adjust all these values via taskmanager.network.memory.fraction, taskmanager.network.memory.min, and taskmanager.network.memory.max.

Please refer to the Configuration Reference for details.

有一个dedicated section in the docs for how to configure the network buffers .

综上所述,可以在./conf/flink-conf.yaml文件中通过设置taskmanager.network.numberOfBuffers参数来配置网络缓冲区的数量。

该参数应设置为#slots-per-TM^2 * #TMs * 4,其中#slots per TM 是每个TaskManager 的槽数, #TMs 是任务管理器的总数。

例如,要支持 20 台 8 槽机器的集群,您应该使用大约 5000 个网络缓冲区以获得最佳吞吐量。默认情况下,每个网络缓冲区的大小为 32 KiBytes。在上面的示例中,系统将因此分配大约 300 MiBytes 用于网络缓冲区。

详情请引用文档。

关于java - 弗林克 IOException : Insufficient number of network buffers,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49283934/

相关文章:

apache-flink - Apache Beam 是否像 Apache Flink 一样支持迭代算法?

java - 无法解析对 AWS Lambda 中值 : ${akka. stream.materializer} 的替换

java - 如何知道何时调用了 Parse.initialize()?

java - JEdi​​tor 没有像 Firefox 的 IE 那样显示 HTML 页面..?

java - 任何使用 ActiveMQ 多播的人

java - 从并行数组中搜索和显示信息

java - 使用InputStream读取器从/res/xml读取XML文件会产生无效字符/编码数据

apache-flink - 更喜欢增加任务管理器的数量而不是每个任务管理器的任务槽的原因是什么?

apache-flink - 使用 REST API 的 Apache Flink 即服务

apache-flink - 从 IDE 运行时的 Flink webui