javascript - 遍历树结构时如何避免 `shareReplay`

标签 javascript rxjs reactive-programming

我正在尝试使用 RxJS 重写包管理器(PM 是 pnpm ,PR 是 here )。

在重写过程中,我使用了很多 .shareReplay(Infinity),有人告诉我这很糟糕(我是响应式(Reactive)编程的初学者)

有人可以建议一种替代方法,如何重写如下代码,而不使用 .shareReplay(Infinity):

'use strict'
const Rx = require('@reactivex/rxjs')

const nodes = [
  {id: 'a', children$: Rx.Observable.empty()},
  {id: 'b', children$: Rx.Observable.empty()},
  {id: 'c', children$: Rx.Observable.from(['a', 'b', 'd'])},
  {id: 'd', children$: Rx.Observable.empty()},
  {id: 'e', children$: Rx.Observable.empty()},
]

// I want this stream to be executed once, that is why the .shareReplay
const node$ = Rx.Observable.from(nodes).shareReplay(Infinity)

const children$ = node$.mergeMap(node => node.children$.mergeMap(childId => node$.single(node => node.id === childId)))

children$.subscribe(v => console.log(v))

最佳答案

<罢工>groupBy运算符(operator)应该在这里工作。从 PR 来看,这可能过于简单化了,但这里是:

'use strict'
const Rx = require('@reactivex/rxjs')

const nodes = [
  {id: 'a', children$: Rx.Observable.empty()},
  {id: 'b', children$: Rx.Observable.empty()},
  {id: 'c', children$: Rx.Observable.from(['a', 'b', 'd'])},
  {id: 'd', children$: Rx.Observable.empty()},
  {id: 'e', children$: Rx.Observable.empty()},
]

Rx.Observable.from(nodes)
 // Group each of the nodes by its id
 .groupBy(node => node.id)
 // Flatten out each of the children by only forwarding children with the same id
 .flatMap(group$ => group$.single(childId => group$.key === childId))
 .subscribe(v => console.log(v));

<罢工>

编辑:比我想象的更难

好吧,在我第二次通读时,我发现这需要比我最初想象的更多的工作,所以它不能这么简单地简化。基本上,您必须在内存复杂度和时间复杂度之间进行选择,因为没有 Elixir 。

从优化的 Angular 来看,如果初始源只是一个数组,那么您可以删除 shareReplay它将以完全相同的方式工作,因为当订阅 ArrayObservable 时,唯一的开销将是迭代数组,重新运行源实际上没有任何额外成本。

基本上,我认为你可以想到两个维度,即节点数 m平均 child 数量n 。在速度优化版本中,您最终将不得不运行 m两次,您将需要迭代“n”个节点。既然你有 m*n 个 child ,最坏的情况就是他们都是独一无二的。这意味着你需要做(m + m*n)操作简化为 O(m*n)如果我没有记错的话。这种方法的缺点是您需要同时拥有 (nodeId -> Node) 的 Map 和用于删除重复依赖项的 Map。

'use strict'
const Rx = require('@reactivex/rxjs')

const nodes = [
  {id: 'a', children$: Rx.Observable.empty()},
  {id: 'b', children$: Rx.Observable.empty()},
  {id: 'c', children$: Rx.Observable.from(['a', 'b', 'd'])},
  {id: 'd', children$: Rx.Observable.empty()},
  {id: 'e', children$: Rx.Observable.empty()},
]

const node$ = Rx.Observable.from(nodes);

// Convert Nodes into a Map for faster lookup later
// Note this will increase your memory pressure.
const nodeMap$ = node$
  .reduce((map, node) => {
    map[node.id] = node;
    return map;
  });


node$
  // Flatten the children
  .flatMap(node => node.children$)
  // Emit only distinct children (you can remove this to relieve memory pressure
  // But you will still need to perform de-duping at some point.
  .distinct()
  // For each child find the associated node
  .withLatestFrom(nodeMap$, (childId, nodeMap) => nodeMap[childId])
  // Remove empty nodes, this could also be a throw if that is an error
  .filter(node => !!node)
  .subscribe(v => console.log(v));

另一种方法是使用与您类似的方法,该方法侧重于以性能为代价减少内存压力。请注意,就像我说的,如果您的源是一个数组,您基本上可以删除 shareReplay ,因为它在重新评估时所做的就是重新迭代该数组。这消除了额外 Map 的开销。虽然我认为你仍然需要不同的来删除重复项。最坏情况下的运行时复杂度将是 O(m^2*n)或者简单地O(m^2)如果n很小,因为您需要迭代所有子项,并且对于每个子项,您还需要迭代 m再次查找匹配节点。

const node$ = Rx.Observable.from(nodes);
node$
  // Flatten the children
  .flatMap(node => node.children$)
  // You may still need a distinct to do the de-duping
  .flatMap(childId => node$.single(n => n.id === childId)));

我想说第一个选项在几乎所有情况下都更可取,但我将其留给您来确定您的用例。您可能会建立一些启发式算法,在某些情况下选择一种算法而不是另一种算法。

旁注:抱歉,这不是那么容易,但是喜欢 pnpm,所以继续努力!

关于javascript - 遍历树结构时如何避免 `shareReplay`,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45908003/

相关文章:

javascript - 如何清空 Angular 4 中的可观察对象

Scala 我可以创建多少个 future

rxjs - 我们应该为每个 Action 类型创建一个 Epic 吗?在 redux-observable 中

rx-java - 除了 System.currentTimeMillis() 之外,使用 .defer() 的一个好用例是什么

java - react 堆项目 : Multiple Publishers making HTTP calls and one Subscriber to handle all results

javascript - 我应该如何将异步服务中的数据设置到 Controller $scope 中?

javascript - 如何动态地用右 CSS 属性替换左 CSS 属性(在 .css 文件中设置)?

2个不同输入的JavaScript非常简单的表单验证

javascript - 客户端项目在端口 x 上运行,服务器 api 端点在端口 y 上运行。 AJAX如何知道webapi端点在哪里

javascript - 如何在订阅时获取观察者的 "current"值