Flink入门 03.入门案例
1 前置说明
1.1 API
Flink提供了多个层次的API供开发者使用,越往上抽象程度越高,使用起来越方便;越往下越底层,使用起来难度越大
注意:在Flink1.12时支持流批一体,DataSet API已经不推荐使用了,所以课程中除了个别案例使用DataSet外,后续其他案例都会优先使用DataStream流式API,既支持无界数据处理/流处理,也支持有界数据处理/批处理!当然Table&SQL-API会单独学习
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/batch/
https://developer.aliyun.com/article/780123?spm=a2c6h.12873581.0.0.1e3e46ccbYFFrC
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html
1.2 编程模型
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html
Flink 应用程序结构主要包含三部分,Source/Transformation/Sink,如下图所示:
2 准备工程
2.1 pom文件
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.13.2</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
2.2 log4j.properties
log4j.rootLogger=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
3 Flink初体验
3.1 需求
使用Flink实现WordCount
3.2 编码步骤
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html
准备环境-env
准备数据-source
处理数据-transformation
输出结果-sink
触发执行-execute
其中创建环境可以使用如下3种方式:
getExecutionEnvironment() //推荐使用
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)
3.3 代码实现
3.3.1 基于DataSet
package com.song.flink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* 需求:使用Flink完成WordCount-DataSet
* 编码步骤
* 1.准备环境-env
* 2.准备数据-source
* 3.处理数据-transformation
* 4.输出结果-sink
* 5.触发执行-execute//如果有print,DataSet不需要调用execute,DataStream需要调用execute
*/
public class WorkCountWithDataSet {
public static void main(String[] args) throws Exception {
//老版本的批处理API如下,但已经不推荐使用了
//1.准备环境-env
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//2.准备数据-source
DataSet<String> lineDS = env.fromElements("Hello Hadoop", "Hello Spark", "Hello Flink");
//3.处理数据-transformation
//3.1 每行数据按照空格切分成一个个的单词组成的集合
DataSet<String> wordsDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> out) throws Exception {
// line就是一行行的数据
String[] words = line.split(" ");
for (String word : words) {
// 将切割处理的一个个单词收集起来并返回
out.collect(word);
}
}
});
// 3.2 对集合中的每个单词记为1,(word, 1)
DataSet<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
// 3.3 对数据按照单词key进行分组
// 0表示按照tuple中索引为0的字段,也就是key(单词)进行分组
UnsortedGrouping<Tuple2<String, Integer>> groupedDS = wordAndOneDS.groupBy(0);
// 3.4 对各个组内数据按照数量(value)进行聚合求sum
// 1表示按照tuple中的索引为1的字段也就是按照数量进行聚合累加
DataSet<Tuple2<String, Integer>> aggResult = groupedDS.sum(1);
// 3.5 排序
DataSet<Tuple2<String, Integer>> result = aggResult.sortPartition(1, Order.DESCENDING).setParallelism(1);
// 4.输出结果
result.print();
// 5.触发执行-execute
// 如果有print,Dataset不需要调用execute,DataStream需要调用execute
// env.execute(); // execute(),count(),collect(),print()
}
}
执行结果如下:
3.3.2 基于DataStream
package com.song.flink;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* Author SongXitang
* Desc
* 需求:使用Flink完成WordCount-DataStream
* 编码步骤
* 1.准备环境-env
* 2.准备数据-source
* 3.处理数据-transformation
* 4.输出结果-sink
* 5.触发执行-execute
*/
public class WordCountWithDataStream {
public static void main(String[] args) throws Exception {
// 新版本的流批统一API,既支持流处理也支持批处理
// 1.准备环境-env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
//env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
//env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 2.准备数据-source
DataStream<String> linesDS = env.fromElements("Hello Hadoop", "Hello Spark", "Hello Flink");
// 3.处理数据-transfromation
DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String line, Collector<String> collector) throws Exception {
String[] words = line.split(" ");
for (String word : words) {
collector.collect(word);
}
}
});
DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String word) throws Exception {
return Tuple2.of(word, 1);
}
});
//KeyedStream<Tuple2<String, Integer>, Tuple> keyByDS = wordAndOneDS.keyBy(0);
KeyedStream<Tuple2<String, Integer>, String> keyByDS = wordAndOneDS.keyBy(t -> t.f0);
SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyByDS.sum(1);
// 4.输出结果
result.print();
// 5.触发执行-execute
// DataStream需要调用execute
env.execute();
}
}
执行结果如下:
3.3.3 Lambda版
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/java_lambdas.html#java-lambda-expressions
package com.song.flink;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class WordCountLambda {
public static void main(String[] args) throws Exception {
// 1.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 2.source
DataStream<String> linesDS = env.fromElements("Hello Hadoop", "Hello Spark", "Hello Flink");
// 3.transformation
DataStream<String> wordsDS = linesDS.flatMap((String value, Collector<String> out) ->
Arrays.stream(value.split(" ")).forEach(out::collect)).returns(Types.STRING);
//DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map((String value) -> Tuple2.of(value, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map((String value) -> Tuple2.of(value, 1),
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
KeyedStream<Tuple2<String, Integer>, String> keyByDS = wordAndOneDS.keyBy(t -> t.f0);
DataStream<Tuple2<String, Integer>> result = keyByDS.sum(1);
// 4.sink
result.print();
// 5.execute
env.execute();
}
}
执行结果如下:
3.3.4 在Yarn上运行
注意
写入HDFS如果存在权限问题:
进行如下设置:
hadoop fs -chmod -R 777 /
并在代码中添加:
System.setProperty("HADOOP_USER_NAME", "root")
修改代码
package com.song.flink;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
public class WordCountYarn {
public static void main(String[] args) throws Exception {
// 设置yarn提交用户
System.setProperty("HADOOP_USER_NAME", "song");
// 获取参数
ParameterTool params = ParameterTool.fromArgs(args);
String output = null;
if (params.has("output")){
output = params.get("output");
}else {
output = "hdfs://nameservice1/data/flink/wordcount/output_" + System.currentTimeMillis();
}
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
DataStream<String> linesDS = env.fromElements("Hello Hadoop", "Hello Spark", "Hello Flink");
// transformation
DataStream<String> wordsDS = linesDS.flatMap((String line, Collector<String> out) -> Arrays.stream(line.split(" "))
.forEach(out::collect)).returns(Types.STRING);
DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map((String word) -> Tuple2.of(word, 1),
TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
KeyedStream<Tuple2<String, Integer>, String> keyByDS = wordAndOneDS.keyBy(t -> t.f0);
DataStream<Tuple2<String, Integer>> result = keyByDS.sum(1);
// sink
result.writeAsText(output).setParallelism(1);
// execute
env.execute();
}
}打包
查看打包的jar
上传
[song@cdh68 jars]$ pwd
/home/song/data/jars
[song@cdh68 jars]$ ll
total 16
-rw-r--r-- 1 song song 15532 Aug 31 16:49 WordCount-1.0-SNAPSHOT.jar
[song@cdh68 jars]$ chmod 755 WordCount-1.0-SNAPSHOT.jar
[song@cdh68 jars]$ ll
total 16
-rwxr-xr-x 1 song song 15532 Aug 31 16:49 WordCount-1.0-SNAPSHOT.jar提交任务
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html
[song@cdh68 ~]$ flink run-application -t yarn-application \
-Dexecution.runtime-mode=BATCH \
-yjm 4096 \
-ytm 16384 \
-ys 4 \
-c com.song.flink.WordCountYarn \
/home/song/data/jars/WordCount-1.0-SNAPSHOT.jar \
--output hdfs://nameservice1/data/flink/wordcount/output在Web页面可以观察到提交的程序