Flink实时计算运用(七)Flink 自定义序列化Protobuf接入实现方案

1. 自定义序列化接入方案(Protobuf)

在实际应用场景中, 会存在各种复杂传输对象,同时要求较高的传输处理性能, 这就需要采用自定义的序列化方式做相应实现, 这里以Protobuf为例做讲解。

功能: kafka对同一Topic的生产与消费,采用Protobuf做序列化与反序列化传输, 验证能否正常解析数据。

  1. 通过protobuf脚本生成JAVA文件

    syntax = "proto3";
    option java_package = "com.itcast.flink.connectors.kafka.proto";
    option java_outer_classname = "AccessLogProto";// 消息结构定义
    message AccessLog {string ip = 1;string time = 2;string type = 3;string api = 4;string num = 5;
    }

    通过批处理脚本,生成JAVA文件:

    @echo off
    for %%i in (proto/*.proto) do (d:/TestCode/protoc.exe --proto_path=./proto  --java_out=../java  ./proto/%%iecho generate %%i to java file successfully!
    )

    注意, 路径要配置正确。

  2. 自定义序列化实现

    添加POM依赖:

    <dependencies><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-connector-kafka_2.11artifactId><version>1.11.2version>dependency><dependency><groupId>com.google.protobufgroupId><artifactId>protobuf-javaartifactId><version>3.8.0version>dependency><dependency><groupId>org.springframeworkgroupId><artifactId>spring-beansartifactId><version>5.1.8.RELEASEversion>dependency>
    dependencies>

    AccessLog对象:

    @Data
    public class AccessLog implements Serializable {private String ip;private String time;private String type;private String api;private Integer num;
    }
    

    CustomSerialSchema:

    /*** 自定义序列化实现(Protobuf)*/
    public class CustomSerialSchema implements DeserializationSchema<AccessLog>, SerializationSchema<AccessLog> {private static final long serialVersionUID = 1L;private transient Charset charset;public CustomSerialSchema() {this(StandardCharsets.UTF_8);}public CustomSerialSchema(Charset charset) {this.charset = checkNotNull(charset);}public Charset getCharset() {return charset;}/*** 反序列化实现* @param message* @return*/@Overridepublic AccessLog deserialize(byte[] message) {AccessLog accessLog = null;try {AccessLogProto.AccessLog accessLogProto = AccessLogProto.AccessLog.parseFrom(message);accessLog = new AccessLog();BeanUtils.copyProperties(accessLogProto, accessLog);return accessLog;} catch (Exception e) {e.printStackTrace();}return accessLog;}@Overridepublic boolean isEndOfStream(AccessLog nextElement) {return false;}/*** 序列化处理* @param element* @return*/@Overridepublic byte[] serialize(AccessLog element) {AccessLogProto.AccessLog.Builder builder = AccessLogProto.AccessLog.newBuilder();BeanUtils.copyProperties(element, builder);return builder.build().toByteArray();}/*** 定义消息类型* @return*/@Overridepublic TypeInformation<AccessLog> getProducedType() {return TypeInformation.of(AccessLog.class);}
    }
  3. 通过flink对kafka消息生产者的实现

    public class KafkaSinkApplication {public static void main(String[] args) throws Exception {// 1. 创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 读取Socket数据源DataStreamSource<String> socketStr = env.socketTextStream("localhost", 9911, "\n");// 3. 转换处理流数据SingleOutputStreamOperator<AccessLog> outputStream = socketStr.map(new MapFunction<String, AccessLog>() {@Overridepublic AccessLog map(String value) throws Exception {System.out.println(value);// 根据分隔符解析数据String[] arrValue = value.split("\t");// 将数据组装为对象AccessLog log = new AccessLog();log.setNum(1);for(int i=0; i<arrValue.length; i++) {if(i == 0) {log.setIp(arrValue[i]);}else if( i== 1) {log.setTime(arrValue[i]);}else if( i== 2) {log.setType(arrValue[i]);}else if( i== 3) {log.setApi(arrValue[i]);}}return log;}});// 3. Kakfa的生产者配置Properties properties = new Properties();properties.setProperty("bootstrap.servers", "10.10.20.132:9092");FlinkKafkaProducer kafkaProducer = new FlinkKafkaProducer("10.10.20.132:9092",            // broker 列表"flink-serial",                  // 目标 topicnew CustomSerialSchema()                 // 序列化 方式);   // 4. 添加kafka的写入器outputStream.addSink(kafkaProducer);socketStr.print().setParallelism(1);// 5. 执行任务env.execute("job");}}
    

    开启Kafka消费者命令行终端,验证生产者的可用性:

    [root@flink1 kafka_2.12-1.1.1]# bin/kafka-console-consumer.sh --bootstrap-server  10.10.20.132:9092  --topic flink-serial	
    1601649380422GET"
    getAccount
    1601649381422POSTaddOrder
    1601649382422POST"
    
  4. 通过flink对kafka消息订阅者的实现

    public class KafkaSourceApplication {public static void main(String[] args) throws Exception {// 1. 创建运行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2. 设置kafka服务连接信息Properties properties = new Properties();properties.setProperty("bootstrap.servers", "10.10.20.132:9092");properties.setProperty("group.id", "fink_group");// 3. 创建Kafka消费端FlinkKafkaConsumer kafkaProducer = new FlinkKafkaConsumer("flink-serial",                  // 目标 topicnew CustomSerialSchema(),   // 自定义序列化properties);// 4. 读取Kafka数据源DataStreamSource<AccessLog> socketStr = env.addSource(kafkaProducer);socketStr.print().setParallelism(1);// 5. 执行任务env.execute("job");}}
    

    通过flink的kafka生产者消息的发送, 对消费者的功能做测试验证。


本文由mirson创作分享,如需进一步交流,请加QQ群:19310171或访问www.softart.cn


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部