java - Akka 循环赛池

标签 java akka

我正在从数据库表或平面文件中获取 4000 万行(数据行)。我正在处理 groovy 的每一行 通过每行创建一个工作人员进行评估(因此在本例中我将创建 4000 万个工作人员)。 这里我使用AKKA的循环池。这种做法正确吗??如果不是,最好的方法是什么。

public class AkkaWay {

public static void main(String[] args) {
    System.out.println("************************** start *****************************");
    new AkkaWay().run();
    System.out.println("************************** END *****************************");
}

private void run() {
    ActorSystem system = ActorSystem.create("CalcSystem");
    ActorRef master = system.actorOf(Master.createMaster(), "master");
    master.tell(new Calculate(), ActorRef.noSender());
    while(!master.isTerminated()){
    try{
     //System.out.println("*********************************** Thread *************************************************");
     Thread.sleep(100);
    }catch(Exception e){
     e.printStackTrace();
    }
   }
}
}

public class Master extends UntypedActor 
{
private final Time time = new Time();

public Master() {
    workerRouter = this.getContext().actorOf(Worker.createWorker().withRouter(new RoundRobinRouter(4)),"workerRouter");
}

@Override
public void onReceive(Object message) {
    if (message instanceof Calculate) {
        time.start();
        processMessages();
    } else if (message instanceof Result) {
        list.add(((Result) message).getFactorial());
        if (list.size() == messages)
            end();
    } else {
        unhandled(message);
    }
}

private void processMessages() 
{
    //read data from file/database (40 millions rows )
    for (int i = 0; i < rows; i++) {

        workerRouter.tell(new Work(), getSelf());// each row send
    }
}

private void end() {
    time.end();
    System.out.println("Done: " + time.elapsedTimeMilliseconds()+"["+time.elapsedTimeMilliseconds()/1000+" secs]");
    getContext().system().shutdown();
}

public static Props createMaster() {
    return Props.create(Master.class, new ArraySeq<Object>(0));
}
}

public class Worker extends UntypedActor 
{

@Override
public void onReceive(Object message) {
    if (message instanceof Work) {
        //evaluate Groovy expression
        getSender().tell(new Result(bigInt), getSelf());
    } else
        unhandled(message);
}

public static Props createWorker() {
    return Props.create(Worker.class, new ArraySeq<Object>(0));
}
}

最佳答案

我认为这不是最好的方法,因为在最坏的情况下,可能会导致您在内存中加载 40 个 mio 行,并在 Actors 邮箱中等待。

使用 akka-stream 可以更好地解决此类问题,一次只加载所需的数据。

关于java - Akka 循环赛池,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43061940/

相关文章:

java - Akka 消息和参与者的命名约定

java - 在Java中设置Actor系统的默认ExecutionContext

java - 在派生类中重写基类方法

java - 如何使用 EWS JAVA api 搜索重定向的电子邮件?

java - JOptionPane.showInputDialog() 问题

java - ANTLR NoViableAltException 与 JAVA

java - JSONObject 到 Java 表示

java - Akka 流回调

scala - Play 2.1.1 未加载自定义 Akka 调度程序

java - 如何在 Play 框架 2 应用程序中存储 Akka Actor 列表?