原理+实战之Flink-Exactly-Once Kafka2Redis一致性实践
文章目录
前言
一、Redis的事务性
二、编写RedisUtil
三、编写RedisExactlySink
四、编写主测类,实现单词统计并且写入Redis
五、测试过程以及图示
5.1启动redis,查看数据库 5.2启动kafka,创建生产者产生数据 5.3启动主程序,并且kafka输入数据
六、过程中遇到的BUG解决
6.1 “Could not get a resource since the pool is exhausted” 6.2 “ERR EXEC without MULTI” 6.3 “Committing one of transactions failed, logging first encountered failure”
总结
前言
我们把Kafka写入Redis的场景举例,实现自定义的两阶段提交,下面介绍我们整个实现的步骤,并观察结果。
一、Redis的事务性
redis实现事务可以通过jedis来使用multi()方法获得当前创建的一个事务,对于redis来说,事务并不具有隔离性和原子性,他更看起来是相当于一连串的命令按照顺序进入队列之后再顺序执行,当其中有失败的时候,依然是可以提交的,但是我们也可以discard,也就是取消事务。
二、编写RedisUtil
代码如下(示例):
public class RedisUtil {
public static JedisPool jedisPool=null;
public static JedisPoolConfig jedisPoolConfig;
//确保拿到的jedis连接是唯一的,从而完成事务 不加入序列化
private final transient Jedis jedis;
private transient Transaction jedisTransaction;
//JedisPool配置类提前加载
static {
jedisPoolConfig=new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(100); //最大可用连接数
jedisPoolConfig.setBlockWhenExhausted(true); //连接耗尽是否等待
jedisPoolConfig.setMaxWaitMillis(2000); //等待时间
jedisPoolConfig.setMaxIdle(5); //最大闲置连接数
jedisPoolConfig.setMinIdle(5); //最小闲置连接数
jedisPoolConfig.setTestOnBorrow(false); //取连接的时候进行一下测试 pingpong
}
//无参构造
public RedisUtil() throws IOException {
InputStream in = RedisUtil.class.getClassLoader().getResourceAsStream("redis.properties");
Properties properties = new Properties();
properties.load(in);
String port = properties.getProperty("redis.port");
String timeout = properties.getProperty("redis.timeout");
jedisPool = new JedisPool(jedisPoolConfig, properties.getProperty("redis.host"), Integer.parseInt(port), Integer.parseInt(timeout));
//System.out.println("开辟连接池");
jedis = jedisPool.getResource();
jedis.auth("root");
}
//获取jedis
public Transaction getTransaction(){
if(this.jedisTransaction==null) {
jedisTransaction = this.jedis.multi();
System.out.println("========"+jedisTransaction);
System.out.println(jedisTransaction);
}
return this.jedisTransaction;
}
public Jedis getjedis(){
return this.jedis;
}
public void setjedisTransactionIsNull(){
this.jedisTransaction=null;
}
}
三、编写RedisExactlySink
代码如下(示例):
public class RedisExactlySink<T> extends TwoPhaseCommitSinkFunction<T, RedisUtil,Void> {
//定义redis hash表名
public static final String REDIS_HASH_MAP="WordAndWordCount";
public static RedisUtil redisUtil;
static {
try {
redisUtil = new RedisUtil();
} catch (IOException e) {
e.printStackTrace();
}
}
//继承父类的构造参数,因为要初始化父类的内容
public RedisExactlySink(){
super(new KryoSerializer<>(RedisUtil.class,new ExecutionConfig()), VoidSerializer.INSTANCE);
}
@Override
protected void invoke(RedisUtil transaction, T value, Context context) throws Exception {
Transaction jedis = transaction.getTransaction();//拿到事务连接
System.out.println(jedis);
Class<?> aClass = value.getClass();//获取class
Field[] fields = aClass.getDeclaredFields();//获取属性字段
fields[0].setAccessible(true);
fields[1].setAccessible(true);
Object object1=fields[0].get(value);
if(object1.toString().equals("error")){
throw new RuntimeException("主动触发异常!!");
}
Object object2=fields[1].get(value);
System.out.println("写入redis HashMap ");
jedis.hset(REDIS_HASH_MAP,object1.toString(),object2.toString());
}
@Override
protected RedisUtil beginTransaction() throws Exception {
return redisUtil;
}
@Override
protected void preCommit(RedisUtil transaction) throws Exception {
System.out.println("正在执行预提交!!!");
}
@Override
protected void commit(RedisUtil transaction) {
Transaction jedistransaction = transaction.getTransaction();
System.out.println(jedistransaction);
try {
System.out.println("事务提交");
jedistransaction.exec();
redisUtil.setjedisTransactionIsNull();
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("jedis close!!!");
}
}
@Override
protected void abort(RedisUtil transaction) {
Transaction jedistransaction = transaction.getTransaction();
System.out.println(jedistransaction);
try {
System.out.println("取消事务");
jedistransaction.discard();
redisUtil.setjedisTransactionIsNull();
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("jedis close!!!");
}
}
}
可以看到这里我们依然是继承了TwoPhaseCommitSinkFunction的方法,这里不做详细介绍。
四、编写主测类,实现单词统计并且写入Redis
public static void main(String[] args) throws Exception {
//1.获取流式执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.设置并行度以及jedis的序列化
env.setParallelism(1);
env.getConfig().addDefaultKryoSerializer(Jedis.class, TBaseSerializer.class);
//3.设置CheckPoint以及StateBackend
CkAndStateBacked.setCheckPointAndStateBackend(env,"FS");
//4.获取Kafka输入流
InputStream in = KafkaToRedis.class.getClassLoader().getResourceAsStream("kafka.properties");
ParameterTool parameterTool=ParameterTool.fromPropertiesFile(in);
SimpleStringSchema simpleStringSchema = new SimpleStringSchema();
Class<? extends SimpleStringSchema> aClass = simpleStringSchema.getClass();
DataStream<String> kafkaDataStream = KafkaUtil.getKafkaDataStream(parameterTool, aClass, env);
//5.map包装数据为value,1
SingleOutputStreamOperator<Tuple2<String, Integer>> mapStream = kafkaDataStream.map(new MapFunction<String, Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> map(String value) throws Exception {
return new Tuple2<>(value, 1);
}
});
//6.mapStream进行keyby并且聚合
SingleOutputStreamOperator<Tuple2<String, Integer>> reduceStream = mapStream.keyBy(data -> data.f0)
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> value1, Tuple2<String, Integer> value2) throws Exception {
return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
}
});
//7.reduceStream包装成POJO类
SingleOutputStreamOperator<Pojo> pojoStream = reduceStream.map(data -> {
Pojo pojo = new Pojo(data.f0, data.f1);
return pojo;
});
//8.pojoStream输出到redis,这里以Hash表的形式类似
// WordAndWordCount java 1 python 1
pojoStream.addSink(new RedisExactlySink<Pojo>());
//9.任务执行
env.execute();
}
五、测试过程以及图示
5.1启动redis,查看数据库
5.2启动kafka,创建生产者产生数据
5.3启动主程序,并且kafka输入数据
可以看到,我们这里设置的CheckPoint的时间间隔是20秒做一次,也就是每次CheckPoint,间隔内的这一段都相当于是同一段事务,要么成功,要么失败。之后我们输入了error,主动触发了异常,如图并没有写进去。
六、过程中遇到的BUG解决
6.1 “Could not get a resource since the pool is exhausted”
这里意思是因为资源池资源耗尽,无法从中获取到redis连接,对应这一步
jedis = jedisPool.getResource();
但是我这里并没有考虑并发,因此通过查询发现是
jedisPoolConfig.setTestOnBorrow(false); //取连接的时候进行一下测试 pingpong
如果设置为true就需要将redis和你的程序放到同一台机器上或者同一局域网上面或者关闭该模式。所以这里我们只需要将true改为false后解决这个问题。
6.2 “ERR EXEC without MULTI”
这里意思是,我们获取到jedis连接并没有开启事务,然后我们却执行了exec(),或者discard,这里是因为对于20秒内的CheckPoint的间隔内,其实我们是把jedis固定了然后,我们并且调用了如图的方法
//获取jedis
public Transaction getTransaction(){
if(this.jedisTransaction==null) {
jedisTransaction = this.jedis.multi();
System.out.println("========"+jedisTransaction);
System.out.println(jedisTransaction);
}
return this.jedisTransaction;
}
这里当下次CheckPoint的时候,也就是下一个20秒之间,从这里获取连接就有问题了,因为我们拿到的还是jedis创建的jedisTransaction ,还是上一次的 事务连接,但是,我们上次CheckPoint结束之前,成功的化其实已经执行了
jedistransaction.exec();
这个步骤看源码会发现,他已经把当前连接里面的事务标志位,置为false
this.inTransaction = false;
然而我们拿到的还是上次这个已经没有开启事务的连接,从而也意味着只有第一次可以成功,后面就失败了,解决的方法就是再每次我们coomit或者abort的时候,我们紧接着调用一个将此事务jedis连接置空的方法,保证下次拿到的事务jedis连接是一个新的连接就ok了
@Override
protected void commit(RedisUtil transaction) {
Transaction jedistransaction = transaction.getTransaction();
System.out.println(jedistransaction);
try {
System.out.println("事务提交");
jedistransaction.exec();
//这里调用一个方法,重置了事务连接
redisUtil.setjedisTransactionIsNull();
} catch (Exception e) {
e.printStackTrace();
} finally {
System.out.println("jedis close!!!");
}
}
调用的是如下方法。
public void setjedisTransactionIsNull(){
this.jedisTransaction=null;
}
6.3 “Committing one of transactions failed, logging first encountered failure”
这个意思是提交其中一个事务失败,日志记录第一次遇到失败,这里的原因跟踪发现报错位置再之前我每次提交或者abort的时候,每次finally里面我都会写close();
finally {
System.out.println("jedis close!!!");
jedistransaction.close();
}
后来发现,其实没必要再这里关闭的,对于事务连接不需要做这种操作,因此都去掉之后就一切便正常了!
总结
以上,就是Flink-Exactly-once系列实践-KafkaToRedis的大体实现了,包括整个代码以及验证,以及过程中遇到的BUG解决,有不足的地方还请见谅!