使用sparklyr在R中调用Spark窗口函数

问题描述

我一直在尝试在sparklyr中复制以下pyspark代码段,但没有运气。

from pyspark.sql.window import Window
from pyspark.sql.functions import concat,col,lit,approx_count_distinct,countdistinct

df = spark.sql("select * from mtcars")

dff = df.withColumn("test",concat(col("gear"),lit(" "),col("carb")))
w = Window.partitionBy("cyl").orderBy("cyl")
  
dff.withColumn("distinct",approx_count_distinct("test").over(w)).show()

我确实设法像这样工作:

tbl(sc,"mtcars")%>% 
  spark_dataframe() %>% 
  invoke("withColumn","concat",invoke_static(sc,"org.apache.spark.sql.functions","expr","concat(gear,carb)")) %>% 
  sdf_register()

我似乎无法弄清楚如何调用Window.partitionBy()Window.orderBy()

# Doesn't work
w <- invoke_static(sc,"org.apache.spark.sql.expressions.Window","partitionBy","cyl")

一些指针会很有帮助!

解决方法

这应该可以帮助您:

w <- orderBy(windowPartitionBy("cyl"),"cyl")
dff <- select(dff,over(approx_count_distinct("test"),w))
,

您可以直接通过管道传递SQL。

mtcars_spk <- copy_to(sc,mtcars,"mtcars_spk",overwrite = TRUE)
mtcars_spk2 <- mtcars_spk %>%
                dplyr::mutate(test = paste0(gear," ",carb)) %>%
                dplyr::mutate(discnt = sql("approx_count_distinct(test) OVER (PARTITION BY cyl)"))

在这里值得注意的是,这是一种罕见的情况,sparklyr支持其他窗口功能。如果您只想将计数或最小(齿轮)按cyl划分,则可以轻松地做到这一点。

mtcars_spk <- copy_to(sc,overwrite = TRUE)
mtcars_spk <- mtcars_spk %>%
                group_by(cyl) %>%
                arrange(cyl) %>%
                mutate(cnt = count(),mindis= min(disp)

链接类似的线程:

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...