apache-spark - Spark DataFrame: 计算每个列的不同值

112 1

问题几乎在标题中: 是否有一种有效的方法来计算DataFrame中每个列的不同值?

描述方法只提供计数, 但不提供非重复计数, 我想知道是否有办法获得所有( 或某些选定的) 列的非重复计数 。

时间: 原作者:

117 5

计算多个聚合是相当昂贵的。 我建议你用近似方法代替。 在这种情况下,接近不同的计数:


val df = Seq((1,3,4),(1,2,3),(2,3,4),(2,3,5)).toDF("col1","col2","col3")



val exprs = df.columns.map((_ ->"approx_count_distinct")).toMap


df.agg(exprs).show()


//+---------------------------+---------------------------+---------------------------+


//|approx_count_distinct(col1)|approx_count_distinct(col2)|approx_count_distinct(col3)|


//+---------------------------+---------------------------+---------------------------+


//| 2| 2| 3|


//+---------------------------+---------------------------+---------------------------+



approx_count_distinct 方法依赖于hood下的HyperLogLog 。

HyperLogLog算法及其变体 HyperLogLog++ ( 在Spark中实现) 依赖于以下巧妙的观察。

如果数字在一个范围内均匀分布,那么不同元素的计数可以从数字的二进制表示中的最大前导零数来近似。

例如,如果我们观察到二进制形式的数字是 0…(k times)…01…1 形式的数字,那么我们可以估计该集合中的 2 ^k元素的顺序 。 这是一个非常粗略的估计,但是,它可以通过草图算法精确到精确。

可以在原始论文中找到对该算法背后的机制的详尽解释 。

注:启动 Spark 1.6时启动 Spark SELECT SOME_AGG(DISTINCT foo)), SOME_AGG(DISTINCT bar)) FROM df 每个子句都应该为每个子句触发单独的聚合。 而这与 SELECT SOME_AGG(foo), SOME_AGG(bar) FROM df 我们聚合一次的地方。因此当使用 count(distinct(_))approxCountDistinct ( 或 approx_count_distinct ) 时性能将无法比较 。

它是 Spark 1.6之后行为的变化之一:

利用针对具有不同聚合的查询的改进的查询规划器(SPARK-9241),具有单个不同聚合的查询的计划已被改变为更健壮的版本。 要切换回由 Spark 1.5计划程序生成的计划,请将 spark.sql.specializeSingleDistinctAggPlanning 设置为 true 。 ( SPARK-12077 )

参考:Apache Spark 中的近似算法: HyperLogLog和 Quantiles 。

原作者:
131 5

pySpark 中,你可以使用 countDistinct() 执行类似的操作:


from pyspark.sql.functions import col, countDistinct



df.agg(*(countDistinct(col(c)).alias(c) for c in df.columns))



Scala 中类似:


import org.apache.spark.sql.functions.countDistinct


import org.apache.spark.sql.functions.col



df.select(df.columns.map(c => countDistinct(col(c)).alias(c)): _*)



如果你想在可能的精度损失的情况下加快速度,你也可以使用approxCountDistinct()。

原作者:
...