java - 当 Source 有大量记录时,Akka 流不会运行

标签 java akka akka-stream reactive-streams

我正在尝试编写一个非常简单的使用 Akka Streams 的介绍性示例。我试图基本上创建一个流,该流将一系列整数作为源,并过滤掉所有非素数整数,生成素数整数流作为其输出。

构造流的类相当简单;为此,我有以下内容。

import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.Flow;
import com.aparapi.Kernel;
import com.aparapi.Range;

import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

public class PrimeStream {
    private final AverageRepository averageRepository = new AverageRepository();
    private final ActorSystem actorSystem;

    public PrimeStream(ActorSystem actorSystem) {
        this.actorSystem = actorSystem;
    }

    public Flow<Integer, Integer, NotUsed> filterPrimes() {
        return Flow.of(Integer.class).grouped(10000).mapConcat(PrimeKernel::filterPrimes).filter( v -> v != 0);
    }
}

当我运行以下测试时,它工作正常。

private final ActorSystem actorSystem = ActorSystem.create("Sys");

@Test
public void testStreams() {
    Flow<Integer, Integer, NotUsed> filterStream = new PrimeStream(actorSystem).filterPrimes();
    Source<Integer, NotUsed> flow = Source.range(10000000, 10001000).via(filterStream);
    flow.runForeach(System.out::println, ActorMaterializer.create(actorSystem));
}

但是,当我通过将测试中的行更改为以下内容来将范围增加 10 倍时,它不再起作用。

Source<Integer, NotUsed> flow = Source.range(10000000, 10010000).via(filterStream);

现在,当测试运行时,不会引发异常,也不会发出警告。它只是运行,然后退出,根本不向控制台显示任何文本。

为了更加确定问题不在于我的素性测试本身,我在不使用 Akka Streams 的情况下在相同的范围内运行了测试,并且运行良好。下面的代码运行没有问题。

@Test
public void testPlain() {
    List<Integer> in = IntStream.rangeClosed(10000000, 10010000).boxed().collect(Collectors.toList());
    List<Integer> out = PrimeKernel.filterPrimes(in);
    System.out.println(out);
}

为了清楚起见,素性测试本身接受一个整数列表,如果列表中的任何元素不是素数,则将其设置为 0。

正如 @RamonJRomeroyVigil 所建议的,如果我将 mapConcat 部分全部删除,但保留所有内容不变,事实上,会打印出 10,000 个整数。但是,如果我保留所有内容相同,只是将 filterPrimes 替换为仅按原样返回方法参数而不触及它的方法,那么它根本不会在屏幕上打印任何内容。我还尝试在开始的filterPrime中添加一个println来调试它。每当它不打印任何包含调试语句的输出时。因此根本没有尝试调用filterPrimes。

最佳答案

runForeach 返回一个 CompletionStage,因此如果您想查看打印的所有数字,那么您必须在 CompletionStage 上等待,否则测试函数返回并且程序终止,而 CompletionStage 未完成。

示例:

flow.runForeach(System.out::println, ActorMaterializer.create(actorSystem)).toCompletableFuture().join();

关于java - 当 Source 有大量记录时,Akka 流不会运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50264569/

相关文章:

java - 在应用自定义约束 validator 之前验证类型的字段

java - LocalDateTime (Java 8 API) 的 Setter 被调用两次

scala - Scala 如何实现并行性?

java - 如何在 Java 中将元素发送到 Source.actorRef 或 Source.queue

java - 如何在基于 Java 的配置中使用@Autowired?

java - 什么是NullPointerException,我该如何解决?

Scala akka http 服务器 - 打印 POST 消息

akka-stream - 物化值是什么意思,为什么在同一图形的以下逻辑中它不同?

akka - akka Stream 在一个简单的流程中创建了多少个 Actor?

akka-stream - 在项目 react 堆或Akka流中,汇和订户之间的概念区别是什么?