java - 即使使用线程安全实现,如何修复异步java代码失败的问题?

标签 java concurrency thread-safety

我正在开发一个实现与此类似的代码库。当 count 的值递增时,我们遇到一个线程无法与其他线程同步的问题,从而进入无限循环。

问题似乎来自后递增运算符的非原子行为。

您可以找到代码Repl here注意:您可能需要运行代码至少 3 次才能观察到它。

我需要支持以线程安全的方式通过尽可能多的线程实现计数增量。

class Main {

    static volatile Integer count = new Integer(0); //boxed integer is intentional to demonstrate mutable instance

    static final void Log(Object o) {
        System.out.println(o);
    }

    static synchronized void increaseCount(){
        count++;
    }

    static synchronized Integer getCount(){
        return count;
    }

    public static void main(String[] arg) throws InterruptedException {

        new Thread(() -> {
            while (getCount() != 60) {
                increaseCount();
                Log(count +" thread A");
            }
        }).start();

        new Thread(() -> {
            while (getCount() != 20) {
                increaseCount();
                Log(count +" thread B");
            }
        }).start();

        new Thread(() -> {
            while (getCount() != 50) {
                increaseCount();
                Log(count+" thread C");
            }
        }).start();
    }
}

最佳答案

如果许多线程正在递增共享计数器,则无法保证哪个线程将看到计数器的特定值。为了确保特定线程看到特定值,该线程必须看到计数器的每个值。然后您也可能只有一个线程,因为它们都彼此同步工作。

如果您想对计数器的每个值执行一些工作,并对特定值进行特殊处理,并且您希望并行化该工作负载,则每个线程都需要准备好执行特殊处理。以下是如何执行此操作的示例:

class Main {

    private static class Worker implements Runnable {

        private final AtomicInteger counter;
        private final Set<Integer> triggers;

        Worker(AtomicInteger counter, Set<Integer> triggers) {
            this.counter = counter;
            this.triggers = triggers;
        }

        public void run() {
            String name = Thread.currentThread().getName();
            while (!triggers.isEmpty()) {
                int value = counter.getAndIncrement();
                try { /* Simulate actually doing some work by sleeping a bit. */
                    long delay = (long) (-100 * Math.log(1 - ThreadLocalRandom.current().nextDouble()));
                    TimeUnit.MILLISECONDS.sleep(delay);
                } catch (InterruptedException ex) {
                    break;
                }
                boolean triggered = triggers.remove(value);
                if (triggered) {
                    System.out.println(name + " handled " + value);
                } else {
                    System.out.println(name + " skipped " + value);
                }
            }
        }
    }

    public static void main(String[] arg) throws InterruptedException {
        AtomicInteger counter = new AtomicInteger();
        Set<Integer> triggers = new ConcurrentSkipListSet<>();
        triggers.add(60);
        triggers.add(20);
        triggers.add(50);
        int concurrency = 4;
        ExecutorService workers = Executors.newFixedThreadPool(concurrency);
        for (int i = 0; i < concurrency; ++i) {
            workers.execute(new Worker(counter, triggers));
        }
        workers.shutdown();
    }

}

可以调整工作线程的数量,以便根据计算机上的核心数量和实际工作负载(任务的 CPU 或 I/O 密集程度)来调整工作线程的数量。

在这种方法中,计数器的每个值仅由一个线程处理,并且哪个线程获得“哨兵”值并不重要。但是,当所有哨兵值都已处理后,所有线程都会关闭。线程通过计数器以及它们需要处理的一组“触发器”或哨兵值相互协调。

关于java - 即使使用线程安全实现,如何修复异步java代码失败的问题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55940306/

相关文章:

java - 通知运行在不同对象中的线程

java - 可重入锁 VS 可重入读写锁

java - 无法使用@Value注释从Spring Boot中的属性文件中读取值

java - 我如何将 bytearray 存储为 HashMap 中的值?

java - 为了学习 CDI 我应该了解 EJB

go - 有人可以解释这个使用 channel 的 Go 代码块吗?我不明白它是如何一次执行 500 个 Action 的

Java 的 Fork/Join vs ExecutorService - 什么时候使用?

java - 如何在给定执行时间的情况下每隔固定时间调用方法

java - 在所有线程完成之前从 java 方法返回

java - for循环的非法类型开始?