java - 使用 Apache Flink 从 Web 获取 JSON 元素

标签 java json apache-flink flink-streaming

在阅读了 Apache Flink 的几个文档页面(official documentationdataartisans)以及 official repository 中提供的示例之后,我不断看到示例,其中它们用作流式传输已下载文件的数据源,始终连接到本地主机。

我正在尝试使用 Apache Flink 下载包含动态数据的 JSON 文件。我的目的是尝试建立我可以访问 JSON 文件的 url 作为 Apache Flink 的输入源,而不是使用另一个系统下载它并使用 Apache Flink 处理下载的文件。

是否可以与 Apache Flink 建立此网络连接?

最佳答案

您可以将要下载的 URL 定义为输入 DataStream,然后从 MapFunction 中下载文档。以下代码演示了这一点:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<String> inputURLs = env.fromElements("http://www.json.org/index.html");

inputURLs.map(new MapFunction<String, String>() {
    @Override
    public String map(String s) throws Exception {
        URL url = new URL(s);
        InputStream is = url.openStream();

        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is));

        StringBuilder builder = new StringBuilder();
        String line;

        try {
            while ((line = bufferedReader.readLine()) != null) {
                builder.append(line + "\n");
            }
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }

        try {
            bufferedReader.close();
        } catch (IOException ioe) {
            ioe.printStackTrace();
        }

        return builder.toString();
    }
}).print();

env.execute("URL download job");

关于java - 使用 Apache Flink 从 Web 获取 JSON 元素,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35684561/

相关文章:

java - 如何使用字符串作为参数来表示 Java 中的对象?

java - 如何实现监听器?

Java Minimal Json如何在一个对象中嵌套多个数组?

javascript - JSON.stringify() 数组的怪异与 Prototype.js

javascript - 404 未找到 JSON 文件 Liferay

scala - 我是否使用了正确的框架?

scala - 任务不可序列化 Flink

java - 为什么在我实现的插入排序中处理有序数组时,通用版比专用版慢4倍?

java - 如何遍历 hashmap 值中的 List

apache-flink - Flink : Dataset and Datastream API in one program. 有可能吗?