android - 使用 retrofit2 和 rx java2 发送大量 POST 时出现 OutOfMemoryException

标签 android post mobile

我有一个带有本地数据库(房间)的应用程序和一个使用 retrofit 2rxjava<POST 来自数据库的所有“事件”的服务。当我发送大量 POST(即 1500+)时,应用会抛出 OutOfMemoryException。我认为发生这种情况是因为每次客户端发送新的 POST 时它都会启动一个新线程。有什么方法可以防止 retrofit/rxJava 创建这么多线程?还是等待服务器响应更好?这是我的代码:

从本地数据库检索所有事件的类

public class RetreiveDbContent {

private final EventDatabase eventDatabase;

public RetreiveDbContent(EventDatabase eventDatabase) {
    this.eventDatabase = eventDatabase;
}

@Override
public Maybe<List<Event>> eventsList() {

 return eventDatabase.eventDao().getAllEvents()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
}
}

接下来,我有一项服务可以遍历数据库事件列表并发布所有事件。如果后端发回成功,则该事件将从本地数据库中删除。

    private void sendDbContent() {

    mRetreiveDbContent.eventsList()
            .subscribe(new MaybeObserver<List<Event>>() {

        @Override
        public void onSubscribe(Disposable d) {
        }

        @Override
        public void onSuccess(final List<Event> events) {


            Timber.e("Size of list from db " + events.size());
            final CompositeDisposable disposable = new CompositeDisposable();

            Observable<Event> eventObservable = Observable.fromIterable(events);
            eventObservable.subscribe(new Observer<Event>() {
                @Override
                public void onSubscribe(Disposable d) {
                    disposable.add(d);
                }

                @Override
                public void onNext(Event event) {
                    Timber.d("sending event from db " + event.getAction());
                    mPresenter.postEvent(Event);
                }

                @Override
                public void onError(Throwable e) {
                    Timber.e("error while emitting db content " + e.getMessage());
                }

                @Override
                public void onComplete() {
                    Timber.d("Finished looping through db list");
                    disposable.dispose();
                }
            });

        }

        @Override
        public void onError(Throwable e) {
            Timber.e("Error occurred while attempting to get db content " + e.getMessage());
        }

        @Override
        public void onComplete() {
            Timber.d("Finished getting the db content");
        }
    });
}

这是我的 postEvent()deleteEvent() 方法,存在于演示者中

    public void postEvent(final Event event) {

    mSendtEvent.sendEvent(event)
          .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new DisposableObserver<Response<ResponseBody>>() {
                @Override
                public void onNext(Response<ResponseBody> responseBodyResponse) {

                    switch (responseBodyResponse.code()) {
                        case CREATED_RESPONSE:
                            Timber.d("Event posted successfully " + responseBodyResponse.code());
                            deleteEventFromRoom(event);
                            break;
                        case BAD_REQUEST:
                            Timber.e("Client sent a bad request! We need to discard it!");
                            break;
                    }
                }

                @Override
                public void onError(Throwable e) {
                    Timber.e("Error " + e.getMessage());
                    mView.onErrorOccurred();
                }

                @Override
                public void onComplete() {

                }
            });
}


    public void deleteEventFromRoom(final Event event) {

    final CompositeDisposable disposable = new CompositeDisposable();
    mRemoveEvent.removeEvent(event)
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer() {
                @Override
                public void onSubscribe(Disposable d) {
                    disposable.add(d);
                }

                @Override
                public void onNext(Object o) {
                    Timber.d("Successfully deleted event from database " + event.getAction());
                }

                @Override
                public void onError(Throwable e) {

                }

                @Override
                public void onComplete() {
                    disposable.dispose();
                }
            });
}

最后是mRemoveEvent 交互器

public class RemoveEvent {

private final EventDatabase eventDatabase;

public RemoveEvent(EventDatabase eventDatabase) {
    this.eventDatabase = eventDatabase;
}

@Override
public Observable removeEvent(final Event event) {
    return Observable.fromCallable(new Callable<Object>() {
        @Override
        public Object call() throws Exception {
            return eventDatabase.eventDao().delete(event);
        }
    });
}
}

注意:我是 RXJava 世界的新手。 提前谢谢你

最佳答案

您正在使用不支持背压的 Observable

Fom RxJava github 页面:

Backpressure

When the dataflow runs through asynchronous steps, each step may perform different things with different speed. To avoid overwhelming such steps, which usually would manifest itself as increased memory usage due to temporary buffering or the need for skipping/dropping data, a so-called backpressure is applied, which is a form of flow control where the steps can express how many items are they ready to process. This allows constraining the memory usage of the dataflows in situations where there is generally no way for a step to know how many items the upstream will send to it.

In RxJava, the dedicated Flowable class is designated to support backpressure and Observable is dedicated for the non-backpressured operations (short sequences, GUI interactions, etc.). The other types, Single, Maybe and Completable don't support backpressure nor should they; there is always room to store one item temporarily.

您应该使用Flowable,您正在将所有事件发送到下游以使用所有可用资源进行处理。

这是一个简单的例子:

Flowable.range(1, 1000)
        .buffer(10)//Optional you can process single event
        .flatMap(buf -> {
            System.out.println(String.format("100ms for sending events to server: %s ", buf));
            Thread.sleep(100);
            return Flowable.fromIterable(buf);
        }, 1)// <-- How many concurrent task should be executed
        .map(x -> x + 1)
        .doOnNext(i -> System.out.println(String.format("doOnNext: %d", i)))
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.single(), false, 1)//Overrides the 128 default buffer size
        .subscribe(new DefaultSubscriber<Integer>() {
    @Override
    public void onStart() {
        request(1);
    }

    @Override
    public void onNext(Integer t) {
        System.out.println(String.format("Received response from server for event : %d", t));
        System.out.println("Processing value would take some time");
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        //You can request for more data here
        request(1);
    }

    @Override
    public void onError(Throwable t) {
        t.printStackTrace();
    }

    @Override
    public void onComplete() {
        System.out.println("ExampleUnitTest.onComplete");
    }
});

最后一个提示:您不应该一次将所有事件都提取到内存中,基本上您将所有“数据库事件”保存在内存中,考虑分页或类似Cursor 的东西,提取 100 行每个操作并在处理完它们后请求下 100 个,我希望您使用 JobScheduler 或 WorkManager API 执行此操作

关于android - 使用 retrofit2 和 rx java2 发送大量 POST 时出现 OutOfMemoryException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49295363/

相关文章:

java - Android 中的 JSON 解码

java - 在我的 HTC Evo 上使用 Eclipse 进行 Android 调试 : Augh!

java - fragment 未显示在 Mainactivity 中,在我的 Activity fragment 内容中仅在我单击工具栏时显示

android - 如何使用 Okhttpclient 创建带参数的 Http Delete 或 Put 方法?安卓okhttp客户端

javascript - 为什么我在实际工作时得到 "Fetch failed loading"?

post - 如何将图像发送到 dart/flutter 中的 api?

javascript - 如何为移动设备实现滑动手势?

android - 复制保护对免费应用有意义吗?

css - 网站不会在移动设备上滚动

ruby-on-rails - Rails + Devise + API + 用户注册