java - CompletionService 中的重试策略

标签 java asynchronous executorservice retrypolicy completion-service

我需要配置重试策略以通过 ExecutorCompletionService 调用 API。

示例代码:

public void func() throws Exception{
    ExecutorService executorService = Executors.newFixedThreadPool(5);
    CompletionService<String> completionService = new ExecutorCompletionService<String>(executorService);
    List<Future<String>> list = new ArrayList<Future<String>>();
    for(int i=0; i<10; i++) {
        AsyncTest asyncTest = new AsyncTest();
        Future<String> futureString = completionService.submit(asyncTest);
        list.add(futureString);
    }
    while (list.size() > 0) {
        Future<String> futureResponse = completionService.take();
        System.out.println(futureResponse.get());
        list.remove(futureResponse);
        }
    executorService.shutdown();
}
public class AsyncTest implements Callable<String> {
       public String call() throws Exception {
              //returns a response from api call
              //this is a network call and throws TimeoutException
       }
}

对于调用 API 时抛出的 TimeoutException 实现重试策略的最佳方法是什么?

最佳答案

我增强了您的 AsyncTest 类:

public class RetryableAsyncTest implements Callable<RetryableAsyncTest> {

   private final String  _name;
   private /* */ String  _value;
   private /* */ boolean _timeouted;
   private /* */ int     _retryCount;

   public RetryableAsyncTest( String name ) {
      _name = name;
   }

   @Override
   public RetryableAsyncTest call() throws Exception {
      try {
         ++_retryCount;
         _timeouted = false;
         //-------- Begin of functionnal code
         if( Math.random() > 0.5 ) {      // Simulation of
            throw new TimeoutException(); // timeout condition
         }
         _value = "computation result";
         //-------- End of functionnal code
      }
      catch( final TimeoutException x ) {
         _timeouted = true;
      }
      return this;
   }

   public String getName() {
      return _name;
   }

   public String getValue() {
      return _value;
   }

   public boolean isTimeouted() {
      return _timeouted;
   }

   public int getRetryCount() {
      return _retryCount;
   }
}

RetryableAsyncExecutor 类:

public class RetryableAsyncExecutor {

   private final ExecutorService                       _exec;
   private final CompletionService<RetryableAsyncTest> _comp;

   public RetryableAsyncExecutor( int nThreads ) {
      _exec = Executors.newFixedThreadPool( nThreads );
      _comp = new ExecutorCompletionService<>( _exec );
   }

   public void submit( RetryableAsyncTest task ) {
      _comp.submit( task );
   }

   public RetryableAsyncTest get() throws Exception {
      final Future<RetryableAsyncTest> f = _comp.take();
      final RetryableAsyncTest task = f.get();
      if( task.isTimeouted()) {
         _comp.submit( task );
      }
      return task;
   }

   public void shutdown() {
      _exec.shutdown();
   }
}

测试用例:

public class Main {

   public static void main( String[] args ) {
      final int COUNT = 8;
      final RetryableAsyncExecutor re = new RetryableAsyncExecutor( 5 );
      try {
         for( int i = 0; i < COUNT; ++i ) {
            re.submit( new RetryableAsyncTest("Async#"+(i+1)));
         }
         int count = 0;
         while( count < COUNT ) {
            final RetryableAsyncTest task = re.get();
            if( task.isTimeouted()) {
               System.err.printf( "%s: retrying (%d)\n",
                  task.getName(), task.getRetryCount());
            }
            else {
               System.err.printf( "%s: done with '%s'.\n",
                  task.getName(), task.getValue());
               ++count;
            }
         }
      }
      catch( final Throwable t ) {
         t.printStackTrace();
      }
      re.shutdown();
      System.exit( 0 );
   }
}

执行日志:

Async#4: done with 'computation result'.
Async#1: done with 'computation result'.
Async#6: retrying (1)
Async#3: done with 'computation result'.
Async#8: done with 'computation result'.
Async#7: retrying (1)
Async#2: done with 'computation result'.
Async#5: retrying (1)
Async#6: done with 'computation result'.
Async#7: done with 'computation result'.
Async#5: retrying (2)
Async#5: done with 'computation result'.

如果您想限制重试次数,此逻辑将发生在 RetryableAsyncExecutor.get() 方法中,作为 _comp.submit( task ) 周围的 if-then-else 条件;

关于java - CompletionService 中的重试策略,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45525466/

相关文章:

java - 如何通过对话框收集用户的输入?

java - 在 oracle.jdbc.pool.OracleDataSource 上设置池属性

java - JAXB 解码 xml @XmlValue 和 @XmlElement

java - Spring 任务异步和延迟

java - 使用Java并发API建模动态数据流的技术

java - 在 Tomcat 中使用 ExecutorService/Futures?

java - 每当您单击它时,都会使用不同的图像重新绘制 JPanel

multithreading - 从线程本身内部取消异步线程

java - ExecutorService.execute() 静默退出 run() 方法

ios - 如何在 swift 3 中专门化通用高阶函数?