Java 流 : put new element through existing stream

标签 java java-8 stream java-stream

我有一个用例,其中有一个数据源,比方说:每秒都有一个新字符串来自该数据源。

我想创建一个管道,如果有新字符串到达​​,它就会被推送到该管道进行处理。

我猜想 Java 8 引入的 Stream API 可以做到这一点,因为它具有处理任意集合的数据的便捷功能,但是我想跳过将数据收集到单独集合的部分,将到达的数据直接分派(dispatch)到我刚刚创建的流。

有什么办法可以做到这一点吗?

最佳答案

为了执行您所描述的操作,您需要某种阻止。我会使用 BlockingQueue (任何类型都可以 - 如果您想避免集合,请使用 SynchronousQueue ,它根本没有内部状态),并使用 Stream.generate 从它创建一个无限的 Stream .

示例:

class StreamableQueue<T> {

    private BlockingQueue<T> dataSource;

    Stream<T> asStream() {
        return Stream.generate(this::takeFromDataSource);
    }

    private T takeFromDataSource() {
        try {
            return dataSource.take();
        } catch (InterruptedException ex) {
            throw new RuntimeException(ex);
        }
    }
}

当然,作为 dataSource 提供给此类的 BlockingQueue 需要从不同的线程提供元素。

<小时/>

编辑:一个小补充 - 您可以使用:而不是使用 try-catch:

关于Java 流 : put new element through existing stream,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51349384/

相关文章:

java - 有没有办法覆盖 JPA 中实体子类中的 ID?

java - 以毫秒为单位解析 CIM_DateTime 到 Java Date

java - 将对象传递给 rcp View

java - ObjectStream 中可变对象的影响

java - 如何检索音量参数并使其显示在 result.jsp 中

java - 我可以在泛型类参数上调用静态方法吗

java - 查找Java 8时尚中的最大单词数

java - 避免在控制逻辑中使用 isPresent() 和 get()

flutter - Navigator.pop之后,StreamBuilder不会重建

node.js - 使用node Writable流写入数据