大数据入门:Spark+Kudu的广告业务项目实战笔记(六)
点击上方蓝色字体,选择“设为星标”
1.将数据放在HDFS上
[hadoop@hadoop000 ~]$ cd app/[hadoop@hadoop000 app]$ lsapache-maven-3.6.3 hive-1.1.0-cdh5.15.1 spark-2.4.5-bin-hadoop2.6hadoop-2.6.0-cdh5.15.1 jdk1.8.0_91 tmp[hadoop@hadoop000 app]$ cd hadoop-2.6.0-cdh5.15.1/[hadoop@hadoop000 hadoop-2.6.0-cdh5.15.1]$ lsbin etc include LICENSE.txt README.txt srcbin-mapreduce1 examples lib logs sbincloudera examples-mapreduce1 libexec NOTICE.txt share[hadoop@hadoop000 hadoop-2.6.0-cdh5.15.1]$ cd sbin/[hadoop@hadoop000 sbin]$ lsdistribute-exclude.sh slaves.sh stop-all.shhadoop-daemon.sh start-all.cmd stop-balancer.shhadoop-daemons.sh start-all.sh stop-dfs.cmdhdfs-config.cmd start-balancer.sh stop-dfs.shhdfs-config.sh start-dfs.cmd stop-secure-dns.shhttpfs.sh start-dfs.sh stop-yarn.cmdkms.sh start-secure-dns.sh stop-yarn.shLinux start-yarn.cmd yarn-daemon.shmr-jobhistory-daemon.sh start-yarn.sh yarn-daemons.shrefresh-namenodes.sh stop-all.cmd[hadoop@hadoop000 sbin]$ ./start-all.sh This script is Deprecated. Instead use start-dfs.sh and start-yarn.shStarting namenodes on [hadoop000]hadoop000: starting namenode, logging to /home/hadoop/app/hadoop-2.6.0-cdh5.15.1/logs/hadoop-hadoop-namenode-hadoop000.outhadoop000: starting datanode, logging to /home/hadoop/app/hadoop-2.6.0-cdh5.15.1/logs/hadoop-hadoop-datanode-hadoop000.outStarting secondary namenodes [0.0.0.0]0.0.0.0: starting secondarynamenode, logging to /home/hadoop/app/hadoop-2.6.0-cdh5.15.1/logs/hadoop-hadoop-secondarynamenode-hadoop000.outstarting yarn daemonsstarting resourcemanager, logging to /home/hadoop/app/hadoop-2.6.0-cdh5.15.1/logs/yarn-hadoop-resourcemanager-hadoop000.outhadoop000: starting nodemanager, logging to /home/hadoop/app/hadoop-2.6.0-cdh5.15.1/logs/yarn-hadoop-nodemanager-hadoop000.out
[hadoop@hadoop000 sbin]$ hadoop fs -mkdir -p /tai/access/20181007
[hadoop@hadoop000 sbin]$ hadoop fs -put ~/data/data-test.json /tai/access/20181007/
2.定时操作重构
package com.imooc.bigdata.cp08
import com.imooc.bigdata.cp08.business.{AppStatProcessor, AreaStatProcessor, LogETLProcessor, ProvinceCityStatProcessor}
import org.apache.commons.lang3.StringUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
//整个项目的入口
object SparkApp extends Logging{
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[2]")
.appName("SparkApp")
.getOrCreate()
//spark-submit ... --conf time=20181007
//spark框架只认spark开头的代码
val time = spark.sparkContext.getConf.get("spark.time")
if(StringUtils.isBlank(time)){
//若为空则不执行
logError("处理批次不能为空")
System.exit(0)
}
LogETLProcessor.process(spark)
ProvinceCityStatProcessor.process(spark)
AreaStatProcessor.process(spark)
AppStatProcessor.process(spark)
spark.stop()
}
}
package com.imooc.bigdata.cp08.utils
import org.apache.spark.sql.SparkSession
object DateUtils {
def getTableName(tableName:String,spark:SparkSession)={
val time = spark.sparkContext.getConf.get("spark.time")
tableName + "_" + time
}
}
val sourceTableName = DateUtils.getTableName("ods",spark)
val sinkTableName = DateUtils.getTableName("province_city_stat",spark)
val rawPath = spark.sparkContext.getConf.get("spark.raw.path")
var jsonDF = spark.read.json(rawPath)
val ipRulePath = spark.sparkContext.getConf.get("spark.ip.path")
val ipRowRDD = spark.sparkContext.textFile(ipRulePath)
val spark = SparkSession.builder().getOrCreate()
3.代码打包
cd ~/app/spark-2.4.5-bin-hadoop2.6/sbin/
sh start-all.sh
vi job.sh
time=20181007
${SPARK_HOME}/bin/spark-submit \
--class com.imooc.bigdata.cp08.SparkApp \
--master local \
--jars /home/hadoop/lib/kudu-client-1.7.0.jar,/home/hadoop/lib/kudu-spark2_2.11-1.7.0.jar \
--conf spark.time=$time \
--conf spark.raw.path="hdfs://hadoop000:8020/tai/access/$time" \
--conf spark.ip.path="hdfs://hadoop000:8020/tai/access/ip.txt" \
/home/hadoop/lib/sparksql-train-1.0.jar
sh job.sh
4.Spark on Yarn
vi jobyarn.sh
time=20181007
${SPARK_HOME}/bin/spark-submit \
--class com.imooc.bigdata.cp08.SparkApp \
--master yarn \
--jars /home/hadoop/lib/kudu-client-1.7.0.jar,/home/hadoop/lib/kudu-spark2_2.11-1.7.0.jar \
--conf spark.time=$time \
--conf spark.raw.path="hdfs://hadoop000:8020/tai/access/$time" \
--conf spark.ip.path="hdfs://hadoop000:8020/tai/access/ip.txt" \
/home/hadoop/lib/sparksql-train-1.0.jar
sh jobyarn.sh
5.定时调度
0 */1 * * *
-r 删除
[hadoop@hadoop000 lib]$ date --date='1 day ago' +%Y%m%d
20200225
time=`date --date='1 day ago' +%Y%m%d`
${SPARK_HOME}/bin/spark-submit \
--class com.imooc.bigdata.cp08.SparkApp \
--master local \
--jars /home/hadoop/lib/kudu-client-1.7.0.jar,/home/hadoop/lib/kudu-spark2_2.11-1.7.0.jar \
--conf spark.time=$time \
--conf spark.raw.path="hdfs://hadoop000:8020/tai/access/$time" \
--conf spark.ip.path="hdfs://hadoop000:8020/tai/access/ip.txt" \
/home/hadoop/lib/sparksql-train-1.0.jar
crontab -e
0 3 * * * /home/hadoop/lib/job.sh
版权声明:
本文为《大数据技术与架构》整理,原创或原作者独家授权。未经原作者允许违规转载追究侵权责任。
本文编辑:冷眼丶
微信公众号|import_bigdata
文章不错?点个【在看】吧! ?
评论