java - 在第二次计算中重用第一次计算的结果

标签 java apache-flink

我正在尝试在 Flink 中编写一个需要两个阶段的计算。

在第一阶段,我从一个文本文件开始,执行一些参数估计,作为结果获得一个表示数据统计模型的 Java 对象。

在第二阶段,我想使用这个对象来生成模拟数据。

我不确定该怎么做。我尝试使用 LocalCollectionOutputFormat,它在本地工作,但是当我在集群上部署作业时,我得到了一个 NullPointerException - 这并不奇怪。

Flink 的做法是什么?

这是我的代码:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
GlobalConfiguration.includeConfiguration(configuration);

// Phase 1: read file and estimate model
DataSource<Tuple4<String, String, String, String>> source = env
        .readCsvFile(args[0])
        .types(String.class, String.class, String.class, String.class);

List<Tuple4<Bayes, Bayes, Bayes, Bayes>> bayesResult = new ArrayList<>();
// Processing here...
....output(new LocalCollectionOutputFormat<>(bayesResult));

env.execute("Bayes");

DataSet<BTP> btp = env
        .createInput(new BayesInputFormat(bayesResult.get(0)))
// Phase 2: BayesInputFormat generates data for further calculations
// ....

这是我得到的异常:

Error: The program execution failed: java.lang.NullPointerException
    at org.apache.flink.api.java.io.LocalCollectionOutputFormat.close(LocalCollectionOutputFormat.java:86)
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:176)
    at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
    at java.lang.Thread.run(Thread.java:745)

org.apache.flink.client.program.ProgramInvocationException: The program execution failed: java.lang.NullPointerException
    at org.apache.flink.api.java.io.LocalCollectionOutputFormat.close(LocalCollectionOutputFormat.java:86)
    at org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:176)
    at org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
    at java.lang.Thread.run(Thread.java:745)

    at org.apache.flink.client.program.Client.run(Client.java:328)
    at org.apache.flink.client.program.Client.run(Client.java:294)
    at org.apache.flink.client.program.Client.run(Client.java:288)
    at org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:55)
    at it.list.flink.test.Test01.main(Test01.java:62)
    ...

最佳答案

最新版本 (0.9-milestone-1) a collect()方法已添加到 Flink

public List<T> collect()

它获取一个 DataSet<T>作为List<T>到驱动程序。 collect()还将触发程序的立即执行(不需要调用 ExecutionEnvironment.execute() )。目前,大约 10 MB 的数据集有大小限制。

如果您不评估驱动程序中的模型,您也可以将两个程序链接在一起,并通过附加数据接收器将模型发送到一边。这会更有效率,因为数据不会在客户端计算机上进行往返。

关于java - 在第二次计算中重用第一次计算的结果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29970164/

相关文章:

hadoop - flink-sql 如何处理 'count(distinct )' 这样的场景

apache-kafka - Kafka 的 OPC-da 连接器 - 可用的解决方案

java - Flink JDBC 连接 Multi-Tenancy

java - 我如何模拟 void 方法并使用 mockito 保留其他所有内容?

JavaFX "toolkit not initialized"在一个测试类中而不是另外两个;区别在哪里?

java - Flink 从集群 GUI 向作业提交 args 的正确方法是什么?

apache-kafka - Flink 与 Confluence Kafka 模式注册表

java - GCDAsyncsocket广播到所有IP

Java - 读取具有特定jvm大小的文本文件

java - Hadoop “Unable to load native-hadoop library for your platform”警告