apache-flink - Apache Flink 创建了错误的计划

标签 apache-flink gelly

我为 Apache Flink 创建了一个简单的作业,它使用 Gelly 提供的 PageRank 实现。

在本地,在 IDE 中运行,一切正常。但是,我尝试使用 JobManager Web 界面将包含作业的 JAR 提交到我的计算机中运行的 Flink 实例。但是,Flink 并没有为作业获取正确的计划并执行 PageRank,而是提出并执行一个非常奇怪的计划,该计划仅计算图的顶点数。

我做了一些研究和调试,发现 Gelly 提供的 PageRank 实现在没有作为算法参数提供时开始计算图的顶点数:

if (numberOfVertices == 0) {
    numberOfVertices = network.numberOfVertices();
}

此计算意味着嵌入式作业。由于运算符是惰性的,因此不会触发任何计算。在Flink服务器中,首先要做的就是获取作业计划。这是通过一个特殊的环境来完成的,OptimizerPlanEnvironment ,提供以下 result方法:

public JobExecutionResult execute(String jobName) throws Exception {
    Plan plan = createProgramPlan(jobName);
    this.optimizerPlan = compiler.compile(plan);

    // do not go on with anything now!
    throw new ProgramAbortException();
}

问题出自这里。尽快ProgramAbortException抛出时,程序返回到目前为止计算的计划。但只计算了内部作业计划,因此主作业计划永远不会被计算或执行。

这是我使用的代码:

public class Job {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        Graph<Long, Double, Double> graph = Graph.fromDataSet(
            PageRankData.getDefaultEdgeDataSet(env), new VertexInit(), env);
        graph.run(new PageRank<Long>(0.85, 10)).print();
    }

    private static class VertexInit implements MapFunction<Long, Double> {
        @Override
        public Double map(Long value) throws Exception { return 1.0; }
    }
}

如果提供了顶点数,则执行例如graph.run(new PageRank<Long>(0.85, 5, 10)) ,没有问题,计划计算正确,PageRank计算正确。

我的问题是:我做错了什么?或者这是 Flink 中的一些实际错误?

最佳答案

正如您所说,问题是 network.numberOfVertices 在内部调用顶点数据集的 count 。这会触发一个独立的 Flink 作业来计算计数值。该值通常由 main 方法检索。但是,在 Web 客户端提交的情况下,这将不起作用,因为 OptimizerPlanEnvironment 只允许编译单个 Flink 作业。该行为类似于分离执行模式,该模式也不支持急切计划执行。

这是目前 Flink Web 客户端的限制。出现此行为的原因是 Flink 不希望阻塞 Netty channel 处理程序线程,该线程需要等待 count 操作的结果。阻塞操作会使线程池挨饿,并使该 session 的 Web 界面无响应,直到它被解除阻塞为止。

关于apache-flink - Apache Flink 创建了错误的计划,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35977601/

相关文章:

java - 如何导入 Apache Flink SNAPSHOT Artifact ?

scala - 弗林克 : PageRank type mismatch error

apache-flink - Flink : Dataset and Datastream API in one program. 有可能吗?

apache-flink - Flink中为什么DataStream不支持聚合

amazon-s3 - 无法执行HTTP请求: Timeout waiting for connection from pool in Flink

sockets - Flink 可以接收 http 请求作为数据源吗?

scala - 将 Kafka 消费者和生产者集成到一个函数中

maven - 运行 Apache Flink 作业时链接失败