在Databricks中使用Sparklyr收集表

问题描述

我有一个parquet表,大约有5 billion rows。使用sparklyr进行所有操作后,它会减少为1,880,573 rows629 columns。当我尝试使用Factor Analysis收集sdf_collect()的信息时,它给了我这个内存错误

Error : org.apache.spark.sql.execution.OutOfMemorySparkException: Total memory usage during row decode exceeds spark.driver.maxResultSize (4.0 GB). The average row size was 5.0 KB

1,573 rows x 629 columns是否太大而无法收集sparklyr?此外,使用data %>% dplyr::count()检查行数花费了9 minutes-如何减少这个时间?

解决方法

是的。 # secondary yaxis column_implied_lst = [e for e in dfd2[available_crops].columns if e[:4]==selected_column[:4]] column_implied = column_implied_lst[0] fig.add_trace(go.Bar(x=dfd2[available_crops].index,y=dfd2[available_crops][column_implied],marker_color = "rgba(255,0.4)"),secondary_y=True) fig.update_layout(yaxis2=dict(title=dict(text='DF: ' + selected_produce +' | Crops: ' + available_crops + ' | Column: '+ column_implied))) column_implied_lst = [e for e in dfd2[available_crops].columns if e[:4]==selected_column[:4]]太大。这不仅是一个棘手的问题,但是您的R实例将其收集到本地内存时会遇到很多麻烦。

关于count(),当您处理这种大小的数据时,不需要9分钟。您可以尝试的一件事是将计数减少到一个变量。 from jupyter_dash import JupyterDash import dash_core_components as dcc import dash_html_components as html from dash.dependencies import Input,Output # data from jupyter_dash import JupyterDash import dash_core_components as dcc import dash_html_components as html from dash.dependencies import Input,Output,State,ClientsideFunction import dash_core_components as dcc import dash_html_components as html import pandas as pd import plotly.graph_objs as go from dash.dependencies import Input,Output import dash_bootstrap_components as dbc import numpy as np from plotly.subplots import make_subplots import plotly.express as px import pandas as pd from pandas import Timestamp import numpy as np # data ########################################################################## index1= [1,2,3,4] columns1 =['time','2m_temp_prod','total_precip_prod'] index2= [1,4] columns2 = ['time','2m_temp_area','total_precip_area'] df_vals_prod = {'corn': pd.DataFrame(index=index1,columns = columns1,data= np.random.randn(len(index1),len(columns1))).cumsum(),'soybeans' : pd.DataFrame(index=index1,len(columns1))).cumsum()} df_vals_area= {'corn': pd.DataFrame(index=index2,columns = columns2,data= np.random.randn(len(index2),len(columns2))).cumsum(),'soybeans' : pd.DataFrame(index=index2,len(columns2))).cumsum()} # mimic data properties of your real world data df_vals_prod['corn']['time'] = [Timestamp('2020-09-23 06:00:00'),Timestamp('2020-09-23 12:00:00'),Timestamp('2020-09-23 18:00:00'),Timestamp('2020-09-24 00:00:00')] df_vals_prod['corn'].set_index('time',inplace = True) df_vals_prod['soybeans']['time'] = [Timestamp('2020-09-23 06:00:00'),Timestamp('2020-09-24 00:00:00')] df_vals_prod['soybeans'].set_index('time',inplace = True) df_vals_area['corn']['time'] = [Timestamp('2020-09-23 06:00:00'),Timestamp('2020-09-24 00:00:00')] df_vals_area['corn'].set_index('time',inplace = True) df_vals_area['soybeans']['time'] = [Timestamp('2020-09-23 06:00:00'),Timestamp('2020-09-24 00:00:00')] df_vals_area['soybeans'].set_index('time',inplace = True) # dash ########################################################################## app = JupyterDash(__name__) # weighting all_options = { 'prod': list(df_vals_prod[list(df_vals_prod.keys())[0]].columns),'area': list(df_vals_area[list(df_vals_prod.keys())[0]].columns) } app.layout = html.Div([ dcc.Dropdown( id='produce-radio',options=[{'label': k,'value': k} for k in all_options.keys()],value='area' ),# dcc.Dropdown( # id='produce-radio',# options=[ # {'label': k,'value': k} for k in all_options.keys() # ],# value='prod',# clearable=False),html.Hr(),dcc.Dropdown( id='crop-radio','value': k} for k in list(df_vals_prod.keys())],value=list(df_vals_prod.keys())[0] ),dcc.Dropdown(id='columns-radio'),html.Div(id='display-selected-values'),dcc.Graph(id="crop-graph") ]) # Callbacks ##################################################################### # Weighting selection. @app.callback( # Dataframe PROD or AREA Output('columns-radio','options'),# layout element: dcc.RadioItems(id='produce-radio'...) [Input('produce-radio','value')]) def set_columns_options(selected_produce): varz = [{'label': i,'value': i} for i in all_options[selected_produce]] print('cb1 output: ') print(varz) return [{'label': i,'value': i} for i in all_options[selected_produce]] # Columns selection @app.callback( Output('columns-radio','value'),# layout element: dcc.RadioItems(id='columns-radio'...) [Input('columns-radio','options')]) def set_columns(available_options): return available_options[0]['value'] # Crop selection @app.callback( Output('crop-radio',# layout element: dcc.RadioItems(id='columns-radio'...) [Input('crop-radio','options')]) def set_crops(available_crops): return available_crops[0]['value'] # Display selections in its own div @app.callback( # Columns 2m_temp_prod,or.... Output('display-selected-values','children'),[Input('produce-radio',Input('crop-radio',Input('columns-radio','value')]) def set_display_children(selected_produce,available_crops,selected_column): return('DF: ' + selected_produce +' | Crops: ' + available_crops + ' | Column: '+ selected_column) # Make a figure based on the selections @app.callback( # Columns 2m_temp_prod,or.... Output('crop-graph','figure'),'value')]) def make_graph(selected_produce,selected_column): #global selected_column # data source / weighting if selected_produce == 'prod': dfd = df_vals_prod dfd2 = df_vals_area if selected_produce == 'area': dfd = df_vals_area dfd2 = df_vals_prod # plotly figure # primary yaxis fig = make_subplots(specs=[[{"secondary_y": True}]]) fig.add_trace(go.Scatter(x=dfd[available_crops].index,y=dfd[available_crops][selected_column]),secondary_y=False) fig.update_layout(yaxis1=dict(title=dict(text='DF: ' + selected_produce +' | Crops: ' + available_crops + ' | Column: '+ selected_column))) # secondary yaxis column_implied_lst = [e for e in dfd2[available_crops].columns if e[:4]==selected_column[:4]] column_implied = column_implied_lst[0] fig.add_trace(go.Bar(x=dfd2[available_crops].index,secondary_y=True) fig.update_layout(yaxis2=dict(title=dict(text='DF: ' + selected_produce +' | Crops: ' + available_crops + ' | Column: '+ column_implied))) # layout makeover fig.update_layout(title=dict(text='Column to match: '+ selected_column + '| Implied match: ' +column_implied)) fig['layout']['yaxis2']['showgrid'] = False return(fig) app.run_server(mode='inline',port = 8077,dev_tools_ui=True,dev_tools_hot_reload =True,threaded=True) 。话虽这么说,除了增加您的spark会话参数(即执行程序的数量)之外,我认为没有其他方法可以大大加快这次的速度。

如果可以的话,我建议在spark中进行因子分析,或者使用较小的数据样本。

,

使用 sparklyr::spark_write_* 将 spark 数据帧导出到磁盘,然后将其读入您的 R 会话。

Parquet 是一个不错的选择,因为它具有快速且紧凑的读/写能力。 在写入操作导致单个文件之前,使用 sparklyr::repartition 将 spark 数据帧重新分区为一部分。这比读入 R 更好,而不是多个文件然后进行后续的行绑定操作。

建议不要使用 collect 函数收集“大”(取决于您的 Spark 配置、RAM)数据帧,因为它可能会将所有数据带到驱动程序节点。

相关问答

Selenium Web驱动程序和Java。元素在(x,y)点处不可单击。其...
Python-如何使用点“。” 访问字典成员?
Java 字符串是不可变的。到底是什么意思?
Java中的“ final”关键字如何工作?(我仍然可以修改对象。...
“loop:”在Java代码中。这是什么,为什么要编译?
java.lang.ClassNotFoundException:sun.jdbc.odbc.JdbcOdbc...