在Scala Spark Dataframe中展平嵌套的json

我有来自任何restapi的多个json,但我不知道其架构。我无法使用数据框的爆炸功能,因为我不知道由spark api创建的列名。

1.Can we store the keys of the nested arrays elements keys by decoding values from dataframe.schema.fields, As spark only provides the value part in the rows of the dataframe and take the top level key as column name.

数据框-

+--------------------+
|       stackoverflow|
+--------------------+
|[[[Martin Odersky...|
+--------------------+

是否有任何最佳方法可以通过在运行时确定架构来使用数据框方法来展平json。

样本Json-:

{
  "stackoverflow": [{
    "tag": {
      "id": 1,
      "name": "scala",
      "author": "Martin Odersky",
      "frameworks": [
        {
          "id": 1,
          "name": "Play Framework"
        },
        {
          "id": 2,
          "name": "Akka Framework"
        }
      ]
    }
  },
    {
      "tag": {
        "id": 2,
        "name": "java",
        "author": "James Gosling",
        "frameworks": [
          {
            "id": 1,
            "name": "Apache Tomcat"
          },
          {
            "id": 2,
            "name": "Spring Boot"
          }
        ]
      }
    }
  ]
}

注意-我们需要在dataframe中进行所有操作,因为有大量数据即将到来,因此我们无法解析每个json。

评论
  • hquas
    hquas 回复

    Created helper function & You can directly call df.explodeColumns on DataFrame.

    下面的代码将展平多级数组和结构类型列。

    scala> :paste
    // Entering paste mode (ctrl-D to finish)
    
    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._
    import scala.annotation.tailrec
    import scala.util.Try
    
    implicit class DFHelpers(df: DataFrame) {
        def columns = {
          val dfColumns = df.columns.map(_.toLowerCase)
          df.schema.fields.flatMap { data =>
            data match {
              case column if column.dataType.isInstanceOf[StructType] => {
                column.dataType.asInstanceOf[StructType].fields.map { field =>
                  val columnName = column.name
                  val fieldName = field.name
                  dfColumns.count(_ == fieldName.toLowerCase) match {
                    case 0 => col(s"${columnName}.${fieldName}").as(s"${fieldName}")
                    case _ => col(s"${columnName}.${fieldName}").as(s"${columnName}_${fieldName}")
                  }
                }.toList
              }
              case column => List(col(s"${column.name}"))
            }
          }
        }
    
        def flatten: DataFrame = {
          val empty = df.schema.filter(_.dataType.isInstanceOf[StructType]).isEmpty
          empty match {
            case false =>
              df.select(columns: _*).flatten
            case _ => df
          }
        }
        def explodeColumns = {
          @tailrec
          def columns(cdf: DataFrame):DataFrame = cdf.schema.fields.filter(_.dataType.typeName == "array") match {
            case c if !c.isEmpty => columns(c.foldLeft(cdf)((dfa,field) => {
              dfa.withColumn(field.name,explode_outer(col(s"${field.name}"))).flatten
            }))
            case _ => cdf
          }
          columns(df.flatten)
        }
    }
    
    // Exiting paste mode, now interpreting.
    
    import org.apache.spark.sql.{DataFrame, SparkSession}
    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._
    import scala.annotation.tailrec
    import scala.util.Try
    defined class DFHelpers
    
    scala> df.printSchema
    root
     |-- stackoverflow: array (nullable = true)
     |    |-- element: struct (containsNull = true)
     |    |    |-- tag: struct (nullable = true)
     |    |    |    |-- author: string (nullable = true)
     |    |    |    |-- frameworks: array (nullable = true)
     |    |    |    |    |-- element: struct (containsNull = true)
     |    |    |    |    |    |-- id: long (nullable = true)
     |    |    |    |    |    |-- name: string (nullable = true)
     |    |    |    |-- id: long (nullable = true)
     |    |    |    |-- name: string (nullable = true)
    
    
    scala> df.explode
    explode   explodeColumns
    
    scala> df.explodeColumns.printSchema
    root
     |-- author: string (nullable = true)
     |-- frameworks_id: long (nullable = true)
     |-- frameworks_name: string (nullable = true)
     |-- id: long (nullable = true)
     |-- name: string (nullable = true)
    
    
    scala>