Spark SQL快速入门系列之Hive

程序源代码

共 4579字,需浏览 10分钟

 ·

2020-08-31 09:41

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

大数据技术与架构
点击右侧关注,大数据开发领域最强公众号!

暴走大数据
点击右侧关注,暴走大数据!



目录

  • 一.hive和spark sql的集成方式(面试可能会问到)

  • 二.spark_shell和spark_sql操作

    • spark_shell

    • spark_sql

    • 使用hiveserver2 + beeline

  • 三.脚本使用spark-sql

  • 四.idea中读写Hive数据

    • 1.从hive中读数据

    • 2.从hive中写数据

    • 使用hive的insert语句去写

    • 使用df.write.saveAsTable("表名")(常用)

    • 使用df.write.insertInto("表名")

    • 3.saveAsTable和insertInto的原理

  • 五.聚合后的分区数


一.hive和spark sql的集成方式(面试可能会问到)


hive on spark(版本兼容)
官网https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started

spark on hive(版本兼容)
官网
http://spark.apache.org/docs/2.1.1/sql-programming-guide.html#hive-tables

二.spark_shell和spark_sql操作

spark_shell


如果你在集群上使用了tez,你需要在spark/conf下spark-defaults.conf添加lzo的路径

spark.jars=/export/servers/hadoop-2.7.7/share/hadoop/common/hadoop-lzo-0.4.20.jar
spark-yarn模式启动
bin/spark-shell --master yarn

spark_sql

完全跟sql一样


使用hiveserver2 + beeline

spark-sql 得到的结果不够友好, 所以可以使用hiveserver2 + beeline
1.启动thriftserver(后台)

sbin/start-thriftserver.sh
2.启动beeline
bin/beeline# 然后输入!connect jdbc:hive2://hadoop102:10000# 然后按照提示输入用户名和密码


三.脚本使用spark-sql

四.idea中读写Hive数据

1.从hive中读数据


添加依赖


    org.apache.spark    spark-hive_2.11    2.1.1

代码实现
import org.apache.spark.sql.SparkSession
object HiveRead { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .master("local[*]") .appName("HiveRead") //添加支持外置hive .enableHiveSupport() .getOrCreate()
spark.sql("show databases") spark.sql("use guli") spark.sql("select count(*) from gulivideo_orc").show()
spark.close()  }}

结果

2.从hive中写数据

使用hive的insert语句去写

import org.apache.spark.sql.SparkSession
object HiveWrite { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "root"); val spark = SparkSession .builder() .master("local[*]") .appName("HiveRead") //添加支持外置hive .enableHiveSupport() .config("spark.sql.warehouse.dir","hdfs://hadoop102:9000/user/hive/warehouse") .getOrCreate()
//先创建一个数据库 spark.sql("create database spark1602") spark.sql("use spark1602") spark.sql("create table user1(id int,name string)") spark.sql("insert into user1 values(10,'lisi')").show()
spark.close()
}}


使用df.write.saveAsTable(“表名”)(常用)

import org.apache.spark.sql.SparkSession
object HiveWrite { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "root"); val spark = SparkSession .builder() .master("local[*]") .appName("HiveRead") //添加支持外置hive .enableHiveSupport() .config("spark.sql.warehouse.dir","hdfs://hadoop102:9000/user/hive/warehouse") .getOrCreate()

val df = spark.read.json("D:\\idea\\spark-sql\\input\\user.json") spark.sql("use spark1602") //直接把数据写入到hive中,表可以存在也可以不存在 df.write.saveAsTable("user2") //也可以进行追加 //df.write.mode("append").saveAsTable("user2") spark.close()
  }}

使用df.write.insertInto(“表名”)

import org.apache.spark.sql.SparkSession
object HiveWrite { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "root"); val spark = SparkSession .builder() .master("local[*]") .appName("HiveRead") //添加支持外置hive .enableHiveSupport() .config("spark.sql.warehouse.dir","hdfs://hadoop102:9000/user/hive/warehouse") .getOrCreate()

val df = spark.read.json("D:\\idea\\spark-sql\\input\\user.json") spark.sql("use spark1602") df.write.insertInto("user2")
spark.close()  }}

3.saveAsTable和insertInto的原理

saveAsTable
使用列名进行分配值

insertInto
按照位置进行1对1

五.聚合后的分区数

import org.apache.spark.sql.SparkSession
object HiveWrite { def main(args: Array[String]): Unit = { System.setProperty("HADOOP_USER_NAME", "root"); val spark = SparkSession .builder() .master("local[*]") .appName("HiveRead") //添加支持外置hive .enableHiveSupport() .config("spark.sql.warehouse.dir","hdfs://hadoop102:9000/user/hive/warehouse") .getOrCreate()
val df = spark.read.json("D:\\idea\\spark-sql\\input\\user.json") df.createOrReplaceTempView("a")
spark.sql("use spark1602") val df1 = spark.sql("select * from a ") val df2 = spark.sql("select sum(age) sum_age from a group by name") println(df1.rdd.getNumPartitions) println(df2.rdd.getNumPartitions) df1.write.saveAsTable("a3") df2.write.saveAsTable("a4")
spark.close()  }}

结果:聚合函数分区数默认200个


如果数据量小,没必要200两个分区,简直浪费。

 df2.write.saveAsTable("a4")

修改为

 df2.coalesce(1).write.saveAsTable("a4")

版权声明:

本文为大数据技术与架构整理,原作者独家授权。未经原作者允许转载追究侵权责任。
编辑|谭林平丶
微信公众号|import_bigdata


欢迎点赞+收藏+转发朋友圈素质三连




文章不错?点个【在看】吧! ?


浏览 45
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报