在Scala Spark Dataframe中展平嵌套的json

我有来自任何restapi的多个json,但我不知道其架构。我无法使用数据框的爆炸功能,因为我不知道由spark api创建的列名。

1.Can we store the keys of the nested arrays elements keys by decoding values from dataframe.schema.fields, As spark only provides the value part in the rows of the dataframe and take the top level key as column name.

数据框-

+--------------------+
|       stackoverflow|
+--------------------+
|[[[Martin Odersky...|
+--------------------+

是否有任何最佳方法可以通过在运行时确定架构来使用数据框方法来展平json。

样本Json-:

{
  "stackoverflow": [{
    "tag": {
      "id": 1,
      "name": "scala",
      "author": "Martin Odersky",
      "frameworks": [
        {
          "id": 1,
          "name": "Play Framework"
        },
        {
          "id": 2,
          "name": "Akka Framework"
        }
      ]
    }
  },
    {
      "tag": {
        "id": 2,
        "name": "java",
        "author": "James Gosling",
        "frameworks": [
          {
            "id": 1,
            "name": "Apache Tomcat"
          },
          {
            "id": 2,
            "name": "Spring Boot"
          }
        ]
      }
    }
  ]
}

注意-我们需要在dataframe中进行所有操作,因为有大量数据即将到来,因此我们无法解析每个json。

评论
hquas
hquas

Created helper function & You can directly call df.explodeColumns on DataFrame.

下面的代码将展平多级数组和结构类型列。

scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.annotation.tailrec
import scala.util.Try

implicit class DFHelpers(df: DataFrame) {
    def columns = {
      val dfColumns = df.columns.map(_.toLowerCase)
      df.schema.fields.flatMap { data =>
        data match {
          case column if column.dataType.isInstanceOf[StructType] => {
            column.dataType.asInstanceOf[StructType].fields.map { field =>
              val columnName = column.name
              val fieldName = field.name
              dfColumns.count(_ == fieldName.toLowerCase) match {
                case 0 => col(s"${columnName}.${fieldName}").as(s"${fieldName}")
                case _ => col(s"${columnName}.${fieldName}").as(s"${columnName}_${fieldName}")
              }
            }.toList
          }
          case column => List(col(s"${column.name}"))
        }
      }
    }

    def flatten: DataFrame = {
      val empty = df.schema.filter(_.dataType.isInstanceOf[StructType]).isEmpty
      empty match {
        case false =>
          df.select(columns: _*).flatten
        case _ => df
      }
    }
    def explodeColumns = {
      @tailrec
      def columns(cdf: DataFrame):DataFrame = cdf.schema.fields.filter(_.dataType.typeName == "array") match {
        case c if !c.isEmpty => columns(c.foldLeft(cdf)((dfa,field) => {
          dfa.withColumn(field.name,explode_outer(col(s"${field.name}"))).flatten
        }))
        case _ => cdf
      }
      columns(df.flatten)
    }
}

// Exiting paste mode, now interpreting.

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.annotation.tailrec
import scala.util.Try
defined class DFHelpers

scala> df.printSchema
root
 |-- stackoverflow: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- tag: struct (nullable = true)
 |    |    |    |-- author: string (nullable = true)
 |    |    |    |-- frameworks: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)


scala> df.explode
explode   explodeColumns

scala> df.explodeColumns.printSchema
root
 |-- author: string (nullable = true)
 |-- frameworks_id: long (nullable = true)
 |-- frameworks_name: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)


scala>
点赞
评论