java - 将 protobuf 与 flink 结合使用

标签 java apache-flink flink-streaming

我正在使用flink从kafka读取数据并将其转换为protobuf。我面临的问题是当我运行 java 应用程序时出现以下错误。如果我将 unknownFields 变量名称修改为其他名称,它可以工作,但很难在所有 protobuf 类上进行此更改。

我还尝试在从 kafka 读取时直接反序列化,但我不确定 getProducedType() 方法返回的 TypeInformation 应该是什么。

    public static class ProtoDeserializer implements DeserializationSchema{

    @Override
    public TypeInformation getProducedType() {
        // TODO Auto-generated method stub
        return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
    }

感谢所有的帮助。谢谢。

java.lang.RuntimeException: The field protected com.google.protobuf.UnknownFieldSet com.google.protobuf.GeneratedMessage.unknownFields is already contained in the hierarchy of the class com.google.protobuf.GeneratedMessage.Please use unique field names through your classes hierarchy at org.apache.flink.api.java.typeutils.TypeExtractor.getAllDeclaredFields(TypeExtractor.java:1594) at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1515) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1412) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1319) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:609) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:437) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:306) at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:133) at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:529)

代码:

    FlinkKafkaConsumer09<byte[]> kafkaConsumer = new FlinkKafkaConsumer09<>("testArr",new ByteDes(),p);

    DataStream<byte[]> input = env.addSource(kafkaConsumer);
    DataStream<PBAddress> protoData = input.map(new RichMapFunction<byte[], PBAddress>() {
        @Override
        public PBAddress map(byte[] value) throws Exception {
            PBAddress addr = PBAddress.parseFrom(value);
            return addr;
        }
    });

最佳答案

也许您应该尝试以下操作:

env.getConfig().registerTypeWithKryoSerializer(PBAddress. class,ProtobufSerializer.class);

env.getConfig().registerTypeWithKryoSerializer(PBAddress. class,PBAddressSerializer.class);

public class PBAddressSerializer extends Serializer<Message> {
  final private Map<Class,Method> hashMap = new HashMap<Class, Method>();

  protected Method getParse(Class cls) throws NoSuchMethodException {
    Method method = hashMap.get(cls);
    if (method == null) {
      method = cls.getMethod("parseFrom",new Class[]{byte[].class});
      hashMap.put(cls,method);
    }

    return method;
  }

  @Override
  public void write(Kryo kryo, Output output, Message message) {
    byte[] ser = message.toByteArray();
    output.writeInt(ser.length,true);
    output.writeBytes(ser);

  }

  @Override
  public Message read(Kryo kryo, Input input, Class<Message> pbClass) {
    try {
      int size = input.readInt(true);
      byte[] barr = new byte[size];
      input.read(barr);
      return (Message) getParse(pbClass).invoke(null,barr);
    } catch (Exception e) {
      throw new RuntimeException("Could not create " + pbClass, e);
    }

  } 
}

关于java - 将 protobuf 与 flink 结合使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38278826/

相关文章:

kubernetes - K8S 上的 Flink : how do I provide Flink configuration to the cluster?

apache-flink - 引导 flink 状态

java - 如何在不使用 xml 或注释的情况下使 Spring 识别 bean?

java - 从 application.properties Spring Boot 中读取值

algorithm - 高效的分布式算法,用于合并具有公共(public)元素的集合

apache-flink - Apache Flink : ClassNotFoundException on remote cluster

apache-flink - 将作业 jar 添加到 $FLINK_HOME/lib 时获取以下类转换异常

elasticsearch - Flink的Elasticsearch接收器中的批量刷新操作的优先级是什么?

java - 使用Java的DeleteZero

java - Element::textContent 和 NodeList::item::getNodeValue 之间的区别