java - 如果未收到数据,请发送并重试数据?

标签 java multithreading design-patterns thread-safety guava

我正在开发一个项目,我需要做以下事情:

  • 将特定套接字上的某些数据发送到另一个系统。我必须在给定的套接字上发送特定的字节数组。每个字节数组都有一个唯一的长地址。
  • 然后使用下面实现的任一 RetryStrategy 继续重试发送相同的数据。
  • 启动一个后台轮​​询器线程,告诉您其他系统是否收到了您发送的数据。如果已收到,我们会将其从 pending 队列中删除,这样就不会重试,如果由于某种原因未收到,我们将使用我们使用的 RetryStrategy 再次尝试发送相同的数据。

例如:如果我们发送了 byteArrayA,其具有唯一的长地址作为 addressA,并且如果它在其他系统中被接收,那么我的轮询器线程将得到此 addressA 作为确认,这意味着它已被接收,所以现在我们可以从待处理队列中删除该地址,以便它不会再次重试。

我有两个RetryStrategy实现了ConstantBackoffExponentialBackoff。所以我想出了下面的模拟器来模拟上述流程。

public class Experimental {
  /** Return the desired backoff delay in millis for the given retry number, which is 1-based. */
  interface RetryStrategy {
    long getDelayMs(int retry);
  }

  public enum ConstantBackoff implements RetryStrategy {
    INSTANCE;
    @Override
    public long getDelayMs(int retry) {
      return 1000L;
    }
  }

  public enum ExponentialBackoff implements RetryStrategy {
    INSTANCE;
    @Override
    public long getDelayMs(int retry) {
      return 100 + (1L << retry);
    }
  }

  /** A container that sends messages with retries. */    
  private static class Sender {
    private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(20);
    private final ConcurrentMap<Long, Retrier> pending = new ConcurrentHashMap<>();

    /** Send the given (simulated) data with given address on the given socket. */
    void sendTo(long addr, byte[] data, int socket) {
      System.err.println("Sending " + Arrays.toString(data) + "@" + addr + " on " + socket);
    }

    /** The state of a message that's being retried. */
    private class Retrier implements Runnable {
      private final RetryStrategy retryStrategy;
      private final long addr;
      private final byte[] data;
      private final int socket;
      private int retry;
      private Future<?> future;

      Retrier(RetryStrategy retryStrategy, long addr, byte[] data, int socket) {
        this.retryStrategy = retryStrategy;
        this.addr = addr;
        this.data = data;
        this.socket = socket;
        this.retry = 0;
      }

      private synchronized void start() {
        if (future == null) {
          future = executorService.submit(this);
          pending.put(addr, this);
        }
      }

      private synchronized void cancel() {
        if (future != null) {
          future.cancel(true);
          future = null;
        }
      }

      private synchronized void reschedule() {
        if (future != null) {
          future = executorService.schedule(this, retryStrategy.getDelayMs(++retry), MILLISECONDS);
        }
      }

      @Override
      synchronized public void run() {
        sendTo(addr, data, socket);
        reschedule();
      }
    }

   /** 
    * Get a (simulated) verified message address. Just picks a pending 
    * one. Returns zero if none left.
    */      
    long getVerifiedAddr() {
      System.err.println("Pending messages: " + pending.size());
      Iterator<Long> i = pending.keySet().iterator();
      long addr = i.hasNext() ? i.next() : 0;
      return addr;
    }

    /** A polling loop that cancels retries of (simulated) verified messages. */        
    class CancellationPoller implements Runnable {
      @Override
      public void run() {
        while (!Thread.currentThread().isInterrupted()) {
          try {
            Thread.sleep(1000);
          } catch (InterruptedException ex) {
            Thread.currentThread().interrupt();
          }
          long addr = getVerifiedAddr();
          if (addr == 0) {
            continue;
          }
          System.err.println("Verified message (to be cancelled) " + addr);
          Retrier retrier = pending.remove(addr);
          if (retrier != null) {
            retrier.cancel();
          }
        }
      }
    }

    private Sender initialize() {
      executorService.submit(new CancellationPoller());
      return this;
    }

    private void sendWithRetriesTo(RetryStrategy retryStrategy, long addr, byte[] data, int socket) {
      new Retrier(retryStrategy, addr, data, socket).start();
    }
  }

  public static void main(String[] args) {
    Sender sender = new Sender().initialize();
    for (long i = 1; i <= 10; i++) {
      sender.sendWithRetriesTo(ConstantBackoff.INSTANCE, i, null, 42);
    }
    for (long i = -1; i >= -10; i--) {
      sender.sendWithRetriesTo(ExponentialBackoff.INSTANCE, i, null, 37);
    }
  }
}

我想看看上面的代码中是否存在竞争条件或任何线程安全问题?因为在多线程中获得正确的东西是很困难的。

让我知道是否有更好或更有效的方法来做同样的事情。

最佳答案

如果你的代码中没有任何反对第三方库的内容,你可以看看这个https://github.com/nurkiewicz/async-retry 。假设我正确理解了你的问题,这个库允许你处理这类问题,而无需重新发明轮子。它写得很清楚并且有很好的记录,所以我希望你能找到适合你的方法。 :)

关于java - 如果未收到数据,请发送并重试数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42322970/

相关文章:

c++ - 编码和等待多个相互依赖的线程的算法、模式或最佳实践?

java - 如何在不使用 Future.get() 的情况下使 Java 中的 Future 超时,因为它是一个阻塞操作?

java - 带有 url 模式的 Tomcat 问题

windows - 为什么 WaitForSingleObject(INVALID_HANDLE_VALUE, INFINITE) 会阻塞?

C++:应用复合模式

iOS 5 iPhone 突破风格的应用程序设计

java - 如何将 Netbeans 项目导入 Eclipse

java - 三重嵌套循环打印过多的字符

c# - 多线程访问 System.Collection.Concurrent 中的非线程安全值?

c - 并行化斐波那契数列生成器