我正在从文件中获取一些 json 记录。我想解析json,然后根据json中的一个字段,更新bucketing函数的基本路径。
例如:Json 记录中有一个字段名称'user-id',基于此我想将我的基本路径更新为 BucketingSink("/data/app/users/"+user-id-field-value+"/")
我该怎么做?
代码:
数据流输入 = env.readTextFile("/home/user/Desktop/jsonFile");
DataStream<String> parsedJson = input.map((inputMsg)->{
String json="";
try{
json=jsonParser.parse(inputMsg).getAsString();
}catch (Exception e){
e.printStackTrace();
}
return json;
});
parsedJson.addSink(new BucketingSink<>(""));
}
最佳答案
使用 BucketingSink.setBucketer()方法来设置您创建的实现 Bucketer 的类接口(interface),并使用 user-id
字段值作为子存储桶路径。
关于hadoop - 如何为 BucketingSink 函数 Flink 设置动态基本路径?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54760702/