java - 由于使用 java8 lambdas 时出现无效类型异常,Flink 无法运行我的应用程序

标签 java apache-flink

我正在使用 Flink 和 Java8。将 lambda 函数与元组和泛型类型一起使用时,我的编译器以异常结束

    /Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/bin/java -Didea.launcher.port=7536 "-Didea.launcher.bin.path=/Applications/IntelliJ IDEA.app/Contents/bin" -Dfile.encoding=UTF-8 -classpath "/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/charsets.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/deploy.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/cldrdata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/dnsns.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/jaccess.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/jfxrt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/localedata.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/nashorn.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/sunec.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/sunjce_provider.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/sunpkcs11.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/ext/zipfs.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/javaws.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/jce.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/jfr.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/jfxswt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/jsse.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/management-agent.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/plugin.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/resources.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/jre/lib/rt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/lib/ant-javafx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/lib/dt.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/lib/javafx-mx.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/lib/jconsole.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/lib/packager.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/lib/sa-jdi.jar:/Library/Java/JavaVirtualMachines/jdk1.8.0_102.jdk/Contents/Home/lib/tools.jar:/Users/hasan.guercan/Git/flink-java-project/target/classes:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-java/1.0.3/flink-java-1.0.3.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-core/1.0.3/flink-core-1.0.3.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-annotations/1.0.3/flink-annotations-1.0.3.jar:/Users/hasan.guercan/.m2/repository/com/esotericsoftware/kryo/kryo/2.24.0/kryo-2.24.0.jar:/Users/hasan.guercan/.m2/repository/com/esotericsoftware/minlog/minlog/1.2/minlog-1.2.jar:/Users/hasan.guercan/.m2/repository/org/objenesis/objenesis/2.1/objenesis-2.1.jar:/Users/hasan.guercan/.m2/repository/org/apache/avro/avro/1.7.6/avro-1.7.6.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-shaded-hadoop2/1.0.3/flink-shaded-hadoop2-1.0.3.jar:/Users/hasan.guercan/.m2/repository/xmlenc/xmlenc/0.52/xmlenc-0.52.jar:/Users/hasan.guercan/.m2/repository/commons-codec/commons-codec/1.4/commons-codec-1.4.jar:/Users/hasan.guercan/.m2/repository/commons-io/commons-io/2.4/commons-io-2.4.jar:/Users/hasan.guercan/.m2/repository/commons-net/commons-net/3.1/commons-net-3.1.jar:/Users/hasan.guercan/.m2/repository/commons-collections/commons-collections/3.2.1/commons-collections-3.2.1.jar:/Users/hasan.guercan/.m2/repository/javax/servlet/servlet-api/2.5/servlet-api-2.5.jar:/Users/hasan.guercan/.m2/repository/org/mortbay/jetty/jetty-util/6.1.26/jetty-util-6.1.26.jar:/Users/hasan.guercan/.m2/repository/com/sun/jersey/jersey-core/1.9/jersey-core-1.9.jar:/Users/hasan.guercan/.m2/repository/commons-el/commons-el/1.0/commons-el-1.0.jar:/Users/hasan.guercan/.m2/repository/commons-logging/commons-logging/1.1.3/commons-logging-1.1.3.jar:/Users/hasan.guercan/.m2/repository/com/jamesmurty/utils/java-xmlbuilder/0.4/java-xmlbuilder-0.4.jar:/Users/hasan.guercan/.m2/repository/commons-lang/commons-lang/2.6/commons-lang-2.6.jar:/Users/hasan.guercan/.m2/repository/commons-configuration/commons-configuration/1.7/commons-configuration-1.7.jar:/Users/hasan.guercan/.m2/repository/commons-digester/commons-digester/1.8.1/commons-digester-1.8.1.jar:/Users/hasan.guercan/.m2/repository/org/codehaus/jackson/jackson-core-asl/1.8.8/jackson-core-asl-1.8.8.jar:/Users/hasan.guercan/.m2/repository/org/codehaus/jackson/jackson-mapper-asl/1.8.8/jackson-mapper-asl-1.8.8.jar:/Users/hasan.guercan/.m2/repository/com/thoughtworks/paranamer/paranamer/2.3/paranamer-2.3.jar:/Users/hasan.guercan/.m2/repository/org/xerial/snappy/snappy-java/1.0.5/snappy-java-1.0.5.jar:/Users/hasan.guercan/.m2/repository/com/jcraft/jsch/0.1.42/jsch-0.1.42.jar:/Users/hasan.guercan/.m2/repository/org/apache/zookeeper/zookeeper/3.4.6/zookeeper-3.4.6.jar:/Users/hasan.guercan/.m2/repository/io/netty/netty/3.7.0.Final/netty-3.7.0.Final.jar:/Users/hasan.guercan/.m2/repository/org/apache/commons/commons-compress/1.4.1/commons-compress-1.4.1.jar:/Users/hasan.guercan/.m2/repository/org/tukaani/xz/1.0/xz-1.0.jar:/Users/hasan.guercan/.m2/repository/commons-beanutils/commons-beanutils-bean-collections/1.8.3/commons-beanutils-bean-collections-1.8.3.jar:/Users/hasan.guercan/.m2/repository/commons-daemon/commons-daemon/1.0.13/commons-daemon-1.0.13.jar:/Users/hasan.guercan/.m2/repository/javax/xml/bind/jaxb-api/2.2.2/jaxb-api-2.2.2.jar:/Users/hasan.guercan/.m2/repository/javax/xml/stream/stax-api/1.0-2/stax-api-1.0-2.jar:/Users/hasan.guercan/.m2/repository/javax/activation/activation/1.1/activation-1.1.jar:/Users/hasan.guercan/.m2/repository/com/google/inject/guice/3.0/guice-3.0.jar:/Users/hasan.guercan/.m2/repository/javax/inject/javax.inject/1/javax.inject-1.jar:/Users/hasan.guercan/.m2/repository/aopalliance/aopalliance/1.0/aopalliance-1.0.jar:/Users/hasan.guercan/.m2/repository/org/apache/commons/commons-math3/3.5/commons-math3-3.5.jar:/Users/hasan.guercan/.m2/repository/org/slf4j/slf4j-api/1.7.7/slf4j-api-1.7.7.jar:/Users/hasan.guercan/.m2/repository/org/slf4j/slf4j-log4j12/1.7.7/slf4j-log4j12-1.7.7.jar:/Users/hasan.guercan/.m2/repository/log4j/log4j/1.2.17/log4j-1.2.17.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/force-shading/1.0.3/force-shading-1.0.3.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-streaming-java_2.10/1.0.3/flink-streaming-java_2.10-1.0.3.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-runtime_2.10/1.0.3/flink-runtime_2.10-1.0.3.jar:/Users/hasan.guercan/.m2/repository/io/netty/netty-all/4.0.27.Final/netty-all-4.0.27.Final.jar:/Users/hasan.guercan/.m2/repository/org/javassist/javassist/3.18.2-GA/javassist-3.18.2-GA.jar:/Users/hasan.guercan/.m2/repository/org/scala-lang/scala-library/2.10.4/scala-library-2.10.4.jar:/Users/hasan.guercan/.m2/repository/com/typesafe/akka/akka-actor_2.10/2.3.7/akka-actor_2.10-2.3.7.jar:/Users/hasan.guercan/.m2/repository/com/typesafe/config/1.2.1/config-1.2.1.jar:/Users/hasan.guercan/.m2/repository/com/typesafe/akka/akka-remote_2.10/2.3.7/akka-remote_2.10-2.3.7.jar:/Users/hasan.guercan/.m2/repository/com/google/protobuf/protobuf-java/2.5.0/protobuf-java-2.5.0.jar:/Users/hasan.guercan/.m2/repository/org/uncommons/maths/uncommons-maths/1.2.2a/uncommons-maths-1.2.2a.jar:/Users/hasan.guercan/.m2/repository/com/typesafe/akka/akka-slf4j_2.10/2.3.7/akka-slf4j_2.10-2.3.7.jar:/Users/hasan.guercan/.m2/repository/org/clapper/grizzled-slf4j_2.10/1.0.2/grizzled-slf4j_2.10-1.0.2.jar:/Users/hasan.guercan/.m2/repository/com/github/scopt/scopt_2.10/3.2.0/scopt_2.10-3.2.0.jar:/Users/hasan.guercan/.m2/repository/io/dropwizard/metrics/metrics-core/3.1.0/metrics-core-3.1.0.jar:/Users/hasan.guercan/.m2/repository/io/dropwizard/metrics/metrics-jvm/3.1.0/metrics-jvm-3.1.0.jar:/Users/hasan.guercan/.m2/repository/io/dropwizard/metrics/metrics-json/3.1.0/metrics-json-3.1.0.jar:/Users/hasan.guercan/.m2/repository/com/fasterxml/jackson/core/jackson-databind/2.4.2/jackson-databind-2.4.2.jar:/Users/hasan.guercan/.m2/repository/com/fasterxml/jackson/core/jackson-annotations/2.4.0/jackson-annotations-2.4.0.jar:/Users/hasan.guercan/.m2/repository/com/fasterxml/jackson/core/jackson-core/2.4.2/jackson-core-2.4.2.jar:/Users/hasan.guercan/.m2/repository/com/twitter/chill_2.10/0.7.4/chill_2.10-0.7.4.jar:/Users/hasan.guercan/.m2/repository/com/twitter/chill-java/0.7.4/chill-java-0.7.4.jar:/Users/hasan.guercan/.m2/repository/org/apache/commons/commons-math/2.2/commons-math-2.2.jar:/Users/hasan.guercan/.m2/repository/org/apache/sling/org.apache.sling.commons.json/2.0.6/org.apache.sling.commons.json-2.0.6.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-clients_2.10/1.0.3/flink-clients_2.10-1.0.3.jar:/Users/hasan.guercan/.m2/repository/org/apache/flink/flink-optimizer_2.10/1.0.3/flink-optimizer_2.10-1.0.3.jar:/Users/hasan.guercan/.m2/repository/commons-cli/commons-cli/1.2/commons-cli-1.2.jar:/Users/hasan.guercan/.m2/repository/org/apache/commons/commons-lang3/3.0.1/commons-lang3-3.0.1.jar:/Applications/IntelliJ IDEA.app/Contents/lib/idea_rt.jar" com.intellij.rt.execution.application.AppMain org.apache.flink.quickstart.exercise2.ReplyGraph
Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'retrieve(ReplyGraph.java:33)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.
    at org.apache.flink.api.java.DataSet.getType(DataSet.java:178)
    at org.apache.flink.api.java.DataSet.collect(DataSet.java:407)
    at org.apache.flink.api.java.DataSet.print(DataSet.java:1605)
    at org.apache.flink.quickstart.exercise2.ReplyGraph.retrieve(ReplyGraph.java:41)
    at org.apache.flink.quickstart.exercise2.ReplyGraph.main(ReplyGraph.java:56)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at com.intellij.rt.execution.application.AppMain.main(AppMain.java:147)
Caused by: org.apache.flink.api.common.functions.InvalidTypesException: The generic type parameters of 'Tuple3' are missing. 
It seems that your compiler has not stored them into the .class file. 
Currently, only the Eclipse JDT compiler preserves the type information necessary to use the lambdas feature type-safely. 
See the documentation for more information about how to compile jobs containing lambda expressions.
    at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameter(TypeExtractor.java:1316)
    at org.apache.flink.api.java.typeutils.TypeExtractor.validateLambdaGenericParameters(TypeExtractor.java:1302)
    at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:346)
    at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:304)
    at org.apache.flink.api.java.typeutils.TypeExtractor.getMapReturnTypes(TypeExtractor.java:119)
    at org.apache.flink.api.java.DataSet.map(DataSet.java:215)
    at org.apache.flink.quickstart.exercise2.ReplyGraph.retrieve(ReplyGraph.java:33)
    ... 6 more

所以我必须至少创建一个匿名类来解决这个问题。 第一个代码片段表示导致所述异常的代码:

DataSet<MailEntry> filteredUserReplyMails = replyMails.filter(entryTuple -> {
            String sender = entryTuple.getField(1).toString();
            return !sender.contains("git@") && !sender.contains("jira@");
        }).map((entry -> {
            MailEntry mailEntry = new MailEntry();
            mailEntry.messageId = entry.f0.replaceAll("<", "").replaceAll(">", "");
            mailEntry.sender = entry.f1;
            mailEntry.replyTo = entry.f2;
            return mailEntry;
        });

下一个在创建匿名类时起作用:

DataSet<MailEntry> filteredUserReplyMails = replyMails.filter(entryTuple -> {
            String sender = entryTuple.getField(1).toString();
            return !sender.contains("git@") && !sender.contains("jira@");
        }).map(new MapFunction<Tuple3<String, String, String>, MailEntry>() {
            @Override
            public MailEntry map(Tuple3<String, String, String> entry) throws Exception {
                MailEntry mailEntry = new MailEntry();
                mailEntry.messageId = entry.f0.replaceAll("<", "").replaceAll(">", "");
                mailEntry.sender = entry.f1;
                mailEntry.replyTo = entry.f2;
                return mailEntry;
            }
        });

Java lambda 函数非常简洁。如何在不创建匿名类的情况下解决这个问题?

最佳答案

尝试在 map 之后使用 returns 方法:

DataSet<MailEntry> filteredUserReplyMails = replyMails.filter(entryTuple -> {
            String sender = entryTuple.getField(1).toString();
            return !sender.contains("git@") && !sender.contains("jira@");
        }).map(entry -> {
            MailEntry mailEntry = new MailEntry();
            mailEntry.messageId = entry.f0.replaceAll("<", "").replaceAll(">", "");
            mailEntry.sender = entry.f1;
            mailEntry.replyTo = entry.f2;
            return mailEntry;
        }).returns(MailEntry.class);

关于java - 由于使用 java8 lambdas 时出现无效类型异常,Flink 无法运行我的应用程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39371771/

相关文章:

apache-flink - Flink 中关于 Event 时间处理的水印是什么?为什么需要它。

json - Apache Flink:无法从 ObjectNode::get 中提取 key

java - "[Fatal Error] :1:120: The processing instruction target matching "[xX][mM][lL] "is not allowed."

java - 在线应用程序创建器如何工作?有没有示例源代码可以看?

java - 如何在 RxJava 2 中组合动态数量的 Observable?

apache-flink - 在 EMR 上使用 flink yarn session 运行 beam pipeline

jar - 将 Storm 拓扑转换为 flink - "no entry class specified"?

stream - Flink 中的水印和触发器有什么区别?

java - 在java中使用两个线程有​​效地打印奇数和偶数?

java - super.paint() 在图像上不可见