尚硅谷唯快不破,与 StreamX 强强联合分享最强流批一体框架,重新...
共 22110字,需浏览 45分钟
·
2022-05-06 13:35
StreamX是一款非常优秀的Flink&Spark极速开发框架与一站式大数据流处理平台。开发者在使用StreamX时只需编写简单的配置文件,即可实现多个开箱即用功能的复用;通过简洁易用的Web页面,即可实现开发任务的编译、部署与管理监控。
在2021年11月18日,StreamX成功入选开源中国和Gitee联合评选的的2021年度最有价值的开源项目。
gvp2021StreamX项目的初衷就是,让Flink&Spark的开发变得更简单。我们在使用StreamX进行开发工作的时候,可以降低学习成本与开发难度,大大简化了Flink任务的日常操作和维护工作,让开发者可以只关心最核心的业务。通过规范项目的配置,鼓励函数式编程,提供了一系列开箱即用的Connectors,标准化了配置、开发、测试、部署、监控、运维的整个过程,最终目标是为了打造一个一站式的大数据流处理平台,提供流批一体、湖仓一体的解决方案。
StreamX 遵循 Apache-2.0 开源协议,将会是个长期更新的活跃项目,自开源以来就受到很多同行的关注和认可,其中不乏业内大佬。
目前在没有任何形式推广的情况下,靠口碑在短短半年时间内已经累计 500+ star,已陆续有来自金融,数据分析,车联网,智能广告,地产等公司的朋友在使用或进行二次开发,也不乏来自一线大厂的朋友在研究使用。
来到官网可以查看项目的最新进展:
Streamx 官网: http://streamxhub.com
Streamx Github: https://github.com/streamxhub/streamx
Streamx Gitee: https://gitee.com/streamxhub/streamx
尚硅谷本着非精品不用的原则,与StreamX创作团队达成合作,针对StreamX的最新发布的1.2.2版本,尚硅谷大数据团队研发了配套的视频资料和使用教程,来提供给各位开发者学习使用!
1.1 什么是Streamx
StreamX项目的初衷是:让Flink开发更简单, 使用StreamX开发,可以极大降低学习成本和开发门槛,让开发者只用关心最核心的业务。
StreamX规范了项目的配置,鼓励函数式编程,定义了最佳的编程方式,提供了一系列开箱即用的Connectors,标准化了配置、开发、测试、部署、监控、运维的整个过程,提供scala和java两套api,其最终目的是打造一个一站式大数据平台,流批一体,湖仓一体的解决方案。
1.2 Streamx的特色
- 开发脚手架
- 多版本Flink支持(1.12+)
- 一系列开箱即用的connectors
- 支持项目编译功能(maven 编译)
- 在线参数配置
- 支持Applicaion 模式,Yarn-Per-Job模式启动
- 快捷的日常操作(任务启动、停止、savepoint,从savepoint恢复)
- 支持火焰图
- 支持notebook(在线任务开发)
- 项目配置和依赖版本化管理
- 支持任务备份、回滚(配置回滚)
- 在线管理依赖(maven pom)和自定义jar
- 自定义udf、连接器等支持
- Flink SQL WebIDE
- 支持catalog、hive
- 任务运行失败发送告警邮件
- 支持失败重启重试
- 从任务开发阶段到部署管理全链路支持
- ...
1.3 正在使用Streamx的公司
正在使用Streamx的公司第2章 Streamx架构StreamX的架构图
streamx-core定位是一个开发时框架
,关注编码开发,规范了配置文件,按照约定优于配置的方式进行开发,提供了一个开发时 RunTime Content和一系列开箱即用的Connector,扩展了DataStream相关的方法,融合了DataStream和Flink SQL api,简化繁琐的操作,聚焦业务本身,提高开发效率和开发体验。
streamx-pump的定位是一个数据抽取的组件
,类似于flinkx,基于streamx-core中提供的各种connector开发。pump是抽水机,水泵的意思,目的是打造一个方便快捷,开箱即用的大数据实时数据抽取和迁移组件,并且集成到streamx-console中,解决实时数据源获取问题,目前在规划中。
streamx-console是一个综合实时数据平台
,低代码(Low Code)平台,可以较好的管理Flink任务,集成了项目编译、发布、参数配置、启动、savepoint,火焰图(flame graph),Flink SQL,监控等诸多功能于一体,大大简化了Flink任务的日常操作和维护,融合了诸多最佳实践。“旧时王谢堂前燕,飞入寻常百姓家”,StreamX让大公司有能力研发使用的项目,现在人人可以使用,其最终目标是打造成一个实时数仓,流批一体的一站式大数据解决方案。
第3章 安装部署Streamx
本教程基于社区于2022年3月24日发布的最新版本1.2.2编写。
3.1 环境要求
StreamX的环境要求3.2 准备环境
3.2.1 linux系统
选择Centos7.5
3.2.2 jdk
选择1.8
3.2.3 maven
选择3.8.5
cd /opt/software
wget https://dlcdn.apache.org/maven/maven-3/3.8.5/binaries/apache-maven-3.8.5-bin.tar.gz --no-check-certificate
tar -zxvf apache-maven-3.8.5-bin.tar.gz -C /opt/module
sudo ln -s /opt/module/apache-maven-3.8.5/bin/mvn /usr/bin/mvn
wget https://gitee.com/lzc2025/maven_setting/raw/master/settings.xml -O /opt/module/apache-maven-3.8.5/conf/settings.xml
3.2.4 Node.js
console前端部分采用vue开发,需要nodejs环境,下载安装最新的nodejs即可。
sudo yum install -y nodejs
查看nodejs版本。
node --version
v16.13.2
3.2.5 MySQL
选择5.7.16
3.2.6 flink
选择1.13.6 需要配置FLINK_HOME环境变量。
export FLINK_HOME=/opt/module/flink-1.13.6
3.2.7 hadoop
选择3.1.3
需要配置相应的环境变量:
export HADOOP_HOME=/opt/module/hadoop-3.1.3
export HADOOP_CLASSPATH=`hadoop classpath`
除了正常的配置外, 需要在core-site.xml
中添加如下配置:
<property>
<name>dfs.client.datanode-restart.timeoutname>
<value>30value>
property>
3.3 安装部署Streamx
3.3.1 下载Streamx
cd /opt/software
wget https://github.com/streamxhub/streamx/releases/download/v1.2.2/streamx-console-service-
1.2.2-bin.tar.gz
3.3.2 解压安装包
tar -zxvf streamx-console-service-1.2.2-bin.tar.gz -C /opt/module
3.3.3 部署Streamx平台
① MySQL中创建数据库
CREATE DATABASE `streamx` CHARACTER SET utf8 COLLATE utf8_general_ci;
② 初始化表
use streamx;
source /opt/module/streamx-console-service-1.2.2/script/final.sql
③ 配置连接信息
vim /opt/module/streamx-console-service-1.2.2/conf/application.yml
...
# 配置默认数据源
primary: primary
datasource:
# 数据源-1,名称为 primary
primary:
username: root
password: aaaaaa
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://hadoop162:3306/streamx?useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=GMT%2B8
...
streamx:
# HADOOP_USER_NAME
hadoop-user-name: atguigu
# 本地的工作空间,用于存放项目源码,构建的目录等.
workspace:
local: /opt/module/streamx_workspace
注意:其中,数据库用户名密码及JDBC连接URL需要修改为自己数据库的相关属性,hadoop-user-name及workspace需要根据自己的实际情况进行修改。其他情况可以暂时使用默认配置。
④ 启动Server
/opt/module/streamx-console-service-1.2.2/bin/startup.sh
启动成功之后使用jps可以看到如下进程。
进程查看⑤ 浏览器登录系统
默认端口是10000,访问链接:http://hadoop162:10000/
Web UI登录界面默认用户名:admin
默认密码:streamx
3.3.4 系统配置
打开StreamX Web UI界面配置Flink HOME。
找到Setting中的Flink HOME标签页配置Flink HOME第4章 部署Flink stream应用4.1创建Flink项目
创建项目4.2在pom.xml
文件中添加Flink相关依赖
<properties>
<flink.version>1.13.6flink.version>
<scala.binary.version>2.11scala.binary.version>
<slf4j.version>1.7.30slf4j.version>
properties>
<dependencies>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-javaartifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-java_${scala.binary.version}artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-clients_${scala.binary.version}artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-runtime-web_${scala.binary.version}artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-apiartifactId>
<version>${slf4j.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-log4j12artifactId>
<version>${slf4j.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.logging.log4jgroupId>
<artifactId>log4j-to-slf4jartifactId>
<version>2.14.0version>
<scope>providedscope>
dependency>
dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-shade-pluginartifactId>
<version>3.2.4version>
<executions>
<execution>
<phase>packagephase>
<goals>
<goal>shadegoal>
goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305exclude>
<exclude>org.slf4j:*exclude>
<exclude>log4j:*exclude>
excludes>
artifactSet>
<filters>
<filter>
<artifact>*:*artifact>
<excludes>
<exclude>META-INF/*.SFexclude>
<exclude>META-INF/*.DSAexclude>
<exclude>META-INF/*.RSAexclude>
excludes>
filter>
filters>
<transformers combine.children="append">
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
transformer>
transformers>
configuration>
execution>
executions>
plugin>
plugins>
build>
4.3 示例代码
package com.atguigu.bigdata;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* @Author lzc
* @Date 2022/3/24 15:28
*/
public class UnboundedWC {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env
.socketTextStream("hadoop162", 9999)
.flatMap(new FlatMapFunction
() {
public void flatMap(String line,
Collector
out) throws Exception {for (String word : line.split(" ")) {
out.collect(word);
}
}
})
.map(new MapFunction
>() {
public Tuple2
map(String word) throws Exception {return Tuple2.of(word, 1l);
}
})
.keyBy(new KeySelector
, String>() {
public String getKey(Tuple2
t) throws Exception {return t.f0; // t._1
}
})
.sum(1)
.print();
env.execute();
}
}
4.4 代码推送到gitee
在Streamx平台部署应用的时候要求代码最好部署在git平台,比如github或gitee。作为国内用户我们选择比较稳定的gitee。
我的项目推送地址:https://gitee.com/lzc2025/atguigu-streamx.git
4.5 配置项目
页面选择Add New对项目进行配置4.6 编译项目
编译前:
编译前界面第一次编译需要的时间比较久, 因为需要下载许多的依赖。
编译成功后:
编译后界面4.7 提交应用
4.7.1 创建应用
4.7.2 配置应用
配置应用完成配置并提交4.7.3 上线应用
上线应用4.7.4 启动应用
启动应用启动应用界面情况注意先启动socket:
nc -lk 9999
4.8 Yarn平台确认执行结果
Yarn平台运行情况输入一些单词,查看运行结果(略)。
第5章 部署Flink SQL应用
执行下面的SQL语句:
create table t1 (
id string,
ts bigint,
vc int
) with(
'connector' = 'kafka',
'topic' = 't1',
'properties.bootstrap.servers' = 'hadoop162:9029,hadoop163:9092,hadoop164:9092',
'properties.group.id' = 'atguigu',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv'
);
create table t2 (
id string,
ts bigint,
vc int
) with(
'connector' = 'print'
);
insert into t2 select * from t1;
配置kafka连接器的依赖
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafka_2.11artifactId>
<version>1.13.6version>
dependency>
5.1 创建application
创建并提交应用5.2 运行Application
运行Application运行Application5.3 查看执行结果
去TaskManager上查看执行的结果。
查看执行结果查看执行结果第6章 使用Streamx API开发应用6.1 项目配置
配置在StreamX中是非常重要的概念。
针对参数设置的问题,在StreamX中提出统一程序配置的概念,把程序的一系列参数从开发到部署阶段按照特定的格式配置到application.yml
里,抽象出一个通用的配置模板,按照这种规定的格式将上述配置的各项参数在配置文件里定义出来,在程序启动的时候将这个项目配置传入到程序中即可完成环境的初始化工作,在任务启动的时候也会自动识别启动时的参数,于是就有了配置文件这一概念。
针对Flink SQL作业在代码里写SQL的问题,StreamX针对Flink SQL作业做了更高层级封装和抽象,开发者只需要将SQL按照一定的规范要求定义到sql.yaml
文件中,在程序启动时将该SQL文件传入到主程序中, 就会自动按照要求加载执行SQL,于是就有了SQL文件的概念。
这里提到的两个术语:
- 配置文件:把程序的一系列参数从开发到部署阶段按照特定的格式配置到文件里,这个有特定作用的文件就是项目的配置文件。
- SQL文件:Flink SQL任务中将提取出来的SQL放到
sql.yaml
中,这个有特定作用的文件就是项目的SQL文件。
6.1.1 配置文件
在StreamX中,DataStream作业和Flink SQL作业配置文件是通用的,换言之,这个配置文件既能定义DataStream的各项配置,也能定义Flink SQL的各项配置(Flink SQL作业中配置文件是可选的),配置文件的格式必须是yaml格式,必须得符合yaml的格式规范。
一个典型的配置文件如下:
flink:
deployment:
option:
target: application
detached:
shutdownOnAttachedExit:
zookeeperNamespace:
jobmanager:
property: #@see: https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html
$internal.application.main: com.streamxhub.streamx.flink.quickstart.datastream.QuickStartApp
pipeline.name: QuickStart App(Flink 1.13)
taskmanager.numberOfTaskSlots: 1
parallelism.default: 2
jobmanager.memory:
flink.size:
heap.size:
jvm-metaspace.size:
jvm-overhead.max:
off-heap.size:
process.size:
taskmanager.memory:
flink.size:
framework.heap.size:
framework.off-heap.size:
managed.size:
process.size:
task.heap.size:
task.off-heap.size:
jvm-metaspace.size:
jvm-overhead.max:
jvm-overhead.min:
managed.fraction: 0.4
checkpoints:
enable: true
interval: 30000
mode: EXACTLY_ONCE
timeout: 300000
unaligned: true
watermark:
interval: 10000
# 状态后端
state:
backend:
value: rocksdb # 保存类型,在flink1.13中只有('rocksdb','hashmap')
checkpoints.num-retained: 1
# 重启策略
restart-strategy:
value: fixed-delay #重启策略[(fixed-delay|failure-rate|none)共3个可配置的策略]
fixed-delay:
attempts: 3
delay: 5000
failure-rate:
max-failures-per-interval:
failure-rate-interval:
delay:
# table
table:
planner: blink # (blink|old|any)
mode: streaming #(batch|streaming)
# kafka source
kafka.source:
bootstrap.servers: kfk1:9092,kfk2:9092,kfk3:9092
topic: test_user
group.id: user_01
auto.offset.reset: earliest
# mysql
jdbc:
driverClassName: com.mysql.cj.jdbc.Driver
jdbcUrl: jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
username: root
password: 123456
参数具体说明参考官网:https://www.streamxhub.com/zh-CN/docs/development/config
6.1.2 SQL文件
SQL文件必须是yaml格式的文件,需要遵循yaml文件的定义规则,具体内部SQL格式的定义非常简单,如下:
sql: |
CREATE TABLE datagen (
f_sequence INT,
f_random INT,
f_random_str STRING,
ts AS localtimestamp,
WATERMARK FOR ts AS ts
) WITH (
'connector' = 'datagen',
-- optional options --
'rows-per-second'='5',
'fields.f_sequence.kind'='sequence',
'fields.f_sequence.start'='1',
'fields.f_sequence.end'='1000',
'fields.f_random.min'='1',
'fields.f_random.max'='1000',
'fields.f_random_str.length'='10'
);
CREATE TABLE print_table (
f_sequence INT,
f_random INT,
f_random_str STRING
) WITH (
'connector' = 'print'
);
INSERT INTO print_table select f_sequence,f_random,f_random_str from datagen;
6.2 项目架构
项目结构说明:
assembly.xml
是部署之后, 启动应用使用, 开发者不用关注logback.xml
是日志配置文件application.yml
就是项目配置文件(核心配置)
flink:
deployment:
option:
target: application
detached:
shutdownOnAttachedExit:
zookeeperNamespace:
jobmanager:
property: #@see: https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/config.html
$internal.application.main: com.atguigu.streamx.StreamxKafkaDemo
yarn.application.name: stramx_kafka_demo
taskmanager.numberOfTaskSlots: 1
parallelism.default: 2
jobmanager.memory:
flink.size:
heap.size:
jvm-metaspace.size:
jvm-overhead.max:
off-heap.size:
process.size:
taskmanager.memory:
flink.size:
framework.heap.size:
framework.off-heap.size:
managed.size:
process.size:
task.heap.size:
task.off-heap.size:
jvm-metaspace.size:
jvm-overhead.max:
jvm-overhead.min:
managed.fraction: 0.4
checkpoints:
enable: false
interval: 30000
mode: EXACTLY_ONCE
timeout: 300000
unaligned: true
watermark:
interval: 10000
# 状态后端
state:
backend:
value: hashmap # 保存类型,在flink1.13中只有('rocksdb','hashmap')
checkpoints.num-retained: 1
# 重启策略
restart-strategy:
value: fixed-delay #重启策略[(fixed-delay|failure-rate|none)共3个可配置的策略]
fixed-delay:
attempts: 3
delay: 5000
failure-rate:
max-failures-per-interval:
failure-rate-interval:
delay:
# table
table:
planner: blink # (blink|old|any)
mode: streaming #(batch|streaming)
# kafka source 配置
kafka.source:
bootstrap.servers: hadoop162:9092,hadoop163:9092,hadoop164:9092
topic: s1
group.id: atguigu
auto.offset.reset: latest
assembly.xml
是部署配置文件,一般不需要修改
<assembly>
<id>binid>
<formats>
<format>tar.gzformat>
formats>
<fileSets>
<fileSet>
<directory>assembly/bindirectory>
<outputDirectory>binoutputDirectory>
<fileMode>0755fileMode>
fileSet>
<fileSet>
<directory>${project.build.directory}directory>
<outputDirectory>liboutputDirectory>
<fileMode>0755fileMode>
<includes>
<include>*.jarinclude>
includes>
<excludes>
<exclude>original-*.jarexclude>
excludes>
fileSet>
<fileSet>
<directory>assembly/confdirectory>
<outputDirectory>confoutputDirectory>
<fileMode>0755fileMode>
fileSet>
<fileSet>
<directory>assembly/logsdirectory>
<outputDirectory>logsoutputDirectory>
<fileMode>0755fileMode>
fileSet>
<fileSet>
<directory>assembly/tempdirectory>
<outputDirectory>tempoutputDirectory>
<fileMode>0755fileMode>
fileSet>
fileSets>
assembly>
- 其他配置可以从gitee项目下载: https://gitee.com/lzc2025/streamx_demo.git
6.3 Streamx API开发流式应用
创建项目streamx_demo
6.3.1 导入maven依赖
<properties>
<flink.version>1.13.6flink.version>
<scala.binary.version>2.11scala.binary.version>
<streamx.flink.version>1.13streamx.flink.version>
<streamx.version>1.2.2streamx.version>
properties>
<dependencies>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-javaartifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-scala_${scala.binary.version}artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-java_${scala.binary.version}artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-scala_${scala.binary.version}artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-clients_${scala.binary.version}artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-runtime-web_${scala.binary.version}artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>com.streamxhub.streamxgroupId>
<artifactId>streamx-flink-coreartifactId>
<version>${streamx.version}version>
dependency>
<dependency>
<groupId>com.streamxhub.streamxgroupId>
<artifactId>streamx-flink-shims_flink-${streamx.flink.version}artifactId>
<version>${streamx.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}artifactId>
<version>${flink.version}version>
dependency>
dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-shade-pluginartifactId>
<version>3.2.4version>
<executions>
<execution>
<id>shade-flinkid>
<phase>nonephase>
execution>
<execution>
<phase>packagephase>
<goals>
<goal>shadegoal>
goals>
<configuration>
<createDependencyReducedPom>falsecreateDependencyReducedPom>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shadingexclude>
<exclude>com.google.code.findbugs:jsr305exclude>
<exclude>org.slf4j:*exclude>
<exclude>log4j:*exclude>
excludes>
artifactSet>
<filters>
<filter>
<artifact>*:*artifact>
<excludes>
<exclude>META-INF/*.SFexclude>
<exclude>META-INF/*.DSAexclude>
<exclude>META-INF/*.RSAexclude>
excludes>
filter>
filters>
configuration>
execution>
executions>
plugin>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-assembly-pluginartifactId>
<version>3.1.1version>
<executions>
<execution>
<id>distro-assemblyid>
<phase>packagephase>
<goals>
<goal>singlegoal>
goals>
execution>
executions>
<configuration>
<appendAssemblyId>falseappendAssemblyId>
<descriptors>
<descriptor>assembly.xmldescriptor>
descriptors>
configuration>
plugin>
<plugin>
<groupId>org.codehaus.mojogroupId>
<artifactId>build-helper-maven-pluginartifactId>
plugin>
<plugin>
<groupId>org.apache.maven.pluginsgroupId>
<artifactId>maven-compiler-pluginartifactId>
plugin>
<plugin>
<groupId>net.alchim31.mavengroupId>
<artifactId>scala-maven-pluginartifactId>
plugin>
plugins>
build>
6.3.2 Kafka Connector的使用
6.3.2.1 添加依赖
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-connector-kafka_${scala.binary.version}artifactId>
<version>${flink.version}version>
dependency>
6.3.2.2 基础消费
① 配置
kafka.source:
bootstrap.servers: hadoop162:9092,hadoop163:9092,hadoop164:9092
topic: s1
group.id: atguigu
auto.offset.reset: latest
② 代码
// 配置
StreamEnvConfig javaConfig = new StreamEnvConfig(args, null);
// 创建StreamingContext对象, 是一个核心类
StreamingContext ctx = new StreamingContext(javaConfig);
// 消费kafka数据
new KafkaSource
(ctx) .getDataStream()
.map(new MapFunction
, String>() {
public String map(KafkaRecord
value) throws Exception {return value.value();
}
})
.print();
// 启动任务
ctx.start();
6.3.2.3 消费多个topic
① 配置
kafka.source:
bootstrap.servers: hadoop162:9092,hadoop163:9092,hadoop164:9092
# 定义可能需要消费到的topic
topic: s1,s2,s3
group.id: atguigu
auto.offset.reset: latest
② 代码
new KafkaSource
(ctx) .topic("s1") // 消费一个topic
.getDataStream()
.map(record -> record.value())
.print("one");
new KafkaSource
(ctx) .topic("s1","s2") // 消费一组topic
.getDataStream()
.map(record -> record.value())
.print("two");
6.3.2.4 消费多个Kafka集群数据
① 配置
kafka.source:
kafka1:
# 配置第一个Kafka集群
bootstrap.servers: hadoop162:9092,hadoop163:9092,hadoop164:9092
topic: s1
group.id: atguigu1
auto.offset.reset: latest
enable.auto.commit: true
kafka2:
# 配置第二个kafka集群
bootstrap.servers: hadoop162:9092,hadoop163:9092,hadoop164:9092
topic: s2
group.id: atguigu2
auto.offset.reset: latest
enable.auto.commit: true
② 代码
new KafkaSource
(ctx) .alias("kafka1") // 指定要消费的Kafka集群
.getDataStream()
.map(record -> record.value())
.print("one");
new KafkaSource
(ctx) .alias("kafka2") // 指定要消费的Kafka集群
.getDataStream()
.map(record -> record.value())
.print("two");
6.3.2.5 其他一些高级用法参考官网
https://www.streamxhub.com/zh-CN/docs/connector/Kafka-Connector
6.3.3 Jdbc Connector的使用
6.3.3.1 导入依赖
<dependency>
<groupId>mysqlgroupId>
<artifactId>mysql-connector-javaartifactId>
<version>5.1.49version>
dependency>
6.3.3.2 配置
jdbc:
driverClassName: com.mysql.jdbc.Driver
jdbcUrl: jdbc:mysql://hadoop162:3306/test?useSSL=false&allowPublicKeyRetrieval=true
username: root
password: aaaaaa
6.3.3.3 从MySQL读取数据
Streamx支持从MySQL读取数据,但是仍然建议使用Flink CDC。
6.3.3.4 向MySQL写入数据
从kafka读取数据然后写入到MySQL中。
SingleOutputStreamOperator
source = new KafkaSource (ctx) .alias("kafka1")
.getDataStream()
.map(record -> {
String[] data = record.value().split(",");
return new WaterSensor(data[0], Long.valueOf(data[1]), Integer.valueOf(data[2]));
});
new JdbcSink
(ctx) .sql(new SQLFromFunction
() { // 抽取SQL语句
public String from(WaterSensor ws) {
return String.format("insert into sensor(id, ts, vc)values('%s', %d, %d)",
ws.getId(),
ws.getTs(),
ws.getVc()
);
}
})
.sink(source);
6.3.4 更多的开箱即用的Connector使用
参考官方文档: https://www.streamxhub.com/zh-CN/docs/intro/
6.3.5 打包到Linux,提交到yarn执行
① 把streamx_demo-1.0-SNAPSHOT.tar.gz
上传到linux,解压
tar -zxvf streamx_demo-1.0-SNAPSHOT.tar.gz
解压后结构如下。
解压后结构② 在PATH中配置flink环境变量
export PATH=$FLINK_HOME:$PATH
③ 打开配置文件配置要执行的主类和在yarn执行的时的job名
vim application.yaml
$internal.application.main: com.atguigu.streamx.StreamxKafkaDemo
yarn.application.name: stramx_kafka_demo
④ 向yarn平台提交任务
bin/startup.sh
⑤ 去yarn查看任务执行情况
6.3.6 使用Streamx平台提交应用
另外也可以使用Streamx平台提交应用。
6.3.6.1 创建Project
创建Project然后编译。
6.3.6.2 创建Application
使用刚才Project创建一个Application。
创建Application6.3.6.3 去yarn查看执行结果
6.4 Streamx API开发SQL应用
6.4.1 导入依赖
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-streaming-scala_${scala.binary.version}artifactId>
<version>${flink.version}version>
<scope>providedscope>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-csvartifactId>
<version>${flink.version}version>
dependency>
<dependency>
<groupId>org.apache.flinkgroupId>
<artifactId>flink-jsonartifactId>
<version>${flink.version}version>
dependency>
6.4.2 在application.yml
中添加table相关配置
# table
table:
planner: blink # (blink|old|any)
mode: streaming #(batch|streaming)
6.4.3 添加sql.yml
文件
在conf目录下添加sql.yml
文件。
first: |
create table s1 (
id string,
ts bigint,
vc int
) with(
'connector' = 'kafka',
'topic' = 's1',
'properties.bootstrap.servers' = 'hadoop162:9029,hadoop163:9092,hadoop164:9092',
'properties.group.id' = 'atguigu',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv'
);
create table s2 (
id string,
ts bigint,
vc int
) with(
'connector' = 'print'
);
insert into s2 select * from s1;
6.4.4 创建app类
package com.atguigu.streamx;
import com.streamxhub.streamx.flink.core.TableContext;
import com.streamxhub.streamx.flink.core.TableEnvConfig;
public class StreamXSqlDemo {
public static void main(String[] args) {
TableEnvConfig tableEnvConfig = new TableEnvConfig(args, null);
TableContext ctx = new TableContext(tableEnvConfig);
ctx.sql("first");
}
}
6.4.5 执行查看结果
添加程序参数:
--sql C:\Users\lzc\Desktop\class_code\prepare\streamx_demo\assembly\conf\sql.yml --conf C:\Users\lzc\Desktop\class_code\prepare\streamx_demo\assembly\conf\application.yml
idea中添加程序运行参数6.4.6 Linux下执行查看结果
打包到linux执行, 步骤与流式应用类似。