java - 如何并行处理文件的行?

标签 java future executorservice

我想读取一个大文件,处理每一行并将结果插入数据库。 我的目标是并行处理线条,因为每个过程都是一项长时间运行的任务。因此我希望一个线程继续读取,多个线程继续处理,一个线程继续插入 block 到 db。

我把它分解如下:

1) 按顺序逐行读取文件(简单)

2) 将每一行发送到线程池(3 个线程),因为处理是长时间运行的任务。在线程池繁忙时阻止进一步的行读取。

3) 将每个处理过的行从每个 theadpool 写入 StringBuffer

4) 监控缓冲区大小,并将结果以 block 的形式写入数据库(例如,每 1000 个条目)

ExecutorService executor = Executors.newFixedThreadPool(3);

StringBuffer sb = new StringBuffer();

String line;
AtomicInteger count = new AtomicInteger(0);
while ((line = reader.read()) != null) {
    count.getAndIncrement();
    Future<String> future = executor.submit(() -> {
        return processor.process(line);
    });

    //PROBLEM: this blocks until the future returns
    sb.append(future.get());

    if (count.get() == 100) {
        bufferChunk = sb;
        count = new AtomicInteger(0);
        sb = new StringBuffer();

        databaseService.batchInsert(bufferChunk.toString());
    }
}

问题:

  • future.get() 将始终阻塞读取器,直到一个 future 返回结果

  • 缓冲区“监控”可能没做好

可能我没有以正确的方式这样做。但我怎样才能做到这一点?

旁注:文件大小约为 10GB,因此我无法先将整个文件读入内存来准备并行任务。

最佳答案

我发现以下解决方案很优雅。它只是众多可能中的一种,但它在概念上很简单并且

  • 它会限制读取,
  • 仅累积最少量的状态以在最后报告就绪
  • 不需要显式处理线程

我只是将实际的测试方法与完整的测试设置和可用的辅助数据结构放在一起 dedicated GitHub repo :

private final AtomicInteger count = new AtomicInteger();

private final Consumer<String> processor = (value) -> {
    count.incrementAndGet();
};

@Test
public void onlyReadWhenExecutorAvailable() throws Exception {

    Executor executor = Executors.newCachedThreadPool();

    CompletableFuture<Void> done = CompletableFuture.completedFuture(null);
    for (Semaphore semaphore = new Semaphore(CONCURRENCY_LEVEL); ; ) {
        String value = reader.read();
        if (value == null) {
            break;
        }

        semaphore.acquire();

        CompletableFuture<Void> future = CompletableFuture.completedFuture(value)
            .thenAcceptAsync(v -> {
                processor.accept(v);
                semaphore.release();
            }, executor);

        done = done.thenCompose($ -> future);
    }
    done.get();

    assertEquals(ENTRIES, count.get());
}

关于java - 如何并行处理文件的行?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50374271/

相关文章:

java - 更改我的正则表达式后,在线程 "main"java.lang.NullPointerException 中出现异常

java - ant getResourceAsStream null 但在 Eclipse 中正常

c++ - std::async "store"如何成为任意异常?

java - 等待 kafkaTemplate 待处理的 future

executorservice - 一段时间后调用 ExecutorService

java - 如何将使用 eclipse 和 tomcat 构建的 Java EE 项目部署到万维网?

java - 返回按钮帮助

Flutter/Dart - 加载 future 的 CircularProgressIndicator 的透明背景?

java - 有没有办法在 Hazelcast 中使用自定义执行程序?

java - shutdown 和 awaitTermination 哪个第一次调用有什么区别?