实操 | Flink1.12.1通过Table API / Flink SQL读取HBase2.4.0

程序源代码

共 12858字,需浏览 26分钟

 ·

2021-04-23 21:09

点击上方蓝色字体,选择“设为星标
回复”资源“获取更多资源

昨天群里有人问 Flink 1.12 读取Hbase的问题,于是看到这篇文章分享给大家。本文作者Ashiamd。

1. 环境

废话不多说,这里用到的环境如下(不确定是否都必要,但是至少我是这个环境)

  • zookeeper 3.6.2

  • Hbase 2.4.0

  • Flink 1.12.1

2. HBase表

# 创建表
create 'u_m_01' , 'u_m_r'

# 插入数据
put 'u_m_01', 'a,A', 'u_m_r:r' , '1'
put 'u_m_01', 'a,B', 'u_m_r:r' , '3'
put 'u_m_01', 'b,B', 'u_m_r:r' , '3'
put 'u_m_01', 'b,C', 'u_m_r:r' , '4'
put 'u_m_01', 'c,A', 'u_m_r:r' , '2'
put 'u_m_01', 'c,C', 'u_m_r:r' , '5'
put 'u_m_01', 'c,D', 'u_m_r:r' , '1'
put 'u_m_01', 'd,B', 'u_m_r:r' , '5'
put 'u_m_01', 'd,D', 'u_m_r:r' , '2'
put 'u_m_01', 'e,A', 'u_m_r:r' , '3'
put 'u_m_01', 'e,B', 'u_m_r:r' , '2'
put 'u_m_01', 'f,A', 'u_m_r:r' , '1'
put 'u_m_01', 'f,B', 'u_m_r:r' , '2'
put 'u_m_01', 'f,D', 'u_m_r:r' , '3'
put 'u_m_01', 'g,C', 'u_m_r:r' , '1'
put 'u_m_01', 'g,D', 'u_m_r:r' , '4'
put 'u_m_01', 'h,A', 'u_m_r:r' , '1'
put 'u_m_01', 'h,B', 'u_m_r:r' , '2'
put 'u_m_01', 'h,C', 'u_m_r:r' , '4'
put 'u_m_01', 'h,D', 'u_m_r:r' , '5'

3. pom依赖

  • jdk1.8

  • Flink1.12.1 使用的pom依赖如下(有些是多余的)

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>flink-hive-hbase</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.12.1</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<hive.version>3.1.2</hive.version>
<mysql.version>8.0.19</mysql.version>
<hbase.version>2.4.0</hbase.version>
</properties>


<dependencies>

<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- HBase -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hbase-2.2_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>${hbase.version}</version>
</dependency>


<!-- &lt;!&ndash; JDBC &ndash;&gt;-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- </dependency>-->

<!-- &lt;!&ndash; mysql &ndash;&gt;-->
<!-- <dependency>-->
<!-- <groupId>mysql</groupId>-->
<!-- <artifactId>mysql-connector-java</artifactId>-->
<!-- <version>${mysql.version}</version>-->
<!-- </dependency>-->

<!-- &lt;!&ndash; Hive Dependency &ndash;&gt;-->
<!-- <dependency>-->
<!-- <groupId>org.apache.hive</groupId>-->
<!-- <artifactId>hive-exec</artifactId>-->
<!-- <version>${hive.version}</version>-->
<!-- </dependency>-->
<!-- <dependency>-->
<!-- <groupId>org.apache.flink</groupId>-->
<!-- <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>-->
<!-- <version>${flink.version}</version>-->
<!-- </dependency>-->

<!-- Table API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- csv -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>

<!-- &lt;!&ndash; Lombok &ndash;&gt;-->
<!-- <dependency>-->
<!-- <groupId>org.projectlombok</groupId>-->
<!-- <artifactId>lombok</artifactId>-->
<!-- <version>1.18.18</version>-->
<!-- </dependency>-->

</dependencies>
</project>

4. Flink-Java代码

用到的pojo类

package entity;
import java.io.Serializable;

public class UserMovie implements Serializable {
@Override
public String toString() {
return "UserMovie{" +
"userId='" + userId + '\'' +
", movieId='" + movieId + '\'' +
", ratting=" + ratting +
'}';
}

public static long getSerialVersionUID() {
return serialVersionUID;
}

public String getUserId() {
return userId;
}

public void setUserId(String userId) {
this.userId = userId;
}

public String getMovieId() {
return movieId;
}

public void setMovieId(String movieId) {
this.movieId = movieId;
}

public Double getRatting() {
return ratting;
}

public void setRatting(Double ratting) {
this.ratting = ratting;
}

public UserMovie() {
}

public UserMovie(String userId, String movieId, Double ratting) {
this.userId = userId;
this.movieId = movieId;
this.ratting = ratting;
}

private static final long serialVersionUID = 256158274329337559L;

private String userId;

private String movieId;

private Double ratting;

}

实际测试代码

package hbase;

import com.nimbusds.jose.util.IntegerUtils;
import entity.UserMovie;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.StringUtils;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;

public class HBaseTest_01 {
public static void main(String[] args) throws Exception {
// 批执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// 表环境
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inBatchMode()
.build();
TableEnvironment tableEnv = TableEnvironment.create(settings);

// 创建用户-电影表 u_m
TableResult tableResult = tableEnv.executeSql(
"CREATE TABLE u_m (" +
" rowkey STRING," +
" u_m_r ROW<r STRING>," +
" PRIMARY KEY (rowkey) NOT ENFORCED" +
" ) WITH (" +
" 'connector' = 'hbase-2.2' ," +
" 'table-name' = 'default:u_m_01' ," +
" 'zookeeper.quorum' = '127.0.0.1:2181'" +
" )");

// 查询是否能获取到HBase里的数据
// Table table = tableEnv.sqlQuery("SELECT rowkey, u_m_r FROM u_m");

// 相当于 scan
Table table = tableEnv.sqlQuery("SELECT * FROM u_m");

// 查询的结果
TableResult executeResult = table.execute();

// 获取查询结果
CloseableIterator<Row> collect = executeResult.collect();

// 输出 (执行print或者下面的 Consumer之后,数据就被消费了。两个只能留下一个)
executeResult.print();

List<UserMovie> userMovieList = new ArrayList<>();

collect.forEachRemaining(new Consumer<Row>() {
@Override
public void accept(Row row) {
String field0 = String.valueOf(row.getField(0));
String[] user_movie = field0.split(",");
Double ratting = Double.valueOf(String.valueOf(row.getField(1)));
userMovieList.add(new UserMovie(user_movie[0],user_movie[1],ratting));
}
});


System.out.println("................");

for(UserMovie um : userMovieList){
System.out.println(um);
}
}
}

5. 输出

  1. 没有注解掉第59行代码executeResult.print();时

+--------------------------------+--------------------------------+
| rowkey | u_m_r |
+--------------------------------+--------------------------------+
| a,A | 1 |
| a,B | 3 |
| b,B | 3 |
| b,C | 4 |
| c,A | 2 |
| c,C | 5 |
| c,D | 1 |
| d,B | 5 |
| d,D | 2 |
| e,A | 3 |
| e,B | 2 |
| f,A | 1 |
| f,B | 2 |
| f,D | 3 |
| g,C | 1 |
| g,D | 4 |
| h,A | 1 |
| h,B | 2 |
| h,C | 4 |
| h,D | 5 |
+--------------------------------+--------------------------------+
20 rows in set
................
  1. 注解掉第59行代码executeResult.print();时

................
UserMovie{userId='a', movieId='A', ratting=1.0}
UserMovie{userId='a', movieId='B', ratting=3.0}
UserMovie{userId='b', movieId='B', ratting=3.0}
UserMovie{userId='b', movieId='C', ratting=4.0}
UserMovie{userId='c', movieId='A', ratting=2.0}
UserMovie{userId='c', movieId='C', ratting=5.0}
UserMovie{userId='c', movieId='D', ratting=1.0}
UserMovie{userId='d', movieId='B', ratting=5.0}
UserMovie{userId='d', movieId='D', ratting=2.0}
UserMovie{userId='e', movieId='A', ratting=3.0}
UserMovie{userId='e', movieId='B', ratting=2.0}
UserMovie{userId='f', movieId='A', ratting=1.0}
UserMovie{userId='f', movieId='B', ratting=2.0}
UserMovie{userId='f', movieId='D', ratting=3.0}
UserMovie{userId='g', movieId='C', ratting=1.0}
UserMovie{userId='g', movieId='D', ratting=4.0}
UserMovie{userId='h', movieId='A', ratting=1.0}
UserMovie{userId='h', movieId='B', ratting=2.0}
UserMovie{userId='h', movieId='C', ratting=4.0}
UserMovie{userId='h', movieId='D', ratting=5.0}

注意

这里我们在Flink在SQL里面定义HBase的Table时,指定的字段都是用的STRING类型,虽然本来应该是INT,但是用INT的时候,报错了,改成INT就ok了。



数据湖架构、战略和分析的8大错误认知

一致性哈希及其在Greenplum中的应用

一万五千字详解HTTP协议


欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧!
浏览 24
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报