我有一个用例,其中有一个数据源,比方说:每秒都有一个新字符串来自该数据源。
我想创建一个管道,如果有新字符串到达,它就会被推送到该管道进行处理。
我猜想 Java 8 引入的 Stream API 可以做到这一点,因为它具有处理任意集合的数据的便捷功能,但是我想跳过将数据收集到单独集合的部分,将到达的数据直接分派(dispatch)到我刚刚创建的流。
有什么办法可以做到这一点吗?
最佳答案
为了执行您所描述的操作,您需要某种阻止。我会使用 BlockingQueue
(任何类型都可以 - 如果您想避免集合,请使用 SynchronousQueue
,它根本没有内部状态),并使用 Stream.generate
从它创建一个无限的 Stream
.
示例:
class StreamableQueue<T> {
private BlockingQueue<T> dataSource;
Stream<T> asStream() {
return Stream.generate(this::takeFromDataSource);
}
private T takeFromDataSource() {
try {
return dataSource.take();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}
当然,作为 dataSource
提供给此类的 BlockingQueue
需要从不同的线程提供元素。
编辑:一个小补充 - 您可以使用:而不是使用 try-catch:
- Lombok的
@SneakyThrows
takeFromDataSource()
方法上的注释 - 或者 - 更好 - jOOλ的
Sneaky.supplier
/Unchecked.supplier
方法,它简化了转换为:Stream.generate(Sneaky.supplier(dataSource::take))
关于Java 流 : put new element through existing stream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51349384/