我正在使用flink从kafka读取数据并将其转换为protobuf。我面临的问题是当我运行 java 应用程序时出现以下错误。如果我将 unknownFields
变量名称修改为其他名称,它可以工作,但很难在所有 protobuf 类上进行此更改。
我还尝试在从 kafka 读取时直接反序列化,但我不确定 getProducedType()
方法返回的 TypeInformation
应该是什么。
public static class ProtoDeserializer implements DeserializationSchema{
@Override
public TypeInformation getProducedType() {
// TODO Auto-generated method stub
return PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO;
}
感谢所有的帮助。谢谢。
java.lang.RuntimeException: The field protected com.google.protobuf.UnknownFieldSet com.google.protobuf.GeneratedMessage.unknownFields is already contained in the hierarchy of the class com.google.protobuf.GeneratedMessage.Please use unique field names through your classes hierarchy at org.apache.flink.api.java.typeutils.TypeExtractor.getAllDeclaredFields(TypeExtractor.java:1594) at org.apache.flink.api.java.typeutils.TypeExtractor.analyzePojo(TypeExtractor.java:1515) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1412) at org.apache.flink.api.java.typeutils.TypeExtractor.privateGetForClass(TypeExtractor.java:1319) at org.apache.flink.api.java.typeutils.TypeExtractor.createTypeInfoWithTypeHierarchy(TypeExtractor.java:609) at org.apache.flink.api.java.typeutils.TypeExtractor.privateCreateTypeInfo(TypeExtractor.java:437) at org.apache.flink.api.java.typeutils.TypeExtractor.getUnaryOperatorReturnType(TypeExtractor.java:306) at org.apache.flink.api.java.typeutils.TypeExtractor.getFlatMapReturnTypes(TypeExtractor.java:133) at org.apache.flink.streaming.api.datastream.DataStream.flatMap(DataStream.java:529)
代码:
FlinkKafkaConsumer09<byte[]> kafkaConsumer = new FlinkKafkaConsumer09<>("testArr",new ByteDes(),p);
DataStream<byte[]> input = env.addSource(kafkaConsumer);
DataStream<PBAddress> protoData = input.map(new RichMapFunction<byte[], PBAddress>() {
@Override
public PBAddress map(byte[] value) throws Exception {
PBAddress addr = PBAddress.parseFrom(value);
return addr;
}
});
最佳答案
也许您应该尝试以下操作:
env.getConfig().registerTypeWithKryoSerializer(PBAddress. class,ProtobufSerializer.class);
或
env.getConfig().registerTypeWithKryoSerializer(PBAddress. class,PBAddressSerializer.class);
public class PBAddressSerializer extends Serializer<Message> {
final private Map<Class,Method> hashMap = new HashMap<Class, Method>();
protected Method getParse(Class cls) throws NoSuchMethodException {
Method method = hashMap.get(cls);
if (method == null) {
method = cls.getMethod("parseFrom",new Class[]{byte[].class});
hashMap.put(cls,method);
}
return method;
}
@Override
public void write(Kryo kryo, Output output, Message message) {
byte[] ser = message.toByteArray();
output.writeInt(ser.length,true);
output.writeBytes(ser);
}
@Override
public Message read(Kryo kryo, Input input, Class<Message> pbClass) {
try {
int size = input.readInt(true);
byte[] barr = new byte[size];
input.read(barr);
return (Message) getParse(pbClass).invoke(null,barr);
} catch (Exception e) {
throw new RuntimeException("Could not create " + pbClass, e);
}
}
}
关于java - 将 protobuf 与 flink 结合使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38278826/