java - 如何从 Java 中的动态源创建对象流?

标签 java java-stream

我正在尝试解决一个问题,看起来它无法通过 Java 实现。

我有一些调用 processObject(SomeObject someObject) 的代码和处理它的方法。我试图封装这整个事情,并希望获得一些对象的流。

下面是我的示例程序:

import java.util.stream.Stream;

public class ProcessObject {

    public static void main(String[] args) {
        int i = 0;
        ProcessObject processObject = new ProcessObject();
        while (true) {
            processObject.processObject(new SomeObject("Hello " + i++));
        }
    }

    public void processObject(SomeObject someObject) {
        System.out.println(someObject);

    }
    //TODO
    public Stream<SomeObject> getStream(){
        //Producer here should wait and produce Objects as soon as 
        //they become available like "processObject" method.
        return Stream.generate(() -> new SomeObject("Hello "));
    }
}


class SomeObject {
    public String name;

    public SomeObject(String name) {
        this.name = name;
    }

    @Override
    public String toString() {
        return name;
    }
}

静态 main 方法不断生成 SomeObjects 并调用方法 processObject 来处理它们并打印它们。所以,一切都很好。

我想创建一个 SomeObject 流,这样我就不会调用方法来处理它们,而是使用流来处理它们,就像这样:

public Stream getStream();

现在在 Java 8 或 Java 9 中,可以使用 Streams。但是从它们生成的源中提出 IMMUTABLE 特性的条件。

如何创建流,然后像真正的管道一样在元素可用时立即将元素添加到流中?

我想使用 BlockingQueue 并在 Stream Generate 方法中使用它作为生产者,如 blockingQueue.take() 但它永远不会编译。

最佳答案

要使用 BlockingQueueStream 提供数据,您需要生产者和消费者在不同的线程上运行。

在这里,我使用我的主线程来使用流,并使用一个新线程通过 BlockingQueue 来提供它以进行演示。

public void test(String[] args) {
    // My queue
    BlockingQueue<BigInteger> queue = new ArrayBlockingQueue<>(10);

    // A Stream of it's contents.
    Stream<BigInteger> biStream = Stream.generate(() -> {
        try {
            return queue.take();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    });

    // Feed the queue from a thread.
    new Thread(new Runnable() {
        // Must be final to be accessible inside `run`.
        final AtomicInteger i = new AtomicInteger();

        @Override
        public void run() {
            // Slow feed to the queue.
            while (true) {
                // Add a new number to the queue.
                queue.add(BigInteger.valueOf(i.getAndIncrement()));
                try {
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
    }).start();

    // DEMO - Consumes the queue printing contents as they arrive.
    biStream.filter(x -> x.testBit(2))
            .limit(20)
            .forEach(x -> System.out.println(x));
}

关于java - 如何从 Java 中的动态源创建对象流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47279442/

相关文章:

Java 8 流 - 增量收集/部分减少/间歇映射/...这甚至叫什么?

java - 我们可以使用 java Stream 来重构它吗?

Java流合并或减少重复对象

java - 用Java模拟触摸滚动

java - 如何使用 Java 下载 WordPress

java - SharedPreferences 变量无法正常工作

java - 在一个流处理中收集匹配和不匹配?

java - java中stream、collect、forEach组合的代码流程

Java Regex matches() 返回 false 但在浏览器中有效

java - 在 Java 中将数字从字符串转换为文本