java - 顺序和并行处理

标签 java multithreading synchronization locking

我有一个生产者和许多消费者。

  • 生产者速度快,产出多
  • 具有相同值的 token 需要顺序处理
  • 具有不同值的 token 必须并行处理
  • 创建新的 Runnable 将非常昂贵,而且生产代码可以使用 100k 的 token (为了创建一个 Runnable,我必须向构造函数传递一些复杂的构建对象)

我可以使用更简单的算法获得相同的结果吗?嵌套带有可重入锁的同步块(synchronized block)似乎有点不自然。 您可能会注意到任何竞争条件吗?

更新:我发现的第二个解决方案是使用 3 个集合。一个用于缓存生产者结果,第二个用于阻塞队列,第三个使用列表来跟踪正在进行的任务。又有点复杂。

我的代码版本

import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.locks.ReentrantLock;

public class Main1 {
    static class Token {
        private int order;
        private String value;
        Token() {

        }
        Token(int o, String v) {
            order = o;
            value = v;
        }

        int getOrder() {
            return order;
        }

        String getValue() {
            return value;
        }
    }

    private final static BlockingQueue<Token> queue = new ArrayBlockingQueue<Token>(10);
    private final static ConcurrentMap<String, Object> locks = new ConcurrentHashMap<String, Object>();
    private final static ReentrantLock reentrantLock = new ReentrantLock();
    private final static Token STOP_TOKEN = new Token();
    private final static List<String> lockList = Collections.synchronizedList(new ArrayList<String>());

    public static void main(String[] args) {
        ExecutorService producerExecutor = Executors.newSingleThreadExecutor();
        producerExecutor.submit(new Runnable() {
            public void run() {
                Random random = new Random();
                    try {
                        for (int i = 1; i <= 100; i++) {
                            Token token = new Token(i, String.valueOf(random.nextInt(1)));

                            queue.put(token);
                        }

                        queue.put(STOP_TOKEN);
                    }catch(InterruptedException e){
                        e.printStackTrace();
                    }
                }
        });

        ExecutorService consumerExecutor = Executors.newFixedThreadPool(10);
        for(int i=1; i<=10;i++) {

            // creating to many runnable would be inefficient because of this complex not thread safe object
            final Object dependecy = new Object(); //new ComplexDependecy()
            consumerExecutor.submit(new Runnable() {
                public void run() {
                    while(true) {
                        try {
                            //not in order


                            Token token = queue.take();
                            if (token == STOP_TOKEN) {
                                queue.add(STOP_TOKEN);
                                return;
                            }


                            System.out.println("Task start" + Thread.currentThread().getId() + " order "  + token.getOrder());

                            Random random = new Random();
                            Thread.sleep(random.nextInt(200)); //doLongRunningTask(dependecy)
                            lockList.remove(token.getValue());

                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    }
            }});

    }
}}

最佳答案

您可以预先创建一组 Runnables,它将挑选传入的任务( token )并根据它们的顺序值将它们放入队列中。

正如评论中所指出的,保证具有不同值的 token 将始终并行执行(总而言之,您至少受到盒子中物理核心的限制) .但保证相同顺序的代币按到达顺序执行。

示例代码:

/**
 * Executor which ensures incoming tasks are executed in queues according to provided key (see {@link Task#getOrder()}).
 */
public class TasksOrderingExecutor {

    public interface Task extends Runnable {
        /**
         * @return ordering value which will be used to sequence tasks with the same value.<br>
         * Tasks with different ordering values <i>may</i> be executed in parallel, but not guaranteed to.
         */
        String getOrder();
    }

    private static class Worker implements Runnable {

        private final LinkedBlockingQueue<Task> tasks = new LinkedBlockingQueue<>();

        private volatile boolean stopped;

        void schedule(Task task) {
            tasks.add(task);
        }

        void stop() {
            stopped = true;
        }

        @Override
        public void run() {
            while (!stopped) {
                try {
                    Task task = tasks.take();
                    task.run();
                } catch (InterruptedException ie) {
                    // perhaps, handle somehow
                }
            }
        }
    }

    private final Worker[] workers;
    private final ExecutorService executorService;

    /**
     * @param queuesNr nr of concurrent task queues
     */
    public TasksOrderingExecutor(int queuesNr) {
        Preconditions.checkArgument(queuesNr >= 1, "queuesNr >= 1");
        executorService = new ThreadPoolExecutor(queuesNr, queuesNr, 0, TimeUnit.SECONDS, new SynchronousQueue<>());
        workers = new Worker[queuesNr];
        for (int i = 0; i < queuesNr; i++) {
            Worker worker = new Worker();
            executorService.submit(worker);
            workers[i] = worker;
        }
    }

    public void submit(Task task) {
        Worker worker = getWorker(task);
        worker.schedule(task);
    }

    public void stop() {
        for (Worker w : workers) w.stop();
        executorService.shutdown();
    }

    private Worker getWorker(Task task) {
        return workers[task.getOrder().hashCode() % workers.length];
    }
}

关于java - 顺序和并行处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34684663/

相关文章:

java - java中的线图

java - 避免线程安全内存供应商中的 volatile 读取

java - 如何使一个功能与其他功能同步,但其他功能之间不同步?

mysql - 同步数据的最佳方式: MySQL -> SQL Server

synchronization - Vulkan 中统一缓冲区的最佳实践

java - MongoDB 在多线程中插入

java.lang.IllegalArgumentException : No view found for id 0x7f0c0098

C#:使用虚拟对象初始化事件处理程序

c - 为什么创建窗口没有反应?

java - 使用 entityManager 从数据库中获取列表