java - 如何在热源上使用 groupBy

标签 java spring-webflux project-reactor

给出以下代码;

我有一个假的“热源”,我想在其中每 2 秒打印每个城市的最后一个值。我看到 log 点 A 和 B 的行为符合我的预期。但是,代码在 groupBy 上阻塞,并且只有 every 在 log 点 C 处发出最终值。我怎样才能让“C”每 2 秒发出一次。

public class Weather {
    String city;
    Integer temperature;

public Weather(String city, Integer temperature) {
    super();
    this.city = city;
    this.temperature = temperature;
}

@Override
public String toString() {
    return "Weather [city=" + city + ", temperature=" + temperature + "]";
}

public static void main(String[] args) {

    BlockingQueue<Weather> queue = new LinkedBlockingQueue<>();

    new Thread(() -> {
        for (int d = 1; d < 100; d += 1) {
            for (String s: new String[] {"LDN", "NYC", "PAR", "ZUR"}) {
                queue.add(new Weather(s, d));
                try { Thread.sleep(250); } catch (InterruptedException e) {}
            }
        }
    }).start(); 

    Flux<Weather> outgoing = Flux.create(
        sink -> {
            for (int i = 0; i < 100; i++) {
                try {
                    sink.next(queue.take());
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            sink.complete();
        }
    );

    ConnectableFlux<Weather> subscriber = outgoing.publish();
    subscriber
    .buffer(Duration.ofSeconds(2))
    .log("A")
    .flatMap(Flux::fromIterable)
    .log("B")
    .groupBy(c -> c.city)
    .flatMap(Flux::last)
    .log("C")           

    .subscribe(s -> System.out.println(">>>>>" + s));


    subscriber.connect();
    System.exit(0);
}

}

最佳答案

这似乎有效;

        subscriber
          .groupBy(c -> c.city)
          .flatMap(g -> g
            .take(Duration.ofSeconds(5))
            .takeLast(1)
          )
          .subscribe(s -> System.out.println(">>>>>" + s));

关于java - 如何在热源上使用 groupBy,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55162934/

相关文章:

java - PhoneAuthProvider.OnVerificationStateChangedCallbacks 没有被解雇

java - 如何并行调度多个 Flux 并限制 onNext 和 onRequest 事件

spring-mvc - 如何在 Spring Webflux 中返回 Mono<Map<String, Flux<Integer>>> 响应?

kotlin - Swagger 与 Spring webflux 依赖构建问题

java - 线程安全响应式(Reactive)缓存

java - Flux.repeat() 不重复

java - 缓存和使缓存的 Mono 失效

Java 设置文件夹所有者,以便只有程序才能访问它

java - 在数据库中存储 Java 类并加载它 使用它 销毁它

java - 自定义图标 JOptionPane.showInputDialog