redis 2m数据读取_Flink读写Redis(二)读取redis数据
自定义flink的RedisSource,实现从redis中读取数据,借鉴了flink-connector-redis_2.11的实现思路,对redis读取操作进行封装,其中flink-connector-redis_2.11的使用和介绍可参考文末连接。项目中需要引入flink-connector-redis_2.11的maven依赖。
抽象redis数据
由于redis有不同的数据类型,所以先定义MyRedisRecord类,封装redis数据类型和数据对象
package com.jike.flink.examples.redis;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
import java.io.Serializable;
public class MyRedisRecord implements Serializable {
private Object data;
private RedisDataType redisDataType;
public MyRedisRecord(Object data, RedisDataType redisDataType) {
this.data = data;
this.redisDataType = redisDataType;
}
public Object getData() {
return data;
}
public void setData(Object data) {
this.data = data;
}
public RedisDataType getRedisDataType() {
return redisDataType;
}
public void setRedisDataType(RedisDataType redisDataType) {
this.redisDataType = redisDataType;
}
}
定义Redis数据读取类
将redis的操作封装起来,首先定义接口类,定义支持的读取操作,例子中这只写了哈希表的get操作,可以增加更多的操作
package com.jike.flink.examples.redis;
import java.io.Serializable;
import java.util.Map;
public interface MyRedisCommandsContainer extends Serializable {
Map hget(String key);
void close();
}定义一个MyRedisCommandsContainer接口实现类,实现对redis的读取操作,主要是调用了Jedis的API,可以支持哨兵模式和直连redis两种模式的连接redis
package com.jike.flink.examples.redis;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisSentinelPool;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
public class MyRedisContainer implements MyRedisCommandsContainer,Cloneable{
private static final long serialVersionUID = 1L;
private static final Logger LOG = LoggerFactory.getLogger(MyRedisContainer.class);
private final JedisPool jedisPool;
private final JedisSentinelPool jedisSentinelPool;
public MyRedisContainer(JedisPool jedisPool) {
Preconditions.checkNotNull(jedisPool, "Jedis Pool can not be null");
this.jedisPool = jedisPool;
this.jedisSentinelPool = null;
}
public MyRedisContainer(JedisSentinelPool sentinelPool) {
Preconditions.checkNotNull(sentinelPool, "Jedis Sentinel Pool can not be null");
this.jedisPool = null;
this.jedisSentinelPool = sentinelPool;
}
@Override
public Map hget(String key) {
Jedis jedis = null;
try {
jedis = this.getInstance();
Map map = new HashMap();
Set fieldSet = jedis.hkeys(key);for(String s : fieldSet){
map.put(s,jedis.hget(key,s));
}return map;
} catch (Exception e) {if (LOG.isErrorEnabled()) {
LOG.error("Cannot get Redis message with command HGET to key {} error message {}", new Object[]{key, e.getMessage()});
}
throw e;
} finally {
this.releaseInstance(jedis);
}
}
private Jedis getInstance() {return this.jedisSentinelPool != null ? this.jedisSentinelPool.getResource() : this.jedisPool.getResource();
}
private void releaseInstance(Jedis jedis) {if (jedis != null) {
try {
jedis.close();
} catch (Exception var3) {
LOG.error("Failed to close (return) instance to pool", var3);
}
}
}
public void close() {if (this.jedisPool != null) {
this.jedisPool.close();
}if (this.jedisSentinelPool != null) {
this.jedisSentinelPool.close();
}
}
}定义MyRedisCommandsContainer对象的创建类
该类用来根据不同的配置生成不同的对象,例子中考虑了直连redis和哨兵模式两张情况,后续还可以考虑redis集群的情形
package com.jike.flink.examples.redis;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisSentinelConfig;
import org.apache.flink.util.Preconditions;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisSentinelPool;
public class MyRedisCommandsContainerBuilder {
public MyRedisCommandsContainerBuilder(){
}
public static MyRedisCommandsContainer build(FlinkJedisConfigBase flinkJedisConfigBase) {
if (flinkJedisConfigBase instanceof FlinkJedisPoolConfig) {
FlinkJedisPoolConfig flinkJedisPoolConfig = (FlinkJedisPoolConfig)flinkJedisConfigBase;
return build(flinkJedisPoolConfig);
} else if (flinkJedisConfigBase instanceof FlinkJedisSentinelConfig) {
FlinkJedisSentinelConfig flinkJedisSentinelConfig = (FlinkJedisSentinelConfig)flinkJedisConfigBase;
return build(flinkJedisSentinelConfig);
} else {
throw new IllegalArgumentException("Jedis configuration not found");
}
}
public static MyRedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) {
Preconditions.checkNotNull(jedisPoolConfig, "Redis pool config should not be Null");
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle());
genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal());
genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle());
JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(), jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(), jedisPoolConfig.getDatabase());
return new MyRedisContainer(jedisPool);
}
public static MyRedisCommandsContainer build(FlinkJedisSentinelConfig jedisSentinelConfig) {
Preconditions.checkNotNull(jedisSentinelConfig, "Redis sentinel config should not be Null");
GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig();
genericObjectPoolConfig.setMaxIdle(jedisSentinelConfig.getMaxIdle());
genericObjectPoolConfig.setMaxTotal(jedisSentinelConfig.getMaxTotal());
genericObjectPoolConfig.setMinIdle(jedisSentinelConfig.getMinIdle());
JedisSentinelPool jedisSentinelPool = new JedisSentinelPool(jedisSentinelConfig.getMasterName(), jedisSentinelConfig.getSentinels(), genericObjectPoolConfig, jedisSentinelConfig.getConnectionTimeout(), jedisSentinelConfig.getSoTimeout(), jedisSentinelConfig.getPassword(), jedisSentinelConfig.getDatabase());
return new MyRedisContainer(jedisSentinelPool);
}
}
redis操作描述类
package com.jike.flink.examples.redis;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
public enum MyRedisCommand {
HGET(RedisDataType.HASH);
private RedisDataType redisDataType;
private MyRedisCommand(RedisDataType redisDataType) {
this.redisDataType = redisDataType;
}
public RedisDataType getRedisDataType() {
return this.redisDataType;
}
}
package com.jike.flink.examples.redis;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
import org.apache.flink.util.Preconditions;
import java.io.Serializable;
public class MyRedisCommandDescription implements Serializable {
private static final long serialVersionUID = 1L;
private MyRedisCommand redisCommand;
private String additionalKey;
public MyRedisCommandDescription(MyRedisCommand redisCommand, String additionalKey) {
Preconditions.checkNotNull(redisCommand, "Redis command type can not be null");
this.redisCommand = redisCommand;
this.additionalKey = additionalKey;
if ((redisCommand.getRedisDataType() == RedisDataType.HASH || redisCommand.getRedisDataType() == RedisDataType.SORTED_SET) && additionalKey == null) {
throw new IllegalArgumentException("Hash and Sorted Set should have additional key");
}
}
public MyRedisCommandDescription(MyRedisCommand redisCommand) {
this(redisCommand, (String)null);
}
public MyRedisCommand getCommand() {
return this.redisCommand;
}
public String getAdditionalKey() {
return this.additionalKey;
}
}
RedisSource
定义flink redis source的实现,继承RichSourceFunction类,该类构造方法接收两个参数,包括redis配置信息以及要读取的redis数据类型信息;open方法会在source打开时执行,用来完成redis操作类对象的创建;run方法会一直读取redis数据,并根据数据类型调用对应的redis操作,封装成MyRedisRecord对象
package com.jike.flink.examples.redis;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisConfigBase;
import org.apache.flink.util.Preconditions;
public class RedisSource extends RichSourceFunction{
private static final long serialVersionUID = 1L;
private String additionalKey;
private MyRedisCommand redisCommand;
private FlinkJedisConfigBase flinkJedisConfigBase;
private MyRedisCommandsContainer redisCommandsContainer;
private volatile boolean isRunning = true;
public RedisSource(FlinkJedisConfigBase flinkJedisConfigBase, MyRedisCommandDescription redisCommandDescription) {
Preconditions.checkNotNull(flinkJedisConfigBase, "Redis connection pool config should not be null");
Preconditions.checkNotNull(redisCommandDescription, "MyRedisCommandDescription can not be null");
this.flinkJedisConfigBase = flinkJedisConfigBase;
this.redisCommand = redisCommandDescription.getCommand();
this.additionalKey = redisCommandDescription.getAdditionalKey();
}
@Override
public void open(Configuration parameters) throws Exception {
this.redisCommandsContainer = MyRedisCommandsContainerBuilder.build(this.flinkJedisConfigBase);
}
@Override
public void run(SourceContext sourceContext) throws Exception {while (isRunning){
switch(this.redisCommand) {case HGET:
sourceContext.collect(new MyRedisRecord(this.redisCommandsContainer.hget(this.additionalKey), this.redisCommand.getRedisDataType()));break;
default:
throw new IllegalArgumentException("Cannot process such data type: " + this.redisCommand);
}
}
}
@Override
public void cancel() {
isRunning = false;if (this.redisCommandsContainer != null) {
this.redisCommandsContainer.close();
}
}
}使用
利用上述提供的类,计算出现次数最多的单词,其中redis中的哈希表保存个了各个单词的词频 定义MyMapRedisRecordSplitter,实现FlatMapFunction接口,对redis中读取的哈希表进行遍历,以形式输出到下游
package com.jike.flink.examples.redis;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisDataType;
import org.apache.flink.util.Collector;
import java.util.Map;
public class MyMapRedisRecordSplitter implements FlatMapFunction> {
@Override
public void flatMap(MyRedisRecord myRedisRecord, Collector> collector) throws Exception {
assert myRedisRecord.getRedisDataType() == RedisDataType.HASH;
Map map = (Map)myRedisRecord.getData();for(Map.Entry e : map.entrySet()){
collector.collect(new Tuple2<>(e.getKey(),Integer.valueOf(e.getValue())));
}
}
}定义主程序类
package com.jike.flink.examples.redis;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
public class MaxCount{
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("ip").setPort(30420).setPassword("passwd").build();
DataStreamSource source = executionEnvironment.addSource(new RedisSource(conf,new MyRedisCommandDescription(MyRedisCommand.HGET,"flink")));
DataStream> max = source.flatMap(new MyMapRedisRecordSplitter()).timeWindowAll(Time.milliseconds(5000)).maxBy(1);
max.print().setParallelism(1);
executionEnvironment.execute();
}
}redis中的数据及idea中的打印结果


总结
简单实现了一个统一的flink redis source,提供了redis数据读取的功能,可以作为flink自定义数据源的入门学习。
Flink读写Redis(一)-写入Redis
flink-connector-redis源码阅读
本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
