java - Spark 在按键进行 group-reduce 时嵌套结构的序列化不正确

标签 java apache-spark apache-spark-sql

我想按键减少数据帧。 reduce 逻辑非常复杂,需要更新大约 10-15 个字段。这就是为什么我想将 DataFrame 转换为 DataSet 并减少 Java POJO。

问题

问题是groupByKey-reduceByKey之后我得到了一些非常奇怪的值。 Encoders.bean(Entity.class)读取正确的数据。 请参阅代码示例部分

解决方法

替换 Encoders.beanEncoders.kryo不起作用,异常:

Try to map struct<broker_name:string,server_name:string,order:int,storages:array<struct<timestamp:timestamp,storage:double>>> to Tuple1, but failed as the number of fields does not line up.

我还看到了this workarround ,但是Encoders.product需要TypeTag 。我不知道如何创建TypeTag在 Java 代码中。

代码示例

    Dataset<Entity> ds = createDataFrame("testData.json", "testSchema.json")
        .as(Encoders.bean(Entity.class));

    // shows correct numbers
    ds.show(10, false);

    // correct data, please pay attention to `storages` column values
+-----------+-----------+-----+-------------------------------+
|broker_name|server_name|order|storages                       |
+-----------+-----------+-----+-------------------------------+
|A1         |S1         |1    |[[2018-10-29 23:11:44, 12.5]]  |
|A2         |S1         |1    |[[2018-10-30 14:43:05, 13.2]]  |
|A3         |S1         |2    |[[2019-11-02 10:00:03, 1001.0]]|
+-----------+-----------+-----+-------------------------------+


    //after reduce shows wrong numbers
    ds
        .groupByKey(o -> new RowKey(o.getBroker_name(), o.getServer_name(), o.getOrder()), Encoders.bean(RowKey.class))
        .reduceGroups((e1, e2) -> e1)
        .map(tuple -> tuple._2, Encoders.bean(Entity.class))
        .show(10, false);

    // wrong values, please pay attention to `storages` column
+-----------+-----+-----------+---------------------------------------------------------+
|broker_name|order|server_name|storages                                                 |
+-----------+-----+-----------+---------------------------------------------------------+
|A1         |2    |S1         |[[7.77011509161492E-309, 149386-07-09 23:48:5454.211584]]|
|A1         |1    |S1         |[[7.61283374479283E-309, 148474-03-19 21:14:3232.5248]]  |
+-----------+-----+-----------+---------------------------------------------------------+

Entity.java

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Entity implements Serializable {
    private String broker_name;
    private String server_name;
    private Integer order;
    private Storage[] storages;
}

Storage.java

@Data
@NoArgsConstructor
@AllArgsConstructor
public class Storage implements Serializable {
    private Timestamp timestamp;
    private Double storage;
}

testData.json:

[
  {
    "broker_name": "A1",
    "server_name": "S1",
    "order": 1,
    "storages": [
      {
        "timestamp": "2018-10-29 23:11:44.000",
        "storage": 12.5
      }
    ]
  },
  {
    "broker_name": "A1",
    "server_name": "S1",
    "order": 1,
    "storages": [
      {
        "timestamp": "2018-10-30 14:43:05.000",
        "storage": 13.2
      }
    ]
  },
  {
    "broker_name": "A1",
    "server_name": "S1",
    "order": 2,
    "storages": [
      {
        "timestamp": "2019-11-02 10:00:03.000",
        "storage": 1001.0
      }
    ]
  }
]

testSchema.json:

{
  "type": "struct",
  "fields": [
    {
      "name": "broker_name",
      "type": "string",
      "nullable": true,
      "metadata": {}
    },
    {
      "name": "server_name",
      "type": "string",
      "nullable": true,
      "metadata": {}
    },
    {
      "name": "order",
      "type": "integer",
      "nullable": true,
      "metadata": {}
    },
    {
      "name": "storages",
      "type": {
        "type": "array",
        "elementType": {
          "type": "struct",
          "fields": [
            {
              "name": "timestamp",
              "type": "timestamp",
              "nullable": true,
              "metadata": {}
            },
            {
              "name": "storage",
              "type": "double",
              "nullable": true,
              "metadata": {}
            }
          ]
        },
        "containsNull": true
      },
      "nullable": true,
      "metadata": {}
    }
  ]
}

最佳答案

这是因为反序列化对 Encoder 推断的模式使用结构匹配,并且由于 Bean 类没有自然结构,因此模式的字段按名称排序。

因此,如果您定义一个类似于 Entity 的 bean 类,则从 bean Encoder 推断出的架构将为

Encoders.bean(Storage.class).schema().printTreeString();
root
 |-- storage: double (nullable = true)
 |-- timestamp: timestamp (nullable = true)

不是

root
 |-- timestamp: timestamp (nullable = true)
 |-- storage: double (nullable = true)

这是应该使用数据集的架构。换句话说,架构定义为:

StructType schema = Encoders.bean(Entity.class).schema();

StructType schema = StructType.fromDDL(
  "broker_name string, order integer, server_name string, " + 
  "storages array<struct<storage: double, timestamp: timestamp>>" 
);

将是有效的,并且可以用于直接加载testData:

Dataset<Entity> ds = spark.read()
  .option("multiline", "true")
  .schema(schema)
  .json("testData.json")
  .as(Encoders.bean(Entity.class));

而您当前的架构相当于:

StructType valid = StructType.fromDDL(
  "broker_name string, order integer, server_name string, " + 
  "storages array<struct<timestamp: timestamp, storage: double>>" 
);

不是,尽管它可以与 JSON 读取器配合使用,而 JSON 读取器(与编码器相反)按名称匹配数据。

可以说,这种行为应该被报告为错误 - 直观上,Encoder 不应转储与其自身加载逻辑不兼容的数据。

相关 JIRA 票证 - SPARK-27050

关于java - Spark 在按键进行 group-reduce 时嵌套结构的序列化不正确,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54987724/

相关文章:

java - 我必须拆分数组数据,使其成为键值对并保存在 map 中

apache-spark - Apache Spark : Dag is not executed twice for reduceByKey

apache-spark - Tableau 连接到 Spark SQL

java - Spark SQL Java GenericRowWithSchema 无法转换为 java.lang.String

oracle - 为什么Oracle的Spark查询(负载)与SQOOP相比这么慢?

Java - 通过证书授权

java - HTTP 状态 500 - 无法为 JSP : in Eclipse Luna 编译类

java - 使 IntelliJ 生成测试而不抛出异常

java - 旋转 DataFrame - Spark SQL

opencv - 在 hadoop 中以分布式模式读取 haar 级联