java - 具有批量和刷新功能的生产者/消费者

标签 java multithreading concurrency queue producer-consumer

我正在尝试编写一个批量邮件服务,它有两种方法:

add(Mail mail):可以发送邮件,由Producers调用

flushMailService():刷新服务。消费者应该获取一个列表,并调用另一个(昂贵的)方法。通常,只有在达到批量大小后才应调用昂贵的方法。

这与这个问题有些相似: Producer/Consumer - producer adds data to collection without blocking, consumer consumes data from collection in batch

可以使用具有超时功能的 poll() 来做到这一点。但是,如果生产者不想等待超时,而是让生产者发送队列中的任何邮件,则应该能够刷新邮件服务。

poll(20, TimeUnit.SECONDS)可以被中断。如果被中断,则无论是否达到批量大小,都应该发送队列中的所有邮件,直到队列为空为止(使用 poll() ,如果队列为空,则立即返回 null 。一旦为空,则被中断的生产者发送的邮件已经发送完毕。然后,生产者应该再次调用阻塞版本的poll,直到被任何其他生产者中断等等。

这似乎适用于给定的实现。

我尝试使用 ExecutorServicesFutures ,但似乎 Future 只能被中断一次,因为它们在第一次中断后被视为取消。因此,我求助于可以多次中断的线程。

目前我有以下似乎可以工作的实现(但使用“原始”线程)。

这是一个合理的方法吗?或者也许可以使用另一种方法?

public class BatchMailService {   
   private LinkedBlockingQueue<Mail> queue = new LinkedBlockingQueue<>();
   private CopyOnWriteArrayList<Thread> threads = new CopyOnWriteArrayList<>();
   private static Logger LOGGER = LoggerFactory.getLogger(BatchMailService.class);

   public void checkMails() {

        int batchSize = 100;
        int timeout = 20;
        int consumerCount = 5;

        Runnable runnable = () -> {
            boolean wasInterrupted = false;

            while (true) {
                List<Mail> buffer = new ArrayList<>();
                while (buffer.size() < batchSize) {
                    try {
                        Mail mail;
                        wasInterrupted |= Thread.interrupted();
                        if (wasInterrupted) {
                            mail = queue.poll(); // non-blocking call
                        } else {
                            mail = queue.poll(timeout, TimeUnit.SECONDS); // blocking call
                        }
                        if (mail != null) {  // mail found immediately, or within timeout
                            buffer.add(mail);
                        } else { // no mail in queue, or timeout reached
                            LOGGER.debug("{} all mails currently in queue have been processed", Thread.currentThread());
                            wasInterrupted = false;
                            break;
                        }
                    } catch (InterruptedException e) {
                        LOGGER.info("{} interrupted", Thread.currentThread());
                        wasInterrupted = true;
                        break;
                    }
                }
                if (!buffer.isEmpty()) {
                    LOGGER.info("{} sending {} mails", Thread.currentThread(), buffer.size());
                    mailService.sendMails(buffer);
                }
            }
        };

        LOGGER.info("starting 5 threads ");
        for (int i = 0; i < 5; i++) {
            Thread thread = new Thread(runnable);
            threads.add(thread);
            thread.start();
        }

    }

    public void addMail(Mail mail) {
        queue.add(mail);
    }

    public void flushMailService() {
        LOGGER.info("flushing BatchMailService");
        for (Thread t : threads) {
            t.interrupt();
        }
    }
}

另一种不中断的方法,但毒丸的变体(Mail POISON_PILL = new Mail())可能如下。当有一个消费者线程时可能效果最好。至少,一枚毒丸,只有一名消费者会继续。

Runnable runnable = () -> {
      boolean flush = false;
      boolean shutdown = false;

      while (!shutdown) {
           List<Mail> buffer = new ArrayList<>();
           while (buffer.size() < batchSize && !shutdown) {
               try {
                   Mail mail;
                   if (flush){
                       mail = queue.poll();
                       if (mail == null) {
                           LOGGER.info(Thread.currentThread() + " all mails currently in queue have been processed");
                           flush = false;
                           break;
                       }
                   }else {
                      mail = queue.poll(5, TimeUnit.SECONDS); // blocking call
                   }

                   if (mail == POISON_PILL){  // flush
                       LOGGER.info(Thread.currentThread() + " got flush");
                       flush = true;
                   }
                   else if (mail != null){
                       buffer.add(mail);
                   }
               } catch (InterruptedException e) {
                   LOGGER.info(Thread.currentThread() + " interrupted");
                   shutdown = true;
               }
           }
           if (!buffer.isEmpty()) {
               LOGGER.info(Thread.currentThread()+"{} sending " + buffer.size()+" mails");
               mailService.sendEmails(buffer);
           }
       }
    };

public void flushMailService() {
    LOGGER.info("flushing BatchMailService");
    queue.add(POISON_PILL);
}

最佳答案

使用信号和等待而不是中断怎么样?

如果需要刷新,生产者会发出邮件并发出信号。 调度程序等待信号或超时,然后继续在消费者线程中发送电子邮件。

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class BatchMailService {

    private LinkedBlockingQueue<Mail> queue = new LinkedBlockingQueue<>();

    public static final int BATCH_SIZE = 100;
    public static final int TIMEOUT = 20;
    public static final int CONSUMER_COUNT = 5;

    private final Lock flushLock = new ReentrantLock();
    private final Condition flushCondition = flushLock.newCondition();

    MailService mailService = new MailService();

    public void checkMails() {

        ExecutorService consumerExecutor = Executors.newFixedThreadPool(CONSUMER_COUNT);

        while (true) {

            try {
                // wait for timeout or for signal to come
                flushLock.lock();
                flushCondition.await(TIMEOUT, TimeUnit.SECONDS);

                // flush all present emails
                final List<Mail> toFLush = new ArrayList<>();
                queue.drainTo(toFLush);

                if (!toFLush.isEmpty()) {
                    consumerExecutor.submit(() -> {
                        LOGGER.info("{} sending {} mails", Thread.currentThread(), toFLush.size());
                        mailService.sendEmails(toFLush);
                    });
                }

            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break; // terminate execution in case of external interrupt
            } finally {
                flushLock.unlock();
            }
        }

    }

    public void addMail(Mail mail) {

        queue.add(mail);

        // check batch size and flush if necessary
        if (queue.size() >= BATCH_SIZE) {

            try {
                flushLock.lock();
                if (queue.size() >= BATCH_SIZE) {
                    flushMailService();
                }
            } finally {
                flushLock.unlock();
            }
        }
    }

    public void flushMailService() {
        LOGGER.info("flushing BatchMailService");
        try {
            flushLock.lock();
            flushCondition.signal();
        } finally {
            flushLock.unlock();
        }
    }

}

关于java - 具有批量和刷新功能的生产者/消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37727746/

相关文章:

c# - 线程和函数参数

java - 我正在尝试在 Android Studio 中实现蓝牙功能,需要一些帮助来解决连接问题

c - 在这种情况下,当多个线程访问同一个变量时是否需要互斥锁?

concurrency - 具有并发垃圾收集器的常见 Lisp 实现

java - ConcurrentHashMap.get() 如何防止脏读?

java - XPages:如何将 Java 日期值放入 ObjectObject 中

java - 如何将java中的电话号码格式化为Android

c - wait() 系统调用 - child 会忽略它吗?

java - Spring - 使用模拟进行单元测试 - 如何在服务单元测试中模拟自定义收集器

java - Spring Data JPA 更新方法