DRS如何实现Mysql->Kafka的数据解析
DRS如何实现Mysql->Kafka的数据解析
数据复制服务(Data Replication Service,简称DRS)是一种易用、稳定、高效、用于数据库在线迁移和数据库实时同步的云服务,除了支持mysql,postgres,sqlserver,MongoDB和oracle等常用数据库之间同步外,还可以通过订阅mysql的增量日志写到kafka,由客户自主决定写到其他的数据库、数据仓库和大数据,从而实现数据在企业间自由流通,满足企业管理者所需要的信息。
实现概览
通过DRS的数据同步功能可以方便从mysql的增量数据同步到kafka,图解如下:
DRS通过mysql的binlog获取增量日志,并对增量日志进行解析,过滤并封装成Record数据写到kafka;客户可以通过ECS订阅kafka数据并解析Record数据,再根据需要写到其他数据库、数据仓库和大数据,图解如下:
下面将具体介绍解析Record数据的整个流程。
解析流程
如何封装成Record数据就成为一个技术难题,既要支持Record的字段可以任意扩充,又要支持高效读取,还要支持不同版本数据可以兼容获取,这里就需要用到Avro技术了。Avro是一个数据序列化系统,用于支持大批量数据交换的应用,Avro支持两种序列化编码方式:二进制编码和JSON编码。使用二进制编码会高效序列化,并且序列化后得到的结果会比较小;而JSON一般用于调试系统或是基于WEB的应用;DRS写到kafka的就是二进制编码,可以高效进行传输和获取。
Record解析主要有2个步骤:
-
使用原生的Kafka consumer从数据订阅通道中获取增量数据;
-
将获取的增量数据执行反序列化,并从中获取前镜像、后镜像和其他属性。
写到kafka的Record数据实际上是使用AVRO对象信息。Avro支持两种序列化编码方式:二进制编码和JSON编码。使用二进制编码会高效序列化,并且序列化后得到的结果会比较小;而JSON一般用于调试系统或是基于WEB的应用;DRS写到kafka的就是二进制编码,可以高效进行传输和获取。
Avro可以允许我们根据模式的定义而生成相应的类,一旦我们定义好相关的类,程序中就不需要直接使用模式了。可以用avro-tools jar包根据record.avsc生成相关类的java代码,参考命令如下:
java -jar avro-tools-1.8.2.jar compile schema D:\drs-avro\src\main\resources\record.avsc D:\drs-avro\src\main\java\
命令执行后会在D:\drs-avro\src\main\java\根据record.avsc文件生成一个Record.java类和相关字段类型的java代码,如下图所示:
把这些代码加到应用程序代码目录,可以直接引用,相关伪代码如下:
1. 从kafka中订阅数据
Properties props = new Properties();props.put("bootstrap.servers", host);props.put("isolation.level","read_committed");props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<String, byte[]>(props);consumer.subscribe(Collections.singletonList(topic));AvroObject avroObject = new AvroObject();while (true) {
ConsumerRecords<String, byte[]> records = consumer.poll(100);
for (ConsumerRecord<String, byte[]> record : records) {
if (record != null && record.value() != null) {
//从kafka获取的数据进行反序列化生成Record类
Record rec = avroObject.getData(record.value());
if (rec != null) {
//解析Record对象信息
avroObject.showRecord(rec);
}
}
}}
2. AvroObject中反序列化
import org.apache.avro.io.Decoder;import org.apache.avro.io.DecoderFactory;import org.apache.avro.specific.SpecificDatumReader;public Record getData(byte[] data) {
try {
SpecificDatumReader<Record> reader = new SpecificDatumReader<Record>(Record.class);
Decoder decoder = DecoderFactory.get().binaryDecoder(data, null);
Record record = reader.read(null, decoder);
return record;
} catch (Exception e) {
System.out.println(e);
return null;
}}
3. AvroObject中解析Record信息
public void showRecord(Record rec) {
System.out.print("version=" + rec.getVersion()); //版本号
System.out.print(",seqno=" + rec.getSeqno()); //日志流水号
System.out.print(",tableName=" + rec.getTableName()); //数据库和表名
System.out.print(",operation=" + rec.getOperation()); //操作类型:INSERT,DELETE,UPDATE,DDL
System.out.println(",columnCount=" + rec.getColumnCount()); //字段个数
int columnCount = (int) (long) rec.getColumnCount();
List<Field> fields = rec.getFields(); //字段列表,包括字段名称和字段类型
List<Object> value = rec.getAfterImages(); //记录后镜像
List<Object> beforeValue = rec.getBeforeImages(); ////记录前镜像
//打印每个字段的名称,类型和前后镜像值
for (int col = 0; col < columnCount; col++) {
Field field = fields.get(col);
System.out.print((col + 1) + " fieldName=" + field.getName());
int type = field.getDataTypeNumber();
String typeNmae = typeMap.get(String.valueOf(type));
String aValue = getValue(type, value.get(col));
if (beforeValue != null && beforeValue.size() > 0) {
String bValue = getValue(type, beforeValue.get(col));
if (!aValue.equals(bValue)) {
aValue = aValue + ",beforeValue=" + bValue;
}
}
System.out.println(",type=" + type + "(" + typeN + "),value=" + aValue);
}}
- 点赞
- 收藏
- 关注作者
评论(0)