当前位置: 代码网 > it编程>编程语言>Java > spark之时间序列预测(商品销量预测)

spark之时间序列预测(商品销量预测)

2024年08月02日 Java 我要评论
本案例使用前1913天的数据作为训练数据,来预测1914天到1941天的销量。以上数据下载后放入resources/advanced下,并在properties.properties中配置一下文件名和路径,以供程序读取和处理数据。2.模型的训练及预测利用python lightgbm进行操作,见time_series.ipynb,data.7z下是spark处理好的数据。1.数据处理以及特征工程利用java spark进行提取,见TimeSeries.java。二.特征工程代码解读,完整见项目中代码注释。

项目地址见:https://github.com/jiangnanboy/spark_data_mining/tree/master/src/main/java/com/sy/dataalgorithms/advanced/time_series

一.概要

此项目将围绕一个时间序列预测任务展开。该任务是kaggle上的一个比赛,m5 forecasting - accuarcy(https://www.kaggle.com/c/m5-forecasting-accuracy/notebooks )。m5的赛题目标是预测沃尔玛各种商品在未来28天的销量。本案例使用前1913天的数据作为训练数据,来预测1914天到1941天的销量。并且,我们只对最细粒度的30490条序列进行预测。 训练数据从kaggle中自行下载:

  • calendar.csv - contains information about the dates on which the products are sold.
  • sales_train_validation.csv - contains the historical daily unit sales data per product and store [d_1 - d_1913]
  • sample_submission.csv - the correct format for submissions. reference the evaluation tab for more info.
  • sell_prices.csv - contains information about the price of the products sold per store and date.
  • sales_train_evaluation.csv - includes sales [d_1 - d_1941] (labels used for the public leaderboard)

以上数据下载后放入resources/advanced下,并在properties.properties中配置一下文件名和路径,以供程序读取和处理数据。

1.数据处理以及特征工程利用java spark进行提取,见timeseries.java。

2.模型的训练及预测利用python lightgbm进行操作,见time_series.ipynb,data.7z下是spark处理好的数据。

二.特征工程代码解读,完整见项目中代码注释

/**
     * 分析和挖掘数据
     * @param session
     */
    public static void analysisdata(sparksession session) {

        // 一.数据集

        /*  1.这里是历史销量sales_train_validation数据
                    +--------------------+-------------+---------+-------+--------+--------+---+---+---+---+---+---+---+---+-
            |                  id|      item_id|  dept_id| cat_id|store_id|state_id|d_1|d_2|d_3|d_4|d_5|d_6|d_7|d_8|d_9|d_10|...
            +--------------------+-------------+---------+-------+--------+--------+---+---+---+---+---+---+---+---+---+----+
            |hobbies_1_001_ca_...|hobbies_1_001|hobbies_1|hobbies|    ca_1|      ca|  0|  0|  0|  0|  0|  0|  0|  0|  0|   0|...
            |hobbies_1_002_ca_...|hobbies_1_002|hobbies_1|hobbies|    ca_1|      ca|  0|  0|  0|  0|  0|  0|  0|  0|  0|   0|...
            |hobbies_1_003_ca_...|hobbies_1_003|hobbies_1|hobbies|    ca_1|      ca|  0|  0|  0|  0|  0|  0|  0|  0|  0|   0|...
            +--------------------+-------------+---------+-------+--------+--------+---+---+---+---+---+---+---+---+---+----+

         schema:
         |-- id: string (nullable = true)
         |-- item_id: string (nullable = true)
         |-- dept_id: string (nullable = true)
         |-- cat_id: string (nullable = true)
         |-- store_id: string (nullable = true)
         |-- state_id: string (nullable = true)
         |-- d_1: integer (nullable = true)
         |-- d_2: integer (nullable = true)
         |-- d_3: integer (nullable = true)
         |-- d_4: integer (nullable = true)
         |-- ......

         */
        string salestrainvalidationpath = timeseries.class.getclassloader().getresource(propertiesreader.get("advanced_timeseries_sales_train_validation_csv")).getpath().replacefirst("/", "");
        dataset<row> salestvdataset = session.read()
                .option("sep", ",")
                .option("header", true)
                .option("inferschema", true)
                .csv(salestrainvalidationpath);

        /*首先,我们只留下salestvdataset中的历史特征值,删去其他列。
            +---+---+---+---+---+---+---+---+---+----+
            |d_1|d_2|d_3|d_4|d_5|d_6|d_7|d_8|d_9|d_10|
            +---+---+---+---+---+---+---+---+---+----+
            |  0|  0|  0|  0|  0|  0|  0|  0|  0|   0|...
            |  0|  0|  0|  0|  0|  0|  0|  0|  0|   0|...
            |  0|  0|  0|  0|  0|  0|  0|  0|  0|   0|...
            +---+---+---+---+---+---+---+---+---+----+
         */
        column[]  columns = new column[1913];
        int index = 0;
        for(string column : salestvdataset.columns()) {
            if(column.contains("d_")) {
                columns[index] = functions.col(column);
                index++;
            }
        }
        dataset<row> xdataset = salestvdataset.select(columns);

        /* 2.这里是日历信息calendar数据
                        +----------+--------+--------+----+-----+----+---+------------+------------+------------+------------+-------+-------+-------+
            |      date|wm_yr_wk| weekday|wday|month|year|  d|event_name_1|event_type_1|event_name_2|event_type_2|snap_ca|snap_tx|snap_wi|
            +----------+--------+--------+----+-----+----+---+------------+------------+------------+------------+-------+-------+-------+
            |2011-01-29|   11101|saturday|   1|    1|2011|d_1|        null|        null|        null|        null|      0|      0|      0|
            |2011-01-30|   11101|  sunday|   2|    1|2011|d_2|        null|        null|        null|        null|      0|      0|      0|
            |2011-01-31|   11101|  monday|   3|    1|2011|d_3|        null|        null|        null|        null|      0|      0|      0|
            +----------+--------+--------+----+-----+----+---+------------+------------+------------+------------+-------+-------+-------+

            schema:
             |-- date: string (nullable = true)
             |-- wm_yr_wk: integer (nullable = true)
             |-- weekday: string (nullable = true)
             |-- wday: integer (nullable = true)
             |-- month: integer (nullable = true)
             |-- year: integer (nullable = true)
             |-- d: string (nullable = true)
             |-- event_name_1: string (nullable = true)
             |-- event_type_1: string (nullable = true)
             |-- event_name_2: string (nullable = true)
             |-- event_type_2: string (nullable = true)
             |-- snap_ca: integer (nullable = true)
             |-- snap_tx: integer (nullable = true)
             |-- snap_wi: integer (nullable = true)
         */
        string calendarpath = timeseries.class.getclassloader().getresource(propertiesreader.get("advanced_timeseries_calendar_csv")).getpath().replacefirst("/", "");
        dataset<row> calendardataset = session.read()
                .option("sep", ",")
                .option("header", true)
                .option("inferschema", true)
                .csv(calendarpath);

        /* 3.商品每周的价格信息sell_prices
            +--------+-------------+--------+----------+
            |store_id|      item_id|wm_yr_wk|sell_price|
            +--------+-------------+--------+----------+
            |    ca_1|hobbies_1_001|   11325|      9.58|
            |    ca_1|hobbies_1_001|   11326|      9.58|
            |    ca_1|hobbies_1_001|   11327|      8.26|
            +--------+-------------+--------+----------+

            schema:
             |-- store_id: string (nullable = true)
             |-- item_id: string (nullable = true)
             |-- wm_yr_wk: integer (nullable = true)
             |-- sell_price: double (nullable = true)
         */
//        string sellpricespath = timeseries.class.getclassloader().getresource(propertiesreader.get("advanced_timeseries_sell_prices_csv")).getpath().replacefirst("/", "");
//        dataset<row> sellpricesdataset = session.read()
//                .option("sep", ",")
//                .option("header", true)
//                .option("inferschema", true)
//                .csv(sellpricespath);

        // (1).测试集,我们只是计算了第1914天的数据的特征。这只些特征只能用来预测1914天的销量,也就是说,实际上是我们的测试数据。
        int targetday = 1914;
        // 使用历史数据中最后的7天构造特征
        int localrange = 7;
        // 由于使用前1913天的数据预测第1914天,历史数据与预测目标的距离只有1天,因此predictdistance=1
        // 如果使用前1913天的数据预测第1915天,则历史数据与预测目标的距离有2天,因此predictdistance=2,以此类推
        int predictdistance = 1;

        dataset<row> testdataset = gettestdataset(salestvdataset, calendardataset, xdataset, targetday, predictdistance);

        // (2).训练集,为了构造训练数据,我们对1914天之前的日期进行同样的特征计算操作,并附上它们的当天销量作为数据标签。
        int trainingdatadays = 7; // 为了简便,现只取7天的数据作训练集
        dataset<row> traindataset = gettraindataset(salestvdataset, calendardataset, xdataset, trainingdatadays, targetday, predictdistance);

        string salestrainevaluationpath = timeseries.class.getclassloader().getresource(propertiesreader.get("advanced_timeseries__sales_train_evaluation_csv")).getpath().replacefirst("/", "");
        dataset<row> labeldataset = session.read()
                .option("sep", ",")
                .option("header", true)
                .option("inferschema", true)
                .csv(salestrainevaluationpath);

        // (3).测试集的label
        dataset<row> testlabeldataset = gettestdatasetlabel(labeldataset, targetday);
        // (4).训练集的label
        dataset<row> trainlabeldataset = gettraindatasetlabel(labeldataset, targetday, trainingdatadays, predictdistance);

        // (5).保存为csv文件,供python lightgbm训练
        // 保存test dataset
        string testdatasetcsvpath = "e:\\idea_project\\spark_data_mining\\src\\main\\resources\\dataalgorithms\\advanced\\timeseries_data\\testdata.csv";
        savecsv(testdataset, testdataset.columns(), testdatasetcsvpath);

        // 保存train dataset
        string traindatasetcsvpath = "e:\\idea_project\\spark_data_mining\\src\\main\\resources\\dataalgorithms\\advanced\\timeseries_data\\traindata.csv";
        savecsv(traindataset, traindataset.columns(), traindatasetcsvpath);

        // 保存test label
        string testlabelcsvpath = "e:\\idea_project\\spark_data_mining\\src\\main\\resources\\dataalgorithms\\advanced\\timeseries_data\\testlabel.csv";
        savecsv(testlabeldataset, testlabeldataset.columns(), testlabelcsvpath);

        // 保存train label
        string trainlabelcsvpath = "e:\\idea_project\\spark_data_mining\\src\\main\\resources\\dataalgorithms\\advanced\\timeseries_data\\trainlabel.csv";
        savecsv(trainlabeldataset, trainlabeldataset.columns(), trainlabelcsvpath);
    }

三.模型训练

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com