当前位置: 代码网 > 科技>人工智能>数据分析 > 全国职业院校技能大赛-大数据应用赛项-数据挖掘-01

全国职业院校技能大赛-大数据应用赛项-数据挖掘-01

2024年08月05日 数据分析 我要评论
-----------------------推荐Top5结果如下--------------------------------------------第一条数据前10列结果展示为:----------------------------------------相同种类前10的id结果展示为:--------------------相似度top5(商品id:14,平均相似度:0.522356)相似度top1(商品id:1,平均相似度:0.983456)若属于该spu_id,则内容为1否则为0。

子任务一:特征工程
剔除订单信息表与订单详细信息表中用户id与商品id不存在现有的维表中的记录,同时建议多利用缓存并充分考虑并行度来优化代码,达到更快的计算效果。1、根据hive的dwd库中相关表或mysql中shtd_store中相关表(order_detail、sku_info),计算出与用户id为6708的用户所购买相同商品种类最多的前10位用户(只考虑他俩购买过多少种相同的商品,不考虑相同的商品买了多少次),将10位用户id进行输出,若与多个用户购买的商品种类相同,则输出结果按照用户id升序排序,输出格式如下,将结果截图粘贴至客户端桌面【release\任务c提交结果.docx】中对应的任务序号下;
结果格式如下:
-------------------相同种类前10的id结果展示为:--------------------
1,2,901,4,5,21,32,91,14,52
 

 def main(args: array[string]): unit = {
   val spark: sparksession = sparksessionutils.getsession
   import spark.implicits._

   val order_detail: dataframe = mysqlutils.read(spark, "ds_db01", "order_detail")
   val order_info: dataframe = mysqlutils.read(spark, "ds_db01", "order_info")

//    根据order_id连接起来,查出每个用户购买的商品并去重
   val data: dataset[row] = order_detail.join(order_info, order_detail("order_id") === order_info("id"))
     .select("user_id", "sku_id")
     .distinct()

//    6708用户所购买的商品
   val user6708_skuids: array[int] = data.filter(col("user_id") === 6708).select("sku_id").map((_: row) (0).tostring.toint).collect()

   val user_ids: string = data.withcolumn("cos", when(col("sku_id").isin(user6708_skuids: _*), 1.0).otherwise(0.0))
   .groupby("user_id")
     .agg(sum("cos").as("same"))
     .filter(col("user_id") !== 6708)
   .orderby(desc("same"), asc("user_id"))
     .limit(10)
     .map((_: row) (0).tostring)
     .collect()
     .mkstring(",")

val str = user_ids.map(_(0).tostring)
 .collect()
 .mkstring(",")

println("-------------------相同种类前10的id结果展示为:--------------------")
println(str)

   
   sparksessionutils.close(spark)
 }

2、根据hive的dwd库中相关表或mysql中shtd_store中相关商品表(sku_info),获取id、spu_id、price、weight、tm_id、category3_id 这六个字段并进行数据预处理,对price、weight进行规范化(standardscaler)处理,对spu_id、tm_id、category3_id进行one-hot编码处理(若该商品属于该品牌则置为1,否则置为0),并按照id进行升序排序,在集群中输出第一条数据前10列(无需展示字段名),将结果截图粘贴至客户端桌面【release\任务c提交结果.docx】中对应的任务序号下。

字段

类型

中文含义

备注

id

double

主键

price

double

价格

weight

double

重量

spu_id#1

double

spu_id 1

若属于该spu_id,则内容为1否则为0

spu_id#2

double

spu_id 2

若属于该spu_id,则内容为1否则为0

.....

double

tm_id#1

double

品牌1

若属于该品牌,则内容为1否则为0

tm_id#2

double

品牌2

若属于该品牌,则内容为1否则为0

……

double

category3_id#1

double

分类级别3 1

若属于该分类级别3,则内容为1否则为0

category3_id#2

double

分类级别3 2

若属于该分类级别3,则内容为1否则为0

……

结果格式如下:
--------------------第一条数据前10列结果展示为:---------------------
1.0,0.892346,1.72568,0.0,0.0,0.0,0.0,1.0,0.0,0.0

def main(args: array[string]): unit = {
   val spark = sparksessionutils.getsession

   val windowspec1: windowspec = window.orderby("spu_id")
   val windowspec2: windowspec = window.orderby("tm_id")
   val windowspec3: windowspec = window.orderby("category3_id")

   val source: dataframe = mysqlutils.read(spark, "ds_db01", "sku_info")
     .select("id", "spu_id", "price", "weight", "tm_id", "category3_id")
     .withcolumn("spu_id_index", dense_rank().over(windowspec1) - 1)
     .withcolumn("tm_id_index", dense_rank().over(windowspec2) - 1)
     .withcolumn("category3_id_index", dense_rank().over(windowspec3) - 1)

   source.show()
   val df = source.write.saveastable("dwd.sku_info_vector")

   val hotencoder: onehotencoder = new onehotencoder()
     .setinputcols(array("spu_id_index", "tm_id_index", "category3_id_index"))
     .setoutputcols(array("spu_id_hot", "tm_id_hot", "category3_id_hot"))
     .setdroplast(false)

   val vectorassembler1: vectorassembler = new vectorassembler()
     .setinputcols(array("price"))
     .setoutputcol("price_v")

   val vectorassembler2: vectorassembler = new vectorassembler()
     .setinputcols(array("weight"))
     .setoutputcol("weight_v")

   val standardscaler1: standardscaler = new standardscaler()
     .setinputcol("price_v")
     .setoutputcol("price_sca")
     .setwithmean(true)

   val standardscaler2: standardscaler = new standardscaler()
     .setinputcol("weight_v")
     .setoutputcol("weight_sca")
     .setwithmean(true)

   val pipelinemodel: pipelinemodel = new pipeline()
     .setstages(array(vectorassembler1, vectorassembler2, standardscaler1, standardscaler2, hotencoder))
     .fit(source)

   spark.udf.register("vectortoarray", (v1: sparsevector) => {
     v1.toarray.mkstring(",")
   })

   spark.udf.register("vectortodouble", (v1: densevector) => {
     v1.apply(0)
   })

   val result = pipelinemodel.transform(source)
     .withcolumn("spu_id_hot", expr("vectortoarray(spu_id_hot)"))
     .withcolumn("tm_id_hot", expr("vectortoarray(tm_id_hot)"))
     .withcolumn("category3_id_hot", expr("vectortoarray(category3_id_hot)"))
     .withcolumn("price_sca", expr("vectortodouble(price_sca)"))
     .withcolumn("weight_sca", expr("vectortodouble(weight_sca)"))
     .select("id", "price_sca", "weight_sca", "spu_id_hot", "tm_id_hot", "category3_id_hot")
     .orderby(asc("id"))
     .limit(1)

println("--------------------第一条数据前10列结果展示为:---------------------")
result.collect().foreach(r => {
 println(r.toseq.flatmap(r => r.tostring.split(",")).take(10).mkstring(","))
})


   sparksessionutils.close(spark)
 }


子任务二:推荐系统
1、根据子任务一的结果,计算出与用户id为6708的用户所购买相同商品种类最多的前10位用户id(只考虑他俩购买过多少种相同的商品,不考虑相同的商品买了多少次),并根据hive的dwd库中相关表或mysql数据库shtd_store中相关表,获取到这10位用户已购买过的商品,并剔除用户6708已购买的商品,通过计算这10位用户已购买的商品(剔除用户6708已购买的商品)与用户6708已购买的商品数据集中商品的余弦相似度累加再求均值,输出均值前5商品id作为推荐使用,将执行结果截图粘贴至客户端桌面【release\任务c提交结果.docx】中对应的任务序号下。

结果格式如下:
------------------------推荐top5结果如下------------------------
相似度top1(商品id:1,平均相似度:0.983456)
相似度top2(商品id:71,平均相似度:0.782672)
相似度top3(商品id:22,平均相似度:0.7635246)
相似度top4(商品id:351,平均相似度:0.7335748)
相似度top5(商品id:14,平均相似度:0.522356)
 

 def main(args: array[string]): unit = {
   val spark = sparksessionutils.getsession
   val order_detail = mysqlutils.read(spark, "ds_db01", "order_detail_copy1")
   val order_info = mysqlutils.read(spark, "ds_db01", "order_info")
//  找出每个用户所购买的商品
   val user_buy_sku = order_detail.join(order_info, order_detail("order_id") === order_info("id"))
     .select("user_id", "sku_id")
     // 题目要求不考虑同一个商品多次购买的情况所以需要去重
     .distinct()

   user_buy_sku.show()

   import spark.implicits._
//  找出用户6708所购买的商品
   val user_6708_sku_ids: array[double] = user_buy_sku
     .filter(col("user_id") === 2790)
     .select("sku_id")
     .map(_(0).tostring.todouble)
     .collect()

   println(user_6708_sku_ids.mkstring(","))
//  找出与用户6708所购买的相同商品最多的前10位用户
   val other_10_user_ids = user_buy_sku
     .filter(col("user_id") !== 2790)
     .withcolumn("is_cos", when(col("sku_id").cast(doubletype).isin(user_6708_sku_ids: _*), 1).otherwise(0))
     .groupby(col("user_id"))
     .agg(sum("is_cos").as("count_cos"))
     .orderby(col("count_cos").desc)
     .select("user_id")
     .map(_(0).tostring.tolong)
     .limit(10)
     .collect()

   println(other_10_user_ids.mkstring(","))
//  找出相似度最高的前10位用户所购买的商品
   val other_10_sku_ids = user_buy_sku.filter(col("user_id").isin(other_10_user_ids: _*))
     .select("sku_id")
     .map(_(0).tostring.todouble)
     .collect()

   println(other_10_sku_ids.mkstring(","))
//  读取上题商品的特征向量表并将除id列的其他列转换为向量/ mysql中的sku_info表
   val sku_info_vector = spark.table("dwd.sku_info_vector")
   println("---------------------")
   sku_info_vector.show()

//    val sku_info_vector = mysqlutils.read(spark,"ds_db01","sku_info")
//  将数据进行标准化
   val vectorassembler = new vectorassembler()
     .setinputcols(sku_info_vector.columns.tail)
     .setoutputcol("features")
   val dataframe = new pipeline()
     .setstages(array(vectorassembler))
     .fit(sku_info_vector)
     .transform(sku_info_vector)
   println("+++++++++++++++++++++")
   dataframe.show()

   val mapdata = dataframe
     .select("id", "features")
     .map(r => {
       labeledpoint(r(0).tostring.toint, r(1).asinstanceof[linalg.vector])
     })
   println(s"mapdata ----------------------------")
   mapdata.show()

   val normalizer = new normalizer()
     .setinputcol("features")
     .setoutputcol("norm_features")
     .setp(2.0)

   val normalized_data = normalizer
     .transform(mapdata)
     .select("label", "norm_features")

   normalized_data.show()
//  定义余弦相似度udf函数
   spark.udf.register("cos", (v1: densevector, v2: densevector) => {
     1 - breeze.linalg.functions.cosinedistance(breeze.linalg.densevector(v1.values), breeze.linalg.densevector(v2.values))
   })

//    注册自定义的余弦相似度udf,前提是的
//    def cosinesimilarity(v1: densevector, v2: densevector): double = {
//      1 - cosinedistance(breeze.linalg.densevector(v1.values), breeze.linalg.densevector(v2.values))
//    }

//  将数据进行条件自连接并根据余弦相似度算出最高的5个商品输出
   val result = normalized_data.crossjoin(normalized_data)
     .todf("left_label", "left_norm_vector", "right_label", "right_norm_vector")
     .filter(col("left_label") !== col("right_label"))
     .withcolumn("cos", expr("cos(left_norm_vector,right_norm_vector)"))
     .orderby(desc("cos"))
     // left_label 用户购买的   right_label 其他用户购买的并剔除指定用户购买
     .filter(col("left_label").isin(user_6708_sku_ids: _*))
     .filter(!col("right_label").isin(user_6708_sku_ids: _*) && col("right_label").isin(other_10_sku_ids: _*))  //完成了剔除
     .groupby("right_label")
     .agg(avg("cos").as("cos"))
     .orderby(desc("cos"))
     .limit(5)

println("------------------------推荐top5结果如下------------------------")
result.collect().zipwithindex.foreach {
 case (row, index) =>
   val right_label = row.getas[double]("right_label").toint
   val cos = row.getas[double]("cos")
   val output = s"相似度top${index + 1} (商品id:$right_label,平均相似度:$cos)" 
   println(output)
}

  sparksessionutils.close(spark)
 }

(0)

相关文章:

版权声明:本文内容由互联网用户贡献,该文观点仅代表作者本人。本站仅提供信息存储服务,不拥有所有权,不承担相关法律责任。 如发现本站有涉嫌抄袭侵权/违法违规的内容, 请发送邮件至 2386932994@qq.com 举报,一经查实将立刻删除。

发表评论

验证码:
Copyright © 2017-2025  代码网 保留所有权利. 粤ICP备2024248653号
站长QQ:2386932994 | 联系邮箱:2386932994@qq.com