java - Flink + Kafka + JSON - Java 示例

标签 java json apache-kafka apache-flink

我正在尝试使用以下代码从 Kafka 主题获取 JSON:

public class FlinkMain {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // parse user parameters
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        DataStream messageStream = env.addSource(
                new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
                , new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties()));

        messageStream.map(new MapFunction<String, String>() {
            private static final long serialVersionUID = -6867736771747690202L;

            @Override
            public String map(String value) throws Exception {
                return "Kafka and Flink says: " + value;
            }
        });

        env.execute();
    }
}

问题是:

1) 该程序未运行到期

Exception in thread "main" org.apache.flink.api.common.functions.InvalidTypesException: The return type of function 'main(FlinkMain.java:23)' could not be determined automatically, due to type erasure. You can give type information hints by using the returns(...) method on the result of the transformation call, or by letting your function implement the 'ResultTypeQueryable' interface.

The problem is at line: `messageStream.map(....`

2) 上述问题可能与DataStream有关。没有类型。但是如果我尝试做:

DataStream<String> messageStream = env.addSource(...

由于cannot resolve constructor FlinkKafkaConsumer09 ...,代码将无法编译

pom.xml (重要部分):

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.1.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>1.1.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>1.1.1</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka-0.9_2.11</artifactId>
        <version>1.1.1</version>
    </dependency>
</dependencies>

我一直在 Flink 中寻找一些使用 JSON DeserializationSchema 的代码,但没有成功。我刚刚找到了 JSONKeyValueDeserializationSchema 的单元测试在这link

有谁知道正确的做法吗?

谢谢

最佳答案

我遵循了 Vishnu viswanath 的回答,但是 JSONKeyValueDeserializationSchema 在 JSON 解析器步骤中引发了异常,即使对于简单的 JSON 作为 {"name":"John Doe ".

抛出的代码是:

DataStream<ObjectNode> messageStream = env.addSource(
    new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
    , new JSONKeyValueDeserializationSchema(false), parameterTool.getProperties()));

messageStream.rebalance().map(new MapFunction<ObjectNode, String>() {
    private static final long serialVersionUID = -6867736771747690202L;

    @Override
    public String map(ObjectNode node) throws Exception {
        return "Kafka and Flink says: " + node.get(0);
    }
}).print();

输出:

09/05/2016 11:16:02 Job execution switched to status FAILED.
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply$mcV$sp(JobManager.scala:822)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768)
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1$$anonfun$applyOrElse$7.apply(JobManager.scala:768)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
    at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
    at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:401)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.NullPointerException
    at com.fasterxml.jackson.core.JsonFactory.createParser(JsonFactory.java:790)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2215)
    at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:52)
    at org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java:38)
    at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.run(Kafka09Fetcher.java:227)
    at java.lang.Thread.run(Thread.java:745)

我使用另一个反序列化模式 JSONDeserializationSchema 成功了

        DataStream<ObjectNode> messageStream = env.addSource(
            new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic")
                    , new JSONDeserializationSchema(), parameterTool.getProperties()));

    messageStream.rebalance().map(new MapFunction<ObjectNode, String>() {
        private static final long serialVersionUID = -6867736771747690202L;

        @Override
        public String map(ObjectNode value) throws Exception {
            return "Kafka and Flink says: " + value.get("key").asText();
        }
    }).print();

关于java - Flink + Kafka + JSON - Java 示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39300183/

相关文章:

java - 类被实例化多次;如何知道正在调用的是哪一个?

json - 无法访问 JSON 对象信息 React/Redux

javascript - 如何在 chrome 扩展程序中制作私有(private)文件?

apache-kafka - Kafka Streams线程号

java - 如何在从 Spark 使用 Kafka 时获取偏移量 ID,将其保存在 Cassandra 中并使用它来重启 Kafka?

apache-kafka - Spring Kafka - 事件源 - 如何使用 Kafka + KafkaStreams API 查询某些实体状态的示例

java - Intellij IDEA JavaFX Scene Builder 与应用程序其余部分的分辨率不同

java - 如何创建在soap header 中需要用户名/密码的soap服务

java - 为什么要在 Java 中使用 (CustomClass) "string"创建类实例?

javascript - 将页面元素索引到 JSON 对象?或者 jQuery 每次都选择它?