问题描述
这个问题是this thread的后续行动
我想在磁盘框架上执行三个操作
- 计算由两列(key_a和key_b)分组的
id
字段的不同值 - 计算按两列中的第一列(key_a)分组的字段
id
的不同值 - 添加一列,其中第一列具有不同的值/两列均具有不同的值
这是我的代码
my_df <-
data.frame(
key_a = rep(letters,384),key_b = rep(rev(letters),id = sample(1:10^6,9984)
)
my_df %>%
select(key_a,key_b,id) %>%
chunk_group_by(key_a,key_b) %>%
# stage one
chunk_summarize(count = n_distinct(id)) %>%
collect %>%
group_by(key_a,key_b) %>%
# stage two
mutate(count_summed = sum(count)) %>%
group_by(key_a) %>%
mutate(count_all = sum(count)) %>%
ungroup() %>%
mutate(percent_of_total = count_summed / count_all)
我的数据采用磁盘框架而不是数据框架的格式,并且具有100M行和8列。
我正在遵循此documentation
中描述的两步说明我担心collect
会使我的机器崩溃,因为它将一切都带到了内存中
我是否必须使用collect
才能在磁盘框架中使用dplyr group bys?
解决方法
您应始终使用srckeep
仅将所需的列加载到内存中。
my_df %>%
srckeep(c("key_a","key_b","id")) %>%
# select(key_a,key_b,id) %>% # no need if you use srckeep
chunk_group_by(key_a,key_b) %>%
# stage one
chunk_summarize(count = n_distinct(id)) %>%
collect %>%
group_by(key_a,key_b) %>%
# stage two
mutate(count_summed = sum(count)) %>%
group_by(key_a) %>%
mutate(count_all = sum(count)) %>%
ungroup() %>%
mutate(percent_of_total = count_summed / count_all)
collect
仅将计算chunk_group_by
和chunk_summarize
的结果带入RAM。它不应该使您的计算机崩溃。
您必须像使用Spark之类的其他系统一样使用collect
。
但是,如果您正在计算n_distinct
,则可以分一个阶段完成
my_df %>%
srckeep(c("key_a","id")) %>%
#select(key_a,id) %>%
group_by(key_a,key_b) %>%
# stage one
summarize(count = n_distinct(id)) %>%
collect
如果您真的很担心RAM的使用,则可以将worker数量减少到1
setup_disk.frame(workers=1)
my_df %>%
srckeep(c("key_a",key_b) %>%
# stage one
summarize(count = n_distinct(id)) %>%
collect
setup_disk.frame()