java - 在 Scala 中使用 RxJava 和平面 map

标签 java scala observable rx-java

我正在尝试在 Scala 应用程序中使用 RX Java 并行执行多个请求

运行成功的java代码为

List<rx.Observable<Response<MyObject>>> observables = requests.stream().map(
    request -> client.getResponse(request).collect(Collectors.toList());

Observable<MyObject> mergedObservable = Observable.merge(observables)
    .flatMap(page -> {
        List<MyObject> objects = page.getResults();
        Observable<MyObject> objectsObservable =  Observable.from(objects);
        return objectsObservable;
}).toBlocking().getIterator()

我想在 Scala 中编写此代码段的等效内容,我所拥有的是

val observables: Array[Observable[Response[MyObject]]] = requests.map(request => client.getResponse(request));
Observable.merge(observables.toList)
       .flatMap[Observable[MyObject]]((page: Response[MyObject]) => {
       val resutls: java.util.List[MyObject] = page.getResults
       val resultObservable: Observable[MyObject] = Observable.from(resutls)
       resultObservable
    }).toBlocking
      .getIterator

代码失败并出现异常

 overloaded method value flatMap with alternatives:
  (x$1: rx.functions.Func1[_ >: com.mypackage.Response[com.mypackage.MyObject], _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x$2: rx.functions.Func1[_ >: Throwable, _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x$3: rx.functions.Func0[_ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x$4: Int)rx.Observable[rx.Observable[com.mypackage.MyObject]] <and>
  (x$1: rx.functions.Func1[_ >: com.mypackage.Response[com.mypackage.MyObject], _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x$2: rx.functions.Func1[_ >: Throwable, _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x$3: rx.functions.Func0[_ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]])rx.Observable[rx.Observable[com.mypackage.MyObject]] <and>
  (x$1: rx.functions.Func1[_ >: com.mypackage.Response[com.mypackage.MyObject], _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]],x$2: Int)rx.Observable[rx.Observable[com.mypackage.MyObject]] <and>
  (x$1: rx.functions.Func1[_ >: com.mypackage.Response[com.mypackage.MyObject], _ <: rx.Observable[_ <: rx.Observable[com.mypackage.MyObject]]])rx.Observable[rx.Observable[com.mypackage.MyObject]]
 cannot be applied to (com.mypackage.Response[com.mypackage.MyObject] => rx.Observable[com.mypackage.MyObject])

关于如何更改 Scala 代码以避免这种情况有什么想法吗?

最佳答案

如果您仔细阅读,错误表明它期望看到类似 Func1[Response[MyObject], Observable[Observable[MyObject]]] 的内容,因为 上的类型参数>平面 map 。删除它或更改为正确的 flatMap[MyObject] 可能足以解决问题(假设您使用的是 Scala 2.12 或使用 -Xexperimental 的 Scala 2.11;否则您可以不要直接对 Func 使用 lambda,并且需要一个辅助函数)。

如果不是,我只需提取 lambda 并为其提供所需的类型(仍然假设 Scala 2.12):

val f: Func1[Response[MyObject], Observable[MyObject]] = 
  page => Observable.from(page.getResults)

Observable.merge(observables.toList).flatMap(f).toBlocking.getIterator

您也可以内联完成:

.flatMap({ page =>
   val resutls: java.util.List[MyObject] = page.getResults
   val resultObservable: Observable[MyObject] = Observable.from(resutls)
   resultObservable
}: Func1[Response[MyObject], Observable[MyObject]])...

关于java - 在 Scala 中使用 RxJava 和平面 map ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55716542/

相关文章:

scala - 递归列表连接

javascript - Angular - 正确嵌套 Observables 及其订阅

java - ubuntu 上 maven jar 插件的性能不佳

java - 如何在 Spring MVC 中解码 Gzip 压缩请求体

java - 线程中出现异常 "main"java.lang.StringIndexOutOfBoundsException

java - 如何在准备好的语句查询中使用oracle SYSTIMESTAMP?

scala - 查找 Seq 中满足条件 X 的第一个元素

Scala 匹配错误

javascript - 如何取消/恢复对可观察模型的更改(或用未更改的副本替换数组中的模型)

javascript - 如何将带有异步等待功能的 promise 更改为可观察的?