java - 使用 Actor Supervised ,如果发生失败,如何以定义的时间间隔重试相同的消息定义的次数

标签 java akka actor akka-actor

我正在关注akka.io容错的代码http://doc.akka.io/docs/akka/current/java/fault-tolerance.html。我将此代码作为引用。我的要求如下: 假设 actor 因一条消息而崩溃 并由他的主管重新启动。然后他开始处理下一个 他的邮箱里的消息。导致崩溃的消息是 'dropped'。但是我想在特定次数(假设 3 次)内处理相同的操作,并在它们之间定义一个时间间隔(假设 1 秒)。如何使用 akka 监督来做到这一点。实际上,通过 actor,我试图检查特定服务 api 是否正常工作(即给出一些异常)。因此,如果特定尝试出现任何异常(假设未找到 404),则将消息重新发送给失败的工作人员 直到达到supervisorStrategy指定的maxNrOfRetries。如果工作人员失败了“maxNrOfRetries”次,则只需记录“此 xx 消息达到的最大尝试次数”。我将如何在 java 中执行此操作。

我的主管类(class):

public class Supervisor extends UntypedActor {


 private static SupervisorStrategy strategy =

 new OneForOneStrategy(3, Duration.create("1 minute"),
  new Function<Throwable, Directive>() {
    @Override
    public Directive apply(Throwable t) {
      if (t instanceof Exception) {
        return restart();
      }else if (t instanceof IllegalArgumentException) {
        return stop();
      } else {
        return escalate();
      }
    }
  });

 @Override
 public SupervisorStrategy supervisorStrategy() {
 return strategy;


}
public void onReceive(Object o) {
if (o instanceof Props) {
  getSender().tell(getContext().actorOf((Props) o), getSelf());
} else {
  unhandled(o);
}


 }
}

子类:

public class Child extends UntypedActor {


  public void onReceive(Object o) throws Exception {
if (o instanceof String) {
Object response = someFunction( (String) message);//this function returns either successfull messgae as string or exception
if(response instanceOf Exception) {
     throw (Exception) response;
   } 
   else
     getSender().tell(response, getSelf())
}else {
  unhandled(o);
}


}

}

创建 Actor :

Props superprops = Props.create(Supervisor.class);
ActorRef supervisor = system.actorOf(superprops, "supervisor");
ActorRef child = (ActorRef) Await.result(ask(supervisor,
Props.create(Child.class), 5000), timeout);
child.tell("serVice_url", ActorRef.noSender());

对于service_url,如果发生故障,我想重复该过程。但它没有发生。如果将 creatng actor 中的下一行写为 child.tell("serVice_url_2", ActorRef.noSender()); 那么该行将被执行,但我想在特定的时间(假设 3 次)内处理相同的操作(发生失败),并在它们之间定义一个时间间隔。 请指导我实现这一目标。

最佳答案

我认为我已经开发出一种方法。尽管我仍然需要在生产级别进行测试。我在下面写下答案,因为它可能对尝试实现相同目标的人有所帮助。如果有人找到更好的方法,那么欢迎他/她。 在这里要提一下,通过这种方法Supervisor 在一个时间范围内处理相同的操作(带有发生失败的消息)特定次数(假设 3 次)。我无法定义它们之间的间隔。 这是代码。 Supervisor 类。

public class MyUntypedActor extends UntypedActor {
//here I have given Max no retrilas as 10.I will controll this number from logic as per my own requirements.But user given number of retrials can not exceed 10.
private static SupervisorStrategy strategy = new AllForOneStrategy(10, Duration.create(5, TimeUnit.MINUTES),
        new Function<Throwable, SupervisorStrategy.Directive>() {
            @Override
            public SupervisorStrategy.Directive apply(Throwable t) {
                if (t instanceof Exception) {
                    //System.out.println("exception" + "*****" + t.getMessage() + "***" + t.getLocalizedMessage());
                    return restart();
                } else if (t instanceof NullPointerException) {
                    return restart();
                } else if (t instanceof IllegalArgumentException) {
                    return stop();
                } else {
                    return escalate();
                }
            }
        });

@Override
public SupervisorStrategy supervisorStrategy() {
    return strategy;
}

public void onReceive(Object o) {
    if (o instanceof Props) {
        getSender().tell(getContext().actorOf((Props) o), getSelf());
    } else {
        unhandled(o);
    }
}
}

我们将在其中编写逻辑的子类。

public class Child extends UntypedActor {
//Through preRestart it will push the message for which exception occured before the restart of the child
@Override
public void preRestart(final Throwable reason, final scala.Option<Object> message) throws Exception {
    System.out.println("reStarting :::" + message.get());
    SetRules.setRemainingTrials(SetRules.remainingTrials + 1);
    getSelf().tell(message.get(), getSender());
};

public void onReceive(Object o) throws Exception {

    if (o instanceof Exception) {
        throw (Exception) o;
    } else if (o instanceof Integer) {
    } else if (o.equals("get")) {
        getSender().tell("get", getSelf());
    } else if (o instanceof String) {

        try {
            // here either we can write our logic directly or for a better
            // approach can call a function where the logic will be excuted.
            getSender().tell("{\"meggase\":\"Succesfull after " + SetRules.remainingTrials + " retrials\"}",
                    getSelf());
        } catch (Exception ex) {
            if (SetRules.remainingTrials == SetRules.noOfRetries) {
                getSender().tell("{\"meggase\":\"Failed to connect after " + SetRules.noOfRetries + " retrials\"}",
                        getSelf());
            } else {
                Exception value1 = ex;
                throw (Exception) value1;
            }
        }
    } else {
        unhandled(o);
    }
}
}

SetRules类提供了有关用户的信息,提供noOfReTrials,并通过remainingTrials存储每个重试状态的重试次数信息

public class SetRules {

public static int noOfRetries;
public static int remainingTrials;

public SetRules(int noOfRetries, int remainingTrials) {
    super();
    SetRules.noOfRetries = noOfRetries;
    SetRules.remainingTrials = remainingTrials;
}

public int getRemainingTrials() {
    return remainingTrials;
}

public static void setRemainingTrials(int remainingTrials) {
    SetRules.remainingTrials = remainingTrials;
}
}

现在让我们创建 Actor 。

Props superprops = Props.create(MyUntypedActor.class);
SetRules setRules=new SetRules(3,0);
ActorSystem system = ActorSystem.create("helloakka");
ActorRef supervisor = system.actorOf(superprops, "supervisor");
ActorRef child = (ActorRef) Await.result(ask(supervisor, Props.create(Child.class), 5000), Duration.create(5, "minutes"));
Future<Object> future = Patterns.ask(child, service_Url, new Timeout(Duration.create(5, "minutes")));
Object result =  Await.result(future, Duration.create(5, "minutes"));
System.out.println(result);

关于java - 使用 Actor Supervised ,如果发生失败,如何以定义的时间间隔重试相同的消息定义的次数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42346361/

相关文章:

java - 将JavaCV作为依赖项添加到Raspberry PI上运行?

java - 我可以创建一个生成多个可执行 Jars 的 Eclipse 项目吗?

scala - Akka Actors简单解释

scala - 使用 Akka Actors 处理多个 TCP 连接

multithreading - Akka:如何确保已收到消息?

java - MVC设计困惑

java - 阻止标题 View 在 ListView 中被单击?

java-8 - Akka actor 和 Java8 CompletableFuture 可以安全地结合在一起吗?

architecture - 实时数据处理架构

scala - 通过将带有参数的方法传递给 "become"来更改 Akka actor 状态