文章目录

慕课网用户行为日志分析实战(写于20年3月)

由 笔尖 发布

执行流程:

image-20200520173620423

一、数据清洗
1、spark.sparkContext.textFile("access-10000.log")获取原始数据,access.map(...).saveAsTextFile将ETL数据输出至本地/hdfs

2、由于使用的是local[2],static SimpleDataformat里的parse和format方法是线程不安全的,因此应采用FastDateFormat.getInstance(pattern)方式。

3、使用textfile对ETL数据进行二次清洗,不同的是这次需要先将数据转换成DF,因此在RDD输出的时候注意应输出ROW(),然后使用spark.createDataFrame(RDD,schemaStruct)转换为DF,写出数据时使用accessDF.coalesce(1).write.format("parquet").mode(SaveMode.Overwrite),将文件输出为1个,并进行覆写。

4、在ip地址解析使用了github的ipdatabase,通过idea的mvn将该包进行install,然后在项目中通过pom引入本地仓库的ipdatabase,之前的install可选assembly:assembly将ipdatabase依赖的jar包一并打包,这样就无需在项目中重复定义依赖。

5、分别使用sparksqlAPI和spark.sql方式进行流量统计,

import spark.implicits._
val videoAccessTopNDF = accessDF.filter($"day" === "20170511" && $"cmsType" === "video")
  .groupBy("day", "cmsId").agg(count("cmsId").as("times")).orderBy($"times".desc)
videoAccessTopNDF.show(false)

agg函数常配合group by用于对某一字段值进行分组聚合操作。

accessDF.createOrReplaceTempView("access_log")
val videoAccessTopNDF = spark.sql("select day,cmsId, count(1) as times from access_log " + "where day='20170511' and cmsType='video' " + "group by day,cmsId order by times desc")

6、链接数据库驱动时注意采用&useSSL=false&serverTimezone=UTC

7、通过DF.foreach将每一行数据读取出来,通过info.getAsString获取对应信息,然后构建形成一个entity,传入到list。禁止conncetion自动提交后,将pstmt添加至batch中,最后批量执行以减少IO。

val sql="insert into imooc_course.day_article_access_topnstat values (?,?,?) "
pstmt=conncetion.prepareStatement(sql)
for(ele<-list){
  pstmt.setString(1,el e.day)
 ....
  pstmt.addBatch()
}
pstmt.executeBatch()
conncetion.commit()

8、spark-sql的开窗函数

常用于将表中数据按照某一字段进行分组,然后按照某个规则进行排序,计算TopN问题。常用的开窗函数有:

row_ number、dense_rank()、rank()、percent_rank()

row_number().over(Window.partitionBy(cityAccessTopNDF("city")).orderBy(cityAccessTopNDF("times").desc)).as("topRank"))

9、通过Echarts进行数据可视化。


暂无评论

发表评论