java - 如何在向另一个应用程序发送数据时实现重试策略?

标签 java multithreading design-patterns guava java-failsafe

我正在处理将数据发送到 zeromq 的应用程序。以下是我的应用程序的作用:

  • 我有一个类 SendToZeroMQ 可以将数据发送到 zeromq。
  • 将相同的数据添加到同一类中的 retryQueue,以便在未收到确认时可以稍后重试。它使用具有 maximumSize 限制的 Guava 缓存。
  • 有一个单独的线程从 zeromq 接收对先前发送的数据的确认,如果未收到确认,则 SendToZeroMQ 将重试发送相同的数据数据。如果收到确认,我们会将其从 retryQueue 中删除,这样就无法再次重试。

想法非常简单,我必须确保我的重试策略正常工作,这样我才不会丢失我的数据。这种情况非常罕见,但以防万一我们没有收到确认。

我正在考虑构建两种类型的RetryPolicies,但我无法理解如何在此处构建与我的程序相对应的类型:

  • RetryNTimes:在此它将重试 N 次,每次重试之间有一个特定的 sleep ,之后,它将删除记录。
  • ExponentialBackoffRetry: 在此它将以指数方式不断重试。我们可以设置一些最大重试限制,之后它不会重试并会丢弃记录。

下面是我的 SendToZeroMQ 类,它向 zeromq 发送数据,每 30 秒从后台线程重试一次并启动 ResponsePoller runnable 永远运行:

public class SendToZeroMQ {
  private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5);
  private final Cache<Long, byte[]> retryQueue =
      CacheBuilder
          .newBuilder()
          .maximumSize(10000000)
          .concurrencyLevel(200)
          .removalListener(
              RemovalListeners.asynchronous(new CustomListener(), executorService)).build();

  private static class Holder {
    private static final SendToZeroMQ INSTANCE = new SendToZeroMQ();
  }

  public static SendToZeroMQ getInstance() {
    return Holder.INSTANCE;
  }

  private SendToZeroMQ() {
    executorService.submit(new ResponsePoller());
    // retry every 30 seconds for now
    executorService.scheduleAtFixedRate(new Runnable() {
      @Override
      public void run() {
        for (Entry<Long, byte[]> entry : retryQueue.asMap().entrySet()) {
          sendTo(entry.getKey(), entry.getValue());
        }
      }
    }, 0, 30, TimeUnit.SECONDS);
  }

  public boolean sendTo(final long address, final byte[] encodedRecords) {
    Optional<ZMQSocketInfo> liveSockets = PoolManager.getInstance().getNextSocket();
    if (!liveSockets.isPresent()) {
      return false;
    }
    return sendTo(address, encodedRecords, liveSockets.get().getSocket());
  }

  public boolean sendTo(final long address, final byte[] encodedByteArray, final Socket socket) {
    ZMsg msg = new ZMsg();
    msg.add(encodedByteArray);
    boolean sent = msg.send(socket);
    msg.destroy();
    // adding to retry queue
    retryQueue.put(address, encodedByteArray);
    return sent;
  }

  public void removeFromRetryQueue(final long address) {
    retryQueue.invalidate(address);
  }
}

下面是我的 ResponsePoller 类,它轮询来自 zeromq 的所有确认。如果我们从 zeromq 得到确认,那么我们将从重试队列中删除该记录,这样它就不会被重试,否则它会被重试。

public class ResponsePoller implements Runnable {
  private static final Random random = new Random();

  @Override
  public void run() {
    ZContext ctx = new ZContext();
    Socket client = ctx.createSocket(ZMQ.PULL);
    String identity = String.format("%04X-%04X", random.nextInt(), random.nextInt());
    client.setIdentity(identity.getBytes(ZMQ.CHARSET));
    client.bind("tcp://" + TestUtils.getIpaddress() + ":8076");

    PollItem[] items = new PollItem[] {new PollItem(client, Poller.POLLIN)};

    while (!Thread.currentThread().isInterrupted()) {
      // Tick once per second, pulling in arriving messages
      for (int centitick = 0; centitick < 100; centitick++) {
        ZMQ.poll(items, 10);
        if (items[0].isReadable()) {
          ZMsg msg = ZMsg.recvMsg(client);
          Iterator<ZFrame> it = msg.iterator();
          while (it.hasNext()) {
            ZFrame frame = it.next();
            try {
                long address = TestUtils.getAddress(frame.getData());
                // remove from retry queue since we got the acknowledgment for this record
                SendToZeroMQ.getInstance().removeFromRetryQueue(address);               
            } catch (Exception ex) {
                // log error
            } finally {
              frame.destroy();
            }
          }
          msg.destroy();
        }
      }
    }
    ctx.destroy();
  }
}

问题:

正如您在上面看到的,我正在使用 SendToZeroMQ 类将 encodedRecords 发送到 zeromq,然后每 30 秒重试一次,具体取决于是否我们是否从 ResponsePoller 类得到了确认。

对于每个 encodedRecords 都有一个名为 address 的唯一键,这就是我们将从 zeromq 返回的作为确认的内容。

我如何继续扩展此示例以构建我上面提到的两个重试策略,然后我可以选择在发送数据时要使用的重试策略。我想出了下面的界面,但后来我无法理解我应该如何继续实现这些重试策略并在上面的代码中使用它。

public interface RetryPolicy {
    /**
     * Called when an operation has failed for some reason. This method should return
     * true to make another attempt.
     */
    public boolean allowRetry(int retryCount, long elapsedTimeMs);
}

我可以使用guava-retrying吗?或 failsafe这里是因为这些库已经有很多我可以使用的重试策略?

最佳答案

我无法计算出有关如何使用相关 API 的所有细节,但至于算法,您可以尝试:

  • 重试策略需要为每条消息附加某种状态(至少重试当前消息的次数,可能是当前延迟)。您需要决定 RetryPolicy 是应该保留它本身还是要将它存储在消息中。
  • 代替 allowRetry,您可以使用一种方法来计算下一次重试应该发生的时间(以绝对时间或 future 的毫秒数),这将是上述状态的函数
  • 重试队列应包含有关何时应重试每条消息的信息。
  • 不使用 scheduleAtFixedRate,而是在重试队列中找到具有最低 when_is_next_retry 的消息(可能通过按绝对重试时间戳排序并选择第一个),以及让 executorService 使用 scheduletime_to_next_retry
  • 重新安排自己
  • 对于每次重试,将其从重试队列中拉出,发送消息,使用 RetryPolicy 计算下一次重试的时间(如果要重试)并使用新值插入重试队列 when_is_next_retry(如果RetryPolicy返回-1,可能意味着消息不再重试)

关于java - 如何在向另一个应用程序发送数据时实现重试策略?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42056618/

相关文章:

java - 查看 SharedPreference 变量是否为空

java - 在类数据字段中声明 Scanner 变量时,什么情况下不会发生资源泄漏?

JavaFX:控制台登录 TextArea + 多线程和任务

javascript - 将单例与模型一起使用的缺点

python - 是否有标准的第 3 方 Python 缓存类?

design-patterns - 存储库类中的非 CRUD 操作

java - 通过 testng 执行一个又一个的 Selenium 测试,无需重新启动浏览器

c# - 在多线程代码中读取 .NET Int32 时,我们需要锁定它吗?

python - 在python中,子进程调用父进程的方法而不调用父进程的__init__

java - flash .swf 文件在 java swing 中运行