java - 聚合函数计算Spark中groupBy的使用情况

我正试图在pyspark的一行代码中进行多个操作,
不知道这对我来说是否可行。
我的目的是不必将输出保存为新的数据帧。
我当前的代码相当简单:

encodeUDF = udf(encode_time, StringType())
new_log_df.cache().withColumn('timePeriod', encodeUDF(col('START_TIME')))
  .groupBy('timePeriod')
  .agg(
    mean('DOWNSTREAM_SIZE').alias("Mean"),
    stddev('DOWNSTREAM_SIZE').alias("Stddev")
  )
  .show(20, False)

我的目的是在使用count()之后添加groupBy,以获得与每个值匹配的记录计数,打印显示为输出。
当尝试使用timePeriod时,我会得到异常。
是否有任何方法可以同时实现groupBy(..).count().agg(..)count()。show()打印,而无需将代码拆分为两行命令,例如:
new_log_df.withColumn(..).groupBy(..).count()
new_log_df.withColumn(..).groupBy(..).agg(..).show()

或者更好的方法是,将合并的输出转换为agg()output-一个额外的列,该列说明与行值匹配的记录计数数。例如。:
timePeriod | Mean | Stddev | Num Of Records
    X      | 10   |   20   |    315


最佳答案:

count()可用于agg()内部,因为groupBy表达式相同。
用蟒蛇

import pyspark.sql.functions as func

new_log_df.cache().withColumn("timePeriod", encodeUDF(new_log_df["START_TIME"])) 
  .groupBy("timePeriod")
  .agg(
     func.mean("DOWNSTREAM_SIZE").alias("Mean"), 
     func.stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     func.count(func.lit(1)).alias("Num Of Records")
   )
  .show(20, False)

pySpark SQL functions doc
带scala
import org.apache.spark.sql.functions._ //for count()

new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME"))) 
  .groupBy("timePeriod")
  .agg(
     mean("DOWNSTREAM_SIZE").alias("Mean"), 
     stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     count(lit(1)).alias("Num Of Records")
   )
  .show(20, false)

count(1)将按等于count("timePeriod")
用Java
import static org.apache.spark.sql.functions.*;

new_log_df.cache().withColumn("timePeriod", encodeUDF(col("START_TIME"))) 
  .groupBy("timePeriod")
  .agg(
     mean("DOWNSTREAM_SIZE").alias("Mean"), 
     stddev("DOWNSTREAM_SIZE").alias("Stddev"),
     count(lit(1)).alias("Num Of Records")
   )
  .show(20, false)