我有一个 Java 类型的 actor,它负责对可能暂时不可用的外部资源进行过滤/重试逻辑。 Actor的字段和常用方法有:
public class MyActorImpl implements MyActor {
private static final long MINWAIT = 50;
private static final long MAXWAIT = 1000;
private static final long DEFAULTWAIT = 0;
private static final double BACKOFFMULTIPLIER = 1.5;
private long updateWait(long currentWait) {
return Math.min(Math.max((long) (currentWait * BACKOFFMULTIPLIER), MINWAIT), MAXWAIT);
}
// mutable
private long opWait = DEFAULTWAIT;
private final Queue<OpInput> opBuffer = new ArrayDeque<>();
// called from external actor
public void operation(OpInput opInput) {
operation(opInput, DEFAULTWAIT);
}
// called internally
public void operation(OpInput opInput, long currentWait);
}
Actor 有多个操作,它们都或多或少具有相同的重试/缓冲逻辑;每个操作都有自己的 [op]Wait
和 [op]Buffer
字段。
- 父 Actor 调用
void operation(OpInput opInput)
- 前面的方法调用
void operation(OpInput opInput, long currentWait)
使用DEFAULTWAIT
对于第二个参数 - 如果
currentWait
参数不等于opWait
然后输入存储在opBuffer
中,否则将输入发送到外部资源。 - 如果外部资源返回成功,则
opWait
设置为DEFAULTWAIT
, 以及opBuffer
的内容通过operation(opInput)
发回方法。如果外部资源(或更可能是网络)返回错误,那么我更新opWait = updateWait(opWait)
和时间表operation(opInput, opWait)
在 actor 系统调度器上使用延迟opWait
女士。
即我正在使用 actor 系统调度程序来实现指数退避;我正在使用 currentWait
参数来标识我正在重试的消息,并缓冲其他消息,直到主要消息被外部资源成功处理。
问题是如果预定operation(opInput, currentWait)
消息丢失后我将永远缓冲消息,因为 currentWait == opWait
守卫将对所有其他消息失败。我可以使用类似 spring-retry 的东西实现指数退避,但我没有看到合并操作的重试循环的方法,这意味着我可以在每个重试循环中使用一个线程(而使用 actor 系统的调度程序不会给系统带来更多压力).
我正在寻找一种容错能力更强的方法来在参与者和外部资源之间的接口(interface)上实现缓冲和指数退避,而不必为任务分配太多资源。
最佳答案
如果我对你的理解是正确的,如果唯一的问题是丢失预定消息,你为什么不使用像 Reliable Proxy Pattern 这样的东西呢?对于该特定消息,然后如果失败 opWait = DEFAULTWAIT;
关于您的代码,我得到了一些信息,当您说 public void operation(OpInput opInput)
被外部调用时,我不明白您的意思。您的意思是此方法正在与网络交互,而网络使用的资源有时不可用?
如果可以的话,我建议一个替代方案。据我了解,你的主要问题是你有一个有时不可用的资源,所以你有某种队列/缓冲区,你用某种等待逻辑来实现,这样一旦消息再次可用,消息就会被处理,不幸的是涉及一些可能丢失并导致无限等待的消息。我认为你可以使用带有超时的 Futures 来实现你想要的。如果 future 在一定时间内未完成,则重试,最多说 3 次重试。您甚至可以根据服务器负载和完成消息所需的时间来调整此时间。希望对您有所帮助。
关于java - 无限期地等待可能永远不会到达的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32555552/