Flink:Java开发环境的搭建
一、实现功能
流式开发Flink开发环境的搭建。
二、实现步骤:Java开发环境
【参考官网:https://ci.apache.org/projects/flink/flink-docs-release-1.7/dev/projectsetup/java_api_quickstart.html】
1.本地环境
(1)官网要求 Maven 3.0.4 (or higher) and Java 8.x (2)本地环境 Maven:3.3.9 Java 1.8
2.创建java项目
(1)进入项目目录,运行maven命令
mvn archetype:generate \
-DarchetypeGroupId=org.apache.flink \
-DarchetypeArtifactId=flink-quickstart-java \
-DarchetypeVersion=1.7.2 \
-DarchetypeCatalog=local
本地:
E:\Tools\WorkspaceforMyeclipse\flink_project>mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java -DarchetypeVersion=1.7.2 -DarchetypeCatalog=local
备注:添加DarchetypeCatalog参数,使创建项目更加快
(2)输入GAV对应参数
Define value for property 'groupId': com.bd.flink
Define value for property 'artifactId': flink-pro
Define value for property 'version' 1.0-SNAPSHOT: : 1.0
Define value for property 'package' com.bd.flink: :
Confirm properties configuration:
groupId: com.bd.flink
artifactId: flink-pro
version: 1.0
package: com.bd.flink
Y: : Y
(3)查看创建结果
E:\Tools\WorkspaceforMyeclipse\flink_project>tree
卷 本地磁盘 的文件夹 PATH 列表
卷序列号为 0003-6793
E:.
└─flink-pro
└─src
└─main
├─java
│ └─com
│ └─bd
│ └─flink
└─resources
3.导入idea
File-》Open-》导入项目pom.xml
查看项目结构
4.项目打包
(1)使用maven命令
进入项目根目录,执行
E:\Tools\WorkspaceforMyeclipse\flink_project\flink-pro>mvn clean package
打包结果
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ flink-pro ---
[INFO] Building jar: E:\Tools\WorkspaceforMyeclipse\flink_project\flink-pro\target\flink-pro-1.0.jar
[INFO]
[INFO] --- maven-shade-plugin:3.0.0:shade (default) @ flink-pro ---
[INFO] Excluding org.slf4j:slf4j-api:jar:1.7.15 from the shaded jar.
[INFO] Excluding org.slf4j:slf4j-log4j12:jar:1.7.7 from the shaded jar.
[INFO] Excluding log4j:log4j:jar:1.2.17 from the shaded jar.
[INFO] Replacing original artifact with shaded artifact.
[INFO] Replacing E:\Tools\WorkspaceforMyeclipse\flink_project\flink-pro\target\flink-pro-1.0.jar with E:\Tools\WorkspaceforMyeclipse\flink_project\flink-pro\target\flink-pro-1.0-shaded.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 11.005 s
[INFO] Finished at: 2019-11-30T16:24:20+08:00
[INFO] Final Memory: 25M/184M
[INFO] ------------------------------------------------------------------------
(2) 使用idea的maven打包工具 View-》Tools Windows-》Maven Projects-》clean+package
5.java开发WordCount项目实例
(1)四步
第一步:创建开发环境(set up the batch execution environment) 第二步:读取数据 第三步:开发业务逻辑(transform operations) 第四步:执行程序(execute program)
(2)代码
package com.bd.flink._1130application;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;
/**
* Created by Administrator on 2019/11/30.
* wordcount代码:java实现
*/
public class BatchWCJava {
public static void main(String[] args) throws Exception {
String input="data\\hello.txt";
//第一步:创建开发环境(set up the batch execution environment)
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//第二步:读取数据
DataSource<String> text=env.readTextFile(input);
//第三步:开发业务逻辑(transform operations)
text.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] tokens=value.toLowerCase().split(" ");
for (String token : tokens) {
if(token.length()>0){
collector.collect(new Tuple2<String,Integer>(token,1));
}
}
}
}).groupBy(0).sum(1).print();
// 第四步:执行程序(execute program)
// execute(), count(), collect(), 或者print()都是执行算子,运行即可
// env.execute("Flink Batch Java API Skeleton");
}
}
(3)运行结果
其中flink_project\flink-pro\data\hello.txt内容
flink hadoop storm
flume spark streaming
is excellent
执行结果
(is,1)
(streaming,1)
(excellent,1)
(hadoop,1)
(flink,1)
(flume,1)
(storm,1)
(spark,1)
评论