java - 如何将多个项目从自定义 Retrofit Converter 发送到 RxJavaCallAdapterFactory

标签 java android json retrofit rx-java

我的应用程序需要下载并解析一个大的 JSON 文件。 为了避免任何与内存相关的问题,我正在自定义转换器中分批解析来自响应对象的输入流,每批 1000 个 json 对象。

一切都运行良好,直到我想将解析后的对象返回给调用者的可观察对象。

我的API方法是这样调用的:

Observable<MyResponseStream> typesObs = api.getTypes(request.method, request.options);

响应由自定义转换器处理

public class MyResponseConverterFactory extends Converter.Factory {

    public MyResponseConverterFactory() {
    }


    @Override
    public Converter<ResponseBody, ?> responseBodyConverter(Type type, Annotation[] annotations, Retrofit retrofit) {
        if (MyResponseStream.class.equals(type)) {
            // We will process only response where the client wait for I2ctResponseStream
            return MyResponseConverter.INSTANCE;
        }

        return null;
    }

    public static MyResponseConverterFactory create() {
        return new MyResponseConverterFactory();
    }

    final static class MyResponseConverter implements Converter<ResponseBody, MyResponseStream> {
        static final MyResponseConverter INSTANCE = new MyResponseConverter();

        @Override
        public MyResponseStream convert(ResponseBody responseBody) throws IOException {
            return new MyResponseStream(responseBody.byteStream());
        }
    }
}

MyResponseStream 看起来像

public class MyResponseStream extends MyResponse<ArrayList<JSONObject>> {

    private final static int BATCH_SIZE = 1000;

    public interface ObjectsStreamListener {
        void onObjectsParsed(String parentKey, ArrayList<ObjectNode> items);
    }

    private ArrayList<ObjectNode> mItems;
    private ObjectMapper mMapper;
    private ObjectsStreamListener mListener;
    private InputStream mInputStream;                

    public MyResponseStream(InputStream inputStream) {
        super();
        mInputStream = inputStream;
        mItems = new ArrayList<>();
    }

   public void start(ObjectsStreamListener listener) {
        mListener = listener;
        if (mInputStream != null) {
            parse();
        }
    }

    private void parse() {
        try {
            mMapper = new ObjectMapper();
            JsonParser parser = mMapper.getFactory().createParser(mInputStream);
            String key;
            JsonToken currentToken = parser.nextToken();

            while (currentToken != null) {
                parser.nextFieldName();
                key = parser.getCurrentName();

                if ("method".equals(key)) {
                    method = parser.nextTextValue();
                } else if ("success".equals(key)) {
                    isSuccess = parser.nextIntValue(0) == 1;
                    Cs.e("isSuccess " + isSuccess);
                } else if ("data".equals(key)) {
                    currentToken = parser.nextToken();
                    parseData(parser);
                } else {
                    currentToken = parser.nextToken();
                }
            }
            parser.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private void parseData(JsonParser parser) throws IOException {
        String currentKey;

        ObjectNode node;
        while (parser.nextToken() != null) {
            // Consume FIELD_NAME token
            parser.nextFieldName();

            // Get parent key (ex groups)
            currentKey = parser.getCurrentName();

            while (parser.getCurrentToken() == JsonToken.START_ARRAY) {
                while (parser.nextToken() == JsonToken.START_OBJECT) {
                    node = mMapper.readTree(parser);
                    mItems.add(node);
                    if (mItems.size() == BATCH_SIZE) {
                        if (mListener != null) {
                            mListener.onObjectsParsed(currentKey, mItems);
                            mItems.clear();
                        }
                    }
                }
            }

            if (!mItems.isEmpty()) {
                if (mListener != null) {
                    mListener.onObjectsParsed(currentKey, mItems);
                    mItems.clear();
                }
            }
        }
    }
}

为了获取解析的对象,我注册了一个监听器

typesObs.map(responseStream -> {
            responseStream.start(new MyResponseStream.ObjectsStreamListener() {
                @Override public void onObjectsParsed(String parentKey, ArrayList<ObjectNode> items) {
                    Cs.e("parentKey " + parentKey + " items " + items);
                }
            });
            return responseStream;
        })

这种方法有效,但它看起来不是一个好的解决方案,因为我没有以任何方式利用 RxJava observable。

我的问题:有没有办法从转换器调用可观察的结果onNext()? 我要更换

mListener.onObjectsParsed(currentKey, mItems);

类似的东西

retrofit.getRxCallAdapterFactory().getCallerObservable().onNext(items)

最佳答案

您可以采取的一种方法是创建适配器而不是转换器。

这种方法将按以下顺序起作用:

  1. 检查类型是否为Observable<MyResponseStream>
  2. 使用retrofit.callAdapter(...)检索 Observable<Response> 的适配器
  3. 创建一个自定义适配器,将可观察对象的创建委托(delegate)给之前检索到的适配器,然后应用操作来创建 Observable<MyResponseStream>返回的可观察值。

关于java - 如何将多个项目从自定义 Retrofit Converter 发送到 RxJavaCallAdapterFactory,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42931542/

相关文章:

java - 如何为android 3.0以上版本定义广播接收器?

javascript - 语法错误: unterminated string literal with JSON array

c# - 如何干净地反序列化字符串值包含在同名对象中的 JSON

java - 从 JSONObject 中删除除一个之外的所有键

java - JPA hibernate 。在一次调用中将具有外键引用的父实体和子实体持久化到父实体

java - 如何设置 swagger 文档以使用 dd/MM/yyyy 格式?

android - PreferenceActivity的优势

java - 如何使用 fireauth 确认登录以确认密码是否正确

java - 安卓横向设计

java - 将 jasperreport 导出为 .pdf 时如何设置 jasperreport 字体