Spark处理的一些业务场景

浪尖聊大数据

共 5907字,需浏览 12分钟

 ·

2021-07-21 22:17

Sparksql在处理一些具体的业务场景的时候,可以通过算子操作,或者RDD之间的转换来完成负责业务的数据处理,在日常做需求的时候,整理出来一下几个经典的业务场景的解决方案,供大家参考。

1、取商家任务(task=1,2,3)全部完成的最早时间(注意如果任务3没有完成,则表中无3的数据,这种情况下全部完成时间为空)

业务背景:

商家在开通店铺服务的时候,会由商家服务人员去跟进商家完成开店任务,如:创建店铺(task_id=1),完成交易(task_id=2),创建营销活动(task_id=3),那么在考核服务人员是否做好服务的定义是:商家在一个月内是否完成所有开店的任务,因此需要统计商家完成全部任务的最早时间,以判断服务的好坏。

原始数据:

原始数据:
table:test
shop_id |task_id |finish_time
001 |1 |2020-03-01 09:00:00
001 |1 |2020-04-01 09:00:00
001 |2 |2020-03-12 09:00:00
001 |3 |2020-03-10 09:00:00
001 |3 |2020-03-02 09:00:00
002 |1 |2020-04-01 09:00:00

输出结果:
shop_id |finish_time
001 |2020-03-12 09:00:00
002 |

分析:
1、每个店铺都会有3个流程,只有流程走完才会有最早完成时间。

2、每个流程都会有多次的完成时间,同一个店铺同一个流程要取最早的完成时间。

3、不同流程完成时间中取最早的完成时间为这个店铺的最后的最早完成时间。

解决方案:
1、先按照shopid,task_id作为主键来获取每个店铺、每个任务节点的最早完成时间,那么得出结果如下:

shop_id    |task_id    |finish_time
001 |1 |2020-03-01 09:00:00
001 |2 |2020-03-12 09:00:00
001 |3 |2020-03-02 09:00:00
002 |1 |2020-04-01 09:00:00

2、然后按照shopid做为主键,对task_id进行聚合统计,对finish_time进行排序获取最新的时间,得出结果如下:

shop_id    |task_num    |finish_time
001 |3 |2020-03-12 09:00:00
002 |1 |2020-04-01 09:00:00

3、判断task_num个数是否为3,如果为3,那么店铺完成所有的业务,就输出这一行,如果不为3,那么未完成所有业务,finish_time变为null,结果如下:

shop_id    |task_num    |finish_time
001 |3 |2020-03-12 09:00:00
002 |1 |

Spark的处理逻辑:

  val DF = Spark.sql("select shop_id,task_id,unix_timestamp(finish_time) as ft from test")  val RDD = DF.rdd.map(f => ((f.getAs[String]("shop_id"),     f.getAs[Int]("task_id")),     f.getAs[Long]("ft"))).groupByKey().map(f => {    val shop_id = f._1._1    val task_id = f._1._2    val list = f._2.toList.sortWith(_ < _)    (shop_id,task_id,list.head)  }).map(f => (f._1,(f._2,f._3))).groupByKey().map(f => {    val shop_id = f._1    val list = f._2.toList.sortWith(_._2 > _._2)    if (list.length == 3)      {        (shop_id,list.length,list.head._2)      }    else      (shop_id,list.length,0L)  })


2、取登陆用户的最大连续登陆天数。

业务场景:

某C端APP,每天会记录登陆用户的登陆时间,然后需要统计用户在一段周期内的最长连续登陆的天数/或者没有登陆的天数。

同时这个业务场景在监控里面也可以使用:例如取数据表中最近连续稳定(数据量不变)的天数等等。

原始数据:

user_id    |ds001        |2020-03-01001        |2020-03-02001        |2020-03-03001        |2020-03-04001        |2020-03-06001        |2020-03-07002        |2020-03-01002        |2020-03-04001        |2020-03-05

结果:

user_id    |num001        |4002        |2

分析:

这块主要处理的问题是连续登陆的问题,如何取判断用户是连续登陆。

1、对用户的登陆时间进行排序;

2、计算每两个时间的时间差,如果对应的时间差为1天,那么就是连续登陆,如果大于1,则为非连续;

3、统计时间差对应数组中连续为1的最大长度就是最大的连续登陆天数。

Spark的处理逻辑:

val DF = Spark.sql("select uid,unix_timestamp(ds) as dt from test")  val RDD = DF.rdd.map(f => (f.getAs[String]("uid"),     f.getAs[Long]("dt"))).groupByKey().map(f => {    val uid = f._1    val ir = f._2.toBuffer.sortWith(_ < _)    var array: Array[Long] = Array()    var num = 0L    for (i <- 0 to ir.length - 2) {      val subTime = ir(i + 1) - ir(i)      val during = subTime/86400L      if (during == 1L){        num = num+1L        array = array :+ num      }      else{        num = 0L      }    }    (uid,array.max)  })

原理:

例如:array如下(也就是时间差对应的数组):Array[Long] = Array(1, 2, 1, 1, 1, 2, 1, 1, 1, 1)
var num = 0L var arr: Array[Long] = Array() for (i <- 0 to array.length - 1) { if (array(i) == 1L){ num = num + 1L arr = arr :+ num } else{ num = 0L } }输出:arr:Array[Long] = Array(1, 1, 2, 3, 1, 2, 3, 4)而arr.max = 4 也就是最大连续登陆的天数。

3、如何让业务方能够自由筛选当天分钟级别的新增访问用户数。

业务背景:

在做flink的实时大屏统计的时候,只能选在到当天当前这个时刻的新增用户数有多少,但是业务方需要通过时间筛选,可能在8点30的时候,需要去看到8点25的时候,今天新增了多少访问用户,而且这个时间区间是随机的,而且是到分钟维度的。

分析:
如果数据量小的情况下:

通过canal监听业务库的binlog,然后写到Kafka通过flink进行binlog解析,生成用户的第一次登陆时间写到mysql,供后端同学通过业务逻辑进行筛选,就可以达到任意区间,任意范围的新增访问用户的圈选。

但是在C端数据量偏大的情况下,显然不能存储全量数据,就算存储也不能按照hive的方式存储,uid + fisrt_time这种模式进行存储。

那么数据量大的情况下,如何解决呢:

1、可以按照分钟进行存储,数据的主键就是时间戳到分钟级别的,然后统计每分钟第一次访问的用户量,那么一天的数据也就是1440行,每一行存的就是第一次访问时间在这个分钟内的用户量。

time_min    |num2020-08-18 09:01:00        |40022020-08-18 09:02:00        |50022020-08-18 09:03:00        |5202

这样存储之后后端可以通过时间区间进行筛选后相加得到某个分钟级别区间端的第一次访问的用户数据。

2、不过上面的方案有个缺陷,虽然将用户维度为主键修改为分钟维度的主键,数据量减少了很多,但是可能业务方需要的不仅仅是用户量,还要具体的用户ID,来针对性进行投放,那么上面的方案就不太适合了。

针对上面的业务场景,可以选用Hbase进行优化。

rowKey    |uid2020-08-18 09:01:00+timetamp        |{uid1,uid2,uid3}2020-08-18 09:02:00+timetamp        |{uid21,uid23,uid33}2020-08-18 09:03:00+timetamp        |{uid13,uid24,uid35}

由于Hbase本身是列存储的,如果将分钟级别的时间戳作为RowKey,是可以很快的定位到数据所在的位置,不必进行全表扫描,这样查询效率会很快。

不过这个场景没有验证过,但是在用户画像的需求中是通过这个逻辑来实现秒级别的查询的。

4、递归的方式来解析JSON串(树结构)

业务背景:

在处理IM需求的时候,需要对客服的评价进行打分,而客服的评价系统是分为多个层级,不同类型,当初设计这个层级关系的时候是按照树结构进行涉及的,最多能下层4集合,但是每一层的都会有具体行为的选择和对应的得分情况。某一个层级可以包含多个下属层级。

具体结构如下:



层级架构如下:



分析:
1、本身是一个数组,数组的元素是JSON串,基本字段一致,每一层级都是包含基本字符串信息:level,id,lbalel,value,parentID,children。

2、children的value也是一个数组,和上面的数组模式一样同时包含全部字段。

3、最后的层级最多到第四层结束,或者说是判断最后对应的children的值是一个空数组结束。

所以这个模式可以利用递归进行调用解析,最后的判定条件是children的值是否为空为止。

代码模式:

var res = new ArrayBuffer[(String,String,String,String,String)]()    def JsonFunc(Son: String,rest:ArrayBuffer[(String,String,String,String,String)]): ArrayBuffer[(String,String,String,String,String)] = {      val jsonArray = JSON.parseArray(Son)      if (jsonArray.size() == 0)  {        println("last")      }      else      {        for (i <- 0 to (jsonArray.size() - 1)) {          val jsonObj = jsonArray.getJSONObject(i)          val label = jsonObj.getOrDefault("label", null).toString          val value = jsonObj.getOrDefault("value", null).toString          val id = jsonObj.getOrDefault("id", null).toString          val level = jsonObj.getOrDefault("level", null).toString          val parentID = jsonObj.getOrDefault("parentID", null).toString          res =  res :+ (id,level,parentID,label,value)          JsonFunc(jsonObj.getOrDefault("children", null).toString,res)        }      }      res    }
浏览 16
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报