java - 如何使用 Java 使用 scala.collection.immutable.Stream 类

标签 java scala apache-spark

我在 Scala 中有一个现有代码,并试图在 Java 中编写相同的代码。但面临一些问题。

Scala 代码:

import java.io.{BufferedReader, InputStreamReader}
import java.util.zip.ZipInputStream
import org.apache.spark.SparkContext
import org.apache.spark.input.PortableDataStream
import org.apache.spark.rdd.RDD

def readFile(path: String,minPartitions: Int): RDD[String] = {

      if (path.endsWith(".zip")) {
        sc.binaryFiles(path, minPartitions)
          .flatMap {
              case (name: String, content: PortableDataStream) =>
            val zis = new ZipInputStream(content.open)
            val entry = zis.getNextEntry
            val br = new BufferedReader(new InputStreamReader(zis))
            Stream.continually(br.readLine()).takeWhile(_ != null)
          }
      }
    }

我在下面写了 java 代码 -

import org.apache.spark.input.PortableDataStream;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.rdd.RDD;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;

        public RDD<String> readFile(String inputDir, int minPartitions) throws Exception {
    SparkSession sparkSession = null;
    sparkSession = SparkSession.builder().appName("zipPoc").config("spark.master", "yarn").getOrCreate();

    JavaSparkContext sc = new JavaSparkContext(sparkSession.sparkContext());
    if (inputDir.endsWith(".zip")) {
        sc.binaryFiles(inputDir, minPartitions).flatMap (
            (String name , PortableDataStream content) -> {
                ZipInputStream stream = new ZipInputStream(content.open());
                ZipEntry entry = stream.getNextEntry();
                BufferedReader br = new BufferedReader(new InputStreamReader(stream));
                scala.collection.immutable.Stream.continually(br.readLine()).takeWhile(_ != null);
            }
        );
    }

}

我遇到了错误。

enter image description here

任何人对此有线索并帮助编写适当的代码。

最佳答案

continually 期望没有参数和返回值的 lambda。 Java 等价物是:

() -> br.readLine()

Java 中也没有_,您必须使用显式参数。

(line) -> line != null

所以这应该可行:

Stream.continually(() -> {       
    try {
        return br.readLine();
    } catch (IOException e) {
        throw new RuntimeException(e);
    }
}).takeWhile((line) -> line != null)

====

如您所见,readLine 抛出已检查的异常。最快的解决方法是将调用包装在 try/catch 中。

关于java - 如何使用 Java 使用 scala.collection.immutable.Stream 类,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47074765/

相关文章:

scala - Spark Scala 编译器不会提示双重与三重相等

java - EntityManagerFactory 关闭,Hibernate

java - 滚动()和添加()

scala - 使用Intellij在Scala中查找未使用的方法

java - Scala:SBT 为每个阶段捆绑不同的库依赖项

apache-spark - 如何根据条件添加新列(不面临 JaninoRuntimeException 或 OutOfMemoryError)?

scala - 使用匿名函数时的 Spark TaskNotSerializable

java - 刷新 JLabel 图标图像

Java 输入流子流。是否可以?

scala - 为什么我不能在 Option[List] 上调用 flatMap