Flink入门 03.入门案例

大数据AI

共 22218字,需浏览 45分钟

 · 2021-09-04

1   前置说明

1.1   API

Flink提供了多个层次的API供开发者使用,越往上抽象程度越高,使用起来越方便;越往下越底层,使用起来难度越大

注意:在Flink1.12时支持流批一体,DataSet API已经不推荐使用了,所以课程中除了个别案例使用DataSet外,后续其他案例都会优先使用DataStream流式API,既支持无界数据处理/流处理,也支持有界数据处理/批处理!当然Table&SQL-API会单独学习

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/batch/

https://developer.aliyun.com/article/780123?spm=a2c6h.12873581.0.0.1e3e46ccbYFFrC

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html

1.2   编程模型

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html

Flink 应用程序结构主要包含三部分,Source/Transformation/Sink,如下图所示:

2   准备工程

2.1  pom文件

<properties>
    <maven.compiler.source>8</maven.compiler.source>
    <maven.compiler.target>8</maven.compiler.target>
    <flink.version>1.13.2</flink.version>
</properties>

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.20</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.25</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-kafka_2.11</artifactId>
        <version>${flink.version}</version>
    </dependency>
</dependencies>

2.2   log4j.properties

log4j.rootLogger=WARN, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n

3   Flink初体验

3.1   需求

使用Flink实现WordCount

3.2   编码步骤

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html

  1. 准备环境-env

  2. 准备数据-source

  3. 处理数据-transformation

  4. 输出结果-sink

  5. 触发执行-execute

其中创建环境可以使用如下3种方式:

getExecutionEnvironment() //推荐使用
createLocalEnvironment()
createRemoteEnvironment(String host, int port, String... jarFiles)

3.3   代码实现

3.3.1   基于DataSet

package com.song.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.operators.Order;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.*;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

/**
 * 需求:使用Flink完成WordCount-DataSet
 * 编码步骤
 * 1.准备环境-env
 * 2.准备数据-source
 * 3.处理数据-transformation
 * 4.输出结果-sink
 * 5.触发执行-execute//如果有print,DataSet不需要调用execute,DataStream需要调用execute
 */

public class WorkCountWithDataSet {
    public static void main(String[] args) throws Exception {
        //老版本的批处理API如下,但已经不推荐使用了
        //1.准备环境-env
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //2.准备数据-source
        DataSet<String> lineDS = env.fromElements("Hello Hadoop""Hello Spark""Hello Flink");

        //3.处理数据-transformation
        //3.1 每行数据按照空格切分成一个个的单词组成的集合
        DataSet<String> wordsDS = lineDS.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> out) throws Exception {
                // line就是一行行的数据
                String[] words = line.split(" ");
                for (String word : words) {
                    // 将切割处理的一个个单词收集起来并返回
                    out.collect(word);
                }
            }
        });

        // 3.2 对集合中的每个单词记为1,(word, 1)
        DataSet<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1);
            }
        });

        // 3.3 对数据按照单词key进行分组
        // 0表示按照tuple中索引为0的字段,也就是key(单词)进行分组
        UnsortedGrouping<Tuple2<String, Integer>> groupedDS = wordAndOneDS.groupBy(0);

        // 3.4 对各个组内数据按照数量(value)进行聚合求sum
        // 1表示按照tuple中的索引为1的字段也就是按照数量进行聚合累加
        DataSet<Tuple2<String, Integer>> aggResult = groupedDS.sum(1);

        // 3.5 排序
        DataSet<Tuple2<String, Integer>> result = aggResult.sortPartition(1, Order.DESCENDING).setParallelism(1);

        // 4.输出结果
        result.print();

        // 5.触发执行-execute
        // 如果有print,Dataset不需要调用execute,DataStream需要调用execute
        // env.execute(); // execute(),count(),collect(),print()
    }
}

执行结果如下:

3.3.2   基于DataStream

package com.song.flink;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

/**
 * Author SongXitang
 * Desc
 * 需求:使用Flink完成WordCount-DataStream
 * 编码步骤
 * 1.准备环境-env
 * 2.准备数据-source
 * 3.处理数据-transformation
 * 4.输出结果-sink
 * 5.触发执行-execute
 */

public class WordCountWithDataStream {
    public static void main(String[] args) throws Exception {
        // 新版本的流批统一API,既支持流处理也支持批处理
        // 1.准备环境-env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        //env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        //env.setRuntimeMode(RuntimeExecutionMode.BATCH);

        // 2.准备数据-source
        DataStream<String> linesDS = env.fromElements("Hello Hadoop""Hello Spark""Hello Flink");

        // 3.处理数据-transfromation
        DataStream<String> wordsDS = linesDS.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String line, Collector<String> collector) throws Exception {
                String[] words = line.split(" ");
                for (String word : words) {
                    collector.collect(word);
                }
            }
        });
        DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map(new MapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public Tuple2<String, Integer> map(String word) throws Exception {
                return Tuple2.of(word, 1);
            }
        });
        //KeyedStream<Tuple2<String, Integer>, Tuple> keyByDS = wordAndOneDS.keyBy(0);
        KeyedStream<Tuple2<String, Integer>, String> keyByDS = wordAndOneDS.keyBy(t -> t.f0);
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = keyByDS.sum(1);

        // 4.输出结果
        result.print();

        // 5.触发执行-execute
        // DataStream需要调用execute
        env.execute();
    }
}

执行结果如下:

3.3.3   Lambda版

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/java_lambdas.html#java-lambda-expressions

package com.song.flink;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

public class WordCountLambda {
    public static void main(String[] args) throws Exception {
        // 1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

        // 2.source
        DataStream<String> linesDS = env.fromElements("Hello Hadoop""Hello Spark""Hello Flink");

        // 3.transformation
        DataStream<String> wordsDS = linesDS.flatMap((String value, Collector<String> out) ->
                Arrays.stream(value.split(" ")).forEach(out::collect)).returns(Types.STRING);
        //DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map((String value) -> Tuple2.of(value, 1)).returns(Types.TUPLE(Types.STRING, Types.INT));
        DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map((String value) -> Tuple2.of(value, 1),
                TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
        KeyedStream<Tuple2<String, Integer>, String> keyByDS = wordAndOneDS.keyBy(t -> t.f0);
        DataStream<Tuple2<String, Integer>> result = keyByDS.sum(1);

        // 4.sink
        result.print();

        // 5.execute
        env.execute();
    }
}

执行结果如下:

3.3.4   在Yarn上运行

注意

写入HDFS如果存在权限问题:

进行如下设置:

hadoop fs -chmod -R 777 /

并在代码中添加:

System.setProperty("HADOOP_USER_NAME""root")
  1. 修改代码

    package com.song.flink;

    import org.apache.flink.api.common.RuntimeExecutionMode;
    import org.apache.flink.api.common.typeinfo.TypeHint;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.api.java.utils.ParameterTool;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.util.Collector;

    import java.util.Arrays;

    public class WordCountYarn {
        public static void main(String[] args) throws Exception {
            // 设置yarn提交用户
            System.setProperty("HADOOP_USER_NAME""song");

            // 获取参数
            ParameterTool params = ParameterTool.fromArgs(args);
            String output = null;
            if (params.has("output")){
                output = params.get("output");
            }else {
                output = "hdfs://nameservice1/data/flink/wordcount/output_" + System.currentTimeMillis();
            }

            // env
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

            // source
            DataStream<String> linesDS = env.fromElements("Hello Hadoop""Hello Spark""Hello Flink");

            // transformation
            DataStream<String> wordsDS = linesDS.flatMap((String line, Collector<String> out) -> Arrays.stream(line.split(" "))
                    .forEach(out::collect)).returns(Types.STRING);
            DataStream<Tuple2<String, Integer>> wordAndOneDS = wordsDS.map((String word) -> Tuple2.of(word, 1),
                    TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {}));
            KeyedStream<Tuple2<String, Integer>, String> keyByDS = wordAndOneDS.keyBy(t -> t.f0);
            DataStream<Tuple2<String, Integer>> result = keyByDS.sum(1);

            // sink
            result.writeAsText(output).setParallelism(1);

            // execute
            env.execute();
        }
    }
  2. 打包

  3. 查看打包的jar

  4. 上传

    [song@cdh68 jars]$ pwd
    /home/song/data/jars
    [song@cdh68 jars]$ ll
    total 16
    -rw-r--r-- 1 song song 15532 Aug 31 16:49 WordCount-1.0-SNAPSHOT.jar
    [song@cdh68 jars]$ chmod 755 WordCount-1.0-SNAPSHOT.jar 
    [song@cdh68 jars]$ ll
    total 16
    -rwxr-xr-x 1 song song 15532 Aug 31 16:49 WordCount-1.0-SNAPSHOT.jar
  5. 提交任务

    https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_execution_mode.html

    [song@cdh68 ~]$ flink run-application -t yarn-application \
    -Dexecution.runtime-mode=BATCH \
    -yjm 4096 \
    -ytm 16384 \
    -ys 4 \
    -c com.song.flink.WordCountYarn \
    /home/song/data/jars/WordCount-1.0-SNAPSHOT.jar \
    --output hdfs://nameservice1/data/flink/wordcount/output
  6. 在Web页面可以观察到提交的程序


欢迎关注微信公众号:大数据AI
浏览 8
点赞
评论
收藏
分享

手机扫一扫分享

举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

举报