十四、kafka消费者之SyncGroup(二)
上一节我们有讲到kafka客户端在收到加入组响应之后会由leader来做消费者消费的分配工作,分配之后会向服务器发送SyncGroupRequest,这一节我们就从SyncGroupRequest协议开始。
SyncGroupRequest及SyncGroupResponse样例展示
- 消费者leader发送的请求
SyncGroupRequestData(groupId=‘mykafka-group_4’, generationId=3, memberId=‘mykafka-group_4_1-c3c31f6b-7229-49a2-b7cc-8a122976bb5e’, groupInstanceId=null, protocolType=‘consumer’, protocolName=‘sticky’, assignments=[SyncGroupRequestAssignment(memberId=‘mykafka-group_4_2-d074e246-fe60-4f06-8cde-0f8c5d8fdd00’, assignment=[0, 1, 0, 0, 0, 1, 0, 7, 116, 111, 112, 105, 99, 95, 49, 0, 0, 0, 1, 0, 0, 0, 1, -1, -1, -1, -1]), SyncGroupRequestAssignment(memberId=‘mykafka-group_4_1-c3c31f6b-7229-49a2-b7cc-8a122976bb5e’, assignment=[0, 1, 0, 0, 0, 1, 0, 7, 116, 111, 112, 105, 99, 95, 49, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 2, -1, -1, -1, -1])])
- 消费者leader收到的响应
SyncGroupResponseData(throttleTimeMs=0, errorCode=0, protocolType=‘consumer’, protocolName=‘sticky’, assignment=[0,
1, 0, 0, 0, 1, 0, 7, 116, 111, 112, 105, 99, 95, 49, 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 2, -1, -1, -1, -1])
- 消费者follower发送的请求
Sending follower SyncGroup to coordinator:SyncGroupRequestData(groupId=‘mykafka-group_4’, generationId=3,
memberId=‘mykafka-group_4_2-d074e246-fe60-4f06-8cde-0f8c5d8fdd00’, groupInstanceId=null, protocolType=‘consumer’, protocolName=‘sticky’, assignments=[])
- 消费者follower收到的响应
SyncGroupResponseData(throttleTimeMs=0, errorCode=0, protocolType=‘consumer’, protocolName=‘sticky’, assignment=[0, 1, 0, 0, 0, 1, 0, 7, 116, 111, 112, 105, 99, 95, 49, 0, 0, 0, 1, 0, 0, 0, 1, -1, -1, -1, -1])
可以看到在发送的请求中消费者leader比follower多了assignments,这个就是leader根据协议投票最终生成的消费者分配,而收到的响应都是一样的。SyncGroupRequest及SyncGroupResponse协议字段如图一及图二所示。


服务端对SyncGroupRequest请求的处理
代码依旧在熟悉的kafka.server.KafkaApis#handleSyncGroupRequest。代码前面主要是校验各种数据及组的状态,组状态为CompletingRebalance的时候才会处理,为Stable
时只会返回分配信息。
这里可以思考一下,多个消费者都会向group协调器发送SyncGroupRequest,而消费者leader需要按照协议分配消费信息之后再发送请求,往往会比其他消费者稍微晚一些发送请求,而leader
的请求才会带有分配信息,kafka是怎么做到让其他组成员也收到同步信息的呢?答案就在以下代码,首先每个member里面定义了awaitingSyncCallback,然后在对leader
的处理中,先存储组的信息,再处理其他成员的response,最后再转换组状态为稳定态。
group.currentState match {case Empty =>responseCallback(SyncGroupResult(Errors.UNKNOWN_MEMBER_ID))case PreparingRebalance =>responseCallback(SyncGroupResult(Errors.REBALANCE_IN_PROGRESS))case CompletingRebalance =>group.get(memberId).awaitingSyncCallback = responseCallback// if this is the leader, then we can attempt to persist state and transition to stableif (group.isLeader(memberId)) {info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")// fill any missing members with an empty assignmentval missing = group.allMembers -- groupAssignment.keySetval assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMapif (missing.nonEmpty) {warn(s"Setting empty assignments for members $missing of ${group.groupId} for generation ${group.generationId}")}//存储组分配信息,写入日志文件groupManager.storeGroup(group, assignment, (error: Errors) => {group.inLock {// another member may have joined the group while we were awaiting this callback,// so we must ensure we are still in the CompletingRebalance state and the same generation// when it gets invoked. if we have transitioned to another state, then do nothingif (group.is(CompletingRebalance) && generationId == group.generationId) {if (error != Errors.NONE) {resetAndPropagateAssignmentError(group, error)maybePrepareRebalance(group, s"error when storing group assignment during SyncGroup (member: $memberId)")} else {//处理其他成员的SyncGroup请求的回调setAndPropagateAssignment(group, assignment)//组状态变更为stablegroup.transitionTo(Stable)}}}})groupCompletedRebalanceSensor.record()}case Stable =>// if the group is stable, we just return the current assignmentval memberMetadata = group.get(memberId)responseCallback(SyncGroupResult(group.protocolType, group.protocolName, memberMetadata.assignment, Errors.NONE))completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))case Dead =>throw new IllegalStateException(s"Reached unexpected condition for Dead group ${group.groupId}")
}
setAndPropagateAssignment方法的代码如下,就是在propagateAssignment方法中处理了其他成员的回调
private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]): Unit = {assert(group.is(CompletingRebalance))group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId))propagateAssignment(group, Errors.NONE)}private def propagateAssignment(group: GroupMetadata, error: Errors): Unit = {val (protocolType, protocolName) = if (error == Errors.NONE)(group.protocolType, group.protocolName)else(None, None)for (member <- group.allMemberMetadata) {if (member.assignment.isEmpty && error == Errors.NONE) {warn(s"Sending empty assignment to member ${member.memberId} of ${group.groupId} for generation ${group.generationId} with no errors")}//处理其他成员的response,返回分配信息,并重置心跳。if (group.maybeInvokeSyncCallback(member, SyncGroupResult(protocolType, protocolName, member.assignment, error))) {// reset the session timeout for members after propagating the member's assignment.// This is because if any member's session expired while we were still awaiting either// the leader sync group or the storage callback, its expiration will be ignored and no// future heartbeat expectations will not be scheduled.completeAndScheduleNextHeartbeatExpiration(group, member)}}}
#客户端收到SyncGroupResponse之后的处理
分析SyncGroupResponse的处理逻辑
虽然经常有资料在描述消费者加入组这块的流程时,都会将其描述成三个步骤,即findCoordinator、joinGroup、syncGroup,实际从我们的源码分析来看,joinGroup、syncGroup
两个步骤并不是完全独立的。这块涉及到了kafka网络通信模块的设计,在这里我们只是稍作分析。
- 首先我们知道syncGroup请求是在onJoinLeader或者onJoinFollower方法中发送的,我们回顾一下代码。
private RequestFuture<ByteBuffer> onJoinLeader(JoinGroupResponse joinResponse) {try {//……省略return sendSyncGroupRequest(requestBuilder);} catch (RuntimeException e) {return RequestFuture.failure(e);}}
- 而onJoinLeader或者onJoinFollower方法是在JoinGroupResponseHandler中的handle方法中调用的,findCoordinator
JoinGroupResponseHandler顾名思义,就是JoinGroup请求返回参数的处理类,同样,findCoordinator、syncGroup等请求都会有这样
的处理类。可以看到在下面的代码中,对JoinGroupResponse解析成功之后,会先调onJoinXX方法,并且.chain(future),这是一种链式调用,
意思也就是说判断JoinGroup返回成功之后,发送syncGroup请求,收到syncGroupResponse之后,调用SyncGroupResponseHandler的handle
方法处理返回参数,如果SyncGroup也解析成功,则会将JoinGroupResponse的RequestFuture也置为成功,然后触发JoinGroup对应RequestFuture的监听器,
监听器的逻辑见以下第二段代码。最后,在调用initiateJoinGroup方法的地方,由consume.poll的线程来处理加入组成功后的逻辑。具体可见以下第三段代码。
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {@Overridepublic void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {Errors error = joinResponse.error();if (error == Errors.NONE) {//……省略AbstractCoordinator.this.generation = new Generation(joinResponse.data().generationId(),joinResponse.data().memberId(), joinResponse.data().protocolName());if (joinResponse.isLeader()) {onJoinLeader(joinResponse).chain(future);} else {onJoinFollower().chain(future);}//……省略} //……省略}}
//org.apache.kafka.clients.consumer.internals.AbstractCoordinator#initiateJoinGroupprivate synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {if (joinFuture == null) {//……省略if (lastRebalanceStartMs == -1L)lastRebalanceStartMs = time.milliseconds();joinFuture = sendJoinGroupRequest();joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {@Overridepublic void onSuccess(ByteBuffer value) {// handle join completion in the callback so that the callback will be invoked// even if the consumer is woken up before finishing the rebalancesynchronized (AbstractCoordinator.this) {if (generation != Generation.NO_GENERATION) {log.info("Successfully joined group with generation {}", generation.generationId);state = MemberState.STABLE;rejoinNeeded = false;// record rebalance latencylastRebalanceEndMs = time.milliseconds();sensors.successfulRebalanceSensor.record(lastRebalanceEndMs - lastRebalanceStartMs);lastRebalanceStartMs = -1L;if (heartbeatThread != null)heartbeatThread.enable();} else {log.info("Generation data was cleared by heartbeat thread. Rejoin failed.");recordRebalanceFailure();}}}//……省略});}return joinFuture;}
//org.apache.kafka.clients.consumer.internals.AbstractCoordinator#joinGroupIfNeededboolean joinGroupIfNeeded(final Timer timer) {//rejoinNeededOrPending方法中是拿加入组成功之后的快照数据来做比对的,也就是说只要还没成功就会重试while (rejoinNeededOrPending()) {//……省略//2.2 发送请求final RequestFuture<ByteBuffer> future = initiateJoinGroup();//……省略//这里future获取的是JoinGroupResponseHandler回调之后的future对象,此时回调已成功,属于回调之后的后置处理if (future.succeeded()) {Generation generationSnapshot;//这里解释了锁AbstractCoordinator对象的原因,因为generation是有可能被心跳线程置空的synchronized (AbstractCoordinator.this) {generationSnapshot = this.generation;}if (generationSnapshot != Generation.NO_GENERATION) {// Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried.ByteBuffer memberAssignment = future.value().duplicate();//2.3 成功加入组之后的处理onJoinComplete(generationSnapshot.generationId, generationSnapshot.memberId, generationSnapshot.protocolName, memberAssignment);resetJoinGroupFuture();//这里为什么要单独写needsJoinPrepare=true,不像下面那样调用resetStateAndRejoin?因为下面的还多了个state重置为未加入组状态needsJoinPrepare = true;} else {//……省略}} else {//……省略}}return true;}
收到消费者分区分配后的处理
这里主要就是拿到分配的规则并更新本地的数据。
//org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#onJoinCompleteprotected void onJoinComplete(int generation,String memberId,String assignmentStrategy,ByteBuffer assignmentBuffer) {log.debug("Executing onJoinComplete with generation {} and memberId {}", generation, memberId);//如果不是leader的话将assignmentSnapshot置空,assignmentSnapshot为分配快照if (!isLeader)assignmentSnapshot = null;//如果最后投票出来的分区分配策略不是本消费者选择的,这里会抛错ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);if (assignor == null)throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);// Give the assignor a chance to update internal state based on the received assignmentgroupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, generation, memberId, rebalanceConfig.groupInstanceId);Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());// should at least encode the short versionif (assignmentBuffer.remaining() < 2)throw new IllegalStateException("There is insufficient bytes available to read assignment from the sync-group response (" +"actual byte size " + assignmentBuffer.remaining() + ") , this is not expected; " +"it is possible that the leader's assign function is buggy and did not return any assignment for this member, " +"or because static member is configured and the protocol is buggy hence did not get the assignment for this member");//解析返回的平衡之后的数据Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);log.info("Adding newly assigned partitions: {}", Utils.join(assignment.partitions(), ", "));Set<TopicPartition> assignedPartitions = new HashSet<>(assignment.partitions());//校验是否能初始绑定的topic是否一致if (!subscriptions.checkAssignmentMatchedSubscription(assignedPartitions)) {log.warn("We received an assignment {} that doesn't match our current subscription {}; it is likely " +"that the subscription has changed since we joined the group. Will try re-join the group with current subscription",assignment.partitions(), subscriptions.prettyString());requestRejoin();return;}final AtomicReference<Exception> firstException = new AtomicReference<>(null);Set<TopicPartition> addedPartitions = new HashSet<>(assignedPartitions);//获取需要新增的topicPartitionaddedPartitions.removeAll(ownedPartitions);//对CooperativeStickyAssignor策略的特殊处理if (protocol == RebalanceProtocol.COOPERATIVE) {Set<TopicPartition> revokedPartitions = new HashSet<>(ownedPartitions);revokedPartitions.removeAll(assignedPartitions);log.info("Updating assignment with\n" +"now assigned partitions: {}\n" +"compare with previously owned partitions: {}\n" +"newly added partitions: {}\n" +"revoked partitions: {}\n",Utils.join(assignedPartitions, ", "),Utils.join(ownedPartitions, ", "),Utils.join(addedPartitions, ", "),Utils.join(revokedPartitions, ", "));//为什么这里判断revokedPartitions不为空就要重新加入呢,首先revokedPartitions意思是需要撤销的分区,if (!revokedPartitions.isEmpty()) {// revoke partitions that were previously owned but no longer assigned;// note that we should only change the assignment (or update the assignor's state)// AFTER we've triggered the revoke callbackfirstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions));// if revoked any partitions, need to re-join the group afterwardslog.debug("Need to revoke partitions {} and re-join the group", revokedPartitions);requestRejoin();}}// The leader may have assigned partitions which match our subscription pattern, but which// were not explicitly requested, so we update the joined subscription here.maybeUpdateJoinedSubscription(assignedPartitions);// Catch any exception here to make sure we could complete the user callback.//触发设置策略的回调try {assignor.onAssignment(assignment, groupMetadata);} catch (Exception e) {firstException.compareAndSet(null, e);}// Reschedule the auto commit starting from nowif (autoCommitEnabled)this.nextAutoCommitTimer.updateAndReset(autoCommitIntervalMs);subscriptions.assignFromSubscribed(assignedPartitions);// add partitions that were not previously owned but are now assignedfirstException.compareAndSet(null, invokePartitionsAssigned(addedPartitions));if (firstException.get() != null)throw new KafkaException("User rebalance callback throws an error", firstException.get());}
这块还有一个细节,大家可以看看对CooperativeStickyAssignor策略的特殊处理,如果判断取消的分区不为空,则需要重新加入组,
这是为什么呢?答案就在发送joinGroup之前onJoinPrepare方法中,具体代码如下。在这里会判断如果协议类型为COOPERATIVE,则优先
更新内存中的分配信息。
我们假设一个场景:
- topic_1:0,1,2
- consume1,consume2,consume3
- 优先启consume1跟consume2,待分配完成之后,应该是consume1分两个分区,consume2分一个分区,然后再启consume3,consume1跟consume2
会重新发送加入组请求,由于绑定的topic没有变化,所以不会撤销分区,而这次重新分配后肯定是一个消费者分一个分区,到consume1执行到onJoinPrepare
方法,会发现需要撤销一个分区,然后会触发重新加入组的逻辑,这里已验证的确会触发两次,这么做无非就是保证消费者在Rebalance期间还能正常消费。
//org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#onJoinPrepareprotected void onJoinPrepare(int generation, String memberId) {//……省略 //在重新加入组之前会根据重分配的协议判断是否重置分区信息switch (protocol) {case EAGER:// revoke all partitionsrevokedPartitions = new HashSet<>(subscriptions.assignedPartitions());exception = invokePartitionsRevoked(revokedPartitions);subscriptions.assignFromSubscribed(Collections.emptySet());break;case COOPERATIVE:// only revoke those partitions that are not in the subscription any more.Set<TopicPartition> ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());revokedPartitions = ownedPartitions.stream().filter(tp -> !subscriptions.subscription().contains(tp.topic())).collect(Collectors.toSet());if (!revokedPartitions.isEmpty()) {exception = invokePartitionsRevoked(revokedPartitions);ownedPartitions.removeAll(revokedPartitions);subscriptions.assignFromSubscribed(ownedPartitions);}break;}}//……省略
总结消费者消费全流程
流程图如下:

本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!
