Flink在实时在实时计算平台和实时数仓中的企业级应用小结
点击上方蓝色字体,选择“设为星标”
回复”资源“获取更多资源
大数据领域自 2010 年开始,以 Hadoop、Hive 为代表的离线计算开始进入各大公司的视野。大数据领域开始了如火如荼的发展。我个人在学校期间就开始关注大数据领域的技术迭代和更新,并且有幸在毕业后成为大数据领域的开发者。
Flink 实时计算
我是抖音主播,我想看带货销售情况的排行?
我是运营,我想看到我们公司销售商品的 TOP10?
我是开发,我想看到我们公司所有生产环境中服务器的运行情况?
......
Flink 实时数据仓库
技术选型
实时计算引擎
同步变异步
应用解耦
流量削峰
高吞吐:
可以满足每秒百万级别消息的生产和消费,并且可以通过横向扩展,保证数据处理能力可以得到线性扩展。
低延迟:
以时间复杂度为 O(1) 的方式提供消息持久化能力,即使对 TB 级以上数据也能保证常数时间复杂度的访问性能。
高容错:
Kafka 允许集群的节点出现失败。
可靠性:
消息可以根据策略进行磁盘的持久化,并且读写效率都很高。
生态丰富:
Kafka 周边生态极其丰富,与各个实时处理框架结合紧密。
强大的状态管理。
Flink 使用 State 存储中间状态和结果,并且有强大的容错能力;
非常丰富的 API。
Flink 提供了包含 DataSet API、DataStream API、Flink SQL 等等强大的API;
生态支持完善。
Flink 支持多种数据源(Kafka、MySQL等)和存储(HDFS、ES 等),并且和其他的大数据领域的框架结合完善;
批流一体。
Flink 已经在将流计算和批计算的 API 进行统一,并且支持直接写入 Hive。
高度汇总,高度汇总指标一般存储在 Redis、HBase 中供前端直接查询使用。
明细数据,在一些场景下,我们的运营和业务人员需要查询明细数据,有一些明细数据极其重要,比如双十一派送的包裹中会有一些丢失和破损。
实时消息,Flink 在计算完成后,有一个下游是发往消息系统,这里的作用主要是提供给其他业务复用;
另外,在一些情况下,我们计算好明细数据也需要再次经过消息系统才能落库,将原来直接落库拆成两步,方便我们进行问题定位和排查。
Hive、Hawq、Impala:
基于 SQL on Hadoop
Presto 和 Spark SQL 类似:
基于内存解析 SQL 生成执行计划
Kylin:
用空间换时间、预计算
Druid:
数据实时摄入加实时计算
ClickHouse:
OLAP 领域的 HBase,单表查询性能优势巨大
Greenpulm:
OLAP 领域的 PostgreSQL
Flink 实时数据仓库
强一致性
自动故障转移和容错性
极高的读写 QPS,非常适合存储 K-V 形式的指标
大厂的实时计算平台和实时数仓技术方案
作者的经验
数据源过多
数据源之间时间 GAP 巨大
离线数据和实时数据要求强一致性
这套数据架构引入了 Hbase 作为中间存储,数据链路变长。导致运维成本大量增加,整个架构的实时性能受制于 Hbase 的变更信息能不能及时发送。
指标没有分层,会导致 ADB 和 Hologres 成为查询瓶颈。在这套数据架构中,我们完全抛弃了中间指标层,完全依赖 SQL 直接汇总查询。一方面得益于省略中间层后指标的准确性,另一方面因为 SQL 直接查询会对 ADB 有巨大的查询压力,使得 ADB 消耗了巨大的资源和成本。
腾讯看点的实时数据系统设计
数据采集层
实时数据仓库层
实时数据存储层
多核 CPU 并行计算
SIMD 并行计算加速
分布式水平扩展集群
稀疏索引、列式存储、数据压缩
聚合分析优化
过去 30 分钟内容的查询,99% 的请求耗时在1秒内
过去 24 小时内容的查询,90% 的请求耗时在5秒内,99% 的请求耗时在 10 秒内
阿里巴巴批流一体数据仓库建设
统一元数据管理
统一计算引擎
统一数据存储
实战案例
架构设计
日志数据上报
日志数据清洗
实时计算程序
结果存储
Flume 和 Kafka 整合和部署
Kafka 模拟数据生成和发送
Flink 和 Kafka 整合时间窗口设计
Flink 计算 PV、UV 代码实现
Flink 和 Redis 整合以及 Redis Sink 实现
Flume 和 Kafka 整合和部署
tar zxf apache-flume-1.8.0-bin.tar.gz
# 定义这个 agent 中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source的配置,监听日志文件中的新增数据
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /home/logs/access.log
#sink配置,使用avro日志做数据的消费
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = flumeagent03
a1.sinks.k1.port = 9000
#channel配置,使用文件做数据的临时缓存
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/temp/flume/checkpoint
a1.channels.c1.dataDirs = /home/temp/flume/data
#描述和配置 source channel sink 之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c
$ flume-ng agent
-c conf
-n a1
-f conf/log_kafka.conf >/dev/null 2>&1 &
定义这个 agent 中各组件的名字
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#source配置
a1.sources.r1.type = avro
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 9000
#sink配置
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = log_kafka
a1.sinks.k1.brokerList = 127.0.0.1:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
#channel配置
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
#描述和配置 source channel sink 之间的连接关系
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
$ flume-ng agent
-c conf
-n a1
-f conf/flume_kafka.conf >/dev/null 2>&1 &
public class UserClick {
private String userId;
private Long timestamp;
private String action;
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public Long getTimestamp() {
return timestamp;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public UserClick(String userId, Long timestamp, String action) {
this.userId = userId;
this.timestamp = timestamp;
this.action = action;
}
}
enum UserAction{
//点击
CLICK("CLICK"),
//购买
PURCHASE("PURCHASE"),
//其他
OTHER("OTHER");
private String action;
UserAction(String action) {
this.action = action;
}
}
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
// 检查点配置,如果要用到状态后端,那么必须配置
env.setStateBackend(new MemoryStateBackend(true));
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("log_user_action", new SimpleStringSchema(), properties);
//设置从最早的offset消费
consumer.setStartFromEarliest();
DataStream<UserClick> dataStream = env
.addSource(consumer)
.name("log_user_action")
.map(message -> {
JSONObject record = JSON.parseObject(message);
return new UserClick(
record.getString("user_id"),
record.getLong("timestamp"),
record.getString("action")
);
})
.returns(TypeInformation.of(UserClick.class));
SingleOutputStreamOperator<UserClick> userClickSingleOutputStreamOperator = dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserClick>(Time.seconds(30)) {
@Override
public long extractTimestamp(UserClick element) {
return element.getTimestamp();
}
});
dataStream
.windowAll(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))
userClickSingleOutputStreamOperator
.keyBy(new KeySelector<UserClick, String>() {
@Override
public String getKey(UserClick value) throws Exception {
return DateUtil.timeStampToDate(value.getTimestamp());
}
})
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))
.evictor(TimeEvictor.of(Time.seconds(0), true))
...
public class DateUtil {
public static String timeStampToDate(Long timestamp){
ThreadLocal<SimpleDateFormat> threadLocal
= ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
String format = threadLocal.get().format(new Date(timestamp));
return format.substring(0,10);
}
}
public class MyProcessWindowFunction extends ProcessWindowFunction<UserClick,Tuple3<String,String, Integer>,String,TimeWindow>{
private transient MapState<String, String> uvState;
private transient ValueState<Integer> pvState;
public void open(Configuration parameters) throws Exception {
super.open(parameters);
uvState = this.getRuntimeContext().getMapState(new MapStateDescriptor<>("uv", String.class, String.class));
pvState = this.getRuntimeContext().getState(new ValueStateDescriptor<Integer>("pv", Integer.class));
}
public void process(String s, Context context, Iterable<UserClick> elements, Collector<Tuple3<String, String, Integer>> out) throws Exception {
Integer pv = 0;
Iterator<UserClick> iterator = elements.iterator();
while (iterator.hasNext()){
pv = pv + 1;
String userId = iterator.next().getUserId();
uvState.put(userId,null);
}
pvState.update(pvState.value() + pv);
Integer uv = 0;
Iterator<String> uvIterator = uvState.keys().iterator();
while (uvIterator.hasNext()){
String next = uvIterator.next();
uv = uv + 1;
}
Integer value = pvState.value();
if(null == value){
pvState.update(pv);
}else {
pvState.update(value + pv);
}
out.collect(Tuple3.of(s,"uv",uv));
out.collect(Tuple3.of(s,"pv",pvState.value()));
}
}
userClickSingleOutputStreamOperator
.keyBy(new KeySelector<UserClick, String>() {
@Override
public String getKey(UserClick value) throws Exception {
return value.getUserId();
}
})
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))
.evictor(TimeEvictor.of(Time.seconds(0), true))
.process(new MyProcessWindowFunction());
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
</dependency>
public class MyRedisSink implements RedisMapper<Tuple3<String,String, Integer>>{
/**
* 设置redis数据类型
*/
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET,"flink_pv_uv");
}
//指定key
public String getKeyFromData(Tuple3<String, String, Integer> data) {
return data.f1;
}
//指定value
public String getValueFromData(Tuple3<String, String, Integer> data) {
return data.f2.toString();
}
}
...
userClickSingleOutputStreamOperator
.keyBy(new KeySelector<UserClick, String>() {
@Override
public String getKey(UserClick value) throws Exception {
return value.getUserId();
}
})
.window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8)))
.trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))
.evictor(TimeEvictor.of(Time.seconds(0), true))
.process(new MyProcessWindowFunction())
.addSink(new RedisSink<>(conf,new MyRedisSink()));
...