我正在尝试解决一个问题,看起来它无法通过 Java 实现。
我有一些调用 processObject(SomeObject someObject) 的代码和处理它的方法。我试图封装这整个事情,并希望获得一些对象的流。
下面是我的示例程序:
import java.util.stream.Stream;
public class ProcessObject {
public static void main(String[] args) {
int i = 0;
ProcessObject processObject = new ProcessObject();
while (true) {
processObject.processObject(new SomeObject("Hello " + i++));
}
}
public void processObject(SomeObject someObject) {
System.out.println(someObject);
}
//TODO
public Stream<SomeObject> getStream(){
//Producer here should wait and produce Objects as soon as
//they become available like "processObject" method.
return Stream.generate(() -> new SomeObject("Hello "));
}
}
class SomeObject {
public String name;
public SomeObject(String name) {
this.name = name;
}
@Override
public String toString() {
return name;
}
}
静态 main 方法不断生成 SomeObjects 并调用方法 processObject 来处理它们并打印它们。所以,一切都很好。
我想创建一个 SomeObject 流,这样我就不会调用方法来处理它们,而是使用流来处理它们,就像这样:
public Stream getStream();
现在在 Java 8 或 Java 9 中,可以使用 Streams。但是从它们生成的源中提出 IMMUTABLE 特性的条件。
如何创建流,然后像真正的管道一样在元素可用时立即将元素添加到流中?
我想使用 BlockingQueue 并在 Stream Generate 方法中使用它作为生产者,如 blockingQueue.take() 但它永远不会编译。
最佳答案
要使用 BlockingQueue
为 Stream
提供数据,您需要生产者和消费者在不同的线程上运行。
在这里,我使用我的主线程来使用流,并使用一个新线程通过 BlockingQueue
来提供它以进行演示。
public void test(String[] args) {
// My queue
BlockingQueue<BigInteger> queue = new ArrayBlockingQueue<>(10);
// A Stream of it's contents.
Stream<BigInteger> biStream = Stream.generate(() -> {
try {
return queue.take();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
// Feed the queue from a thread.
new Thread(new Runnable() {
// Must be final to be accessible inside `run`.
final AtomicInteger i = new AtomicInteger();
@Override
public void run() {
// Slow feed to the queue.
while (true) {
// Add a new number to the queue.
queue.add(BigInteger.valueOf(i.getAndIncrement()));
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}).start();
// DEMO - Consumes the queue printing contents as they arrive.
biStream.filter(x -> x.testBit(2))
.limit(20)
.forEach(x -> System.out.println(x));
}
关于java - 如何从 Java 中的动态源创建对象流?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47279442/