java - 如何在同一线程中并行通量内运行单声道

标签 java reactive-programming spring-webflux

我正在尝试用 Mono 的值填充 Flux 内的对象。当我尝试这样做时,它只是忽略我的“设置”操作。我认为这是因为 Flux 是并行工作的,而 Mono 不是。我该如何解决这个问题?

Flux.fromIterable(proxyParserService.getProxyList())
            .parallel()
            .runOn(Schedulers.parallel())
            .filter(proxy -> proxy.getCorrupted() == null || !proxy.getCorrupted())
            .subscribe(proxy -> {
                        try {
                            RestTemplate restTemplate = getProxiedTemplate(proxy.getHost(), proxy.getPort());
                            restTemplate.exchange(URI, HttpMethod.GET, HttpEntity.EMPTY, String.class);
                            geoDataService.getData(proxy.getHost()) // Here comes the Mono object, that contains needed value to set into "proxy"
                                    .subscribe(geoData ->
                                    {
                                        log.info("GEODATA: {} ", geoData);
                                        proxy.setCountryCode(geoData.getCountryCode()); // ignored somehow
                                    });
                            proxy.setCorrupted(false);
                            addresses.add(proxy);
                            log.info("IP {}:{} is OK", proxy.getHost(), proxy.getPort());
                            log.info("Final result: {}", proxy.toString());
                        } catch (ResourceAccessException e) {
                            log.info("IP {}:{} is corrupted!", proxy.getHost(), proxy.getPort());
                            proxy.setCorrupted(true);
                            addresses.add(proxy);
                        }
                    },
                    throwable -> log.error(String.format("Exception caught while trying to fill map: %s", throwable.getCause())));

}

Here's some logs

如您所见,我正在尝试将国家/地区代码设置为代理。

最佳答案

已解决。在“flatMap”运算符中添加了 Mono 对象。 示例:

Flux.fromIterable(proxyParserService.getProxyList())
            .parallel()
            .runOn(Schedulers.parallel())
            .filter(poxy -> !valueExist(addresses.values(), poxy))
            .flatMap(geoDataService::getData) // Now it runs in parallel threads
            .subscribe(proxy -> {
                        try {
                            RestTemplate restTemplate = getProxiedTemplate(proxy.getHost(), proxy.getPort());
                            restTemplate.exchange(URI, HttpMethod.GET, HttpEntity.EMPTY, String.class);
                            proxy.setCorrupted(false);
                            addresses.put(proxy.getCountryCode(), proxy);
                            log.info("IP {}:{} is OK", proxy.getHost(), proxy.getPort());
                            log.info("Final result: {}", proxy.toString());
                        } catch (ResourceAccessException e) {
                            log.info("IP {}:{} is corrupted!", proxy.getHost(), proxy.getPort());
                            proxy.setCorrupted(true);
                            addresses.put(proxy.getCountryCode(), proxy);
                        }
                    },
                    throwable -> log.error(String.format("Exception caught while trying to fill map: %s", throwable.getCause())));

关于java - 如何在同一线程中并行通量内运行单声道,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59652807/

相关文章:

Java写入关闭的输出流不抛出ioexception

java - 创建 Realm 对象时出现问题( IllegalStateException : Cannot modify managed objects outside of a write transaction)

rxjs - 当第一个作为参数传递的 observables 完成时完成 combineLatest 的变体

reactive-programming - 使用响应式(Reactive) Couchbase java 驱动程序进行批处理

java - 如何将基本 URI 设置为 Spring WebClient

java - 可能导致错误的原因是什么

java - 如何将初始参数放入小程序中?

android - 订阅 Android UI 线程

java - 网络流量中的网络过滤器是什么样的

java - 使用 Spring WebFlux 的 webclient 在 Mono 上有条件地重复或重试