java - 如何并行而不是顺序执行多个查询?

标签 java multithreading executorservice callable

我正在查询所有 10 个表以从中获取用户 ID,并将所有用户 ID 加载到 HashSet 中,以便我可以拥有唯一的用户 ID。

到目前为止,是按顺序进行的。我们转到一个表并从中提取所有 user_id 并将其加载到哈希集中,然后加载到第二个和第三个表并继续。

    private Set<String> getRandomUsers() {
        Set<String> userList = new HashSet<String>();

        // is there any way to make this parallel?
        for (int table = 0; table < 10; table++) {
            String sql = "select * from testkeyspace.test_table_" + table + ";";

            try {
                SimpleStatement query = new SimpleStatement(sql);
                query.setConsistencyLevel(ConsistencyLevel.QUORUM);
                ResultSet res = session.execute(query);

                Iterator<Row> rows = res.iterator();
                while (rows.hasNext()) {
                    Row r = rows.next();

                    String user_id = r.getString("user_id");
                    userList.add(user_id);
                }
            } catch (Exception e) {
                System.out.println("error= " + ExceptionUtils.getStackTrace(e));
            }
        }

        return userList;
    }

有什么方法可以使其成为多线程,以便对于每个表,他们可以并行地从我的表中获取数据?最后,我需要 userList 哈希集,它应该包含所有 10 个表中的所有唯一用户 ID。

我正在使用 Cassandra 数据库,并且仅建立一次连接,因此我不需要创建多个连接。

最佳答案

如果您能够使用 Java 8,您可能可以对表列表使用 parallelStream 来执行此操作,并使用 lambda 将表名称扩展为相应的唯一 ID 列表每个表,然后将结果连接到一个散列中。

如果没有 Java 8,我会使用 Google Guava 的可监听 futures 和执行器服务,如下所示:

public static Set<String> fetchFromTable(int table) {
    String sql = "select * from testkeyspace.test_table_" + table + ";";
    Set<String> result = new HashSet<String>();
    // populate result with your SQL statements
    // ...
    return result;
}

public static Set<String> fetchFromAllTables() throws InterruptedException, ExecutionException {
    // Create a ListeningExecutorService (Guava) by wrapping a 
    // normal ExecutorService (Java) 
    ListeningExecutorService executor = 
            MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());

    List<ListenableFuture<Set<String>>> list = 
            new ArrayList<ListenableFuture<Set<String>>>(); 
    // For each table, create an independent thread that will 
    // query just that table and return a set of user IDs from it
    for (int i = 0; i < 10; i++) {
        final int table = i;
        ListenableFuture<Set<String>> future = executor.submit(new Callable<Set<String>>() {
            public Set<String> call() throws Exception {
                return fetchFromTable(table);
            }
        });
        // Add the future to the list
        list.add(future);
    }
    // We want to know when ALL the threads have completed, 
    // so we use a Guava function to turn a list of ListenableFutures
    // into a single ListenableFuture
    ListenableFuture<List<Set<String>>> combinedFutures = Futures.allAsList(list);

    // The get on the combined ListenableFuture will now block until 
    // ALL the individual threads have completed work.
    List<Set<String>> tableSets = combinedFutures.get();

    // Now all we have to do is combine the individual sets into a
    // single result
    Set<String> userList = new HashSet<String>();
    for (Set<String> tableSet: tableSets) {
        userList.addAll(tableSet);
    }

    return userList;
}

Executors 和 Futures 的使用都是 Java 的核心。 Guava 唯一做的就是让我将 Futures 变成 ListenableFutures。请参阅here讨论为什么后者更好。

可能仍然有方法可以提高这种方法的并行性,但是如果您的大部分时间都花在等待数据库响应或处理网络流量上,那么这种方法可能会有所帮助。

关于java - 如何并行而不是顺序执行多个查询?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28776674/

相关文章:

java - 此代码的答案对于每个输入都是错误的,无论它是正确的还是错误的

java - 如何在多线程场景中使用自行生成的_id更新插入mongodb文档/记录

java - JPA使用flush来触发异常并停止执行

java - 如何为两组不同的任务正确使用 CyclicBarrier 的循环行为?

java - java executorService 与 tomcat 中的最大线程数

java - 从 ExecutorService 获取当前正在运行的线程对象

java - ListenableFutureTask/ExecutorService

java - 在 Java 中使用 addAll 超过 Set 的 ConcurrentModification 异常

java - 等待并行 RX 订阅者完成

java - Eclipse 中缺少项目依赖项