hadoop - 如何为 BucketingSink 函数 Flink 设置动态基本路径?

标签 hadoop hdfs bigdata apache-flink flink-streaming

我正在从文件中获取一些 json 记录。我想解析json,然后根据json中的一个字段,更新bucketing函数的基本路径。

例如:Json 记录中有一个字段名称'user-id',基于此我想将我的基本路径更新为 BucketingSink("/data/app/users/"+user-id-field-value+"/")

我该怎么做?

代码:
数据流输入 = env.readTextFile("/home/user/Desktop/jsonFile");

    DataStream<String> parsedJson = input.map((inputMsg)->{

        String json="";
        try{

            json=jsonParser.parse(inputMsg).getAsString();

        }catch (Exception e){
            e.printStackTrace();
        }
        return json;

    });

   parsedJson.addSink(new BucketingSink<>(""));

}

最佳答案

使用 BucketingSink.setBucketer()方法来设置您创建的实现 Bucketer 的类接口(interface),并使用 user-id字段值作为子存储桶路径。

关于hadoop - 如何为 BucketingSink 函数 Flink 设置动态基本路径?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54760702/

相关文章:

java - 如何将 COBOL COMP、COMP-1 和 COMP-2 转换为 Java Long 或 Int

hadoop - HDFS如何存储大于 block 大小的单个数据?

mongodb - 在 MongoDb 中存储物联网数据

hadoop - 使用Hadoop及相关项目分析不断变化的使用模式

file - 无法在具有自定义分隔符的配置单元中插入数据

performance - 如何知道 MR2 中的 HDFS 并发吞吐量

hadoop - 根据日志级别写入HDFS

hadoop - map-reduce 是否可以有多个输出文件?

python - 我们可以将 Hadoop 与 Python 集成吗?

hadoop - 为什么分区连接(随机播放)并不总是比广播连接更好?