博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark综合使用及用户行为案例页面转化率统计分析实战-Spark商业应用实战
阅读量:7219 次
发布时间:2019-06-29

本文共 7140 字,大约阅读时间需要 23 分钟。

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,如有任何技术交流,可随时联系。

1 页面转化率概念

  • 页面转化率的求解思路是通过UserAction表获取一个session的所有UserAction,根据时间顺序排序后获取全部PageId 然后将PageId组合成PageFlow,即1,2,3,4,5的形式(按照时间顺序排列),之后,组合为1_2, 2_3, 3_4, ...的形式 然后筛选出出现在targetFlow中的所有A_B

  • 每个A_B进行数量统计,然后统计startPage的PV,之后根据targetFlow的A_B顺序,计算每一层的转化率

2 页面转化率业务分析

2.1 创建Spark客户端

// 任务的执行ID,用户唯一标示运行后的结果,用在MySQL数据库中val taskUUID = UUID.randomUUID().toString// 构建Spark上下文val sparkConf = new SparkConf().setAppName("SessionAnalyzer").setMaster("local[*]")// 创建Spark客户端val spark = SparkSession.builder().config(sparkConf).enableHiveSupport().getOrCreate()val sc = spark.sparkContext复制代码

2.2 查询指定日期范围内的用户访问行为数据

查询指定日期范围内的用户访问行为数据  val actionRDD = this.getActionRDDByDateRange(spark, taskParam)    def getActionRDDByDateRange(spark:SparkSession, taskParam:JSONObject): RDD[UserVisitAction] = {    val startDate = ParamUtils.getParam(taskParam, Constants.PARAM_START_DATE)    val endDate = ParamUtils.getParam(taskParam, Constants.PARAM_END_DATE)    import spark.implicits._    spark.sql("select * from user_visit_action where date>='" + startDate + "' and date<='" + endDate + "'")      .as[UserVisitAction].rdd  }复制代码

2.3 具体业务分析

将用户行为信息转换为 K-V 结构val sessionid2actionRDD = actionRDD.map(item => (item.session_id, item))将数据进行内存缓存sessionid2actionRDD.persist(StorageLevel.MEMORY_ONLY)对
RDD,做一次groupByKey操作,生成页面切片val sessionid2actionsRDD = sessionid2actionRDD.groupByKey()复制代码

2.4 最核心的一步,每个session的单跳页面切片的生成,以及页面流的匹配,算法

val pageSplitRDD = generateAndMatchPageSplit(sc, sessionid2actionsRDD, taskParam)def generateAndMatchPageSplit(sc:SparkContext, sessionid2actionsRDD:RDD[(String, Iterable[UserVisitAction])], taskParam:JSONObject ):RDD[(String, Int)] = {/* 对目标PageFlow进行解析 *///1,2,3,4,5,6,7val targetPageFlow = ParamUtils.getParam(taskParam, Constants.PARAM_TARGET_PAGE_FLOW)//将字符串转换成为了List[String]val targetPages = targetPageFlow.split(",").toList//targetPages.slice(0, targetPages.length-1) :[1,2,3,4,5,6]//targetPages.tail :[2,3,4,5,6,7]//targetPages.slice(0, targetPages.length-1).zip(targetPages.tail):(1,2)(2,3)(3,4)(4,5)(5,6)(6,7)//map(item => item._1 + "_" + item._2):(1_2,2_3,3_4,4_5,5_6,6_7)val targetPagePairs = targetPages.slice(0, targetPages.length-1).zip(targetPages.tail).map(item => item._1 + "_" + item._2)//将结果转换为广播变量//targetPagePairs类型为List[String]val targetPageFlowBroadcast = sc.broadcast(targetPagePairs)/* 对所有PageFlow进行解析 */// 对全部数据进行处理sessionid2actionsRDD.flatMap{ case (sessionid, userVisitActions) =>  // 获取使用者指定的页面流  // 使用者指定的页面流,1,2,3,4,5,6,7  // 1->2的转化率是多少?2->3的转化率是多少?  // 这里,我们拿到的session的访问行为,默认情况下是乱序的  // 比如说,正常情况下,我们希望拿到的数据,是按照时间顺序排序的  // 但是问题是,默认是不排序的  // 所以,我们第一件事情,对session的访问行为数据按照时间进行排序  // 举例,反例  // 比如,3->5->4->10->7  // 3->4->5->7->10  // userVisitActions是Iterable[UserAction],toList.sortWith将Iterable中的所有UserAction按照时间进行排序  // 按照时间排序  val sortedUVAs = userVisitActions.toList.sortWith((uva1, uva2) => DateUtils.parseTime(uva1.action_time).getTime() < DateUtils.parseTime(uva2.action_time).getTime())  // 提取所有UserAction中的PageId信息  val soredPages = sortedUVAs.map(item => if(item.page_id != null) item.page_id)  //【注意】页面的PageFlow是将session的所有UserAction按照时间顺序排序后提取PageId,再将PageId进行连接得到的  // 按照已经排好的顺序对PageId信息进行整合,生成所有页面切片:(1_2,2_3,3_4,4_5,5_6,6_7)  val sessionPagePairs = soredPages.slice(0, soredPages.length-1).zip(soredPages.tail).map(item => item._1 + "_" + item._2)  /* 由此,得到了当前session的PageFlow */  // 只要是当前session的PageFlow有一个切片与targetPageFlow中任一切片重合,那么就保留下来  // 目标:(1_2,2_3,3_4,4_5,5_6,6_7)   当前:(1_2,2_5,5_6,6_7,7_8)  // 最后保留:(1_2,5_6,6_7)  // 输出:(1_2, 1) (5_6, 1) (6_7, 1)  sessionPagePairs.filter(targetPageFlowBroadcast.value.contains(_)).map((_,1))}复制代码

}

2.5 计算跳转率

// 返回:(1_2, 1),(3_4, 1), ..., (100_101, 1)// 统计每个跳转切片的总个数// pageSplitPvMap:(1_2, 102320), (3_4, 90021), ..., (100_101, 45789)val pageSplitPvMap = pageSplitRDD.countByKey复制代码

2.6 首先计算首页PV的数量

// 使用者指定的页面流是3,2,5,8,6// 咱们现在拿到的这个pageSplitPvMap,3->2,2->5,5->8,8->6// 首先计算首页PV的数量val startPagePv = getStartPagePv(taskParam, sessionid2actionsRDD)def getStartPagePv(taskParam:JSONObject, sessionid2actionsRDD:RDD[(String, Iterable[UserVisitAction])]) :Long = {// 获取配置文件中的targetPageFlowval targetPageFlow = ParamUtils.getParam(taskParam, Constants.PARAM_TARGET_PAGE_FLOW)// 获取起始页面IDval startPageId = targetPageFlow.split(",")(0).toLong// sessionid2actionsRDD是聚合后的用户行为数据// userVisitAction中记录的是在一个页面中的用户行为数据val startPageRDD = sessionid2actionsRDD.flatMap{ case (sessionid, userVisitActions) =>  // 过滤出所有PageId为startPageId的用户行为数据  userVisitActions.filter(_.page_id == startPageId).map(_.page_id)}// 对PageId等于startPageId的用户行为数据进行技术startPageRDD.count()复制代码

}

2.7 计算目标页面流的各个页面切片的转化率(比如:2_3/1_2)

  • 版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,如有任何技术交流,可随时联系。

    计算目标页面流的各个页面切片的转化率 val convertRateMap = computePageSplitConvertRate(taskParam, pageSplitPvMap, startPagePv)  def computePageSplitConvertRate(taskParam:JSONObject, pageSplitPvMap:collection.Map[String, Long], startPagePv:Long):collection.Map[String, Double] = {  val convertRateMap = new mutable.HashMap[String, Double]()  //1,2,3,4,5,6,7  val targetPageFlow = ParamUtils.getParam(taskParam, Constants.PARAM_TARGET_PAGE_FLOW)  val targetPages = targetPageFlow.split(",").toList  //(1_2,2_3,3_4,4_5,5_6,6_7)  val targetPagePairs = targetPages.slice(0, targetPages.length-1).zip(targetPages.tail).map(item => item._1 + "_" + item._2)  // lastPageSplitPv:存储最新一次的页面PV数量  var lastPageSplitPv = startPagePv.toDouble  // 3,5,2,4,6  // 3_5  // 3_5 pv / 3 pv  // 5_2 rate = 5_2 pv / 3_5 pv  // 通过for循环,获取目标页面流中的各个页面切片(pv)  for(targetPage <- targetPagePairs){    // 先获取pageSplitPvMap中记录的当前targetPage的数量    val targetPageSplitPv = pageSplitPvMap.get(targetPage).get.toDouble    println((targetPageSplitPv, lastPageSplitPv))    // 用当前targetPage的数量除以上一次lastPageSplit的数量,得到转化率    val convertRate = NumberUtils.formatDouble(targetPageSplitPv / lastPageSplitPv, 2)    // 对targetPage和转化率进行存储    convertRateMap.put(targetPage, convertRate)    // 将本次的targetPage作为下一次的lastPageSplitPv    lastPageSplitPv = targetPageSplitPv  }  convertRateMap}复制代码

2.8 持久化页面转化率

persistConvertRate(spark, taskUUID, convertRateMap)def persistConvertRate(spark:SparkSession, taskid:String, convertRateMap:collection.Map[String, Double]) {    val convertRate = convertRateMap.map(item => item._1 + "=" + item._2).mkString("|")    val pageSplitConvertRateRDD = spark.sparkContext.makeRDD(Array(PageSplitConvertRate(taskid,convertRate)))    import spark.implicits._    pageSplitConvertRateRDD.toDF().write      .format("jdbc")      .option("url", ConfigurationManager.config.getString(Constants.JDBC_URL))      .option("dbtable", "page_split_convert_rate")      .option("user", ConfigurationManager.config.getString(Constants.JDBC_USER))      .option("password", ConfigurationManager.config.getString(Constants.JDBC_PASSWORD))      .mode(SaveMode.Append)      .save()复制代码

}

3 总结

温故而知新,本文为了综合复习,进行代码总结,内容粗鄙,勿怪

版权声明:本套技术专栏是作者(秦凯新)平时工作的总结和升华,通过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。QQ邮箱地址:1120746959@qq.com,如有任何技术交流,可随时联系。

秦凯新 于深圳

转载地址:http://xvxym.baihongyu.com/

你可能感兴趣的文章
抓取存储quota超过80%的users
查看>>
C语言经典算法100例
查看>>
速成CAD版本转换的教程
查看>>
CAD文件图纸过大,该怎么解决?
查看>>
Spring aop 切不进去原因。。
查看>>
PHP获取客户端IP
查看>>
php开发APP接口-封装通信接口改进版
查看>>
Android系统性能演变历程
查看>>
OSChina 周三乱弹 —— 打醒精神去瞌睡
查看>>
SSH 密钥登录linux
查看>>
你必须掌握的 21 个 Java 核心技术!
查看>>
告诉你WHT中文站是什么?
查看>>
4、Juniper SSG520 PPTP映射到ROS后MAC无法连接解决方法
查看>>
利用批处理文件来建立一个记录3389登陆者信息
查看>>
Linux 系统下双机HA的实现
查看>>
02_swarm mode key concepts
查看>>
Eclipse打包插件Fat Jar 解压打包
查看>>
Apache Shiro 使用手册
查看>>
CentOS mini 6.5 安装DB2 Express-C 问题处理记录
查看>>
DirectByteBuffer
查看>>