汇总标准差并计算Sparklyr中的非NA无NA

问题描述

我有一个很大的data.frame,并且我一直在使用summariseacross来汇总众多变量的摘要统计信息。由于data.frame的大小,我不得不开始处理sparklyr中的数据。

由于sparklyr不支持across,因此我正在使用summarise_each。除summarise_each中的sparklyr似乎不支持sdsum(!is.na(.))

之外,此方法工作正常

下面是一个示例数据集,以及我通常如何使用dplyr处理它:

test <- data.frame(ID = c("Group1","Group1",'Group1',"Group2",'Group2',"Group3","Group3"),Value1 = c(-100,-10,-5,1,2,3,4,3),Value2 = c(50,100,10,3))
test %>% 
  group_by %>%
  summarise(across((Value1:Value2),~sum(!is.na(.),na.rm = TRUE),.names = "{col}_count"),across((Value1:Value2),~min(.,.names = "{col}_min"),~max(.,.names = "{col}_max"),~mean(.,.names = "{col}_mean"),~sd(.,.names = "{col}_sd"))

# A tibble: 1 x 10
  Value1_count Value2_count Value1_min Value2_min Value1_max Value2_max Value1_mean Value2_mean Value1_sd Value2_sd
         <int>        <int>      <dbl>      <dbl>      <dbl>      <dbl>       <dbl>       <dbl>     <dbl>     <dbl>
1           17           17       -100         -5          4        100       -5.53        11.2      24.7      25.8

我还能够使用summarise_each成功实现相同的答案,如下所示:

test %>% 
  group_by(ID) %>%
  summarise_each(funs(min = min(.,max = max(.,mean = mean(.,sum = sum(.,sd = sd(.,na.rm = TRUE)))

  ID     Value1_min Value2_min Value1_max Value2_max Value1_mean Value2_mean Value1_sum Value2_sum
  <fct>       <dbl>      <dbl>      <dbl>      <dbl>       <dbl>       <dbl>      <dbl>      <dbl>
1 Group1       -100         -5          2        100      -17.4        23          -122        161
2 Group2          1          2          4          4        3.14        3.29         22         23
3 Group3          1          1          3          3        2           2             6          6

使用sparklyr时,我已经能够成功计算出minmaxmeansum,如下所示:

sc <- spark_connect(master = "local",version = "2.4.3")
test <- spark_read_csv(sc = sc,path = "C:\\path\\test space.csv")

test %>% 
  group_by(ID) %>%
  summarise_each(funs(min = min(.,na.rm = TRUE)))
# Source: spark<?> [?? x 9]
  ID     Value1_min Value_2_min Value1_max Value_2_max Value1_mean Value_2_mean Value1_sum Value_2_sum
  <chr>       <int>       <int>      <int>       <int>       <dbl>        <dbl>      <dbl>       <dbl>
1 Group2          1           2          4           4        3.14         3.29         22          23
2 Group3          1           1          3           3        2            2             6           6
3 Group1       -100          -5          2         100      -17.4         23          -122         161

但是尝试获取sdsum(!is.na(.))时收到错误消息,以下是我收到的代码错误消息。有什么办法可以帮助汇总这些值?

test %>% 
  group_by(ID) %>%
  summarise_each(funs(min = min(.,na.rm = TRUE)))

Error: org.apache.spark.sql.catalyst.parser.ParseException: 
mismatched input 'AS' expecting ')'(line 1,pos 298)

== sql ==
SELECT `ID`,MIN(`Value1`) AS `Value1_min`,MIN(`Value_2`) AS `Value_2_min`,MAX(`Value1`) AS `Value1_max`,MAX(`Value_2`) AS `Value_2_max`,AVG(`Value1`) AS `Value1_mean`,AVG(`Value_2`) AS `Value_2_mean`,SUM(`Value1`) AS `Value1_sum`,SUM(`Value_2`) AS `Value_2_sum`,stddev_samp(`Value1`,TRUE AS `na.rm`) AS `Value1_sd`,stddev_samp(`Value_2`,TRUE AS `na.rm`) AS `Value_2_sd`
----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------^^^
FROM `test_space_30172a44_c0aa_4305_9a5e_d45fa77ba0b9`
GROUP BY `ID`

    at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:241)
    at org.apache.spark.sql.catalyst.parser.AbstractsqlParser.parse(ParseDriver.scala:117)
    at org.apache.spark.sql.execution.SparksqlParser.parse(SparksqlParser.scala:48)
    at org.apache.spark.sql.catalyst.parser.AbstractsqlParser.parsePlan(ParseDriver.scala:69)
    at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:642)
    at sun.reflect.GeneratedMethodAccessor66.invoke(UnkNown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at sparklyr.Invoke.invoke(invoke.scala:147)
    at sparklyr.StreamHandler.handleMethodCall(stream.scala:136)
    at sparklyr.StreamHandler.read(stream.scala:61)
    at sparklyr.BackendHandler$$anonfun$channelRead0$1.apply$mcV$sp(handler.scala:58)
    at scala.util.control.Breaks.breakable(Breaks.scala:38)
    at sparklyr.BackendHandler.channelRead0(handler.scala:38)
    at sparklyr.BackendHandler.channelRead0(handler.scala:14)
    at io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:105)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.MessagetoMessageDecoder.channelRead(MessagetoMessageDecoder.java:102)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.handler.codec.BytetoMessageDecoder.fireChannelRead(BytetoMessageDecoder.java:310)
    at io.netty.handler.codec.BytetoMessageDecoder.channelRead(BytetoMessageDecoder.java:284)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    at java.lang.Thread.run(Thread.java:748)
In addition: Warning messages:
1: Named arguments ignored for sql stddev_samp 
2: Named arguments ignored for sql stddev_samp 

解决方法

问题是na.rm参数。 Spark的stddev_samp函数没有这样的参数,sparklyr似乎没有处理它。

缺少的值总是在SQL中删除,因此您无需指定na.rm

test_spark %>% 
  group_by(ID) %>%
  summarise_each(funs(min = min(.),max = max(.),mean = mean(.),sum = sum(.),sd = sd(.)))
#> # Source: spark<?> [?? x 11]
#>   ID     Value1_min Value2_min Value1_max Value2_max Value1_mean Value2_mean
#>   <chr>       <dbl>      <dbl>      <dbl>      <dbl>       <dbl>       <dbl>
#> 1 Group2          1          2          4          4        3.14        3.29
#> 2 Group1       -100         -5          2        100      -17.4        23   
#> 3 Group3          1          1          3          3        2           2   
#>   Value1_sum Value2_sum Value1_sd Value2_sd
#>        <dbl>      <dbl>     <dbl>     <dbl>
#> 1         22         23      1.21     0.951
#> 2       -122        161     36.6     38.6  
#> 3          6          6      1        1  

这似乎是特定于summarise的错误,例如sdna.rm的{​​{1}}。

mutate

对于test_spark %>% group_by(ID) %>% mutate_each(funs(sd = sd(.,na.rm = TRUE))) ,您只需要将其写为sum(!is.na(.))

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...