在包含多行的键上进行火花合并(替换)

我正在使用Apache Spark,我想合并两个DataFrame,一个包含现有数据,另一个包含(潜在)更新。合并应该在给定数量的键属性上进行,但是,对于一组键属性,将有多个现有行需要替换为多个新行(如果“更新数据”的时间戳是最新的比现有数据的时间戳)。

作为可视化,假设现有数据是

+------+------+---------+-----------+------------+
| key1 | key2 | subkey3 | timestamp | attributes |
+------+------+---------+-----------+------------+
|    1 |    1 |       0 |         0 | something1 |
|    1 |    1 |       1 |         0 | something2 |
|    1 |    2 |       0 |         0 | something3 |
|    1 |    2 |       1 |         0 | something4 |
|    1 |    3 |       0 |         0 | something5 |
+------+------+---------+-----------+------------+

而更新数据是

+------+------+---------+-----------+----------------------+
| key1 | key2 | subkey3 | timestamp |      attributes      |
+------+------+---------+-----------+----------------------+
|    1 |    1 |       0 |         1 | something_new1       |
|    1 |    1 |       1 |         1 | something_new2       |
|    1 |    1 |       2 |         1 | something_new3       |
|    1 |    2 |       0 |         1 | something_new4       |
|    1 |    2 |       0 |         2 | something_even_newer |
|    1 |    4 |       0 |         1 | something6           |
+------+------+---------+-----------+----------------------+

那么生成的DataFrame应该如下所示:

+------+------+---------+-----------+----------------------+
| key1 | key2 | subkey3 | timestamp |      attributes      |
+------+------+---------+-----------+----------------------+
|    1 |    1 |       0 |         1 | something_new1       |
|    1 |    1 |       1 |         1 | something_new2       |
|    1 |    1 |       2 |         1 | something_new3       |
|    1 |    2 |       0 |         2 | something_even_newer |
|    1 |    3 |       0 |         0 | something5           |
|    1 |    4 |       0 |         1 | something6           |
+------+------+---------+-----------+----------------------+

So the merge, in this case, happens on the two keys key1 and key2 and if there are more recent rows for this compound key in the update data, then all existing rows of the same key will be replaced by the latest rows in the update data. Note that the number of rows for a given compound key may change in either direction after applying an update.

一种解决方案是这样的窗口排名:

df_merged = ( df_old
  .union(df_update)
  .withColumn("rank",
    rank().over(
      Window.partitionBy(
        col("key1"), 
        col("key2"), 
      )
      .orderBy(col("timestamp").desc())
     )
   )
   .filter(col("rank") == 1)
   .drop("rank")
)

假设所有数据都存储在Parquet或Delta表中,Spark中获得所需行为的最有效方法是什么?

评论