java - Spark 打印数据帧而不会耗尽内存

标签 java apache-spark memory dataset partitioning

如何在 Java 中打印整个数据帧而不耗尽内存?

Dataset<Row> df = ...

我知道:

df.show() 

将显示数据帧,但如果数据帧足够大,则可能会耗尽内存。

我知道我可以使用以下方法限制内容:

df.show(rowCount, false)

但是想要打印整个数据框,我不想限制内容...

我已经尝试过:

df.foreachPartition(iter -> {
    while(iter.hasNext()){
       System.out.println(rowIter.next().mkString(",");)
     }
});

但这将打印在每个相应的节点上,而不是打印在驱动程序上......

是否有任何方法可以在不耗尽内存的情况下打印驱动程序中的所有内容?

最佳答案

您必须将所有数据传送到驱动程序,这会消耗您的内存:(...

解决方案可能是拆分数据帧并在驱动程序中逐段打印。当然,这取决于数据本身的结构,它看起来像:

long count = df.count();
long inc = count / 10;
for (long i = 0; i < count; i += inc) {
  Dataset<Row> filteredDf =
      df.where("id>=" + i + " AND id<" + (i + inc));

  List<Row> rows = filteredDf.collectAsList();
  for (Row r : rows) {
    System.out.printf("%d: %s\n", r.getAs(0), r.getString(1));
  }
}

我将数据集分成 10 个,但我知道我的 id 是从 1 到 100...

完整的示例可以是:

package net.jgp.books.sparkWithJava.ch20.lab900_splitting_dataframe;

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

/**
 * Splitting a dataframe to bring it back to the driver for local
 * processing.
 * 
 * @author jgp
 */
public class SplittingDataframeApp {

  /**
   * main() is your entry point to the application.
   * 
   * @param args
   */
  public static void main(String[] args) {
    SplittingDataframeApp app = new SplittingDataframeApp();
    app.start();
  }

  /**
   * The processing code.
   */
  private void start() {
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
        .appName("Splitting a dataframe to collect it")
        .master("local")
        .getOrCreate();

    Dataset<Row> df = createRandomDataframe(spark);
    df = df.cache();

    df.show();
    long count = df.count();
    long inc = count / 10;
    for (long i = 0; i < count; i += inc) {
      Dataset<Row> filteredDf =
          df.where("id>=" + i + " AND id<" + (i + inc));

      List<Row> rows = filteredDf.collectAsList();
      for (Row r : rows) {
        System.out.printf("%d: %s\n", r.getAs(0), r.getString(1));
      }
    }
  }

  private static Dataset<Row> createRandomDataframe(SparkSession spark) {
    StructType schema = DataTypes.createStructType(new StructField[] {
        DataTypes.createStructField(
            "id",
            DataTypes.IntegerType,
            false),
        DataTypes.createStructField(
            "value",
            DataTypes.StringType,
            false) });

    List<Row> rows = new ArrayList<Row>();
    for (int i = 0; i < 100; i++) {
      rows.add(RowFactory.create(i, "Row #" + i));
    }
    Dataset<Row> df = spark.createDataFrame(rows, schema);
    return df;
  }
}

你认为这有帮助吗?

它不像将其保存在数据库中那么优雅,但它可以避免在架构中添加额外的组件。这段代码不是很通用,我不确定您是否可以在当前版本的 Spark 中使其通用。

关于java - Spark 打印数据帧而不会耗尽内存,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55125909/

相关文章:

apache-spark - 如何重命名 pyspark 中的列,类似于使用 Spark 兼容的 SQL PIVOT 语句?

java - spark-shell 命令给出 java 错误

scala - 使用 Spark 将对象发送到特定分区

Delphi forms.pas内存泄漏?

caching - Redis 作为缓存和使用相同实例的队列

java.sql.SQLIntegrityConstraintViolationException Spring 启动

java - 重写 Kotlin/Java 子类型中的函数类型

java - 在一长串文件中查找 3 个最近修改的文件

java - Servlet 检索预先选择的单选按钮

javascript - 删除数据库中的一行 (JSP)