姓名: 总分:
hadoop、hive、hbase、数据集成、scala阶段测试
一、选择题(共20道,每道0.5分)
1、下面哪个程序负责hdfs数据存储( c )
a. namenode b. jobtracher
c. datanode d. secondarynamenode
2、下列哪个属性是hdfs-site.xml中的配置( c ) b
a. fs.defaultfs b. dfs.replication
c. yarn.resourcemanager.address d. mapreduce.framework.name
3、hadoop-3.x集群中的hdfs的默认的数据块的大小是( d )
a. 256m b.32m
c.64m d.128m
4、hadoop-3.x集群中的hdfs的默认的副本块的个数是( c )
a. 1 b. 2
c. 3 d. 4
5、请问以下哪个命令组成是错误的( c )
a.bin/hadoop fs -cat /data/c.txt b. sbin/hdfs dfsadmin -report
c. bin/hdfs namenode -format d.sbin/stop-dfs.sh
6、以下与hdfs类似的框架是( c )
a. ntfs b. fat32
c. gfs d.ext3
7、hbase启动不需要哪个进程( d )
a. hmaster b. hregionserver
c. quorumpeermain d. nodemanager
8、下列哪个是纯离线数据采集工具( b )
a. flinkx b. sqoop
c. flume d. canal
9、map的输出结果首先被写入( a )
a. 内存 b. 缓存
c. 磁盘 d. 以上都正确
10、mapreduce与hbase的关系,哪些描述是正确的?( b )
a. 两者不可或缺,mapreduce是hbase可以正常运行的保证
b. 两者不是强关联关系,没有mapreduce,hbase可以正常运行
c. mapreduce不可以直接访问hbase
d. 它们之间没有任何关系
11.下列哪个不是spark的执行模式? ( c )
a. local b. yarn
c. mesos d. hdfs
12.在spark中,什么机制用于加速迭代计算? ( b )
a. checkpointing b. caching
c. broadcasting d. partitioning
-
下列哪个函数属于转换操作(transformation)而不是行动操作(action)?( c )
a. count() b. collect()
c. filter() d. saveastextfile()
-
在spark中,
persist
和cache
方法有何区别? ( b )
a) cache
是persist
的一个别名,二者完全相同。
b) persist
提供多种存储级别,而cache
总是使用默认的存储级别。
c) cache
用于dataframe,而persist
用于rdd。
d) persist
用于数据的持久化,而cache
用于数据的临时存储。
- 在spark中,什么是“窄依赖”(narrow dependency)与“宽依赖”(wide dependency)?它们如何影响数据的并行处理? ?(a )
a) 窄依赖表示每个父rdd分区映射到子rdd的一个分区,而宽依赖涉及多个父分区到一个子分区,导致shuffle。
b) 宽依赖意味着数据不需要重分布,而窄依赖则需要shuffle。
c) 窄依赖和宽依赖都涉及到数据的shuffle,只是程度不同。
d) 窄依赖与宽依赖仅在dataframe中存在,对rdd没有意义。
- spark的
job
和stage
在执行过程中如何划分? ( d )
a) job
由一系列stage
组成,每个stage
对应于一个shuffle操作。
b) job
和stage
是同义词,没有区别。
c) stage
由一系列job
组成,用于并行执行不同的任务。
d) job
是由用户提交的任务,stage
是dagscheduler为优化执行计划而创建的最小执行单元。
- 在spark中,
mappartitions
与map
操作有何区别,以及在什么情况下使用mappartitions
更合适? ( b )
a) mappartitions
和map
都是对每个元素进行操作,没有区别。
b) mappartitions
可以访问整个分区的数据,适用于需要对分区内的数据进行全局操作的场景。
c) map
操作可以改变分区的数量,而mappartitions
不能。
d) mappartitions
是map
的别名,用于提高代码可读性。
- spark的
kryo
序列化库如何帮助提高性能? ( b )
a) kryo增加了序列化的复杂度,但提高了数据的完整性。
b) kryo序列化库提供了一种更紧凑、更快的序列化方式,减少了网络传输和磁盘i/o的开销。
c) kryo只用于spark的内部通信,对外部数据无影响。
d) kryo序列化库是默认的序列化方式,不需要配置。
- 在spark中,
sparksession
与sparkcontext
的关系是什么?为何推荐使用sparksession
? ( a )
a) sparksession
是sparkcontext
的封装,提供了更高级的功能,如sql查询和数据源管理,sparksession
简化了api,提高了易用性。
b) sparksession
和sparkcontext
可以互换使用,没有推荐使用的原因。
c) sparkcontext
是sparksession
的前身,sparksession
仅用于spark sql。
d) sparksession
用于管理执行器,sparkcontext
用于管理driver程序。
- spark的
broadcast join
与shuffle hash join
有何区别?在何种情况下应优先考虑使用broadcast join
? ( d )a
a) broadcast join
将较小的表广播到每个节点,减少shuffle成本;shuffle hash join
需要更多网络传输,适用于大表间的连接。
b) broadcast join
和shuffle hash join
没有区别,只是名称不同。
c) shuffle hash join
总是优于broadcast join
,因为它更通用。
d) broadcast join
用于小数据集,shuffle hash join
用于大数据集,但具体选择与数据大小无关。
二、填空题(共20分,每空0.5分)
1、启动hdfs的shell脚本是:( sh xxx.sh )start-dfs.sh
2、block是hdfs的基本存储单元,默认大小是( 128 ) mb
3、mapreduce默认输入的格式化类:( inputformat )textinputformat
4、hadoop三大组件:( hdfs )、( mapreduce )、( yarn )
5、hiveserver2默认的端口:( 10000 )
6、hbase的rowkey设计三大原则:( 唯一性 )、( 散列性 )、( 长度适中 )
7、在spark中,一个( rdd )由一系列的( 分区 )组成,每个stage由一组( rdd )构成,而stage之间的依赖关系通常由( 行动算子 )操作触发。
8、hive中数据文件默认存储格式是( txt )textfile
9、spark core中缓存的实现方式有几种( 使用cache存储到内存中 )( 使用checkpoint存储到磁盘中 )
10、hive中sql转变成mr经过4个器,分别是解析器,( 编译器 )、( 优化器 )、( 执行器 )
三、判断题(共10道,每道1分)
1、block size是不可以修改的( f )
2、如果namenode意外终止,secondarynamenode会接替它使集群继续工作( f )
3、mapreduce 切片的大小等于 block的大小( f )
4、在hbase中由hmaster负责用户的io请求( f )
5、mapreduce中map任务的数量可以自己指定( t )
6、datax只能用于离线数据采集( t )
7、flume运行时需要依赖mapreduce( t )f
8、mapreduce中环形缓冲区默认大小为128m( f )
9、spark的sparkcontext
和sparksession
可以同时存在于同一个应用中,sparkcontext
提供了更多低级的api,而sparksession
则提供了高层的api,包括sql和数据源支持。( t )
10、spark的map
操作是懒惰求值的,只有在触发行动操作时才会执行计算。( t )
四、简答题(共5道,每道4分)
1、用自己的语言描述secondarynamenode的作用。
secondarynamenode作为namenode的秘书,帮助namenode处理事务。
secondarynamenode是用来帮助namenode完成元数据信息合并,从角色上看属于namenode的“秘书”
1.定期合并fsimage和edits文件
-
提供hdfs元数据的冷备份
-
监控hdfs状态
-
提升hdfs的可靠性和性能
2、用自己的语言描述spark的数据倾斜优化方式。
1.使用hive etl预处理数据
如果导致数据倾斜的是hive表。如果该hive表中的数据本身很不均匀(比如某个 key对应了100万数据,其他key才对应了10条数据)。
此时可以评估一下,是否可以通过hive来进行数据预处理(即通过hive etl预先对 数据按照key进行聚合,或者是预先和其他表进行join),然后在spark作业中针对的数据源就不是 原来的hive表了,而是预处理后的hive表。此时由于数据已经预先进行过聚合或join操作了,那么 在spark作业中也就不需要使用原先的shuffle类算子执行这类操作了。
2.过滤少数导致倾斜的key
如果发现导致倾斜的key就少数几个,而且对计算本身的影响并不大。
如果我们判断那少数几个数据量特别多的key,对作业的执行和计算结果不是特别 重要的话,那么干脆就直接过滤掉那少数几个key。比如,在spark sql中可以使用where子句过滤 掉这些key或者在spark core中对rdd执行filter算子过滤掉这些key。如果需要每次作业执行时, 动态判定哪些key的数据量最多然后再进行过滤,那么可以使用sample算子对rdd进行采样,然后 计算出每个key的数量,取数据量最多的key过滤掉即可。
3.提高shuffle操作的并行度
在对rdd执行shuffle算子时,给shuffle算子传入一个参数,比如 reducebykey(1000),该参数就设置了这个shuffle算子执行时shuffle read task的数量。对于 spark sql中的shuffle类语句,比如group by、join等,需要设置一个参数,即 spark.sql.shuffle.partitions,该参数代表了shuffle read task的并行度,该值默认是200,对于很 多场景来说都有点过小。
增加shuffle read task的数量,可以让原本分配给一个task的多个key分配给多个 task,从而让每个task处理比原来更少的数据。
4.双重聚合
对rdd执行reducebykey等聚合类shuffle算子或者在spark sql中使用group by 语句进行分组聚合时,比较适用这种方案。
这个方案的核心实现思路就是进行两阶段聚合。第一次是局部聚合,先给每个key 都打上一个随机数,比如10以内的随机数,此时原先一样的key就变成不一样的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就会变成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接着 对打上随机数后的数据,执行reducebykey等聚合操作,进行局部聚合,那么局部聚合结果,就会 变成了(1_hello, 2) (2_hello, 2)。然后将各个key的前缀给去掉,就会变成(hello,2)(hello,2),再次 进行全局聚合操作,就可以得到最终结果了,比如(hello, 4)。
5.将reduce join转为map join
在对rdd使用join类操作,或者是在spark sql中使用join语句时,而且join操作中 的一个rdd或表的数据量比较小(比如几百m或者一两g),比较适用此方案。
不使用join算子进行连接操作,而使用broadcast变量与map类算子实现join操作, 进而完全规避掉shuffle类的操作,彻底避免数据倾斜的发生和出现。将较小rdd中的数据直接通过 collect算子拉取到driver端的内存中来,然后对其创建一个broadcast变量;接着对另外一个rdd 执行map类算子,在算子函数内,从broadcast变量中获取较小rdd的全量数据,与当前rdd的每 一条数据按照连接key进行比对,如果连接key相同的话,那么就将两个rdd的数据用你需要的方式 连接起来。
6.采样倾斜key并分拆join操作
两个rdd/hive表进行join的时候,如果数据量都比较大,无法采用“解决方案五 ”,那么此时可以看一下两个rdd/hive表中的key分布情况。如果出现数据倾斜,是因为其中某一 个rdd/hive表中的少数几个key的数据量过大,而另一个rdd/hive表中的所有key都分布比较均 匀,那么采用这个解决方案是比较合适的。
思路:
对包含少数几个数据量过大的key的那个rdd,通过sample算子采样出一份样本来,然后统计一下每个 key的数量,计算出来数据量最大的是哪 几个key。
然后将这几个key对应的数据从原来的rdd中拆分出来,形成一个单独的rdd,并给每个key都打上n以 内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个rdd。
接着将需要join的另一个rdd,也过滤出来那几个倾斜key对应的数据并形成一个单独的rdd,将每条数 据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个 rdd。
再将附加了随机前缀的独立rdd与另一个膨胀n倍的独立rdd进行join,此时就可以将原先相同的key打 散成n份,分散到多个task中去进行join了。
而另外两个普通的rdd就照常join即可。
最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。
7.使用随机前缀和扩容rdd进行join
如果在进行join操作时,rdd中有大量的key导致数据倾斜,那么进行分拆key也没 什么意义。
该方案的实现思路基本和“解决方案六”类似,首先查看rdd/hive表中的数据分布情况,找到那个造成 数据倾斜的rdd/hive表,比如有多个key都对应了超过1万条数据。
然后将该rdd的每条数据都打上一个n以内的随机前缀。
同时对另外一个正常的rdd进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一 个0~n的前缀。
最后将两个处理后的rdd进行join即可。
3、用自己的语言描述诉mapreduce流程。
一、输入分片(input splitting)
- 过程描述:在mapreduce作业开始前,输入文件(或文件夹)首先被划分为多个inputsplit(输入分片)。默认情况下,每个hdfs的block(数据块)对应一个inputsplit。这样做的目的是将大文件分割成多个小块,以便并行处理。
- 分片大小:分片的大小通常与hdfs的数据块大小相同(默认是128mb),但也可以根据作业需求进行调整。分片时不考虑数据集整体,而是逐个针对每一个文件单独切片。
二、map阶段
- inputformat类:使用inputformat类的子类(如textinputformat)把输入文件(夹)划分为inputsplit。
- recordreader类:每个inputsplit通过recordreader类被解析成一个个<key,value>键值对。在textinputformat中,默认是每行的起始偏移量作为key,每行的内容作为value。
- mapper类:框架调用mapper类中的map函数,输入是<k1,v1>键值对,输出是<k2,v2>键值对。程序员可以覆盖map函数,实现自己的逻辑。
三、combiner阶段(可选)
- 过程描述:combiner是一个本地化的reduce操作,发生在map端。它的主要作用是减少map端的输出,从而减少shuffle过程中网络传输的数据量,提高作业的执行效率。
- 注意:combiner的输出是reducer的输入,因此它绝不能改变最终的计算结果。combiner适合于等幂操作,如累加、最大值等。
四、shuffle阶段
- 分区:在map函数处理完数据后,输出的<k2,v2>键值对会根据key进行分区,不同的分区由不同的reduce task处理。分区操作通常通过哈希函数实现。
- 排序与合并:在写入环形缓冲区之前,数据会先进行排序(默认采用快速排序算法)。当缓冲区满(默认为80%容量)时,数据会溢出到磁盘文件中,并在溢出前完成排序。多个溢出文件在最终输出前会进行归并排序,合并成一个大的有序文件。
- 数据传输:map任务完成后,reduce任务会启动数据copy线程,通过http方式请求map任务所在的nodemanager以获取输出文件。
五、reduce阶段
- 数据合并:reduce任务将接收到的所有map任务的输出数据(已分区且区内有序)进行合并,相同key的value值会被放到同一个集合中。
- reducer类:框架调用reducer类中的reduce函数,对合并后的数据进行处理,最终输出结果。
- outputformat类:使用outputformat类的子类(如textoutputformat)将最终结果输出到文件或数据库等存储介质中。
4、谈谈hive的优化。
1.本地模式运行,当处理一些小任务时可以选择本地模式运行,这样会使得任务执行的速度会很快。
2.jvm模式,在处理一些需要很多资源的任务时,可以先申请一部分的资源,等运行结束后再将资源释放。
3.严格模式,启动严格模式,禁止分区表的全表扫描,查询数据时必须加limit,禁止笛卡尔积。
4.hive join的数据倾斜问题,当小表join小表时,不用去管它;当小表join大表时,小表放在join的左边;当大表join大表时,应当考虑是否会出现某个reduce数据量过大的情况。空key过滤:当有大量数据同时放入一个reduce时,应当观察该rowkey,一般来说该rowkey对应的数据都是异常数据,需要使用sql语句对其进行过滤。空key转换:当有大量的数据都对应一个空的rowkey时,需要给这些数据随机分配一个rowkey,使它们均匀的分布到一些reduce中。
5.自定义map和reduce的数量,一般不去修改它。
5、用自己的语言描述诉spark的资源调度和任务调度流程。
spark的资源调度:driver向resourcemanager申请资源,resourcemanager选择一个空闲的子节点开启applicationmaster任务,applicationmaster向resourcemanager提交申请资源开启executor的任务。applicationmaster选择一个空闲子节点开启executor,
开启完毕后applicationmaster将executor开启的消息发送给driver,让driver发送执行任务。
spark的任务调度流程:driver端,遇到action算子触发任务执行,将任务提交到有向无环图,dagscheduler中,根据rdd的血缘关系划分划分stage,将rdd中的分区封装成taskset任务,发送到taskscheduler。taskscheduler取出taskset任务,根据rdd 的提供最优的任务执行计划,只移动计算不移动数据,开始对执行任务。
spark的资源调度:
1、driver提交作业命令
2、向resourcemananger申请资源
3、resourcemananger检查一些权限、资源空间,在一个相对空闲的子节点上开启一个applicationmaster的进程
4、applicationmaster向resourcemananger申请资源,启动executor
5、resourcemananger启动executor
6、executor反向注册给driver,告诉driver资源申请成功,可以发送任务
spark的任务调度流程:
7、driver遇到一个行动算子,触发整个作业调度
8、先将整个作业交给dag有向无环图
dag scheduler
9、根据rdd之间的血缘关系,找出宽窄依赖(有没有shuffle的产生)
10、通过宽窄依赖划分stage阶段
11、根据stage阶段,将stage中的task任务封装成一个taskset对象
12、发送给后续的 task scheduler
task scheduler
13、从dag scheduler发送过来的taskset中取出task任务
14、根据rdd五大特性的最后一大特性,只移动计算不移动数据,将task任务发送到对应的executor的线程池中执行
五、代码题(50分)
1、spark sql数据分析以及可视化(30分)
提示:
(数据字段为:日期date、省份province、城市city、新增确诊confirm、新增出院heal、新增死亡dead、消息来源source)
部分数据截图:
题目:
-
1、统计湖北省每月新增出院病例总数最多的前3个城市(8分)
create table bigdata30_test3.covid (
dates string,
province string,
city string,
confirm int,
heal int,
dead int,
source string
)row format delimited fields terminated by ','
location '/bigdata30/data/';
select
tt1.month,
tt1.city,
tt1.counts as counts,
tt1.rank
from
(select
t1.month,
t1.city,
t1.counts as counts,
-- row_number:数据相同也不会导致排名重复
row_number() over(partition by t1.month order by t1.counts desc) rank
from
(select
province,
city,
-- 只取出月份
substring(dates,0,2) as month,
count(heal) as counts
from bigdata30_test3.covid
where province = "湖北" and city != "境外输入-英国"
-- 只取出月份用来分组
group by city,province,substring(dates,0,2)) t1) tt1
-- 取每个排序的前三名
where tt1.rank <= 3;
结果:
1月 武汉市 24 1
1月 荆门市 10 2
1月 荆州市 10 3
2月 武汉市 62 1
2月 黄石市 54 2
2月 黄冈市 46 3
3月 武汉市 41 1
3月 鄂州市 32 2
3月 孝感市 28 3
4月 武汉市 29 1
4月 荆门市 1 2
4月 襄阳市 1 3
-
2、统计安徽省每月新增确诊人数同比
-- 纯sql dbeaver中执行 select t1.month, t1.counts, lag(counts,1,-1) over(order by t1.month) as last_counts, case when lag(counts,1,-1) over(order by t1.month) < 0 then '没有上一个月的数据' else round(((t1.counts - lag(counts,1,-1) over(order by t1.month)) / lag(counts,1,-1) over(order by t1.month)),2) end as rate from (select substring(dates,0,2) as month, province, count(confirm) as counts from bigdata30_test3.covid where province = "安徽" group by province,substring(dates,0,2)) t1; -- mysql建表 create table datamysql ( months varchar(30), counts varchar(30), last_counts varchar(30), rate varchar(30) )
// 为了将数据写入到mysql,使用sparksql package com.shujia.dslexam import org.apache.spark.rdd.rdd import org.apache.spark.sql.{dataframe, row, sparksession} import java.sql.{connection, drivermanager, preparedstatement} object exam2_1 { def main(args: array[string]): unit = { val sparksession: sparksession = sparksession.builder() .master("local") .appname("考试大题一") //参数设置的优先级:代码优先级 > 命令优先级 > 配置文件优先级 .config("spark.sql.shuffle.partitions", "1") .enablehivesupport() // 开启hive的配置 .getorcreate() sparksession.sql("use bigdata30_test3") //truncate = false 时,完整地显示某列值,不进行任何截断。 val datadf: dataframe = sparksession.sql( """ |select |t1.month, |t1.counts, |lag(counts,1,-1) over(order by t1.month) as last_counts, |case | when lag(counts,1,-1) over(order by t1.month) < 0 then 0 | else round(((t1.counts - lag(counts,1,-1) over(order by t1.month)) / lag(counts,1,-1) over(order by t1.month)),2) |end as rate |from |(select |substring(dates,0,2) as month, |province, |count(confirm) as counts |from |bigdata30_test3.covid |where province = "安徽" |group by province,substring(dates,0,2)) t1 |""".stripmargin) datadf.foreach((rdd: row) => { //注册驱动 class.forname("com.mysql.jdbc.driver") //创建数据库连接对象 val conn: connection = drivermanager.getconnection( "jdbc:mysql://master:3306/exam?useunicode=true&characterencoding=utf-8&usessl=false", "root", "123456" ) //创建预编译对象 val statement: preparedstatement = conn.preparestatement("insert into datamysql values(?,?,?,?)") statement.setstring(1, rdd.getas[string]("month")) statement.setlong(2, rdd.getas[long]("counts")) statement.setlong(3, rdd.getas[long]("last_counts")) statement.setdouble(4, rdd.getas[double]("rate")) // 执行sql语句 statement.executeupdate() statement.close() conn.close() }) } }
-
3、统计安徽省各城市连续新增确诊人数、连续新增确诊开始日期、连续新增确诊结束日期及连续新增确诊天数(12分)
select distinct
tt1.city,
max(tt1.confirm) over(partition by tt1.city,tt1.flag,split(tt1.new_day, '-')[0]) - min(tt1.confirm) over(partition by tt1.city,tt1.flag,split(tt1.new_day, '-')[0]) as add_confirm,
min(tt1.new_day) over(partition by tt1.city,tt1.flag,split(tt1.new_day, '-')[0]) as start_date,
max(tt1.new_day) over(partition by tt1.city,tt1.flag,split(tt1.new_day, '-')[0]) as end_date,
count(1) over(partition by tt1.city,tt1.flag,split(tt1.new_day, '-')[0]) as counts
from
(select
*,
cast(split(t1.new_day, '-')[1] as int) as day_of_month,
(cast(split(t1.new_day, '-')[1] as int) - t1.rank) as flag
from
(select * ,
from_unixtime(unix_timestamp(dates,'mm月dd日'),'mm-dd') as new_day,
row_number() over(partition by city order by dates) rank
from bigdata30_test3.covid
where province = "安徽" and source = "安徽卫健委" and heal is not null and confirm is not null and dead is not null) t1) tt1
-- 连续新增确诊人数应该是,求出的数据应该是本组的最后一条的confirm减去本组第一天的confirm,而不是下面的一组中的最大的confirm减去最小的confirm ??? 该如何求解
max(tt1.confirm) over(partition by tt1.city,tt1.flag,split(tt1.new_day, '-')[0]) -min(tt1.confirm) over(partition by tt1.city,tt1.flag,split(tt1.new_day, '-')[0]) as add_confirm,
-- 解决方案:first_value()和last_value()函数分别获取了每个分组的第一天和最后一天的确诊人数
-- 为了避免下述中出现的分组全出现在结果中的问题,使用first_value()和last_value()函数时,
-- 最好指定rows between unbounded preceding and unbounded following
select distinct
tt1.city,
last_value(tt1.confirm) over (partition by tt1.city, tt1.flag, split(tt1.new_day, '-')[0] order by tt1.new_day rows between unbounded preceding and unbounded following) -
first_value(tt1.confirm) over (partition by tt1.city, tt1.flag, split(tt1.new_day, '-')[0] order by tt1.new_day rows between unbounded preceding and unbounded following) as add_confirm,
min(tt1.new_day) over (partition by tt1.city, tt1.flag, split(tt1.new_day, '-')[0]) as start_date,
max(tt1.new_day) over (partition by tt1.city, tt1.flag, split(tt1.new_day, '-')[0]) as end_date,
count(1) over (partition by tt1.city, tt1.flag, split(tt1.new_day, '-')[0]) as counts
from
(select
*,
cast(split(t1.new_day, '-')[1] as int) as day_of_month,
(cast(split(t1.new_day, '-')[1] as int) - t1.rank) as flag
from
(select *,
from_unixtime(unix_timestamp(dates, 'mm月dd日'), 'mm-dd') as new_day,
row_number() over (partition by city order by dates) as rank
from bigdata30_test3.covid
where province = '安徽' and source = '安徽卫健委' and heal is not null and confirm is not null and dead is not null
) t1
) tt1;
-- 注:
在sql中,rows between unbounded preceding and unbounded following是窗口函数(如first_value(), last_value(), row_number(), sum(), avg()等)的一个子句,用于指定窗口的范围。
unbounded preceding表示窗口的起始点是分区中的第一行。
unbounded following表示窗口的结束点是分区中的最后一行。
对于first_value()和last_value()这样的函数,它们通常需要一个明确的窗口定义来确定“第一”和“最后”是基于什么范围来计算的。如果不提供rows between子句,某些数据库系统可能会报错,因为它们不知道应该基于哪些行来计算这些值。
-- 不加也可正常执行,但是不能加上order by tt1.new_day,否则会出现整个分组都出现在最终的结果里
形如:
|合肥市| 0| 01-28| 01-30| 3|
|合肥市| 10| 01-28| 01-30| 3|
|合肥市| 7| 01-28| 01-30| 3|
select distinct
tt1.city,
last_value(tt1.confirm) over (partition by tt1.city, tt1.flag, split(tt1.new_day, '-')[0]) -
first_value(tt1.confirm) over (partition by tt1.city, tt1.flag, split(tt1.new_day, '-')[0]) as add_confirm,
min(tt1.new_day) over (partition by tt1.city, tt1.flag, split(tt1.new_day, '-')[0]) as start_date,
max(tt1.new_day) over (partition by tt1.city, tt1.flag, split(tt1.new_day, '-')[0]) as end_date,
count(1) over (partition by tt1.city, tt1.flag, split(tt1.new_day, '-')[0]) as counts
from
(select
*,
cast(split(t1.new_day, '-')[1] as int) as day_of_month,
(cast(split(t1.new_day, '-')[1] as int) - t1.rank) as flag
from
(select *,
from_unixtime(unix_timestamp(dates, 'mm月dd日'), 'mm-dd') as new_day,
row_number() over (partition by city order by dates) as rank
from bigdata30_test3.covid
where province = '安徽' and source = '安徽卫健委' and heal is not null and confirm is not null and dead is not null
) t1
) tt1;
2、spark dsl数据分析(20分)
现有三份数据,结构如下:
-
live_types 直播间信息表
-
live_events 用户访问直播间记录表
-
user_info 用户信息表
题目:
1、统计每位用户观看不同类型直播的次数(6分)
import org.apache.spark.sql.expressions.window
import org.apache.spark.sql.{dataframe, sparksession}
object exam2 {
def main(args: array[string]): unit = {
val sparksession: sparksession = sparksession.builder()
.master("local")
.appname("社保练习")
.getorcreate()
import org.apache.spark.sql.functions._
import sparksession.implicits._
// user_id live_id start_time end_time
// 用户id 直播间id 开始时间 结束时间
val live_events: dataframe = sparksession.read
.format("csv")
.option("sep", "\t")
.schema("user_id int,live_id int,start_time timestamp,end_time timestamp")
.load("spark/data_exam/live_events.txt")
// live_events.show()
//live_id live_type
// 直播间id 直播间类型
val live_types: dataframe = sparksession.read
.format("csv")
.option("sep", "\t")
.schema("live_id int,live_type string")
.load("spark/data_exam/live_types.txt")
// live_types.show()
//user_id user_name
// 用户id 用户名
val user_info: dataframe = sparksession.read
.format("csv")
.option("sep", "\t")
.schema("user_id int,user_name string")
.load("spark/data_exam/user_info.txt")
// user_info.show()
/**
* 1、统计每位用户观看不同类型直播的次数(6分)
* > 输出:[用户id,用户名,直播间类型,次数]
*/
live_events
.withcolumn("count",count(expr("1")) over window.partitionby($"user_id",$"live_id"))
// todo 设置表与表之间进行左连接
.join(live_types, live_events("live_id") === live_types("live_id"), "left")
.join(user_info, live_events("user_id") === user_info("user_id"), "left")
/**
* 两张表中的字段名相同,要注明字段所属表
* 否则会报 reference 'user_id' is ambiguous, could be: user_id, user_id.
*/
.select(live_events("user_id"),$"user_name",$"live_type",$"count")
.distinct()
.show()
+-------+---------+---------+-----+
|user_id|user_name|live_type|count|
+-------+---------+---------+-----+
| 106| lucy| music| 1|
| 100| lihua| game| 1|
| 102| tom| food| 1|
| 104| bush| game| 1|
| 105| jam| game| 1|
| 102| tom| music| 1|
| 100| lihua| food| 2|
| 101| bob| food| 1|
| 101| bob| game| 1|
| 102| tom| game| 1|
| 104| bush| food| 1|
+-------+---------+---------+-----+
2、统计每位用户累计观看直播时长,按时长降序排列(6分)
/**
* 2、统计每位用户累计观看直播时长,按时长降序排列(6分)
* > 输出:[用户id,用户名,累计时长]
*/
// 100 1 2021-12-01 19:00:00 2021-12-01 19:28:00 start_time timestamp,end_time
live_events
.withcolumn("times",(unix_timestamp($"end_time","yyyy-mm-dd hh:mm:ss") - unix_timestamp($"start_time","yyyy-mm-dd hh:mm:ss")))
.withcolumn("all_times", sum($"times") over window.partitionby($"user_id"))
.join(user_info,"user_id")
// todo 时间戳 / 60 ,在最后查询时,可以将秒转换成分钟
.select($"user_id",$"user_name",$"all_times" / 60)
.distinct()
.orderby($"all_times".desc)
.show()
+-------+---------+----------------+
|user_id|user_name|(all_times / 60)|
+-------+---------+----------------+
| 104| bush| 178.0|
| 101| bob| 163.0|
| 102| tom| 140.0|
| 106| lucy| 129.0|
| 100| lihua| 110.0|
| 105| jam| 8.0|
+-------+---------+----------------+
3、统计不同类型直播用户累计观看时长降序排名(8分)
/**
* 3、统计不同类型直播用户累计观看时长降序排名(8分)
* > 输出:[直播间id,直播间类型,用户id,用户名,累计时长,排名]
*/
live_events
//todo 在开始得出时间戳的时候就将其转换成以分钟为单位
.withcolumn("times", (unix_timestamp($"end_time", "yyyy-mm-dd hh:mm:ss") - unix_timestamp($"start_time", "yyyy-mm-dd hh:mm:ss")) / 60)
.withcolumn("all_times", sum($"times") over window.partitionby($"user_id"))
.join(user_info, "user_id")
.join(live_types, "live_id")
.withcolumn("rank", row_number() over window.partitionby($"live_type").orderby($"all_times".desc))
.select($"live_id",$"live_type",$"user_id",$"user_name",$"all_times",$"rank")
.show()
+-------+---------+-------+---------+---------+----+
|live_id|live_type|user_id|user_name|all_times|rank|
+-------+---------+-------+---------+---------+----+
| 1| food| 104| bush| 178.0| 1|
| 1| food| 101| bob| 163.0| 2|
| 1| food| 102| tom| 140.0| 3|
| 1| food| 100| lihua| 110.0| 4|
| 1| food| 100| lihua| 110.0| 5|
| 3| music| 102| tom| 140.0| 1|
| 3| music| 106| lucy| 129.0| 2|
| 2| game| 104| bush| 178.0| 1|
| 2| game| 101| bob| 163.0| 2|
| 2| game| 102| tom| 140.0| 3|
| 2| game| 100| lihua| 110.0| 4|
| 2| game| 105| jam| 8.0| 5|
+-------+---------+-------+---------+---------+----+
发表评论