为什么集群似乎在Spark Cogroup函数中不起作用

我有两个蜂巢群集表t1和t2

CREATE EXTERNAL TABLE `t1`(
   `t1_req_id` string,
    ...
PARTITIONED BY (`t1_stats_date` string)
CLUSTERED BY (t1_req_id) INTO 1000 BUCKETS

// t2 looks similar with same amount of buckets

该代码如下所示:

 val t1 = spark.table("t1").as[T1].rdd.map(v => (v.t1_req_id, v))
 val t2=  spark.table("t2").as[T2].rdd.map(v => (v.t2_req_id, v))

 val outRdd = t1.cogroup(t2)
      .flatMap { coGroupRes =>
        val key = coGroupRes._1
        val value: (Iterable[T1], Iterable[T2])= coGroupRes._2
        val t3List = // create a list with some logic on Iterable[T1] and Iterable[T2]
        t3List
 }
 outRdd.write....

I make sure that the both t1 and t2 table has same amount of partitions, and on spark-submit there are
spark.sql.sources.bucketing.enabled=true and spark.sessionState.conf.bucketingEnabled=true flags

但是Spark DAG并未显示出任何群集影响。似乎仍然有完整的数据洗牌 我还缺少什么,还有其他配置,调整吗?如何确保没有完整的数据混洗? 我的Spark版本是2.3.1

enter image description here

评论