observable - 使用 JavaRX 将文档插入 couchbase 时抛出 DocumentAlreadyExistsException 异常时如何继续处理下一个文档?

标签 observable rx-java couchbase couchbase-java-api

我正在处理批处理作业,该作业读取、转换文档并将其写入 coucbase。我正在使用 Java 批量插入文档。我不想使用 upsert,因为我想记录文档是否已存在于 couchbase 中并调用 insert。请在下面找到我的代码。当文档已经存在时,将引发 DocumentAlreadyExistsException 并且作业将停止,而不是继续处理下一个文档。如何处理这个问题?

代码:

    public void insertAll(Collection<JsonDocument> documents) {

    Observable.from(documents).flatMap(new Func1<JsonDocument, Observable<JsonDocument>>() {
        @Override
        public Observable<JsonDocument> call(final JsonDocument docToInsert) {
            return couchbaseConfig.catalogBucket().async().insert(docToInsert)
                    .doOnError((Throwable throwable) -> log.error(
                            "Exception {} occured while inerting document {} to cb", throwable.getMessage(),
                            docToInsert));
        }
    }).last().toBlocking().single();

}

异常(exception):

com.couchbase.client.java.error.DocumentAlreadyExistsException: null
at com.couchbase.client.java.bucket.api.Mutate$1$1.call(Mutate.java:154) ~[java-client-2.7.4.jar:na]
at com.couchbase.client.java.bucket.api.Mutate$1$1.call(Mutate.java:132) ~[java-client-2.7.4.jar:na]
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:69) ~[rxjava-1.3.8.jar:1.3.8]
at rx.observers.Subscribers$5.onNext(Subscribers.java:235) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeDoOnEach$DoOnEachSubscriber.onNext(OnSubscribeDoOnEach.java:101) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.producers.SingleProducer.request(SingleProducer.java:65) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:211) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeMap$MapSubscriber.setProducer(OnSubscribeMap.java:102) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx.Subscriber.setProducer(Subscriber.java:205) ~[rxjava-1.3.8.jar:1.3.8]
at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:103) ~[rxjava-1.3.8.jar:1.3.8]
at com.couchbase.client.core.endpoint.AbstractGenericHandler.completeResponse(AbstractGenericHandler.java:508) ~[core-io-1.7.4.jar:na]
at com.couchbase.client.core.endpoint.AbstractGenericHandler.access$000(AbstractGenericHandler.java:86) ~[core-io-1.7.4.jar:na]
at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:526) ~[core-io-1.7.4.jar:na]
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) ~[rxjava-1.3.8.jar:1.3.8]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[na:1.8.0_131]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[na:1.8.0_131]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) ~[na:1.8.0_131]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) ~[na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_131]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_131]
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.couchbase.client.core.message.kv.InsertResponse.class
at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:118) ~[rxjava-1.3.8.jar:1.3.8]
at rx.internal.operators.OnSubscribeMap$MapSubscriber.onNext(OnSubscribeMap.java:73) ~[rxjava-1.3.8.jar:1.3.8]
... 21 common frames omitted

最佳答案

您可以使用onErrorResumeNext来抑制错误,如下所示:

Observable.from(docs)
  .flatMap(docToInsert ->
    bucket.insert(docToInsert)
      .doOnError(t -> System.out.println("oops, error inserting " + docToInsert.id() + " : " + t))
      .onErrorResumeNext(t ->
        t instanceof DocumentAlreadyExistsException ? Observable.empty() : Observable.error(t))
  ).toCompletable().await();

请注意,即使没有发出任何项目,toCompleteable().await() 也会起作用;如果所有文档都已存在,single() 将抛出异常。

关于observable - 使用 JavaRX 将文档插入 couchbase 时抛出 DocumentAlreadyExistsException 异常时如何继续处理下一个文档?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56083434/

相关文章:

android - 在android应用程序中获取异常 "getViewID() < 0, Status: 404 (HTTP 404 not_found)"

javascript - 为什么当我的变量从可观察到的值每秒接收到一个新值时,ngModal不会更新输入值

Angular 5 同步 HTTP 调用

rxjs - 对一次性事件使用 Observable

java - 如何在正确(实时)的时间内传递可观察的元素?

java - RxJava 无法调用路由处理程序

android - 如何在没有丑陋的 instanceof 的情况下处理 Retrofit Rx onError 中的不同类型的错误

服务中的 Angular 4+ ngOnDestroy() - 销毁 observable

java - 使用 Couchbase Java 客户端的第一个查询不返回任何行

spring-boot - 是否可以在 Spring Boot 中连接到两个不同的沙发底座桶