在我的 Android 应用程序中,我有一个处理用户交互的演示器,包含某种请求管理器,如果需要,可以通过请求管理器将用户输入发送到请求管理器。
请求管理器本身包含服务器 API 并使用此 RxJava 处理服务器请求。
我有一个代码,每当用户输入一条消息并显示服务器的响应时,它就会向服务器发送一个请求:
private Observable<List<Answer>> sendRequest(String request) {
MyRequest request = new MyRequest();
request.setInput(request);
return Observable.fromCallable(() -> serverApi.process(request))
.doOnNext(myResponse -> {
// store some data
})
.map(MyResponse::getAnswers)
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread());
}
但是现在我需要某种队列。用户可以在服务器响应之前发送一条新消息。队列中的每条消息都应按顺序处理。 IE。第二条消息将在我们收到对第一条消息的响应后发送,依此类推。
如果发生错误,则不应处理进一步的请求。
我还需要在 RecyclerView 中显示答案。
我不知道如何更改上面的代码来实现上述处理
我看到了某种问题。一方面,这个队列可以由用户随时更新,另一方面,只要服务器发送响应,消息就应该从队列中删除。
也许有一个 rxjava 运算符或我刚刚错过的特殊方式。
我在这里看到了类似的答案,但是,那里的“队列”是不变的。 Making N sequential api calls using RxJava and Retrofit
我将非常感谢任何解决方案或链接
最佳答案
我没有找到任何优雅的原生 RxJava 解决方案。所以我会定制一个Subscriber
做你的工作。
对于你的 3 点:
为了顺序执行,我们创建了一个单线程调度器
Scheduler sequential = Schedulers.from(Executors.newFixedThreadPool(1));
为了在发生错误时停止所有请求,我们应该一起订阅所有请求而不是创建一个
Flowable
每次。所以我们定义了以下函数(这里我请求是Integer
和响应String
):void sendRequest(Integer request)
Flowable<String> reciveResponse()
并定义一个字段来关联请求和响应流:
FlowableProcessor<Integer> requestQueue = UnicastProcessor.create();
为了重新运行未发送的请求,我们定义了重新运行函数:
void rerun()
然后我们就可以使用它了:
reciveResponse().subscribe(/**your subscriber**/)
现在让我们来实现它们。
发送请求时,我们简单地将其插入requestQueue
public void sendRequest(Integer request) {
requestQueue.onNext(request);
}
首先,要按顺序执行请求,我们应该将工作安排到 sequential
:
requestQueue
.observeOn(sequential)
.map(i -> mockLongTimeRequest(i)) // mock for your serverApi.process
.observeOn(AndroidSchedulers.mainThread());
其次,发生错误时停止请求。这是一种默认行为。如果我们什么都不做,一个错误将破坏订阅并且不会发出任何进一步的项目。
第三,重新运行未发送的请求。首先是因为 native 运算符将取消流,例如 MapSubscriber
做(RxJava-2.1.0-FlowableMap#63):
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);// fail will call cancel
return;
}
我们应该包装错误。在这里我使用我的 Try
类来包装可能的异常,您可以使用任何其他可以包装异常而不是抛出异常的实现:
.map(i -> Try.to(() -> mockLongTimeRequest(i)))
然后是自定义 OnErrorStopSubscriber implements Subscriber<Try<T>>, Subscription
.
它正常请求和发出项目。当错误发生时(实际上是失败的 Try
发出)它停在那里并且不会请求或发出甚至下游请求它。后拨rerun
方法,它会回到运行状态并正常发射。该类大约有80行。您可以在 my github 上查看代码.
现在我们可以测试我们的代码了:
public static void main(String[] args) throws InterruptedException {
Q47264933 q = new Q47264933();
IntStream.range(1, 10).forEach(i -> q.sendRequest(i));// emit 1 to 10
q.reciveResponse().subscribe(e -> System.out.println("\tdo for: " + e));
Thread.sleep(10000);
q.rerun(); // re-run after 10s
Thread.sleep(10000);// wait for it complete because the worker thread is deamon
}
private String mockLongTimeRequest(int i) {
Thread.sleep((long) (1000 * Math.random()));
if (i == 5) {
throw new RuntimeException(); // error occur when request 5
}
return Integer.toString(i);
}
和输出:
1 start at:129
1 done at:948
2 start at:950
do for: 1
2 done at:1383
3 start at:1383
do for: 2
3 done at:1778
4 start at:1778
do for: 3
4 done at:2397
5 start at:2397
do for: 4
error happen: java.lang.RuntimeException
6 start at:10129
6 done at:10253
7 start at:10253
do for: 6
7 done at:10415
8 start at:10415
do for: 7
8 done at:10874
9 start at:10874
do for: 8
9 done at:11544
do for: 9
您可以看到它按顺序运行。并在发生错误时停止。后拨rerun
方法,它继续处理左边未发送的请求。
关于android - RxJava。顺序执行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47264933/