Spark处理数据倾斜过程记录
数据倾斜带来的问题
数据倾斜的产生原因
| 类型 | RDD | SQL | 
|---|---|---|
| 去重 | distinct | distinct | 
| 聚合 | groupByKey、reduceByKey、aggregateByKey | group by | 
| 关联 | join、left join、right join | join、left join、right join | 

数据倾斜大Key定位
val cscTopKey: Array[(Int, Row)] = sampleSKew(sparkSession,"default.tab_spark","id")println(cscTopKey.mkString("\n"))def sampleSKew( sparkSession: SparkSession, tableName: String, keyColumn: String ): Array[(Int, Row)] = {val df: DataFrame = sparkSession.sql("select " + keyColumn + " from " + tableName)val top10Key: Array[(Int, Row)] = df.select(keyColumn).sample(withReplacement = false, 0.1).rdd.map(k => (k, 1)).reduceByKey(_ + _).map(k => (k._2, k._1)).sortByKey(ascending = false).take(10)top10Key}
SELECTid,conut(1) as cnFROMdefault.tab_spark_test_3GROUP BY idORDER BY cn DESCLIMIT 100;###结果集100000,2000012100001,1600012100002,1
单表数据倾斜优化
解决方案:
sparkSession.udf.register("random_prefix", ( value: Int, num: Int ) => randomPrefixUDF(value, num))sparkSession.udf.register("remove_random_prefix", ( value: String ) => removeRandomPrefixUDF(value))//t1 增加前缀,t2按照加盐的key进行聚,t3去除加盐,聚合val sql ="""|select| id,| sum(sell) totalSell|from| (| select| remove_random_prefix(random_id) id,| sell| from| (| select| random_id,| sum(pic) sell| from| (| select| random_prefix(id, 6) random_id,| pic| from| default.tab_spark_test_3| ) t1| group by random_id| ) t2| ) t3|group by| id""".stripMargindef randomPrefixUDF( value: Int, num: Int ): String = {new Random().nextInt(num).toString + "_" + value}def removeRandomPrefixUDF( value: String ): String = {value.toString.split("_")(1)}
表关联数据倾斜优化
1、适用场景
2、解决逻辑
union common 表 join old 表
以下为打散大 key 和扩容小表的实现思路:
/*** 打散大表 扩容小表 解决数据倾斜** @param sparkSession*/def scatterBigAndExpansionSmall(sparkSession: SparkSession): Unit = {import sparkSession.implicits._val saleCourse = sparkSession.sql("select *from sparktuning.sale_course")val coursePay = sparkSession.sql("select * from sparktuning.course_pay").withColumnRenamed("discount", "pay_discount").withColumnRenamed("createtime", "pay_createtime")val courseShoppingCart = sparkSession.sql("select * from sparktuning.course_shopping_cart").withColumnRenamed("discount", "cart_discount").withColumnRenamed("createtime", "cart_createtime")// TODO 1、拆分 倾斜的keyval commonCourseShoppingCart: Dataset[Row] = courseShoppingCart.filter(item => item.getAs[Long]("courseid") != 101 && item.getAs[Long]("courseid") != 103)val skewCourseShoppingCart: Dataset[Row] = courseShoppingCart.filter(item => item.getAs[Long]("courseid") == 101 || item.getAs[Long]("courseid") == 103)//TODO 2、将倾斜的key打散 打散36份val newCourseShoppingCart = skewCourseShoppingCart.mapPartitions((partitions: Iterator[Row]) => {partitions.map(item => {val courseid = item.getAs[Long]("courseid")val randInt = Random.nextInt(36)CourseShoppingCart(courseid, item.getAs[String]("orderid"),item.getAs[String]("coursename"), item.getAs[String]("cart_discount"),item.getAs[String]("sellmoney"), item.getAs[String]("cart_createtime"),item.getAs[String]("dt"), item.getAs[String]("dn"), randInt + "_" + courseid)})})//TODO 3、小表进行扩容 扩大36倍val newSaleCourse = saleCourse.flatMap(item => {val list = new ArrayBuffer[SaleCourse]()val courseid = item.getAs[Long]("courseid")val coursename = item.getAs[String]("coursename")val status = item.getAs[String]("status")val pointlistid = item.getAs[Long]("pointlistid")val majorid = item.getAs[Long]("majorid")val chapterid = item.getAs[Long]("chapterid")val chaptername = item.getAs[String]("chaptername")val edusubjectid = item.getAs[Long]("edusubjectid")val edusubjectname = item.getAs[String]("edusubjectname")val teacherid = item.getAs[Long]("teacherid")val teachername = item.getAs[String]("teachername")val coursemanager = item.getAs[String]("coursemanager")val money = item.getAs[String]("money")val dt = item.getAs[String]("dt")val dn = item.getAs[String]("dn")for (i <- 0 until 36) {list.append(SaleCourse(courseid, coursename, status, pointlistid, majorid, chapterid, chaptername, edusubjectid,edusubjectname, teacherid, teachername, coursemanager, money, dt, dn, i + "_" + courseid))}list})// TODO 4、倾斜的大key 与 扩容后的表 进行joinval df1: DataFrame = newSaleCourse.join(newCourseShoppingCart.drop("courseid").drop("coursename"), Seq("rand_courseid", "dt", "dn"), "right").join(coursePay, Seq("orderid", "dt", "dn"), "left").select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid", "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney","cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")// TODO 5、没有倾斜大key的部分 与 原来的表 进行joinval df2: DataFrame = saleCourse.join(commonCourseShoppingCart.drop("coursename"), Seq("courseid", "dt", "dn"), "right").join(coursePay, Seq("orderid", "dt", "dn"), "left").select("courseid", "coursename", "status", "pointlistid", "majorid", "chapterid", "chaptername", "edusubjectid", "edusubjectname", "teacherid", "teachername", "coursemanager", "money", "orderid", "cart_discount", "sellmoney","cart_createtime", "pay_discount", "paymoney", "pay_createtime", "dt", "dn")// TODO 6、将 倾斜key join后的结果 与 普通key join后的结果,uinon起来df1.union(df2).write.mode(SaveMode.Overwrite).insertInto("sparktuning.salecourse_detail")}
评论
