java - 无限期地等待可能永远不会到达的消息

标签 java multithreading exception-handling akka message-queue

我有一个 Java 类型的 actor,它负责对可能暂时不可用的外部资源进行过滤/重试逻辑。 Actor的字段和常用方法有:

public class MyActorImpl implements MyActor {
    private static final long MINWAIT = 50;
    private static final long MAXWAIT = 1000;
    private static final long DEFAULTWAIT = 0;
    private static final double BACKOFFMULTIPLIER = 1.5;

    private long updateWait(long currentWait) {
        return Math.min(Math.max((long) (currentWait * BACKOFFMULTIPLIER), MINWAIT), MAXWAIT);
    }

    // mutable
    private long opWait = DEFAULTWAIT;
    private final Queue<OpInput> opBuffer = new ArrayDeque<>();

    // called from external actor
    public void operation(OpInput opInput) {
        operation(opInput, DEFAULTWAIT);
    }

    // called internally
    public void operation(OpInput opInput, long currentWait);
}

Actor 有多个操作,它们都或多或少具有相同的重试/缓冲逻辑;每个操作都有自己的 [op]Wait[op]Buffer字段。

  1. 父 Actor 调用void operation(OpInput opInput)
  2. 前面的方法调用void operation(OpInput opInput, long currentWait)使用 DEFAULTWAIT对于第二个参数
  3. 如果currentWait参数不等于 opWait然后输入存储在opBuffer中,否则将输入发送到外部资源。
  4. 如果外部资源返回成功,则opWait设置为 DEFAULTWAIT , 以及 opBuffer 的内容通过 operation(opInput) 发回方法。如果外部资源(或更可能是网络)返回错误,那么我更新 opWait = updateWait(opWait)和时间表 operation(opInput, opWait)在 actor 系统调度器上使用延迟 opWait女士。

即我正在使用 actor 系统调度程序来实现指数退避;我正在使用 currentWait参数来标识我正在重试的消息,并缓冲其他消息,直到主要消息被外部资源成功处理。

问题是如果预定operation(opInput, currentWait)消息丢失后我将永远缓冲消息,因为 currentWait == opWait守卫将对所有其他消息失败。我可以使用类似 spring-retry 的东西实现指数退避,但我没有看到合并操作的重试循环的方法,这意味着我可以在每个重试循环中使用一个线程(而使用 actor 系统的调度程序不会给系统带来更多压力).

我正在寻找一种容错能力更强的方法来在参与者和外部资源之间的接口(interface)上实现缓冲和指数退避,而不必为任务分配太多资源。

最佳答案

如果我对你的理解是正确的,如果唯一的问题是丢失预定消息,你为什么不使用像 Reliable Proxy Pattern 这样的东西呢?对于该特定消息,然后如果失败 opWait = DEFAULTWAIT;

关于您的代码,我得到了一些信息,当您说 public void operation(OpInput opInput) 被外部调用时,我不明白您的意思。您的意思是此方法正在与网络交互,而网络使用的资源有时不可用?

如果可以的话,我建议一个替代方案。据我了解,你的主要问题是你有一个有时不可用的资源,所以你有某种队列/缓冲区,你用某种等待逻辑来实现,这样一旦消息再次可用,消息就会被处理,不幸的是涉及一些可能丢失并导致无限等待的消息。我认为你可以使用带有超时的 Futures 来实现你想要的。如果 future 在一定时间内未完成,则重试,最多说 3 次重试。您甚至可以根据服务器负载和完成消息所需的时间来调整此时间。希望对您有所帮助。

关于java - 无限期地等待可能永远不会到达的消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32555552/

相关文章:

java - 如何在不使用访问器的情况下访问父类(super class)中的私有(private)变量?

java - 对 For 循环中的数字求和

c++ - 如何将 ifstream 作为参数传递给 std::thread 函数?

c# - 异常捕获和错误记录

java - 我们可以让 JVM 抛出我们自己的用户定义的异常吗?

java - Spring Roo 创建 Controller 和 View

java - 具有多个 JTable 的大型 JScrollPane — 如何分页

java - 我正在尝试了解java和多线程中的Singleton类

python - 定时器中断线程python

language-agnostic - 处理所有异常的正确时机