java - 将显式和隐式并行与 java-8 流混合

标签 java multithreading parallel-processing java-8 java-stream

过去我用两个线程写过一些java程序。 第一个线程(生产者)正在从 API(C 库)读取数据,创建一个 java 对象,将该对象发送到另一个线程。 C API 正在传送事件流(无限)。 线程使用 LinkedBlockingQueue 作为管道来交换对象(放置、轮询)。 第二个线程(消费者)正在处理对象。 (我还发现线程内的代码更具可读性。第一个线程正在处理 C API 内容并生成 正确的 java 对象,第二个线程不受 C API 处理,正在处理数据)。

现在我感兴趣的是,如何使用 Java 8 中的新流 API 实现上述场景。 但假设我想保留这两个线程(生产者/消费者)! 第一个线程正在写入流。第二个线程正在从流中读取。 我也希望,我可以用这种技术处理更好的显式并行性(生产者/消费者) 在流中,我可以使用一些隐式并行机制(例如 stream.parallel())。

我对新的流 api 没有太多经验。 所以我在下面尝试了以下代码,以解决上面的想法。

  • 我使用“生成”来访问 C API 并将其提供给 java 流。
  • 我在消费者线程中使用 .parallel() 来测试和处理隐式并行性。看起来不错。但见下文。

问题:

  1. “生成”是生产者在这种情况下的最佳方式吗?
  2. 我对如何终止/关闭生产者中的流有理解上的问题, 如果 API 有一些错误AND 我想关闭整个管道。 我是使用 stream.close 还是抛出异常?
    • 2.1 我使用了 stream.close()。但是“生成”在关闭后仍在运行, 我发现只抛出一个异常来终止生成部分。 此异常将进入流并且消费者正在接收异常 (这对我来说很好,消费者可以识别并终止)。 但在这种情况下,生产者生产的数量超过消费者处理的数量,同时出现异常。
    • 2.2 如果消费者使用隐式并行 stream.parallel()。生产者正在处理更多的项目。 所以我看不到这个问题的任何解决方案。 (访问 C API,检查错误,做出决定)。
    • 2.3 producer抛出异常到达consumer stream,但没有处理完所有插入的对象。

再一次:这个想法是与线程有明确的并行性。 但在内部我可以处理新功能并尽可能使用并行处理

也感谢您提出这个问题。

package sandbox.test;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.LongStream;

public class MyStream {
    private volatile LongStream stream = null;
    private AtomicInteger producerCount = new AtomicInteger(0);
    private AtomicInteger consumerCount = new AtomicInteger(0);
    private AtomicInteger apiError = new AtomicInteger(0);

    public static void main(String[] args) throws InterruptedException {
    MyStream appl = new MyStream();
    appl.create();
    }

    private static void sleep(long sleep) {
    try {
        Thread.sleep(sleep);
    } catch (InterruptedException e) {
        throw new RuntimeException(e);
    }
    }

    private static void apiError(final String pos, final int iteration) {
    RuntimeException apiException = new RuntimeException("API error pos=" + pos + " iteration=" + iteration);
    System.out.println(apiException.getMessage());
    throw apiException;
    }

    final private int simulateErrorAfter = 10;

    private Thread produce() {
    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
        System.out.println("Producer started");
        stream = LongStream.generate(() -> {
            int localCount;
            // Detect error, while using stream.parallel() processing
            int error = apiError.get();
            if ( error > 0 )
                apiError("1", error);
            // ----- Accessing the C API here -----
            localCount = producerCount.incrementAndGet(); // C API access; delegate for accessing the C API
            // ----- Accessing the C API here -----

            // Checking error code from C API
            if ( localCount > simulateErrorAfter ) { // Simulate an API error
                producerCount.decrementAndGet();
                stream.close();
                apiError("2", apiError.incrementAndGet());
            }
            System.out.println("P: " + localCount);
            sleep(200L);
            return localCount;
            });
        System.out.println("Producer terminated");
        }
    });
    thread.start();
    return thread;
    }

    private Thread consume() {
    Thread thread = new Thread(new Runnable() {
        @Override
        public void run() {
        try {
            stream.onClose(new Runnable() {
            @Override
            public void run() {
                System.out.println("Close detected");
            }
            }).parallel().forEach(l -> {
            sleep(1000);
            System.out.println("C: " + l);
            consumerCount.incrementAndGet();
            });
        } catch (Exception e) {
            // Capturing the stream end
            System.out.println(e);
        }
        System.out.println("Consumer terminated");
        }
    });
    thread.start();
    return thread;
    }

    private void create() throws InterruptedException {
    Thread producer = produce();
    while ( stream == null )
        sleep(10);
    Thread consumer = consume();
    producer.join();
    consumer.join();
    System.out.println("Produced: " + producerCount);
    System.out.println("Consumed: " + consumerCount);

    }
}

最佳答案

您需要了解有关 Stream API 的一些基本要点:

  • 在流上应用的所有操作都是惰性的,在应用终端操作之前不会做任何事情。使用“生产者”线程创建流是没有意义的,因为这个线程不会做任何事情。所有操作都在您的“消费者”线程中执行,后台线程由 Stream 实现本身启动。创建 Stream 实例的线程完全无关

  • 关闭流与 Stream 操作本身无关,即不会关闭线程。它旨在释放额外的资源,例如关闭与 Files.lines(…) 返回的流关联的文件。您可以使用 onClose 安排此类清理操作,Stream 将在您调用 close 时调用它们,仅此而已。对于 Stream 类本身,它没有任何意义。

  • Stream 不会模拟“一个线程正在写入,另一个线程正在读取”这样的场景。他们的模型是“一个线程调用您的Supplier,然后调用您的Consumer,另一个线程执行相同的操作,x 个其他线程...... ”

    如果你想用不同的生产者和消费者线程实现生产者/消费者方案,你最好使用 ThreadExecutorService 和线程安全队列.

但您仍然可以使用 Java 8 的功能。例如。无需使用内部类来实现 Runnable;您可以为它们使用 lambda 表达式。

关于java - 将显式和隐式并行与 java-8 流混合,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/28484298/

相关文章:

java - 作为 Java 的 Scala 片段

java - 有用于 Java 的 IKVM 吗?我可以在 JVM 上运行 .NET 程序集吗?

c# - MaxDegreeOfParallelism = 2 显示 3 个线程

matlab - 在并行matlab中运行两个函数

javascript - 多个 Javascript 并行运行

java - 使用 Android 从 SIM 卡中检索 SMS 消息

java - java中使用While循环计算列表的元素数量

python - 使用xterm打开新控制台: How to while the current console is printing,也在新控制台上打印

java - 线程上下文类加载器可以为空吗?

c++ - NET-SNMP 和多线程