Java react 器通量未根据预期进行映射

标签 java spring-boot project-reactor

我有以下演示应用程序设置:

  • mongodb 包含 2 个集合:1 个包含加密货币,1 个包含这些加密货币的汇率
  • spring webflux 项目使用服务器发送事件获取这些汇率的实时更新

我有一个返回 Flux 的服务的 List<CryptoCurrencyRateDTO>基于加密货币集合中存在的货币。我为这些货币中的每一种生成一个随机汇率并将它们流式传输到 Web 客户端。

服务是这样的:

@Service
public class CryptoCurrencyRateService {
  @Autowired private CryptoCurrencyRateRepository rateRepository;
  @Autowired private CryptoCurrencyRepository currencyRepository;

  // constructor

  public Flux<List<CryptoCurrencyRateDTO>> realtimeRates() {
    return currencyRepository.findAll()
      .map(CryptoCurrency::getSymbol)
      .flatMap(rateRepository::findTopBySymbolOrderByTimestamp)
      .zipWith(
        Flux.<Long>generate(sink -> sink.next(Instant.now().toEpochMilli())),
        (rate, timestamp) -> new CryptoCurrencyRate(rate.getSymbol(), timestamp, randomRateBasedOnPrevious )
      )
      .flatMap(rateRepository::save)
      .map(rateMapper::toDto)
      .collectList()
      .delayElement(Duration.ofSeconds(5))
      .repeat();
  }
}

CryptoCurrencyRateRepository如下:

@Repository
public interface CryptoCurrencyRateRepository extends ReactiveMongoRepository<CryptoCurrencyRate, String> {
    Mono<CryptoCurrencyRate> findTopBySymbolOrderByTimestamp(String symbol);
}

但是在调用 .flatMap(rateRepository::findTopBySymbolOrderByTimestamp) 之后我只得到一个 Flux包含 1 个项目,而我认为我会得到 Flux包含 currencyRepository.findAll().map(CryptoCurrency::getSymbol) 中每个符号的最高汇率打电话是因为我的加密货币 Collection 包含 3 种货币。

当我查看日志记录时,我看到对 findTopBySymbolOrderByTimestamp 的调用执行3次

2018-11-16 16:04:33.626 DEBUG 3387 --- [ntLoopGroup-2-3] o.s.d.m.core.ReactiveMongoTemplate       : find using query: { "symbol" : "BTC" } fields: Document{{}} for class: class nl.reactive.charts.server.domain.CryptoCurrencyRate in collection: cryptoCurrencyRate
2018-11-16 16:04:33.627 DEBUG 3387 --- [ntLoopGroup-2-3] o.s.d.m.core.ReactiveMongoTemplate       : find using query: { "symbol" : "ETH" } fields: Document{{}} for class: class nl.reactive.charts.server.domain.CryptoCurrencyRate in collection: cryptoCurrencyRate
2018-11-16 16:04:33.629 DEBUG 3387 --- [ntLoopGroup-2-3] o.s.d.m.core.ReactiveMongoTemplate       : find using query: { "symbol" : "XRP" } fields: Document{{}} for class: class nl.reactive.charts.server.domain.CryptoCurrencyRate in collection: cryptoCurrencyRate

最佳答案

我无法重现您的问题。我是这样模仿的

public static void main(String[] args) {
    Flux<String> stringFlux = Flux.fromStream(Stream.of("a", "b", "c"));
    System.out.println(realtimeRates(stringFlux).blockFirst());
}

static Flux<List<String>> realtimeRates(Flux<String> list) {
    Flux<String> symbols = list.map(Scratch::getSymbol);
    Flux<String> topRates = symbols.flatMap(Scratch::findTopBySymbolOrderByTimestamp);
    Flux<String> zip = topRates.zipWith(
        Flux.<Long>generate(sink -> sink.next(Instant.now().toEpochMilli())),
        (rate, timestamp) -> rate + timestamp.toString());
    Mono<List<String>> listMono = zip.collectList();
    Mono<List<String>> delayElement = listMono.delayElement(Duration.ofSeconds(5));
    Flux<List<String>> repeat = delayElement.repeat();
    return repeat;
}

static Mono<String> findTopBySymbolOrderByTimestamp(String symbol) {
    return Mono.just("other-" + symbol);
}

static String getSymbol(String rate) {
    return rate.toLowerCase();
}

如您所见,您将得到类似 [other-a1542821666133, other-b1542821666133, other-c1542821666133] 的信息.

您如何检查平面图结果?请注意,如果您使用 blockFirst()blockLast()方法你只会得到一个元素,因为它是 Flux<String> (检查上面代码中的 topRates 变量)

关于Java react 器通量未根据预期进行映射,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53340911/

相关文章:

java - 在混合 Java/Kotlin 项目中使用 Dagger 2 的 Maven 配置

java - 地理位置权限不足

java - 在 Spring Security 中,UserDetails 将密码存储为 String,但我将其存储为 byte[]

java - for循环只读取最后一个值

java - 在Java中修改来自外部库的文件

java - Spring Boot Auth 服务器中的多个 SSO 提供程序

spring - 有记录 Spring @Value 字段的好方法吗?

java - Observable.map 抛出错误未在订阅中捕获

java - 如何正确管理 Reactor 中的可关闭资源

kotlin - 在一个响应中发送多个不同类型的 Flux