问题描述
m = Prophet()
m.fit(df)
遇到以下错误:
Unrecognized token 'Initial': was expecting 'null','true','false' or NaN
at [Source: Initial log joint probability = -13.932; line: 1,column: 8]
上述错误不断出现。尝试降级 numpy
,重新安装 pystan
和 fbprophet
,但问题仍未解决。
解决方法
我在尝试在 AWS EMR Spark 集群(使用 jupyter notebook 界面)上使用 prophet
时遇到了同样的问题/错误。经过多次故障排除后,我们意识到这是因为 Spark 期望返回一个特定的数据格式——我相信一个具有特定字段的 json——但是 prophet
返回一个 pandas
数据帧。
我通过在 pyspark 中编写用户定义的函数 (udf) 解决了这个问题,该函数允许我在 Spark 数据帧上使用先知并指定将从该 Spark 函数返回的数据。
我自己的解决方案基于 this example 和 this example 中 Spark 上 pandas_udf
的 prophet
函数。
下面是我写的函数的通用版本。为清楚起见,我试图在我拥有的数据上拟合时间序列模型以检测异常值,因此为什么我拟合并预测相同的数据。您还需要确保安装了 pyarrow
以在 Spark 中正确处理 pandas_udf
:
# Import relevant packages
import pyspark.sql.functions as F
import pyspark.sql.types as types
import prophet
# Define output schema of prophet model
output_schema = types.StructType([
types.StructField('id',types.IntegerType(),True),#args: name (string),data type,nullable (boolean)
types.StructField('ds',types.TimestampType(),types.StructField('yhat',types.DoubleType(),types.StructField('yhat_lower',types.StructField('yhat_upper',True)
])
# Function to fit Prophet timeseries model
@F.pandas_udf(output_schema,F.PandasUDFType.GROUPED_MAP)
def fit_prophet_model(df):
"""
:param df: spark dataframe containing our the data we want to model.
:return: returns spark dataframe following the output_schema.
"""
# Prep the dataframe for use in Prophet
formatted_df = df[['timestamp','value_of_interest']] \
.rename(columns = {'timestamp': 'ds','value_of_interest': 'y'}) \
.sort_values(by = ['ds'])
# Instantiate model
model = prophet.Prophet(interval_width = 0.99,growth = 'linear',daily_seasonality = True,weekly_seasonality = True,yearly_seasonality = True,seasonality_mode = 'multiplicative')
# Fit model and get fitted values
model.fit(formatted_df)
model_results = model.predict(formatted_df)[['ds','yhat','yhat_lower','yhat_upper']] \
.sort_values(by = ['ds'])
model_results['id'] = formatted_df['id'] #add grouping id
model_results = model_results[['id','ds','yhat_upper']] #get columns in correct order
return model_results
然后要在您的数据上运行该函数,只需执行以下操作:
results = (my_data.groupBy('id') \
.apply(fit_prophet_model)
)
results.show(10) #show first ten rows of the fitted model results