java - Clojure 转换器与 Java 中流的中间操作的概念相同吗?

标签 java clojure java-8 java-stream transducer

当我学习 Clojure 中的转换器时,突然让我想起了它们让我想起的东西:Java 8 流!

Transducers are composable algorithmic transformations. They are independent from the context of their input and output sources and specify only the essence of the transformation in terms of an individual element.

A stream is not a data structure that stores elements; instead, it conveys elements from a source such as a data structure, an array, a generator function, or an I/O channel, through a pipeline of computational operations.

Clojure:

(def xf
  (comp
    (filter odd?)
    (map inc)
    (take 5)))

(println
  (transduce xf + (range 100)))  ; => 30
(println
  (into [] xf (range 100)))      ; => [2 4 6 8 10]

Java:

// Purposely using Function and boxed primitive streams (instead of
// UnaryOperator<LongStream>) in order to keep it general.
Function<Stream<Long>, Stream<Long>> xf =
        s -> s.filter(n -> n % 2L == 1L)
                .map(n -> n + 1L)
                .limit(5L);

System.out.println(
        xf.apply(LongStream.range(0L, 100L).boxed())
                .reduce(0L, Math::addExact));    // => 30
System.out.println(
        xf.apply(LongStream.range(0L, 100L).boxed())
                .collect(Collectors.toList()));  // => [2, 4, 6, 8, 10]

除了静态/动态类型的不同之外,这些在目的和用法上似乎与我非常相似。

类比 Java 流的转换是否是思考转换器的合理方式?如果不是,它有什么缺陷,或者两者在概念上有何不同(更不用说实现)?

最佳答案

主要区别在于,动词集(操作)在某种程度上对流是封闭的,而对转换器是开放的:例如尝试在流上实现 partition,感觉有点二流:

import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.Stream.Builder;

public class StreamUtils {
    static <T> Stream<T> delay(final Supplier<Stream<T>> thunk) {
        return Stream.of((Object) null).flatMap(x -> thunk.get());
    }

    static class Partitioner<T> implements Function<T, Stream<Stream<T>>> {
        final Function<T, ?> f;

        Object prev;
        Builder<T> sb;

        public Partitioner(Function<T, ?> f) {
            this.f = f;
        }

        public Stream<Stream<T>> apply(T t) {
            Object tag = f.apply(t);
            if (sb != null && prev.equals(tag)) {
                sb.accept(t);
                return Stream.empty();
            }
            Stream<Stream<T>> partition = sb == null ? Stream.empty() : Stream.of(sb.build());
            sb = Stream.builder();
            sb.accept(t);
            prev = tag;
            return partition;
        }

        Stream<Stream<T>> flush() {
            return sb == null ? Stream.empty() : Stream.of(sb.build());
        }
    }

    static <T> Stream<Stream<T>> partitionBy(Stream<T> in, Function<T, ?> f) {
        Partitioner<T> partitioner = new Partitioner<>(f);
        return Stream.concat(in.flatMap(partitioner), delay(() -> partitioner.flush()));
    }
}

与序列和缩减器一样,当您进行转换时,您不会创建“更大”的计算,而是创建“更大”的源。

为了能够传递计算,您引入了 xf 从 Stream 到 Stream 的函数,以将操作从方法提升到一流实体(以便将它们从源中解开)。通过这样做,您已经创建了一个换能器,尽管它的界面太大了。

下面是上述代码的更通用版本,用于将任何 (clojure) 转换器应用于流:

import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Stream;
import java.util.stream.Stream.Builder;

import clojure.lang.AFn;
import clojure.lang.IFn;
import clojure.lang.Reduced;

public class StreamUtils {
    static <T> Stream<T> delay(final Supplier<Stream<T>> thunk) {
        return Stream.of((Object) null).flatMap(x -> thunk.get());
    }

    static class Transducer implements Function {
        IFn rf;

        public Transducer(IFn xf) {
            rf = (IFn) xf.invoke(new AFn() {
                public Object invoke(Object acc) {
                    return acc;
                }

                public Object invoke(Object acc, Object item) {
                    ((Builder<Object>) acc).accept(item);
                    return acc;
                }
            });
        }

        public Stream<?> apply(Object t) {
            if (rf == null) return Stream.empty();
            Object ret = rf.invoke(Stream.builder(), t);
            if (ret instanceof Reduced) {
                Reduced red = (Reduced) ret;
                Builder<?> sb = (Builder<?>) red.deref();
                return Stream.concat(sb.build(), flush());
            }
            return ((Builder<?>) ret).build();
        }

        Stream<?> flush() {
            if (rf == null) return Stream.empty();
            Builder<?> sb = (Builder<?>) rf.invoke(Stream.builder());
            rf = null;
            return sb.build();
        }
    }

    static <T> Stream<?> withTransducer(Stream<T> in, IFn xf) {
        Transducer transducer = new Transducer(xf);
        return Stream.concat(in.flatMap(transducer), delay(() -> transducer.flush()));
    }
}

关于java - Clojure 转换器与 Java 中流的中间操作的概念相同吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/35137205/

相关文章:

java - 可以采用 Class 或 Collection 的通用 Java 5 方法?

java-8 - NoClassDefFoundError 无法初始化类 com.ibm.mq.headers.internal.HeaderType

java - lambda表达式是否有on对象与之关联,为什么 “this”不引用lambda表达式的对象?

java - Android 中 Bundle 的错误数字

java - 从异常类型中识别丢失的 jar - Java

java - Linux 上的 Quasar/Pulsar lein midje 构建错误

ubuntu - lein repl 中的 Control-d 行编辑?

build - Leiningen 在构建工作 uberjar 时遇到问题

Java 8 查找并替换匹配的字符串

java - Log4j2 使用 {} 反对使用 %d 或 %s