java - 如何使用 HTTP 作为传输读取通过 Avro 序列化的二进制数据?

标签 java http serialization deserialization avro

我使用 Avro 作为序列化协议(protocol)。我的服务已准备就绪,每个序列化/反序列化在内存中都运行良好。 所以现在我想测试一下它在HTTP传输后是否工作正常。 我以为写一个方法来测试很简单,但是过了一段时间,我无法弄清楚,这是我尝试过的:

  1. 使用 HttpClient:

        String itemIds = "abc123";
        System.out.println("itemIds are: " + itemIds + "\n\n\n");
        String endpoint = "http://localhost:8080/api/v1/items?itemIds=" + URLEncoder.encode(itemIds, "UTF-8");
        HttpClient client = HttpClientBuilder.create().build();
        HttpGet request = new HttpGet(endpoint);
        String USER_AGENT = "Mozilla/5.0";    
        request.addHeader("User-Agent", USER_AGENT);
        request.addHeader("Content-Type", "avro/binary");
        HttpResponse response = client.execute(request);
        System.out.println("Response Code : "
            + response.getStatusLine().getStatusCode());
        byte[] bytes = EntityUtils.toByteArray(response.getEntity());
        SearchMaterializationDto deserializedReponse = SearchMaterializationAvroObjectSerializer.deserializeToSearchMaterialization(bytes);
        System.out.println(deserializedReponse.toString());
    

这种方法会抛出 java.io.EOFException。执行此行时 SearchMaterializationDto deserializedReponse = SearchMaterializationAvroObjectSerializer.deserializeToSearchMaterialization(bytes);

这是我的 SearchMaterializationDto deserializedReponse = SearchMaterializationAvroObjectSerializer.deserializeToSearchMaterialization(bytes); 方法:

public static SearchMaterializationDto deserializeToSearchMaterialization(byte[] buffer) {
        SearchMaterializationDto searchMaterializationDto = new SearchMaterializationDto();
        try {
            ObjectInput input = new ObjectInputStream(new ByteArrayInputStream(buffer));
            searchMaterializationDto.readExternal(input);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        return searchMaterializationDto;
    }

这是我的 SearchMaterializationDto.java 类(仅列出了调用的方法):

@org.apache.avro.specific.AvroGenerated
public class SearchMaterializationDto extends org.apache.avro.specific.SpecificRecordBase implements org.apache.avro.specific.SpecificRecord {
private static final org.apache.avro.io.DatumReader
    READER$ = new org.apache.avro.specific.SpecificDatumReader(SCHEMA$);

  @Override public void readExternal(java.io.ObjectInput in)
    throws java.io.IOException {
    READER$.read(this, SpecificData.getDecoder(in));
  }
}
  • 使用 Avro 解码器 example显示:

    private static void decoderWay(String endpoint) throws IOException {
    byte[] bytes = getBytes(endpoint);
    Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
    SpecificDatumReader<SearchMaterializationDto> reader = new SpecificDatumReader<SearchMaterializationDto>(SearchMaterializationDto.getClassSchema());
    SearchMaterializationDto searchMaterializationDto = reader.read(null, decoder);
    }
    
  • 但它也会抛出 EOFException:

    Exception in thread "main" java.io.EOFException
        at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:473)
        at org.apache.avro.io.BinaryDecoder.readLong(BinaryDecoder.java:160)
        at org.apache.avro.io.BinaryDecoder.doReadItemCount(BinaryDecoder.java:363)
        at org.apache.avro.io.BinaryDecoder.readMapStart(BinaryDecoder.java:408)
        at org.apache.avro.io.ValidatingDecoder.readMapStart(ValidatingDecoder.java:211)
        at org.apache.avro.generic.GenericDatumReader.readMap(GenericDatumReader.java:308)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:177)
        at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:116)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:230)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:174)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:152)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
    

    最佳答案

    原来是我的序列化/反序列化代码问题。我已将其更改为以下内容并且有效:

    public static byte[] serializeSearchMaterializationToByteArray(SearchMaterializationDto searchMaterializationDto) {
            return avroSerialize(SearchMaterializationDto.class, searchMaterializationDto);
        }
    
    public static <T> byte[] avroSerialize(Class<T> clazz, Object object) {
        byte[] ret = null;
        try {
            if (object == null || !(object instanceof SpecificRecord)) {
                return null;
            }
    
            T record = (T) object;
            ByteArrayOutputStream out = new ByteArrayOutputStream();
            Encoder e = EncoderFactory.get().directBinaryEncoder(out, null);
            SpecificDatumWriter<T> w = new SpecificDatumWriter<T>(clazz);
            w.write(record, e);
            e.flush();
            ret = out.toByteArray();
        } catch (IOException e) {
    
        }
    
        return ret;
    }
    
    public static SearchMaterializationDto deserializeToSearchMaterialization(byte[] avroBytes) {
        return avroDeserialize(avroBytes, SearchMaterializationDto.class, SearchMaterializationDto.getClassSchema());
    }
    
    public static <T> T avroDeserialize(byte[] avroBytes, Class<T> clazz, Schema schema) {
        T ret = null;
        try {
            ByteArrayInputStream in = new ByteArrayInputStream(avroBytes);
            Decoder d = DecoderFactory.get().directBinaryDecoder(in, null);
            SpecificDatumReader<T> reader = new SpecificDatumReader<T>(clazz);
            ret = reader.read(null, d);
        } catch (IOException e) {
    
        }
    
        return ret;
    }
    

    关于java - 如何使用 HTTP 作为传输读取通过 Avro 序列化的二进制数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42078538/

    相关文章:

    java - 如何更改工具栏上的更改文本和箭头颜色?

    java - 使用 ReentrentLock 和 Synchronized(object) 有什么区别?

    java - Android - 如何避免 Activity 之间的重复代码

    http - 我优雅的停止 http.ListenAndServe 失败

    java - 有时 HttpURLConnection.getInputStream 执行速度太慢

    java - 在Java中将类重构为接口(interface)时如何支持向后兼容的序列化?

    c++ - 添加数据成员以进行 boost 序列化时进行跟踪

    java - 使用 java "if"语句时遇到问题

    java: 让 Apache HttpClient 连接到具有特定 IP 的主机

    serialization - 如何使用\*print-dup\* 在 clojure 中打印记录?一个简单的案例