java - 在本地执行示例 Flink 程序

标签 java apache-flink

我正在尝试以本地模式在 Apache Flink 中执行示例程序。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;


public class WordCountExample {
    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        DataSet<String> text = env.fromElements(
            "Who's there?",
            "I think I hear them. Stand, ho! Who's there?");
        //DataSet<String> text1 = env.readTextFile(args[0]);

        DataSet<Tuple2<String, Integer>> wordCounts = text
            .flatMap(new LineSplitter())
            .groupBy(0)
            .sum(1);

        wordCounts.print();
        env.execute();

        env.execute("Word Count Example");
    }

    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }
}

它给了我异常(exception):

    Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/mapreduce/InputFormat
    at WordCountExample.main(WordCountExample.java:10)
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.mapreduce.InputFormat
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    ... 1 more

我做错了什么?

我也使用了正确的 jar 。 flink-java-0.9.0-milestone-1.jar flink-clients-0.9.0-milestone-1.jar flink-core-0.9.0-milestone-1.jar

最佳答案

将三个 Flink Jar 文件作为依赖项添加到项目中是不够的,因为它们还有其他传递依赖项,例如 Hadoop。

获得工作设置以开发(和本地执行)Flink 程序的最简单方法是遵循 quickstart guide它使用 Maven 原型(prototype)来配置 Maven 项目。这个 Maven 项目可以导入到您的 IDE 中。

关于java - 在本地执行示例 Flink 程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30571618/

相关文章:

java - 密码验证,一次一个正则表达式模式

apache-flink - 如何为同一独立集群上运行的不同 flink 作业指定不同的 log4j.properties 文件

apache-flink - 重置可查询状态

java - Java 是否使用 SSH 配置文件?

java - 如何增加 Flink 内存大小

java - Eclipse类型删除

java - 在 Gradle 中管理传递依赖版本

Java程序对n个数字的基数^指数求和,而下一个基数和指数是(n-1)

docker - Flink 1.5.4 没有在 Kubernetes 中注册 Google Cloud Storage (GCS) 文件系统,尽管它可以在 docker 容器中运行

java - Apache 弗林克 : Ordered timestamps with parallelism