java - 如何在 Apache Beam 中设置 PCollection<List<String>> 的编码器?

标签 java json apache-beam jackson-databind

我正在自学 Apache Beam,专门用于解析 JSON。我能够创建一个简单的示例,将 JSON 解析为 POJO,将 POJO 解析为 CSV。它要求我使用 .setCoder() 对于我的简单 POJO 类。

        pipeline
            .apply("Read source JSON file.", TextIO.read().from(options.getInput()))
            .apply("Parse to POJO matching schema", ParseJsons.of(Person.class))
            .setCoder(SerializableCoder.of(Person.class))
            .apply("Create comma delimited string", new PersonToCsvRow())
            .apply("Write out to file", TextIO.write().to(options.getOutput())
                .withoutSharding());

问题

现在我尝试跳过使用一些自定义转换进行解析的 POJO 步骤。我的管道如下所示:

        pipeline
            .apply("Read Json", TextIO.read().from("src/main/resources/family_tree.json"))
            .apply("Traverse Json tree", new JSONTreeToPaths())
            .apply("Format tree paths", new PathsToCSV())
            .apply("Write to CSV", TextIO.write().to("src/main/resources/paths.csv")
                .withoutSharding());

该管道应该采用高度嵌套的 JSON 结构并打印树中的每个单独路径。我遇到了与上面 POJO 示例中相同的错误:

Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for Traverse Json tree/MapElements/Map/ParMultiDo(Anonymous).output [PCollection@331122245]. Correct one of the following root causes:
  No Coder has been manually specified;  you may do so using .setCoder().

我尝试过的

所以我尝试通过几种不同的方式添加编码器:

.setCoder(SerializableCoder.of(List<String>.class))

导致“无法从参数化类型中选择”。我发现了由不同用例生成的此错误的另一个实例 here ,但接受的答案似乎只适用于该用例。

然后我开始仔细阅读 Beam 文档并发现 ListCoder.of()它(字面上)没有描述。但它看起来很有希望,所以我尝试了一下:

.setCoder(ListCoder.of(SerializableCoder.of(String.class)))

但这让我回到了最初没有手动设置编码器的错误。

问题

如何满足为 List<String> 设置编码器的要求对象?

代码

导致 setCoder 的转换错误是这个:

package transforms;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;

import java.util.ArrayList;
import java.util.List;

public class JSONTreeToPaths extends PTransform<PCollection<String>, PCollection<List<String>>> {

    public static class ExtractPathsFromTree extends SimpleFunction<JsonNode, List<String>> {
        public List<String> apply(JsonNode root) {
            List<String> pathContainer = new ArrayList<>();
            getPaths(root, "", pathContainer);
            return pathContainer;
        }
    }

    public static class GetRootNode extends SimpleFunction<String, JsonNode> {
        public JsonNode apply(String jsonString) {
            try {
                return getRoot(jsonString);
            } catch (JsonProcessingException e) {
               e.printStackTrace();
               return null;
            }
        }
    }

    @Override
    public PCollection<List<String>> expand(PCollection<String> input) {
        return input
            .apply(MapElements.via(new GetRootNode()))
            .apply(MapElements.via(new ExtractPathsFromTree()));
    }

    private static JsonNode getRoot(String jsonString) throws JsonProcessingException {
        ObjectMapper mapper = new ObjectMapper();
        return mapper.readTree(jsonString);
    }

    private static void getPaths(JsonNode node, String currentPath, List<String> paths) {
        //check if leaf:
        if (node.path("children").isMissingNode()) {
            currentPath += node.get("Id");
            paths.add(currentPath);
            System.out.println(currentPath);
            return;
        }

        // recursively iterate over children
        currentPath += (node.get("Id") + ",");
        for (JsonNode child : node.get("children")) {
            getPaths(child, currentPath, paths);
        }
    }
}



最佳答案

虽然错误消息似乎暗示字符串列表是需要编码的内容,但它实际上是 JsonNode。我只需要在错误消息中进一步阅读一下,因为开头语句对于问题所在有点具有欺骗性:

Exception in thread "main" java.lang.IllegalStateException: Unable to return a default Coder for Traverse Json tree/MapElements/Map/ParMultiDo(Anonymous).output [PCollection@1324829744]. 
...
...
Inferring a Coder from the CoderRegistry failed: Unable to provide a Coder 
for com.fasterxml.jackson.databind.JsonNode.
Building a Coder using a registered CoderProvider failed.

发现这一点后,我通过扩展 Beam 的 CustomCoder 类解决了这个问题。这个抽象类很好,因为您只需编写代码来序列化和反序列化对象:

public class JsonNodeCoder extends CustomCoder<JsonNode> {

    @Override
    public void encode(JsonNode node, OutputStream outStream) throws IOException {
        ObjectMapper mapper = new ObjectMapper();
        String nodeString = mapper.writeValueAsString(node);
        outStream.write(nodeString.getBytes());
    }

    @Override
    public JsonNode decode(InputStream inStream) throws IOException {
        byte[] bytes = IOUtils.toByteArray(inStream);
        ObjectMapper mapper = new ObjectMapper();
        String json = new String(bytes);
        return mapper.readTree(json);
    }
}

希望这对其他 Beam 新手有所帮助。

关于java - 如何在 Apache Beam 中设置 PCollection<List<String>> 的编码器?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/69789702/

相关文章:

javascript - eval 的最大字符串长度

python - 为可移植运行者构建 apache beam sdk 线束 - 名称问题

java - Tinkerpop3 自定义边缘 id 生成器

java - 如何删除字符串中的最后 2 个单词?

java - Drools 8.x Java 8 兼容性

python - 连接由主键链接的 2 个 JSON 输入

google-bigquery - 从 Cloud Dataflow 代码调用存储过程

java - 如何在android中将数据库写入文本文件

json - 解析 loadbalancer.yaml 时出错 : error converting YAML to JSON: yaml: line 4: found character that cannot start any token

python - 使用 Python 代码将原始 JSON 数据上传到 Google Cloud Storage