DRS数据同步到KAFKA原理
##一、kafka简单介绍
kafka大家的第一印象是一个消息系统。但是kafka的官网的说法是:
Apache Kafka® is a distributed streaming platform
kafka是一个分布式流处理平台,而流处理平台主要具备以下三种能力:
1.发布和订阅消息流,类似于消息队列或企业消息传递系统。
2.以容错的持久化方式储存消息流。
3.可以在产生消息流的时候,同时进行处理。
而kafka具备以下几个特性:
1.kafka作为一个集群可以运行在一个或者多个服务器上,这些服务器可以跨多个数据中心。
2.kafka集群存储的消息是按照topic(主题)进行分类的。
3.每个消息(也被称为记录)是由一个key,一个value和一个时间戳构成。
kafka对外提供了四种核心API:
1.Producer API,允许应用程序发布消息到kafka集群上的1个或多个的topic。
2.Consumer API,允许应用程序订阅一个或多个topic,并处理这些topic的消息。
3.Streams API,允许应用程序充当一个流处理器,从1个或多个topic消费输入流,并产生一个输出流到1个或多个输出topic,有效地将输入流转换到输出流。
4.Connector API,允许构建运行可重复使用的生产者或消费者,将topic和现有的应用程序或数据系统连接起来。例如,一个关系型数据库的连接器可以捕获到该库下每一个表的变化。
##二、Producer发送数据到kafka的流程
DRS同步到kafka:DRS作为kafka的客户端,利用kafka的Producer API;将源端数据库产生的增量数据写入到目标kafka的topic上。
以mysql到kafka为例,大致流程入下:
1.将源端mysql的binlog日志记录的增量数据作为消息封装成一个Record。
2.经过拦截器,对消息进行过滤。
3.经过序列化器,将消息的key和value进行序列化,当然可以自行定义序列化规则或者自行编写序列化器。
4.消息经过分区器,确定这条消息需要发送到目标topic的分区号。如果在消息里面指定了partion字段,那么就是将消息发送到指定分区。
5.之后消息会封装从成一个一个批次汇总到RecordAccumulator。accumulator可以作为一个缓存,是kafka强大的写入性能原因之一。
6.之后会依赖一个后台唤醒的Sender线程,将数据有序的发送到leader partition所在的broker(kafka集群的每一个服务器都是一个broker)中。
7.在发送消息的过程中,kafka客户端可以从任意一个broker获取到kafka集群的metadata信息,metadata信息里面记录了kafka集群的每个topic的所有partition的信息: leader, fellow, isr, replicas等。
整体的流程如下图所示
三、Producer两个重要参数
1.acks决定了生产者如何在性能与数据可靠之间做取舍,官方源码中描述如下:
public static final String ACKS_CONFIG = "acks";
private static final String ACKS_DOC = "The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the "
+ " durability of records that are sent. The following settings are allowed: "
+ " <ul>"
+ " <li><code>acks=0</code> If set to zero then the producer will not wait for any acknowledgment from the"
+ " server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be"
+ " made that the server has received the record in this case, and the <code>retries</code> configuration will not"
+ " take effect (as the client won't generally know of any failures). The offset given back for each record will"
+ " always be set to <code>-1</code>."
+ " <li><code>acks=1</code> This will mean the leader will write the record to its local log but will respond"
+ " without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after"
+ " acknowledging the record but before the followers have replicated it then the record will be lost."
+ " <li><code>acks=all</code> This means the leader will wait for the full set of in-sync replicas to"
+ " acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica"
+ " remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting."
+ "</ul>";
对于kafka来说,消息日志是按照topic分类存储的,而对于一个topic来说有partitons分区数,replication-factor副本数。
对于一个topic而言有多个分区,一个分又可以有多个副本。这些副本中,只有一个leader partition。其他都是follower partiton,仅有leader partition可以对外提供服务,follower partiton主要用于冗余备份。
而副本是存放在不同的broker上面的,因此在创建topic的时候,副本数不能大于broker的节点数的。
而acks参数呢,就是和副本有关系。
acks=0:这意味着producer发送数据后,不会等待broker确认,直接发送下一条数据,性能最好
acks=1:为1意味着producer发送数据后,需要等待leader副本确认接收后,才会发送下一条数据,性能次之
acks=-1/all:这个代表的是all,意味着发送的消息写入leader partition后,等到follower从leader拉取到消息后,才会发送下一条数据,性能最差,但可靠性最强
而DRS以可靠性优先,因此设置的acks参数值为all,确保消息写入到所有可用副本后,才进行下一条写入。
2.max.in.flight.requests.per.connection,官方源码描述如下:
public static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION = "max.in.flight.requests.per.connection";
private static final String MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION_DOC = "The maximum number of unacknowledged requests the client will send on a single connection before blocking."
+ " Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of"
+ " message re-ordering due to retries (i.e., if retries are enabled).";
// 在InFlightRequests.java中
/**
* Can we send more requests to this node?
*
* @param node Node in question
* @return true iff we have no requests still being sent to the given node
*/
public boolean canSendMore(String node) {
Deque<NetworkClient.InFlightRequest> queue = requests.get(node);
return queue == null || queue.isEmpty() ||
(queue.peekFirst().send.completed() && queue.size() < this.maxInFlightRequestsPerConnection);
}
// 在Sender.java中
if (guaranteeMessageOrder) {
// Mute all the partitions drained
for (List<ProducerBatch> batchList : batches.values()) {
for (ProducerBatch batch : batchList)
this.accumulator.mutePartition(batch.topicPartition);
}
}
max.in.flight.requests.per.connection表示在单个连接中,最多可以忍受多少个请求处于发送中没有没有响应。kafka源码中这个参数默认是5,可以认为,在一个连接中有5个请求发送出去了,并且Producer都没有收到broker的响应。
如果这个参数大于1,由于有重试机制,可能会存在消息顺序错乱的风险。
如下图,在一个网络连接中将batch封装成不同的request,从batch队列中取出数据,按照顺序封装成不同的request(请求1… 请求5).
如果broker在处理请求2时因为borker节点不可以等因素导致写消息到partition异常了,但是其它请求的数据都正常写入了。此时由于重试机制,Producer会将请求2重新发送。
导致broker写入到leader partition消息顺序错乱。
而DRS为了保证数据写入到kafka是有序的,max.in.flight.requests.per.connection参数设置为1,但是这样降低了kafka的吞吐量。
- 点赞
- 收藏
- 关注作者
评论(0)