java - 保存部分集合,同时由其他线程添加新条目

标签 java concurrency

我有一个批处理作业,它异步发送 http 请求并将它们收集在结果列表中。这非常有效。

public File addLine(CsvLine line) throws IOException {
    lines.add(line);
    return this;
}

当尝试优化时,我希望将这些行存储到持久位置。

public File addLine(CsvLine line) throws IOException {
    lines.add(line);
    if (lines.size() > uploadService.getPartSize()) {
        List<CsvLine> copy;
        copy = new ArrayList<>(lines);
        lines.removeAll(copy);
        uploadService.save(copy);
    }
    return this;
}

多个线程仍在添加到行集合中,但在制作副本时,我确保只删除要保存的行(副本集合)。 所以这不起作用,我尝试在这部分添加同步关键字

public File addLine(CsvLine line) throws IOException {
    lines.add(line);
    if (lines.size() > uploadService.getPartSize()) {
        List<CsvLine> copy;
        synchronized (this) {
            copy = new ArrayList<>(lines);
            lines.removeAll(copy);
        }
        uploadService.save(copy);
    }
    return this;
}

但没有任何成功。我假设 ArrayList 的构造函数不是线程保存的,如果我没有记错的话,这也是唯一需要同步的部分。

有人可以指出我做错了什么吗?

最佳答案

我整理了一个DoubleBufferedList反对这样做。

/**
 * Lock free - thread-safe.
 *
 * Write from many threads - read with fewer threads.
 *
 * Write items of type T.
 *
 * Read items of type List<T>.
 *
 * @author OldCurmudgeon
 * @param <T>
 */
public class DoubleBufferedList<T> {

    /**
     * Atomic reference so I can atomically swap it through.
     *
     * Mark = true means I am adding to it so momentarily unavailable for iteration.
     */
    private final AtomicMarkableReference<List<T>> list = new AtomicMarkableReference<>(newList(), false);

    // Factory method to create a new list - may be best to abstract this.
    protected List<T> newList() {
        return new ArrayList<>();
    }

    /**
     * Get and replace the current list.
     *
     * Used by readers.
     *
     * @return List<T> of a number (possibly 0) of items of type T.
     */
    public List<T> get() {
        // Atomically grab and replace the list with an empty one.
        List<T> empty = newList();
        List<T> it;
        // Replace an unmarked list with an empty one.
        if (!list.compareAndSet(it = list.getReference(), empty, false, false)) {
            // Failed to replace!
            // It is probably marked as being appended to but may have been replaced by another thread.
            // Return empty and come back again soon.
            return Collections.<T>emptyList();
        }
        // Successfull replaced an unmarked list with an empty list!
        return it;
    }

    /**
     * Grab and lock the list in preparation for append.
     *
     * Used by add.
     */
    private List<T> grab() {
        List<T> it;
        // We cannot fail so spin on get and mark.
        while (!list.compareAndSet(it = list.getReference(), it, false, true)) {
            // Spin on mark - waiting for another grabber to release (which it must).
        }
        return it;
    }

    /**
     * Release the grabbed list.
     *
     * Opposite of grab.
     */
    private void release(List<T> it) {
        // Unmark it - should this be a compareAndSet(it, it, true, false)?
        if (!list.attemptMark(it, false)) {
            // Should never fail because once marked it will not be replaced.
            throw new IllegalMonitorStateException("It changed while we were adding to it!");
        }
    }

    /**
     * Add an entry to the list.
     *
     * Used by writers.
     *
     * @param entry - The new entry to add.
     */
    public void add(T entry) {
        List<T> it = grab();
        try {
            // Successfully marked! Add my new entry.
            it.add(entry);
        } finally {
            // Always release after a grab.
            release(it);
        }
    }

    /**
     * Add many entries to the list.
     *
     * @param entries - The new entries to add.
     */
    public void add(List<T> entries) {
        List<T> it = grab();
        try {
            // Successfully marked! Add my new entries.
            it.addAll(entries);
        } finally {
            // Always release after a grab.
            release(it);
        }
    }

    /**
     * Add a number of entries.
     *
     * @param entries - The new entries to add.
     */
    @SafeVarargs
    public final void add(T... entries) {
        // Make a list of them.
        add(Arrays.<T>asList(entries));
    }

}

像普通一样使用 List但是get返回给您 List<T>包含可用于处理的项目的安全列表。

final String BYE = "BYE!";

public void test() throws InterruptedException {
    final DoubleBufferedList<String> l = new DoubleBufferedList<>();
    // Producer.
    Thread p = new Thread(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 100000; i++) {
                // Pump the queue.
                l.add("" + i);
            }
            l.add(BYE);
        }
    });
    // Consumer.
    Thread c = new Thread(new Runnable() {
        @Override
        public void run() {
            boolean done = false;
            do {
                List<String> got = l.get();
                System.out.println("Got: " + got);
                for (String s : got) {
                    if (s.equals(BYE)) {
                        done = true;
                    }
                }
            } while (!done);
        }
    });
    // Fire it up.
    p.start();
    c.start();
    // Wait for them to finish.
    p.join();
    c.join();
}

关于java - 保存部分集合,同时由其他线程添加新条目,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36766094/

相关文章:

java - 谁能推荐一个 Java 或 Scala DOS/基于终端的 UI 框架?

java - ProcessBuilder 进程未运行

java - 如何启动、暂停和停止生产者/消费者关系?

multithreading - 每秒处理数百万个请求 : how does load balancer(main server thread) works

java - 并发链表读取器/写入器无法正常工作

java - 为什么 Thread.stop() 方法使用不安全?

java - 如何使用 CSS 设置 JavaFX 圆半径?

java - 带有cloudera hive jdbc pom问题的Spring引导

java - 如何使用 <dsp :a> in Oracle Commerce(ATG) 生成动态 URL

java - 单个java信号量上的死锁?