java - 关于 Guava ListenableFuture 的查询

标签 java concurrency guava future

我正在编写一个调用一些外部服务的服务。我使用 future 来表示所有这些外部服务调用的结果。我使用 Guava 库提供的 Futures.successfulAsList() 方法将所有 future 折叠为一个 future。

这是我的代码

List<ListenableFuture<List<T>>> futureList = new ArrayList<>();

for(int id: shardIds) {
    ListeningExecutorService service =
          (ListeningExecutorService) _shardMgr.getExecutorService(id);
    SelectTask task = new SelectTask(_shardMgr.getReadHandle(id), sql, mapper);


    ListenableFuture<List<T>> future = service.submit(task);
    //Add Callback
    Futures.addCallback(future, new ErrorCallBack(task),
            Executors.newFixedThreadPool(1));
    futureList.add(future);
}

ListenableFuture<List<List<T>>> combinedFuture =
        Futures.successfulAsList(futureList);
int timeout = _dbTimeout.get();
List<T> selectResult = new ArrayList<T>();

try {
    List<List<T>> result = combinedFuture.get(timeout, TimeUnit.MILLISECONDS);
    for(List<T> sublist: result) {
        for(T t : sublist) {
            //TODO: Do we want to put a cap on how many results we return here?
            //I think we should
            selectResult.add(t);
        }
    }
}
catch(Exception ex) {
    log.error("******************* Exception in parallelSelect ",ex);
    throw new RuntimeException("Error in parallelSelect");
}

当我的 future 之一(外部服务调用)失败时,会调用 ErrorCallBack 的 onFailure() ,但我仍然无法使用合并的Future.get(timeout, TimeUnit.MILLISECONDS);我在 for(T t : sublist) 行中得到 NullPointerException ...在迭代结果时。

我希望当一个外部服务调用失败时,我不应该通过combinedFuture.get()

我做错了什么吗?我什至尝试从 ErrorCallBack 的 onFailure 方法中抛出异常。

这是ErrorCallBack的实现

private class ErrorCallBack<T> implements FutureCallback<List<T>>  {
    private final SelectTask _task;

    public ErrorCallBack(SelectTask task) {
        _task = task;
    }

    @Override
    public void onFailure(Throwable t) {
        log.error("ErrorCallBack:onFailure(). Enter");
        DBErrorType type = DBErrorType.UNKNOWN;
        try {
            log.error("ErrorCallBack:onFailure(). Exception ",t);
            if(t instanceof InterruptedException || t instanceof CancellationException) {
                type = DBErrorType.UNKNOWN;
            } else if ( t instanceof SQLException || t.getCause() instanceof SQLException) {
                type = DBErrorType.SQL_SYNTAX_ERROR;
            } else if ( t instanceof MySQLSyntaxErrorException || t.getCause() instanceof MySQLSyntaxErrorException) {
                type = DBErrorType.SQL_SYNTAX_ERROR;
            } else if ( t instanceof ExecutionException) {
                type = DBErrorType.SQL_SYNTAX_ERROR;
            } else if (t instanceof TimeoutException) {
                type = DBErrorType.NETWORK_ERROR;
            } else {
                type = DBErrorType.UNKNOWN;
            }
            ShardHandle handle = _task.getShardHandle();
            _shardMgr.reportException(handle, type);
            DBException exception = new DBException(handle.getShardInfo(), type, ErrorSeverity.CRITICAL, t);
            _alertModule.handleAlert(exception.getAlertContext());
        } catch( Exception ex) {
        }
    }

    @Override
    public void onSuccess(List<T> result) {}
}

最佳答案

I expect that when one external service call fails, i should not get past combinedFuture.get()

好吧,不,因为您正在调用 Futures.successfulAsList() ,顾名思义,returns the results of the successful Futures (对于那些失败的则为空)。对于您想要的行为,您应该调用 Futures.allAsList()它为您提供了一个 Future,如果它的任何组件失败,它就会失败。

由于您没有检查结果中的空值,因此您会得到 NPE。

关于java - 关于 Guava ListenableFuture 的查询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21618273/

相关文章:

java - 正则表达式查找方法调用

java - mysql jdbc驱动中的通信链路故障

c# - 在多个任务中(一次?)编辑哈希集是线程安全的吗?

java - 如何在并发事务中实现高性能的对象隔离解决方案

java - 尝试在 Go 中实现 Java Guava sets.difference

java - Wildfly 13 gzip 过滤器属性

java - 如何使用junit测试java中的注释

multithreading - 使用 HT 在一个 Core 上执行的线程之间的数据交换将使用什么?

java - 加载缓存 : RemovalListener Returns null Value

java - getIfPresent 是否被视为 expireAfterAccess 的 get 操作?