java - 组间最后值的总和

标签 java reactive-programming rx-java

假设我们有一个合作游戏,玩家可以获得和失去分数。任何玩家都可以随时加入游戏。一旦加入,玩家就永远不会离开(为了简单起见)。球队的总得分只是每个球员在任何时刻的得分之和。玩家应该能够看到自己当前的得分和团队的总得分。

因此,我们有一个与时间戳绑定(bind)的所有玩家的分数流。

Time:   0  1  2 3 4 5  6  7    8  9 10 11 12 13 14 15 16    17  18 19 20
Stream: A5 A4       B2 A3 A4B4 A9         K5    B6 A8 B10K9 A12

字母代表玩家,数字代表得分。像A4B4这样的值意味着两个玩家的得分同时改变。在 groupBy 之后,我们有三个流:每个流对应一个玩家。请注意,玩家集不是预定义的。

Time:   0  1  2 3 4 5  6  7    8  9 10 11 12 13 14 15 16    17  18 19 20

Alex:   5  4           3  4    9                   8        12
Ben:                2     4                     6     10
Katie:                                    5           9

我期望得到的是分数每次变化的总和流:

Time:   0  1  2 3 4 5  6  7    8  9 10 11 12 13 14 15 16    17  18 19 20
Total:  5  4        6  5  8    13         18    20 19 27    31

那是微不足道的。但解决方案变得复杂。

这是一个输入流。

Observable< Record > input = Observable.from( new Record[] {
  new Record( "Alex", 0, 5 ),
  new Record( "Alex", 1, 4 ),
  new Record( "Ben", 5, 2 ),
  new Record( "Alex", 6, 3 ),
  new Record( "Alex", 7, 4 ),
  new Record( "Ben", 7, 4 ),
  new Record( "Alex", 8, 9 ),
  new Record( "Katie", 12, 5 ),
  new Record( "Ben", 14, 6 ),
  new Record( "Alex", 15, 8 ),
  new Record( "Katie", 16, 9 ),
  new Record( "Ben", 16, 10 ),
  new Record( "Alex", 17, 12 )
} )
.delay( e -> Observable.interval( e.timestamp, TimeUnit.SECONDS, scheduler ) )
.doOnCompleted( latch::countDown )
.share();

Record 是一个简单的元组:Record( String player, int timestamp, double Score )

让我们将其分成几组:

Observable< Observable< Record > > output = input
  .groupBy( e -> e.player )
  .map( g -> g.cache() )

标准的 toList 方法需要 Observable 来完成。但组流仅在输入流完成时才完成(任何玩家都可以随时加入游戏)。因此需要一个新的变压器。它在 onNext 调用时返回一个列表,而不是 onComplete

  .compose( new ToListOnNextTransformer() )

然后,每次发出新组(每次有新玩家加入游戏时),我们都会生成一个新的 Observable。该 Observable 返回所有玩家最新得分的总和。

  .map( eachPlayerRecordsList -> Observable.combineLatest( eachPlayerRecordsList, ( Object... eachPlayerRecordsArray ) -> {
    double total = Arrays
      .stream( eachPlayerRecordsArray )
      .mapToDouble( e -> ( ( Record )e ).score )
      .sum();
    int timestamp = Arrays
      .stream( eachPlayerRecordsArray )
      .mapToInt( e -> ( ( Record )e ).timestamp )
      .max()
      .getAsInt();
    return new Record( "Total", timestamp, total );
} ) );

最后,我们输出第一组的值,直到第二个玩家加入游戏。然后我们输出第二组的值,直到第三个玩家加入,等等。这正是 switchOnNext 所做的。

Observable.switchOnNext( outputg )
  .subscribe( System.out::println, e -> { ( ( Throwable )e ).printStackTrace(); } );

自定义变压器类:

private static class ToListOnNextTransformer implements Observable.Transformer< Observable< Record >, List< Observable< Record > > > {
  private final List< Observable< Record > > list = new ArrayList<>();
  private final PublishSubject< List< Observable< Record > > > ret = PublishSubject.create();

  @Override
  public Observable< List< Observable< Record > > > call( Observable< Observable< Record > > eachPlayerRecords ) {
    eachPlayerRecords.subscribe( playerRecords -> {
      list.add( playerRecords );
      ret.onNext( new ArrayList<>( list ) );
    } );
    return ret;
  }
}

问题是:

  • 可以简化吗?最好采用响应式(Reactive)方法。
  • 我没有违反RxJava的一些规则吗?比如在订阅者内部使用订阅者等。

这是一项人为任务。我正在学习 RxJava 并试图理解响应式(Reactive)编程。我希望存在更好的方法,但我只是错过了一些明显的东西。

The full source code of a test file .

最佳答案

我不完全理解您的示例代码,但您可以通过 scan 进行运行求和:

source.scan(0, (a, b) -> a + b)

因此,给定人员及其点的事件流,您可以创建两个组:

ConnectableObservable<Record> co = input.publish(); // replay, cache, etc.

Observable<Pair<String, Integer>> userScores = co
.groupBy(r -> r.name)
.flatMap(g -> just(Pair.of(g.getKey(), g.scan(0, (a, b) -> a + b))));

Observable<Pair<String, Integer>> teamScores = co
.groupBy(r -> r.team)
.flatMap(g -> just(Pair.of(g.getKey(), g.scan(0, (a, b) -> a + b))));

userScores.filter(p -> p.name.equals("Alex"))
.concatMap(p -> p.second.map(s -> Pair.of(p.first, s)))
.subscribe(System.out::println);

co.connect();

关于java - 组间最后值的总和,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32717608/

相关文章:

javascript - 根据特定条件加入 RxJs 中的两个可观察流

java - RxJava - 在 Observable<List<Service>> 中等待所有服务的 Observable<Boolean>

angular - RxJS : Filter one observable using another observable

java - 使用单线程调度程序时,RxJava mergeLatest 的单元测试挂起

java - 有没有办法将 @Scheduled 与 Duration 字符串(如 15s 和 5m)一起使用?

java - 将源文件包含在可运行的 jar 文件中

Java 正则表达式以条件替换器开头和结尾

java - 运行 jar : Exception in thread "main" org. eclipse.swt.SWTException 时出现 SWT 异常:无效的线程访问

java - mergeDelayError() 之后 Observable.take() 是如何工作的?

java - 在没有事件总线的情况下触发事件时自动从 observable 获取项目