简介
Prophet是开源的时间序列预测工具,使用时间序列分解与机器学习拟合的方法进行建模预测
,关于prophet模型优点本文不再累述,网络上的文章也比较多了,各种可视化,参数的解释与demo演示,但是真正用到工业上大规模的可供学习的中文材料并不多。
本文打算使用PySpark进行多序列预测建模,会给出一个比较详细的脚本,供交流学习,重点在于使用hive数据/分布式,数据预处理,以及pandas_udf对多条序列进行循环执行。
tips:背景说明,在十万级别的sku序列上使用prophet预测每个序列未来七天的销售。
1.导入库和初始化设置
Pandas Udf 构建在 Apache Arrow 之上,因此具有低开销,高性能的特点,udf对每条记录都会操作一次,数据在 JVM 和 Python 中传输,pandas_udf就是使用 Java 和 Scala 中定义 UDF,然后在 python 中调用。
import datetime
from dateutil.relativedelta import relativedelta
from fbprophet import Prophet
import pandas as pd
import numpy as np
import warnings
warnings.filterwarnings('ignore')
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
spark = SparkSession. \
Builder(). \
config("spark.sql.execution.arrow.enabled", "true"). \
enableHiveSupport(). \
getOrCreate()
其中初始化config:开启spark df与pandas df 相互转化的性能优化配置.
2.数据预处理
def sale_ds(df):
df['ds'] = pd.to_datetime(df['ds'])
df = df[['store_sku', 'ds', 'y']]
start_day = (
df['ds'].max() -
relativedelta(
days=63)).strftime('%Y-%m-%d')
df = df[df['ds'] >= start_day][['store_sku', 'ds', 'y']]
sale_set = df.groupby(
['store_sku']).filter(
lambda x: len(x) >= 14 and np.sum(
x['y']) > 7)
return sale_set
def replace_fill(data):
"""
先尝试使用上周的数据填补,再针对极端的数据进行cap,保障序列的完整和平滑性
:param data:单个序列
:param name: 序列名称,store_sku
:return: 修复后的一条序列
"""
data['ds'] = pd.to_datetime(data['ds'], format='%Y-%m-%d')
data['y'] = data['y'].astype(float)
data.loc[data['y'] <= 0, 'y'] = np.NaN
data.loc[data['y'].isnull(), 'y'] = data['y'].shift(7).values[0]
data.loc[data['y'].isnull(), 'y'] = data['y'].shift(-7).values[0]
data.loc[data['y'].isnull(), 'y'] = data['y'].shift(-14).values[0]
data.loc[data['y'].isnull(), 'y'] = data['y'].shift(14).values[0]
data.loc[data['y'].isnull(), 'y'] = data['y'].interpolate(methon='nearest', order=3)
low = data[data['y'] > 0]['y'].quantile(0.10)
high = data[data['y'] > 0]['y'].quantile(0.90)
data.loc[data['y'] < low, 'y'] = np.NaN
data.loc[data['y'] > high, 'y'] = np.NaN
data['y'] = data['y'].fillna(data['y'].mean())
data['y'] = np.log1p(data['y'])
return data
以上为数据预处理,具体内容见注释.
放入模型中的时间和y值名称必须是ds和y,首先控制数据的周期长度,如果预测天这种粒度的任务,则使用最近的4-6周即可。
因为是放入了长度不一的多个序列,为了让预测更加可靠,对序列的长度有一定的限定,比如,序列长度至少有14天,还要一个需要注意的问题是,如果出现0,0,0,0,0,0,1,0,1这样数据稀疏的数据的时候,prophet会报错,报错内容大致为,std太低,反推回去就是放入的数据类似于常量,模型无法拟合。
至于缺失值的填充,prophet可以设置y为nan,模型在拟合过程中也会自动填充一个预测值,因为我们预测的为sku销量,是具有星期这种周期性的,所以如果出现某一天的缺失,我们倾向于使用最近几周同期数据进行填充,没有优先使用均值或众数进行填充,是因为,均值和众数会掩盖序列的周期性,破坏整个序列的规律,为了进一步对数据进行平滑,对于异常值还进行了分位数盖帽,因为时序数据往往是偏态分布,所以我们对原始值做了取对数处理。
以上的数据预处理比较简单,其中多数可以使用hive进行操作,会更加高效,这里放出来的目的是演示一种思路以及python函数和最后的pandas_udf交互。
3.建模
def prophet_train(data):
model = Prophet(
daily_seasonality=False,
yearly_seasonality=False,
holidays=holiday_df,
holidays_prior_scale=10)
model.add_seasonality(
name='weekly',
period=7,
fourier_order=3,
prior_scale=0.10)
model.fit(data)
future = model.make_future_dataframe(periods=7, freq='d')
forecast = model.predict(future)
forecast['pro_pred'] = np.expm1(forecast['yhat'])
forecast_df=forecast[['store_sku','ds','pro_pred']]
forecast_df.loc[forecast_df['pro_pred'] < 0, 'pro_pred'] = 0
low = (1 + 0.1) * data['y'].min()
hight = min((1 + 0.05) * data['y'].max(), 10000)
forecast_df.loc[forecast_df['pro_pred'] < low, 'pro_pred'] = low
forecast_df.loc[forecast_df['pro_pred'] > hight, 'pro_pred'] = hight
return forecast_df
以上参数设置详见
函数内部的holiday_df是假日数据,数据格式需要按照文档要求进行定义,改函数部分也会和整个代码一起放在github,如果序列中最近呈现出较大的下滑或者增长,那么预测值很容易得到负数或者非常大,这个时候我们依然需要对预测值进行修正,而非完全交给模型,当然你也可以在放入数据中设置上下限。
data['cap'] = 1000
data['floor'] = 6
该函数把前面的数据预处理函数和模型训练函数放在一个函数中,类似于主函数,目的是使用统一的输入和输出。
def prophet_main(data):
true_time = pd.datetime.now().strftime('%Y-%m-%d')
data.dropna(inplace=True)
data['ds'] = pd.to_datetime(data['ds'])
data = data[data['ds'] < true_time]
data['ds'] = data['ds'].astype(str)
data['ds'] = pd.to_datetime(data['ds'])
data = replace_fill(data)
pro_back = prophet_train(data)
return pro_back
4.读取hive数据,调用spark进行prophet模型预测
schema = StructType([
StructField("store_sku", StringType()),
StructField("ds", StringType()),
StructField("pro_pred", DoubleType())
])
@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def run_model(data):
data['store_sku']=data['store_sku'].astype(str)
df = prophet_main(data)
uuid = data['store_sku'].iloc[0]
df['store_sku']=unid
df['ds']=df['ds'].astype(str)
df['pro_pred']=df['pro_pred'].astype(float)
cols=['store_sku','ds','pro_pred']
return df[cols]
假设我们希望输出的结果为三列,分别是store_sku,ds,pro_pred,则定义它们的数据类型,定义的数据类型和顺序要和放入的数据类型一致,然后通过@pandas_udf进行装饰,PandasUDFType有两种类型一种是Scalar(标量映射),另一种是Grouped Map(分组映射).我们显然是要使用分组映射,通过store_sku作为id进行分组,从而实现split-apply-combine
以上是纯python内容,下面展示通过hive数据库读取和运行python并把结果写入hive中。
data = spark.sql(
"""
select concat(store_code,'_',goods_code) as store_sku,qty_fix as y,ds
from scmtemp.redsku_store_sku_sale_fix_d""")
data.createOrReplaceTempView('data')
sale_predict = data.groupby(['store_sku']).apply(run_model)
sale_predict.createOrReplaceTempView('test_read_data')
spark.sql(f"drop table if exists scmtemp.store_sku_sale_prophet")
spark.sql(f"create table scmtemp.store_sku_sale_prophet as select * from store_sku_predict_29 ")
print('完成预测')
当然也可以不用pandas_udf的形式进行
,在旧版spark中使用sc.parallelize()实现分组并行化
如:sc.parallelize(data,800).map(run_model).reduce(merge)
上文还有一个节假日数据没有给出来,限于篇幅有限,整个代码就放在github上了,如需要请自取。
基本交代清楚了,暂更于此。
完整代码