java - 处理 100 万条记录的执行器框架

标签 java multithreading file executorservice

我有一个要求,我必须处理一个包含100万条记录的文件并将其保存在 Redis 缓存中。我本来应该使用 redis 管道,但我没有得到任何有关它的信息。这是我的问题:Question

所以我决定使用多线程执行器框架。我是多线程新手 这是我的代码:

@Async
    public void createSubscribersAsync(Subscription subscription, MultipartFile file)throws EntityNotFoundException, InterruptedException, ExecutionException, TimeoutException {

        ExecutorService executorService = Executors.newFixedThreadPool(8);
        Collection<Callable<String>> callables = new ArrayList<>();


        List<Subscriber> cache = new ArrayList<>();
        int batchSize = defaultBatchSize.intValue();

        while ((line = br.readLine()) != null) {
            try {
                Subscriber subscriber = createSubscriber(subscription, line);
                cache.add(subscriber);
                if (cache.size() >= batchSize) {
                    IntStream.rangeClosed(1, 8).forEach(i -> {
                    callables.add(createCallable(cache, subscription.getSubscriptionId()));});
                }
            } catch (InvalidSubscriberDataException e) {
                invalidRows.add(line + ":" + e.getMessage());
                invalidCount++;
            }
        }
        List<Future<String>> taskFutureList = executorService.invokeAll(callables);
        for (Future<String> future : taskFutureList) {
            String value = future.get(4, TimeUnit.SECONDS);
            System.out.println(String.format("TaskFuture returned value %s", value));
        }
    }

    private Callable<String> createCallable(List<Subscriber> cache, String subscriptionId) {

        return new Callable<String>() {

            public String call() throws Exception {

                System.out.println(String.format("starting expensive task thread %s", Thread.currentThread().getName()));
                processSubscribers(cache,subscriptionId);
                System.out.println(String.format("finished expensive task thread %s", Thread.currentThread().getName()));
                return "Finish Thread:" + Thread.currentThread().getName();
            }
        };
    }

    private void processSubscribers(List<Subscriber> cache, String subscriptionId) {
        subscriberRedisRepository.saveAll(cache);
        cache.clear();
    }

这里的想法是我想将一个文件分成一个批处理并使用线程保存该批处理。我创建了 8 个线程池。

这是实现执行器框架的正确方法吗?如果没有,你能帮我解决这个问题吗?感谢您的帮助。

最佳答案

快速修改当前代码以实现要求:

在 while 循环中,一旦当前缓存超过批处理大小,就会在当前缓存中创建一个可调用传递。 重置缓存列表,创建一个新列表并将其指定为缓存。

您正在创建可调用对象列表以批量提交它们,为什么不在创建后立即提交可调用对象呢?这将开始将已读取的记录写入 Redis,同时主线程继续从文件中读取。

 List<Future<String>> taskFutureList = new LinkedList<Future<String>>();
 while ((line = br.readLine()) != null) {
    try {
        Subscriber subscriber = createSubscriber(subscription, line);
        cache.add(subscriber);
        if (cache.size() >= batchSize) {
                    taskFutureList.add(executorService.submit(createCallable(cache,subscription.getSubscriptionId())));
            List<Subscriber> cache = new ArrayList<>();
        }
     } catch (InvalidSubscriberDataException e) {
        invalidRows.add(line + ":" + e.getMessage());
        invalidCount++;
    }
}
//submit last batch that could be < batchSize
if(!cache.isEmpty()){ 
           taskFutureList.add(executorService.submit(createCallable(cache,subscription.getSubscriptionId())));
}

您不必存储单独的可调用列表。

关于java - 处理 100 万条记录的执行器框架,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58079900/

相关文章:

java - 为什么将我的应用程序安装到其他设备后会出现 IndexOutOfBoundsException?

java - Json Gson 期望 Begin Object,但实际上是字符串

c++ - Win32、MFC : Ending threads

c++ - C++中文件数据的动态分配

java - 按下按钮时如何绘制垂直线?

java - Tomcat 上的 Spring 3 JMS

java - java servlet 中多个线程访问的变量是否需要声明为 volatile?

Java 并发 - 内联初始化的非最终字段和安全发布

java - Filewriter 创建一个只读文件

c - 在C中打开文件如何防止读取错误的数据类型