DRS如何实现Mysql->Kafka的数据解析

举报
DRS技术快客 发表于 2020/05/30 15:24:28 2020/05/30
【摘要】 DRS如何实现Mysql->Kafka的数据解析数据复制服务(Data Replication Service,简称DRS)是一种易用、稳定、高效、用于数据库在线迁移和数据库实时同步的云服务,除了支持mysql,postgres,sqlserver,MongoDB和oracle等常用数据库之间同步外,还可以通过订阅mysql的增量日志写到kafka,由客户自主决定写到其他的数据库、数据仓库和...

DRS如何实现Mysql->Kafka的数据解析

数据复制服务(Data Replication Service,简称DRS)是一种易用、稳定、高效、用于数据库在线迁移和数据库实时同步的云服务,除了支持mysql,postgres,sqlserver,MongoDB和oracle等常用数据库之间同步外,还可以通过订阅mysql的增量日志写到kafka,由客户自主决定写到其他的数据库、数据仓库和大数据,从而实现数据在企业间自由流通,满足企业管理者所需要的信息。

实现概览

通过DRS的数据同步功能可以方便从mysql的增量数据同步到kafka,图解如下:
数据解析流动.PNG
DRS通过mysql的binlog获取增量日志,并对增量日志进行解析,过滤并封装成Record数据写到kafka;客户可以通过ECS订阅kafka数据并解析Record数据,再根据需要写到其他数据库、数据仓库和大数据,图解如下:
数据回放流动.PNG
下面将具体介绍解析Record数据的整个流程。

解析流程

如何封装成Record数据就成为一个技术难题,既要支持Record的字段可以任意扩充,又要支持高效读取,还要支持不同版本数据可以兼容获取,这里就需要用到Avro技术了。Avro是一个数据序列化系统,用于支持大批量数据交换的应用,Avro支持两种序列化编码方式:二进制编码和JSON编码。使用二进制编码会高效序列化,并且序列化后得到的结果会比较小;而JSON一般用于调试系统或是基于WEB的应用;DRS写到kafka的就是二进制编码,可以高效进行传输和获取。

Record解析主要有2个步骤:

  1. 使用原生的Kafka consumer从数据订阅通道中获取增量数据;

  2. 将获取的增量数据执行反序列化,并从中获取前镜像、后镜像和其他属性。

写到kafka的Record数据实际上是使用AVRO对象信息。Avro支持两种序列化编码方式:二进制编码和JSON编码。使用二进制编码会高效序列化,并且序列化后得到的结果会比较小;而JSON一般用于调试系统或是基于WEB的应用;DRS写到kafka的就是二进制编码,可以高效进行传输和获取。

Avro可以允许我们根据模式的定义而生成相应的类,一旦我们定义好相关的类,程序中就不需要直接使用模式了。可以用avro-tools jar包根据record.avsc生成相关类的java代码,参考命令如下:

java -jar avro-tools-1.8.2.jar compile schema D:\drs-avro\src\main\resources\record.avsc D:\drs-avro\src\main\java\

命令执行后会在D:\drs-avro\src\main\java\根据record.avsc文件生成一个Record.java类和相关字段类型的java代码,如下图所示:

展示.png

把这些代码加到应用程序代码目录,可以直接引用,相关伪代码如下:

1. 从kafka中订阅数据

Properties props = new Properties();props.put("bootstrap.servers", host);props.put("isolation.level","read_committed");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(props);consumer.subscribe(Collections.singletonList(topic));AvroObject avroObject = new AvroObject();while (true) {
    ConsumerRecords<String, byte[]> records = consumer.poll(100);
    for (ConsumerRecord<String, byte[]> record : records) {
        if (record != null && record.value() != null) {
            //从kafka获取的数据进行反序列化生成Record类
            Record rec = avroObject.getData(record.value());
            if (rec != null) {
                //解析Record对象信息
                avroObject.showRecord(rec);
            }
        }
    }}

2. AvroObject中反序列化

import org.apache.avro.io.Decoder;import org.apache.avro.io.DecoderFactory;import org.apache.avro.specific.SpecificDatumReader;public Record getData(byte[] data) {

    try {
        SpecificDatumReader<Record> reader = new SpecificDatumReader<Record>(Record.class);
        Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
        Record record = reader.read(null, decoder);
        return record;
    } catch (Exception e) {
        System.out.println(e);
        return null;
    }}

3. AvroObject中解析Record信息

public void showRecord(Record rec) {
    System.out.print("version=" + rec.getVersion());    //版本号
    System.out.print(",seqno=" + rec.getSeqno());       //日志流水号
    System.out.print(",tableName=" + rec.getTableName());   //数据库和表名
    System.out.print(",operation=" + rec.getOperation());   //操作类型:INSERT,DELETE,UPDATE,DDL
    System.out.println(",columnCount=" + rec.getColumnCount()); //字段个数
    int columnCount = (int) (long) rec.getColumnCount();
    List<Field> fields = rec.getFields();   //字段列表,包括字段名称和字段类型
    List<Object> value = rec.getAfterImages();  //记录后镜像
    List<Object> beforeValue = rec.getBeforeImages();   ////记录前镜像

    //打印每个字段的名称,类型和前后镜像值
    for (int col = 0; col < columnCount; col++) {
        Field field = fields.get(col);
        System.out.print((col + 1) + " fieldName=" + field.getName());
        int type = field.getDataTypeNumber();
        String typeNmae = typeMap.get(String.valueOf(type));
        String aValue = getValue(type, value.get(col));
        if (beforeValue != null && beforeValue.size() > 0) {
            String bValue = getValue(type, beforeValue.get(col));
            if (!aValue.equals(bValue)) {
                aValue = aValue  + ",beforeValue=" + bValue;
            }
        }
        System.out.println(",type=" + type + "(" + typeN + "),value=" + aValue);
    }}


【版权声明】本文为华为云社区用户原创内容,未经允许不得转载,如需转载请自行联系原作者进行授权。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@huaweicloud.com
  • 点赞
  • 收藏
  • 关注作者

评论(0

0/1000
抱歉,系统识别当前为高风险访问,暂不支持该操作

全部回复

上滑加载中

设置昵称

在此一键设置昵称,即可参与社区互动!

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。

*长度不超过10个汉字或20个英文字符,设置后3个月内不可修改。