Java设置多线程处理

标签 java multithreading parallel-processing set

全部

我正在尝试检查包含从 1 到 N 的数字的某些数据集的多线程处理。例如,我想对所有这些数字求和:

1)保存总和(结果)。

public class ResultHolder {

    public static AtomicLong total_time = new AtomicLong(0);

    public static Long sum = 0l;

    public Long getSum() {

        return sum;

    } // END: getSum()

    @PostConstruct
    public void init() {

    } // END: init()

    public void setSum(Long sum) {

        this.sum = sum;

    } // END: setSum()

    public void printSum() {
        System.out.println("Sum is " + sum);
    }

    public void clearSum() {
        sum = 0l;
    }

} // ENDC: ResultHolder

2)处理部分号码集合:

public class SumProcessor {

    private static int global_id = 0;
    final public int processor_id;
    private final ArrayList<Long> numbers;
    private Long processor_sum = 0l;

    @Autowired
    private final ResultHolder sumHoldder = null;

    public SumProcessor(ArrayList<Long> numbers) {

        this.numbers = numbers;

        processor_id = ++global_id;

    } // END: constructor

    public void work() throws Exception {

        long t1 = new java.util.Date().getTime();

        int i = 0;

        try {

            if (numbers == null) throw new Exception("Не удалось получить массив чисел.");
            for (i = 0; i < numbers.size(); i++) {
                Long o = null;
                try {
                    o = numbers.get(i);
                    if (o == null) throw new Exception("no number");
                } catch (Exception e) {
                    throw new Exception("Ошибка извлечения числа из массива: " + e);
                }
                processor_sum += o;
            } // END: for

            if (sumHoldder == null) throw new Exception("No sum holder");

            synchronized (sumHoldder) {
                sumHoldder.setSum(sumHoldder.getSum() + processor_sum);
            }

            long t2 = new java.util.Date().getTime();

            this.sumHoldder.total_time.addAndGet(t2 - t1);

        } catch (Exception e) {

            System.out.println("Work() error (" + i + ") " + e);

        }

        return;

    } //END: method1

    @PostConstruct
    public void init() {

        System.out.println("Initializated B: " + this);

    } //END: method2

    @PreDestroy
    public void destroy() {

        System.out.println("Destroy B: " + this);

    } //END: method3

    @Override
    public String toString() {
        return "" +
                "Processor " + processor_id + " " +
                "contain " + numbers.size() + " " +
                "numbers from " + numbers.get(0) +
                " to " + numbers.get(numbers.size() - 1);

    } //END: toString()

} //END: class SumProcessor

3)非常简单的分析器(计算处理时间)

@Aspect
public class MethodLoggerBasic {

@Pointcut("execution(* *.work(..))")
void around_work() {};

@Around("around_work()")
public void logMethodName(ProceedingJoinPoint joinPoint) throws Throwable {

    long starttime = new Date().getTime();
    joinPoint.proceed();
    long endtime = new Date().getTime();
    long time = endtime - starttime;
    MainApp.time += time;

} // END:
} // ENDC

4)主程序(可以开始线性或并行处理)

public class MainApp {

static AnnotationConfigApplicationContext context;

public static long time = 0l;
public final static int SIZE = 40_000_000;
public final static int DIVIDE_FACTOR = 4;
public static ArrayList<Long>[] numbers = new ArrayList[DIVIDE_FACTOR];

public static ArrayList<SumProcessor> processors = new ArrayList<>();

public static void main(String[] args) throws Exception {

    context = new AnnotationConfigApplicationContext(myConfig.class);

    // form 4 datasets

    int part_size = SIZE / DIVIDE_FACTOR;

    int i;
    int j;

    for (j = 0; j < DIVIDE_FACTOR; j++) {
        numbers[j] = new ArrayList<>();
        for (i = 0; i < (int) part_size; i++) {
            numbers[j].add(((j * part_size) + i + 1l));
        }
    }

    // create 4 processors (bean)

    for (i = 0; i < DIVIDE_FACTOR; i++) {
        SumProcessor bean = context.getBean(SumProcessor.class, numbers[i]);
        if (bean == null) throw new Exception("Error recive bean SumProcessor.class");
        processors.add(bean);
    }

    // creates 4 threads fro processors
    thread_process thread1 = new thread_process();
    thread_process thread2 = new thread_process();
    thread_process thread3 = new thread_process();
    thread_process thread4 = new thread_process();

    ResultHolder a;

    a = context.getBean(ResultHolder.class);

    try {

        boolean isByPool = true; // flag

        time = 0;

        if (isByPool) {

            System.out.println("-------------------");
            System.out.println("Multithread compute");
            System.out.println("-------------------");
            ExecutorService pool = new ThreadPoolExecutor(
                    4,
                    4,
                    0,
                    TimeUnit.MICROSECONDS,
                    new ArrayBlockingQueue<>(4)
            );

            List<Callable<Boolean>> tasks = new ArrayList();

            tasks.add(thread1);
            tasks.add(thread2);
            tasks.add(thread3);
            tasks.add(thread4);

            pool.invokeAll(tasks);

            pool.shutdown();

            pool.awaitTermination(60, TimeUnit.SECONDS);

        } else {

            thread1.start();
            thread2.start();
            thread3.start();
            thread4.start();

            thread1.join();
            thread2.join();
            thread3.join();
            thread4.join();

        }

        a.printSum();
        a.clearSum();

        System.out.println("total time is " + a.total_time);
        System.out.println("basic time is " + MainApp.time);

        System.out.println("-------------");
        System.out.println("Single thread");
        System.out.println("-------------");

        ArrayList<Long> numbers_tolal = new ArrayList<>();
        for (i = 0; i < SIZE; i++) {
            numbers_tolal.add((i + 1l));
        }

        SumProcessor sumProcessor = context.getBean(SumProcessor.class, numbers_tolal);

        a.total_time.set(0l);
        time = 0l;

        sumProcessor.work();

        a.printSum();

        System.out.println("total time is " + a.total_time);
        System.out.println("basic time is " + MainApp.time);

    } catch (Exception e) {

        throw new Exception("MainApp error: " + e);

    }

    context.close();

} // END: main

} // END: class MainApp

5)线程进程:

公共(public)类 thread_process 扩展了 Thread 实现 Callable、Runnable {

static int index = 0;

@Override
public void run() {

    try {

        SumProcessor next = MainApp.processors.get(index++);

        if (next == null) {

            System.out.println("Нет процессора");

            System.exit(-1);

        }

        next.work();

        System.out.println("Thread " + this + " complete!");

    } catch (Exception e) {

        System.out.println("Error in thread " + this + ": " + e);

    }

} //END: run()

@Override
public Boolean call() throws Exception {

    run();

    return true;

} //END: call()
}; //END: class thread_process

输出为:

Initializated B: Processor 1 contain 10000000 numbers from 1 to 10000000
Initializated B: Processor 2 contain 10000000 numbers from 10000001 to 20000000
Initializated B: Processor 3 contain 10000000 numbers from 20000001 to 30000000
Initializated B: Processor 4 contain 10000000 numbers from 30000001 to 40000000
-------------------
Multithread compute
-------------------
Thread Thread[Thread-3,5,main] complete!
Thread Thread[Thread-4,5,main] complete!
Thread Thread[Thread-2,5,main] complete!
Thread Thread[Thread-1,5,main] complete!
Sum is 800000020000000
total time is 11254
basic time is 11254
-------------
Single thread
-------------
Initializated B: Processor 5 contain 40000000 numbers from 1 to 40000000
Sum is 800000020000000
total time is 6995
basic time is 6995

有没有一种方法可以使其并行比线性更快?或者我也许不需要 fork 这个任务?或者也许我的分析器不太好......

GitHub project

最佳答案

您正在尝试使用多线程执行顺序任务,这不是多线程的正确使用。在这里,您有一个资源需要执行一些工作。您正在使用多个线程来分配该工作,但同时,当另一个线程正在使用该资源时,您会阻塞一个线程。那么,如果您不希望工作线程并行访问资源,为什么首先要有工作线程呢?

如果没有必要,您可以删除数据集的 Set 实现并使用 List 或 Arrays,您可以在其中使用索引访问元素,而不会阻塞工作线程。

<小时/>

更新 1:只需在 pool.shutdown() 调用后添加一行即可。

pool.shutdown(); // starts thread shutdown, or force execution of worker threads
pool.awaitTermination(60, TimeUnit.SECONDS); // blocks main thread until thread pool finishes
// ...
// now you can do your single thread task

此外,不要创建太多线程,因为单个线程的速度足以处理数百万个数组元素。

<小时/>

更新 2:所以,我不知道为什么,但将单个线程放在 try block 之外似乎得到了预期的结果。

public class MainApp {static AnnotationConfigApplicationContext context;

    public static long time = 0;
    public final static int SIZE = 28_000_000;
    public final static int DIVIDE_FACTOR = 4;
    public static ArrayList<Long>[] numbers = new ArrayList[DIVIDE_FACTOR];

    public static ArrayList<SumProcessor> processors = new ArrayList<>();

    public static void main(String[] args) throws Exception {

        context = new AnnotationConfigApplicationContext(AppConfig.class);

        ResultHolder a = context.getBean(ResultHolder.class);

        // form 4 datasets

        int part_size = SIZE / DIVIDE_FACTOR;

        int i;
        int j;

        for (j = 0; j < DIVIDE_FACTOR; j++) {
            numbers[j] = new ArrayList<>(part_size);
            for (i = 0; i < (int) part_size; i++) {
                numbers[j].add(((j * part_size) + i + 1l));
            }
        }

        // create 4 processors (bean)

        for (i = 0; i < DIVIDE_FACTOR; i++) {
            SumProcessor bean = context.getBean(SumProcessor.class, numbers[i]);
            if (bean == null) throw new Exception("Error receive bean SumProcessor.class");
            processors.add(bean);
        }

        // creates 4 threads fro processors
        thread_process thread1 = new thread_process();
        thread_process thread2 = new thread_process();
        thread_process thread3 = new thread_process();
        thread_process thread4 = new thread_process();

        try {
            boolean isByThread = true; // flag
            time = 0;
            System.out.println("-------------------");
            System.out.println("Multithread compute");
            System.out.println("-------------------");
            ExecutorService pool = new ThreadPoolExecutor(
                    4,
                    4,
                    0,
                    TimeUnit.MICROSECONDS,
                    new LinkedBlockingDeque<>(4) // or ArrayBlockingDeque<>(4)
            );
            List<Callable<Boolean>> tasks = new ArrayList();
            tasks.add(thread1);
            tasks.add(thread2);
            tasks.add(thread3);
            tasks.add(thread4);
            List<Future<Boolean>> futures = pool.invokeAll(tasks);
            pool.shutdown();
            pool.awaitTermination(60, TimeUnit.SECONDS);
            System.out.println("Time is: " + time);

            a.printSum();
            a.clearSum();
            time = 0;

        } catch (Exception e) {
            throw new Exception("MainApp error: " + e);

        } // <---- moved single thread out of try block

        ArrayList<Long> numbers_total = new ArrayList<>(SIZE);
        for (i = 0; i < SIZE; i++) {
            numbers_total.add((i + 1l));
        }

        System.out.println("-------------");
        System.out.println("Single thread");
        System.out.println("-------------");
        SumProcessor sumProcessor = context.getBean(SumProcessor.class, numbers_total);
        sumProcessor.work();
        System.out.println("Time is: " + time);
        a.printSum();
        a.clearSum();
        time = 0;

        context.close();
    } // END: main
}

输出:

Initialized B: Processor 1 contain 7000000 numbers from 1 to 7000000
Initialized B: Processor 2 contain 7000000 numbers from 7000001 to 14000000
Initialized B: Processor 3 contain 7000000 numbers from 14000001 to 21000000
Initialized B: Processor 4 contain 7000000 numbers from 21000001 to 28000000
-------------------
Multithread compute
-------------------
Thread[Thread-3,5,main] complete task.
Thread[Thread-2,5,main] complete task.
Thread[Thread-1,5,main] complete task.
Thread[Thread-4,5,main] complete task.
Time is: 5472
Sum is 392000014000000
-------------
Single thread
-------------
Initialized B: Processor 5 contain 28000000 numbers from 1 to 28000000
Time is: 10653
Sum is 392000014000000

输出[逆序]:

-------------
Single thread
-------------
Initialized B: Processor 1 contain 28000000 numbers from 1 to 28000000
Time is: 2265
Sum is 392000014000000
Initialized B: Processor 2 contain 7000000 numbers from 1 to 7000000
Initialized B: Processor 3 contain 7000000 numbers from 7000001 to 14000000
Initialized B: Processor 4 contain 7000000 numbers from 14000001 to 21000000
Initialized B: Processor 5 contain 7000000 numbers from 21000001 to 28000000
-------------------
Multithread compute
-------------------
Thread[Thread-2,5,main] complete task.
Thread[Thread-4,5,main] complete task.
Thread[Thread-1,5,main] complete task.
Thread[Thread-3,5,main] complete task.
Time is: 2115
Sum is 392000014000000

关于Java设置多线程处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57869997/

相关文章:

java - 产生线程时,如何限制可能的最大数量?

VBA Excel 中的多线程

r - R脚本的分布式调度系统

Java -> 使用 SWIG over JNI 的 C++ 通信。防止 C++ 错误和内存泄漏

java - SPRING|Java - 在查询中使用带有 SpEl 的 IN 子句

java - 调用FutureTask的run方法时如何消除sonar qube问题?

java - 应该在微服务中使用多线程吗?

java - 使用 ViewPager 在 Xamarin.Android 上以轮播样式显示图像幻灯片并使用突出的子项进行填充,Viewpager 错误地计算参数

java - 如何完全摆脱 JPanel 及其中的所有内容?

multithreading - 互斥锁是如何工作的?互斥锁是否全局保护变量?定义它的范围重要吗?