在分布式Spark DataFrame上使用functools reduce

我正在尝试将列列表添加到现有的Spark DataFrame中。

示例代码:

columns_list = ['col1', 'col2', 'col3', 'col4']
reduce(lambda df, col: df.withColumn(col, lit('NULL')), columns_list, df).show()

这给出了预期的结果。

Does using reduce() on Distributed Spark DataFrame will try to execute everything on Single Node?

评论
  • kquis
    kquis 回复

    OP问

    在Distributed Spark DataFrame上使用reduce()是否会尝试在单节点上执行所有操作?

    但是我认为OP真正想知道的是

    以下命令是否与Spark执行观点不同?

    产生玩具数据

    data = [
        ('1',),
        ('2',),
        ('3',),
        ('4',),
    ]
    df = spark.createDataFrame(data, ['id'])
    

    You can see the execution plan of your code using .explain()

    Scenario 1 (using functools.reduce)

    from functools import reduce
    from pyspark.sql.functions import col, lit
    columns_list = ['col1', 'col2', 'col3', 'col4']
    reduce(lambda df, col: df.withColumn(col, lit('NULL')), columns_list, df).show()
    result1 = reduce(lambda df, col: df.withColumn(col, lit('NULL')), columns_list, df)
    result1.explain()
    
    == Physical Plan ==
    *(1) Project [id#0, NULL AS col1#122, NULL AS col2#125, NULL AS col3#129, NULL AS col4#134]
    +- Scan ExistingRDD[id#0]
    

    Scenario 2 (@anky's code using select and list comprehension)

    result2 = df.select("*",*[lit('NULL').alias(i) for i in columns_list])
    result2.explain()
    
    == Physical Plan ==
    *(1) Project [id#0, NULL AS col1#140, NULL AS col2#141, NULL AS col3#142, NULL AS col4#143]
    +- Scan ExistingRDD[id#0]
    

    方案3(用于循环和迭代分配)

    result3 = df
    for i in columns_list:
        result3 = result3.withColumn(i, lit('NULL'))
    
    result3.explain()
    
    == Physical Plan ==
    *(1) Project [id#0, NULL AS col1#167, NULL AS col2#170, NULL AS col3#174, NULL AS col4#179]
    +- Scan ExistingRDD[id#0]
    

    Note that Scenario 3 does not work in 'base' Python (why functools.reduce() is necessary). OP, I suggest reading about the differences between Transformations and Actions in Spark. Spark generates a 'Plan' of execution first, which is why Reduce() is not required.