java - 如何向Dataframe添加一些信息?

标签 java scala apache-spark

我有非常简单的 DataFrame。

val df = Seq(
    ("Alex", "2018-01-01 00:00:00", "2018-02-01 00:00:00", "OUT"),
    ("Bob", "2018-02-01 00:00:00", "2018-02-05 00:00:00", "IN"),
    ("Mark", "2018-02-01 00:00:00", "2018-03-01 00:00:00", "IN"),
    ("Mark", "2018-05-01 00:00:00", "2018-08-01 00:00:00", "OUT"),
    ("Meggy", "2018-02-01 00:00:00", "2018-02-01 00:00:00", "OUT")
).toDF("NAME", "START_DATE", "END_DATE", "STATUS")

df.show()

在我的 Scala 项目中,我将此 DataFrame 转换为 CSV 文件。我需要在开头添加一些信息,如您在示例中看到的:

| REQUEST_DATE | 2019-02-05 20:00:00 |
| USER         | Kate                |
| SEARCH_TYPE  | Global              |

| NAME         | START_DATE          | END_DATE            | STATUS |
| Alex         | 2018-01-01 00:00:00 | 2018-02-01 00:00:00 | OUT    |
| Bob          | 2018-02-01 00:00:00 | 2018-02-05 00:00:00 | IN     |
| Mark         | 2018-02-01 00:00:00 | 2018-03-01 00:00:00 | IN     |
| Mark         | 2018-05-01 00:00:00 | 2018-08-01 00:00:00 | OUT    |
| Meggy        | 2018-02-01 00:00:00 | 2018-02-01 00:00:00 | OUT    |

我尝试创建新的 DataFrame 并将它们连接在一起。不幸的是,您无法连接 2 个具有不同架构的 DataFrame。

最佳答案

假设您不想在将文件写入磁盘后执行此操作,您可以:

1. 将两个数据框中的所有内容都转换为字符串。但是,输出将如下所示:

 | REQUEST_DATE | 2019-02-05 20:00:00 |                     |        |
 | USER         | Kate                |                     |        |
 | SEARCH_TYPE  | Global              |                     |        |
 |              |                     |                     |        |
 | NAME         | START_DATE          | END_DATE            | STATUS |
 | Alex         | 2018-01-01 00:00:00 | 2018-02-01 00:00:00 | OUT    |
 | Bob          | 2018-02-01 00:00:00 | 2018-02-05 00:00:00 | IN     |
 | Mark         | 2018-02-01 00:00:00 | 2018-03-01 00:00:00 | IN     |
 | Mark         | 2018-05-01 00:00:00 | 2018-08-01 00:00:00 | OUT    |
 | Meggy        | 2018-02-01 00:00:00 | 2018-02-01 00:00:00 | OUT    |
  • 构建您的客户输出编写器,在保存之前添加 header 。您可以找到更多信息there - 查找保存/写入部分。
  • 更新

    如果你想做#1,这里是转换第一个数据帧(带有te数据)的代码:

    Dataset<Row> transitionDf = dataDf
        .withColumn("_c1", dataDf.col("NAME"))
        .withColumn("_c2",
            dataDf.col("START_DATE").cast(DataTypes.StringType))
        .withColumn("_c3",
            dataDf.col("END_DATE").cast(DataTypes.StringType))
        .withColumn("_c4", dataDf.col("STATUS").cast(DataTypes.StringType))
        .drop("NAME")
        .drop("START_DATE")
        .drop("END_DATE")
        .drop("STATUS");
    

    关键是cast()你的列,这样你就可以使用unionByName()来合并两个数据帧。 Java 中的整个代码(我不使用 Scala)将是这样的:

    package net.jgp.labs.spark.l901Union;
    
    import java.sql.Date;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Row;
    import org.apache.spark.sql.RowFactory;
    import org.apache.spark.sql.SparkSession;
    import org.apache.spark.sql.types.DataTypes;
    import org.apache.spark.sql.types.StructField;
    import org.apache.spark.sql.types.StructType;
    
    /**
     * Use of unionByName() to create a complex header on a dataframe.
     * 
     * @author jgp
     */
    public class UnionApp {
      private SimpleDateFormat format = null;
    
      public UnionApp() {
        this.format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
      }
    
      /**
       * main() is your entry point to the application.
       * 
       * @param args
       * @throws ParseException
       */
      public static void main(String[] args) throws ParseException {
        UnionApp app = new UnionApp();
        app.start();
      }
    
      /**
       * The processing code.
       * 
       * @throws ParseException
       */
      private void start() throws ParseException {
        // Creates a session on a local master
        SparkSession spark = SparkSession.builder()
            .appName("expr()")
            .master("local")
            .getOrCreate();
    
        // DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd
        // HH:mm:ss", Locale.ENGLISH);
    
        // Data
        StructType dataSchema = DataTypes.createStructType(new StructField[] {
            DataTypes.createStructField(
                "NAME",
                DataTypes.StringType,
                false),
            DataTypes.createStructField(
                "START_DATE",
                DataTypes.DateType,
                false),
            DataTypes.createStructField(
                "END_DATE",
                DataTypes.DateType,
                false),
            DataTypes.createStructField(
                "STATUS",
                DataTypes.StringType,
                false) });
        List<Row> dataRows = new ArrayList<Row>();
        dataRows.add(RowFactory.create("Alex", toDate("2018-01-01 00:00:00"),
            toDate("2018-02-01 00:00:00"), "OUT"));
        dataRows.add(RowFactory.create("Bob", toDate("2018-02-01 00:00:00"),
            toDate("2018-02-05 00:00:00"), "IN"));
        dataRows.add(RowFactory.create("Mark", toDate("2018-02-01 00:00:00"),
            toDate("2018-03-01 00:00:00"), "IN"));
        dataRows.add(RowFactory.create("Mark", toDate("2018-05-01 00:00:00"),
            toDate("2018-08-01 00:00:00"), "OUT"));
        dataRows.add(RowFactory.create("Meggy", toDate("2018-02-01 00:00:00"),
            toDate("2018-02-01 00:00:00"), "OUT"));
        Dataset<Row> dataDf = spark.createDataFrame(dataRows, dataSchema);
        dataDf.show();
        dataDf.printSchema();
    
        // Header
        StructType headerSchema = DataTypes.createStructType(new StructField[] {
            DataTypes.createStructField(
                "_c1",
                DataTypes.StringType,
                false),
            DataTypes.createStructField(
                "_c2",
                DataTypes.StringType,
                false),
            DataTypes.createStructField(
                "_c3",
                DataTypes.StringType,
                false),
            DataTypes.createStructField(
                "_c4",
                DataTypes.StringType,
                false) });
        List<Row> headerRows = new ArrayList<Row>();
        headerRows.add(RowFactory.create("REQUEST_DATE",
            format.format(new java.util.Date()), "", ""));
        headerRows.add(RowFactory.create("USER", "Kate", "", ""));
        headerRows.add(RowFactory.create("SEARCH_TYPE", "Global", "", ""));
        headerRows.add(RowFactory.create("", "", "", ""));
        headerRows
            .add(RowFactory.create("NAME", "START_DATE", "END_DATE", "STATUS"));
        Dataset<Row> headerDf = spark.createDataFrame(headerRows, headerSchema);
        headerDf.show(false);
        headerDf.printSchema();
    
        // Transition
        Dataset<Row> transitionDf = dataDf
            .withColumn("_c1", dataDf.col("NAME"))
            .withColumn("_c2",
                dataDf.col("START_DATE").cast(DataTypes.StringType))
            .withColumn("_c3",
                dataDf.col("END_DATE").cast(DataTypes.StringType))
            .withColumn("_c4", dataDf.col("STATUS").cast(DataTypes.StringType))
            .drop("NAME")
            .drop("START_DATE")
            .drop("END_DATE")
            .drop("STATUS");
        transitionDf.show(false);
        transitionDf.printSchema();
    
        // Union
        Dataset<Row> unionDf = headerDf.unionByName(transitionDf);
        unionDf.show(false);
        unionDf.printSchema();
      }
    
      private Date toDate(String dateAsText) throws ParseException {
        java.util.Date parsed;
        parsed = format.parse(dateAsText);
        return new Date(parsed.getTime());
      }
    }
    

    我将其保存为我的 Spark and Java labs and GitHub 的一部分。等效的 Scala 代码可能会更紧凑一点:)。

    关于java - 如何向Dataframe添加一些信息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54525966/

    相关文章:

    java - 如何以编程方式隐藏底部栏中的项目

    Scala 泛型 this.type

    apache-spark - org.apache.spark.sql.Row 无法在 Spark 2.0 Preview 中解析

    python - PySpark 抛出错误方法 __getnewargs__([]) 不存在

    python - from_json Pyspark SQL函数: default value for not found keys?

    java - 让我的方法返回一个字符串

    java - 按运行时间对歌曲集合进行排序

    java - 将 add_jar 与 Swig 生成的 Java 绑定(bind)一起使用的正确方法是什么?

    scala - 如何从交叉验证器获得经过训练的最佳模型

    scala - 使用可为空而不是选项?