数仓(八)从0到1简单搭建加载数仓DWD层(用户行为日志数据解析)
一、DWD层用户行为日志解析结构
DWD层是对用户的日志行为进行解析,以及对业务数据采用维度模型的方式重新建模(维度退化)。本节我们先来回顾一下用户行为日志的结构。
1、前端埋点日志信息
前端埋点日志信息都是JSON格式形式,主要包括两方面:
(1)启动日志;(2)事件日志;
我们之前已经把前端埋点的日志信息,写到ODS层ods_log表了,传入的参数是一个String类型字符串即一条日志信息一个String类型字符串。
二、DWD层-启动日志表
1、启动日志表结构
分区:
dt = 2020-06-14
过滤条件:
利用get_json_object函数,解析start内容不为空说明是启动日志信息;
范围
包括:公共信息common、启动信息start、启动app时间ts;
2、创建表结构
DROP TABLE IF EXISTS dwd_start_log;
CREATE EXTERNAL TABLE dwd_start_log(
`area_code` STRING COMMENT '地区编码',
`brand` STRING COMMENT '手机品牌',
`channel` STRING COMMENT '渠道',
`is_new` STRING COMMENT '是否首次启动',
`model` STRING COMMENT '手机型号',
`mid_id` STRING COMMENT '设备id',
`os` STRING COMMENT '操作系统',
`user_id` STRING COMMENT '会员id',
`version_code` STRING COMMENT 'app版本号',
`entry` STRING COMMENT 'icon手机图标 notice 通知 install 安装后启动',
`loading_time` BIGINT COMMENT '启动加载时间',
`open_ad_id` STRING COMMENT '广告页ID ',
`open_ad_ms` BIGINT COMMENT '广告总共播放时间',
`open_ad_skip_ms` BIGINT COMMENT '用户跳过广告时点',
`ts` BIGINT COMMENT '时间'
) COMMENT '启动日志表'
PARTITIONED BY (`dt` STRING) -- 按照时间创建分区
STORED AS PARQUET -- 采用parquet列式存储
LOCATION '/warehouse/gmall/dwd/dwd_start_log' -- 指定在HDFS上存储位置
TBLPROPERTIES('parquet.compression'='lzo') -- 采用LZO压缩;
3、装载数据
首日和每日加载数据分区都是一样的策略,每天DWD层从ODS层获取数据后加载。
insert overwrite table dwd_start_log partition(dt='2020-06-14')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.is_new'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.start.entry'),
get_json_object(line,'$.start.loading_time'),
get_json_object(line,'$.start.open_ad_id'),
get_json_object(line,'$.start.open_ad_ms'),
get_json_object(line,'$.start.open_ad_skip_ms'),
get_json_object(line,'$.ts')
from ods_log
where dt='2020-06-14'
and get_json_object(line,'$.start') is not null;
SET hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
三、DWD层-页面日志表
分区:
dt = 2020-06-14
过滤条件:
利用get_json_object函数,解析page内容不为空说明是页面日志信息;
范围
包括:公共信息common、页面信息page、启动app时间ts;
3、创建表结构
drop table if exists dwd_page_log;
CREATE EXTERNAL TABLE dwd_page_log(
`area_code` string COMMENT '地区编码',
`brand` string COMMENT '手机品牌',
`channel` string COMMENT '渠道',
`model` string COMMENT '手机型号',
`mid_id` string COMMENT '设备id',
`os` string COMMENT '操作系统',
`user_id` string COMMENT '会员id',
`version_code` string COMMENT 'app版本号',
`during_time` bigint COMMENT '持续时间毫秒',
`page_item` string COMMENT '目标id ',
`page_item_type` string COMMENT '目标类型',
`last_page_id` string COMMENT '上页类型',
`page_id` string COMMENT '页面ID ',
`source_type` string COMMENT '来源类型',
`ts` bigint
) COMMENT '页面日志表'
PARTITIONED BY (dt string)
stored as parquet
LOCATION '/warehouse/gmall/dwd/dwd_page_log'
TBLPROPERTIES('parquet.compression'='lzo');
insert overwrite table dwd_page_log partition(dt='2020-06-14')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.sourceType'),
get_json_object(line,'$.ts')
from ods_log
where dt='2020-06-14'
and get_json_object(line,'$.page') is not null;
四、DWD层-动作日志表
根据图示我们可以看到:
(1)需要自定义创建UDTF函数
来完成对actions动作数组的“炸裂”,实现“一行输入,多行输出”的需求。即输入JSON数组字符串actions,输出每一个JSON数组元素action。
(2)然后通过get_json_object(action,$action元素字段)获取信息;
分区:
dt = 2020-06-14
过滤条件:
利用get_json_object函数,解析actions内容不为空;
范围
包括:公共信息common、页面信息page、动作信息、启动app时间ts;
2、创建表结构
drop table if exists dwd_action_log;
CREATE EXTERNAL TABLE dwd_action_log(
`area_code` string COMMENT '地区编码',
`brand` string COMMENT '手机品牌',
`channel` string COMMENT '渠道',
`model` string COMMENT '手机型号',
`mid_id` string COMMENT '设备id',
`os` string COMMENT '操作系统',
`user_id` string COMMENT '会员id',
`version_code` string COMMENT 'app版本号',
`during_time` bigint COMMENT '持续时间毫秒',
`page_item` string COMMENT '目标id ',
`page_item_type` string COMMENT '目标类型',
`last_page_id` string COMMENT '上页类型',
`page_id` string COMMENT '页面id ',
`source_type` string COMMENT '来源类型',
`action_id` string COMMENT '动作id',
`item` string COMMENT '目标id ',
`item_type` string COMMENT '目标类型',
`ts` bigint COMMENT '时间'
) COMMENT '动作日志表'
PARTITIONED BY (dt string)
stored as parquet
LOCATION '/warehouse/gmall/dwd/dwd_action_log'
TBLPROPERTIES('parquet.compression'='lzo');
org.apache.hive
hive-exec
3.1.2
package com.qiusheng.hive.udtf;
import org.apache.hadoop.hive.ql.exec.MapredContext;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import java.util.ArrayList;
import java.util.List;
/**
* @author qiusheng
* @date 2021年11月04日
*
*/
public class ExplodeJSONArray extends GenericUDTF
{
/**
* 1、第一步需要自定义的类继承GenericUDTF类
* 并且重写initialize方法;
* 返回StructObjectInspector对象
*
* @qiusheng
* @param argOIs
* @return
* @throws UDFArgumentException
*/
@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException
{
//1、判断参数的合法性argOI实际就是actions
//(1)判断参数的个数(需要传递一个参数)
//如果传递的参数个数不是1个,则抛异常;
if(argOIs.getAllStructFieldRefs().size() != 1)
{
throw new UDFArgumentException("参数个数错误!只需要1个参数......");
}
//(2)判断传递的参数类型,必须是String类型
//如果不是string类型,则抛异常;
if(!"String".equals(argOIs.getAllStructFieldRefs().get(0).getFieldObjectInspector().getTypeName()))
{
throw new UDFArgumentException("参数类型不对!应该是String类型......");
}
//2、返回StructObjectInspector对象
//第一个参数:变量名,类型是List
数组; //第二个参数:检验变量,类型是List
;
//定义返回值名称
List
fieldNames = new ArrayList ();
//定义返回值类型
List
fieldOIs = new ArrayList ();
//把items添加到返回值名称
fieldNames.add("items");
//调用检验方法对items
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
//3、返回类型
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames,fieldOIs);
}
/**
* 2、第二步重写process方法
*
* function:炸裂功能
* @author qiusheng
* @param objects
* @throws HiveException
*/
@Override
public void process(Object[] objects) throws HiveException
{
//1、获取传入的数据就是actions
String jsonArray = objects.toString();
//2、把传入的数据(string)类型转化为JSON数组JSONArray类型
JSONArray actions = new JSONArray(jsonArray);
//3、循环取出JSONArray对象中的JSON,并且写出来
for (int i = 0;i < actions.length();i++)
{
//定义一个字符串数组,长度就是1
String[] result = new String[1];
//getString(i)把元素取出来,添加到这个数组中
result[0] = actions.getString(i);
//写到String数组里面
forward(result);
}
}
/**
* @author qiusheng
* @throws HiveException
*/
@Override
public void close() throws HiveException
{
}
}
3.2、通过maven打成jar包
3.3、上传jar包到hadoop集群路径下
(1)先上传jar包到CentOS集群的node1节点
[qiusheng@node01 module]$ hadoop fs -mkdir -p /functions/hive/jars
上传jar包到这个目录/functions/hive/jars
hive (gmall)>
create function explode_json_array as 'com.qiusheng.hive.udtf.ExplodeJSONArray' using jar 'hdfs://mode01:8020/functions/hive/jars/HiveDWDActionLog-1.0-SNAPSHOT.jar';
insert overwrite table dwd_action_log partition(dt='2020-06-14')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.during_time'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.sourceType'),
get_json_object(action,'$.action_id'),
get_json_object(action,'$.item'),
get_json_object(action,'$.item_type'),
get_json_object(action,'$.ts')
from ods_log lateral view explode_json_array(get_json_object(line,'$.actions')) tmp as action
where dt='2020-06-14'
and get_json_object(line,'$.actions') is not null;
from ods_log lateral view explode_json_array(get_json_object(line,'$.actions')) tmp as action
五、DWD层-曝光日志表
六、DWD层-错误日志表
drop table if exists dwd_error_log;
CREATE EXTERNAL TABLE dwd_error_log(
`area_code` string COMMENT '地区编码',
`brand` string COMMENT '手机品牌',
`channel` string COMMENT '渠道',
`model` string COMMENT '手机型号',
`mid_id` string COMMENT '设备id',
`os` string COMMENT '操作系统',
`user_id` string COMMENT '会员id',
`version_code` string COMMENT 'app版本号',
`page_item` string COMMENT '目标id ',
`page_item_type` string COMMENT '目标类型',
`last_page_id` string COMMENT '上页类型',
`page_id` string COMMENT '页面ID ',
`source_type` string COMMENT '来源类型',
`entry` string COMMENT ' icon手机图标 notice 通知 install 安装后启动',
`loading_time` string COMMENT '启动加载时间',
`open_ad_id` string COMMENT '广告页ID ',
`open_ad_ms` string COMMENT '广告总共播放时间',
`open_ad_skip_ms` string COMMENT '用户跳过广告时点',
`actions` string COMMENT '动作',
`displays` string COMMENT '曝光',
`ts` string COMMENT '时间',
`error_code` string COMMENT '错误码',
`msg` string COMMENT '错误信息'
) COMMENT '错误日志表'
PARTITIONED BY (dt string)
stored as parquet
LOCATION '/warehouse/gmall/dwd/dwd_error_log'
TBLPROPERTIES('parquet.compression'='lzo');
insert overwrite table dwd_error_log partition(dt='2020-06-14')
select
get_json_object(line,'$.common.ar'),
get_json_object(line,'$.common.ba'),
get_json_object(line,'$.common.ch'),
get_json_object(line,'$.common.md'),
get_json_object(line,'$.common.mid'),
get_json_object(line,'$.common.os'),
get_json_object(line,'$.common.uid'),
get_json_object(line,'$.common.vc'),
get_json_object(line,'$.page.item'),
get_json_object(line,'$.page.item_type'),
get_json_object(line,'$.page.last_page_id'),
get_json_object(line,'$.page.page_id'),
get_json_object(line,'$.page.sourceType'),
get_json_object(line,'$.start.entry'),
get_json_object(line,'$.start.loading_time'),
get_json_object(line,'$.start.open_ad_id'),
get_json_object(line,'$.start.open_ad_ms'),
get_json_object(line,'$.start.open_ad_skip_ms'),
get_json_object(line,'$.actions'),
get_json_object(line,'$.displays'),
get_json_object(line,'$.ts'),
get_json_object(line,'$.err.error_code'),
get_json_object(line,'$.err.msg')
from ods_log
where dt='2020-06-14'
and get_json_object(line,'$.err') is not null;