聊聊 Kafka: Consumer 源码解析之 Consumer 如何加入 Consumer Group

共 34045字,需浏览 69分钟

 ·

2021-12-13 09:48

一、前言

今天这一篇我们来说一下 Consumer 是如何加入 Consumer Group 的,我们前面有一篇 Kafka 的架构文章有说到,Consumer 有消费组(Consumer Group)的概念,而 Producer 没有生产组的概念。所以说 Consumer 侧会比 Producer 侧复杂点,除了消费者有消费组的概念,还需要维护管理 offset 偏移量、重复消费等问题。

与消费组相关的两个组件,一个是消费者客户端的 ConsumerCoordinator,一个是 Kafka Broker 服务端的 GroupCoordinator。ConsumerCoordinator 负责与 GroupCoordinator 通信,Broker 启动的时候,都会启动一个 GroupCoordinator 实例,而一个集群中,会有多个 Broker,那么如何确定一个新的 Consumer 加入 Consumer Group 后,到底和哪个 Broker 上的 GroupCoordinator 进行交互呢?

别急,聪明的程序员肯定是有办法的,我们还是先来说一下 GroupCoordinator 吧。

二、GroupCoordinator

别问,问就是有相应的算法和策略。那我们就来看下是啥算法和策略实现 Consumer 正确找到 GroupCoordinator 的,这就和 Kafka 内部的 Topic __consumer_offsets 有关系了。

2.1 __consumer_offsets

__consumer_offsets 这个内部 Topic,专门用来存储 Consumer Group 消费的情况,默认情况下有 50 个 partition,每个 partition 默认三个副本。如下图所示:


2.2 Consumer 如何找到 GroupCoordinator 的?

每个 Consumer Group 都有其对应的 GroupCoordinator,当一个新的 Consumer 要寻找和它交互的 GroupCoordinator 时,需要先对它的 GroupId 进行 hash,然后取模 __consumer_offsets 的 partition 数量,最后得到的值就是对应 partition,那么这个 partition 的 leader 所在的 broker 即为这个 Consumer Group 要交互的 GroupCoordinator 所在的节点。获取 partition 公式如下:

abs(GroupId.hashCode()) % NumPartitions

举个例子,假设一个 GroupId 计算出来的 hashCode 是 8,之后取模 50 得到 8。那么 partition-8 的 leader 所在的 broker 就是我们要找的那个节点。这个 Consumer Group 后面都会直接和该 broker 上的 GroupCoordinator 交互。

三、Group 状态变更

说 Consumer 加入 Consumer Group 流程之前,老周觉得有必要先说一下 Consumer Group 的状态变更。

3.1 消费端

在协调器 AbstractCoordinator 中的内部类 MemberState 中我们可以看到协调器的四种状态,分别是未注册、重分配后没收到响应、重分配后收到响应但还没有收到分配、稳定状态。


上述消费端的四种状态的转换如下图所示:

3.2 服务端

对于 Kafka 服务端的组则有五种状态 Empty、PreparingRebalance、CompletingRebalance、Stable、Dead。他们的状态转换如下图所示:


四、Consumer 加入 Consumer Group 流程

说 Consumer 如何加入 Consumer Group 之前,我们还是先来回顾下上一篇消息消费的测试案例。


核心方法是 poll() 方法,我们这里简单提一下,后面我们会详细介绍 Consumer 关于 poll 的网络模型。


Consumer 如何加入 Consumer Group 的,我们得来看啥时候与 GroupCoordinator 交互通信的,不难发现在消息拉取请求做准备 updateAssignmentMetadataIfNeeded() 这个方法里。

然后关于对 ConsumerCoordinator 的处理都集中在 coordinator.poll() 方法中。

我们来跟一下这两个方法:

// org.apache.kafka.clients.consumer.KafkaConsumer#updateAssignmentMetadataIfNeeded(org.apache.kafka.common.utils.Timer, boolean)
boolean updateAssignmentMetadataIfNeeded(final Timer timer, final boolean waitForJoinGroup) {
    if (coordinator != null && !coordinator.poll(timer, waitForJoinGroup)) {
        return false;
    }

    return updateFetchPositions(timer);
}

// org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(org.apache.kafka.common.utils.Timer, boolean)
public boolean poll(Timer timer, boolean waitForJoinGroup) {
    maybeUpdateSubscriptionMetadata();

    // 执行已完成(异步提交)的 offset 提交请求的回调函数。
    invokeCompletedOffsetCommitCallbacks();

    // 队列负载算法为自动分配(即 Kafka 根据消费者个数与分区数动态负载分区)
    if (subscriptions.hasAutoAssignedPartitions()) {
        if (protocol == null) {
            throw new IllegalStateException("User configured " + ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG +
                " to empty while trying to subscribe for group protocol to auto assign partitions");
        }
        // Always update the heartbeat last poll time so that the heartbeat thread does not leave the
        // group proactively due to application inactivity even if (say) the coordinator cannot be found.
        // 维护与 broker 端的心跳请求,确保不会被“踢出”消费组。
        // 检查心跳线程运行是否正常,如果心跳线程失败则抛出异常,反之则更新 poll 调用时间。
        pollHeartbeat(timer.currentTimeMs());
        // 如果不存在协调器和协调器已断开连接,则返回 false,结束本次拉取。如果协调器就绪,则继续往下走。
        if (coordinatorUnknown() && !ensureCoordinatorReady(timer)) {
            return false;
        }

        // 判断是否需要触发重平衡,即消费组内的所有消费者重新分配 topic 中的分区信息。
        if (rejoinNeededOrPending()) {
            // due to a race condition between the initial metadata fetch and the initial rebalance,
            // we need to ensure that the metadata is fresh before joining initially. This ensures
            // that we have matched the pattern against the cluster's topics at least once before joining.
            if (subscriptions.hasPatternSubscription()) {
                // For consumer group that uses pattern-based subscription, after a topic is created,
                // any consumer that discovers the topic after metadata refresh can trigger rebalance
                // across the entire consumer group. Multiple rebalances can be triggered after one topic
                // creation if consumers refresh metadata at vastly different times. We can significantly
                // reduce the number of rebalances caused by single topic creation by asking consumer to
                // refresh metadata before re-joining the group as long as the refresh backoff time has
                // passed.
                if (this.metadata.timeToAllowUpdate(timer.currentTimeMs()) == 0) {
                    // 更新元数据
                    this.metadata.requestUpdate();
                }

                if (!client.ensureFreshMetadata(timer)) {
                    return false;
                }

                maybeUpdateSubscriptionMetadata();
            }

            // if not wait for join group, we would just use a timer of 0
            // 发送 join-group、sync-group 请求,加入 group 并获取其 assign 的 TopicPartition list。
            if (!ensureActiveGroup(waitForJoinGroup ? timer : time.timer(0L))) {
                // since we may use a different timer in the callee, we'd still need
                // to update the original timer's current time after the call
                timer.update(time.milliseconds());

                return false;
            }
        }
    } else {
        // For manually assigned partitions, if there are no ready nodes, await metadata.
        // If connections to all nodes fail, wakeups triggered while attempting to send fetch
        // requests result in polls returning immediately, causing a tight loop of polls. Without
        // the wakeup, poll() with no channels would block for the timeout, delaying re-connection.
        // awaitMetadataUpdate() initiates new connections with configured backoff and avoids the busy loop.
        // When group management is used, metadata wait is already performed for this scenario as
        // coordinator is unknown, hence this check is not required.
        // 用户手动为消费组指定负载的队列的相关处理逻辑
        // 如果需要更新元数据,并且还没有分区准备好,则同步阻塞等待元数据更新完毕。
        if (metadata.updateRequested() && !client.hasReadyNodes(timer.currentTimeMs())) {
            client.awaitMetadataUpdate(timer);
        }
    }

    // 如果开启了自动提交消费进度,并且已到下一次提交时间,则提交。
    maybeAutoCommitOffsetsAsync(timer.currentTimeMs());
    return true;
}

org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#poll(org.apache.kafka.common.utils.Timer, boolean) 方法中,具体可以分为以下几个步骤:

  • 检测心跳线程运行是否正常 (需要定时向 GroupCoordinator 发送心跳,在建立连接之后,建立连接之前不会做任何事情)

  • 如果不存在协调器和协调器已断开连接,则返回 false,结束本次拉取。如果 coordinator 未知,就初始化 ConsumerCoordinator (在 ensureCoordinatorReady() 中实现)

  • 判断是否需要触发重平衡,即消费组内的所有消费者重新分配 topic 中的分区信息。

  • 通过 ensureActiveGroup() 发送 join-group、sync-group 请求,加入 group 并获取其 assign 的 TopicPartition list。

  • 如果需要更新元数据,并且还没有分区准备好,则同步阻塞等待元数据更新完毕。

  • 如果开启了自动提交消费进度,并且已到下一次提交时间,则提交。

其中,有几个地方需要详细介绍,那就是 ensureCoordinatorReady() 方法、rejoinNeededOrPending() 方法和 ensureActiveGroup() 方法。

五、ensureCoordinatorReady()

这个方法的作用是:选择一个连接数最小的 broker,向其发送 GroupCoordinator 请求,并建立相应的 TCP 连接。

  • 方法调用流程是:ensureCoordinatorReady() -> lookupCoordinator() -> sendFindCoordinatorRequest()。

  • 如果 client 获取到 Server response,那么就会与 GroupCoordinator 建立连接。

5.1 org.apache.kafka.clients.consumer.internals.AbstractCoordinator#ensureCoordinatorReady

5.2 org.apache.kafka.clients.consumer.internals.AbstractCoordinator#lookupCoordinator

5.3 org.apache.kafka.clients.consumer.internals.AbstractCoordinator#sendFindCoordinatorRequest

5.4 小结

  • 选择一个连接最小的节点,发送 FindCoordinator request 请求,并对 response 进行处理。

  • FindCoordinatorRequest 这个请求会使用 group id 通过 ConsumerNetworkClient.send() 来查找对应的 GroupCoordinator 节点。(当然 ConsumerNetworkClient.send() 也是采用的 Java NIO 的机制,我们前面的文章有说到过)

  • 如果正确获取 GroupCoordinator 时(会返回其对应的 node id、host 和 port 信息),建立连接,并更新心跳时间。

六、rejoinNeededOrPending()


关于 rejoin, 下列几种情况会触发再均衡 reblance 操作

  • 新的消费者加入消费组 (第一次进行消费也属于这种情况)

  • 消费者宕机下线 (长时间未发送心跳包)

  • 消费者主动退出消费组,比如调用 unsubscrible() 方法取消对主题的订阅

  • 消费组对应的 GroupCoordinator 节点发生了变化

  • 消费组内所订阅的任一主题或者主题的分区数量发生了变化

七、ensureActiveGroup()

现在我们已经知道了 GroupCoordinator 节点,并建立了连接。ensureActiveGroup() 这个方法的主要作用是向 GroupCoordinator 发送 join-group、sync-group 请求,获取 assign 的 TopicPartition list。

  • 方法调用流程是:ensureActiveGroup() -> ensureCoordinatorReady() -> startHeartbeatThreadIfNeeded() -> joinGroupIfNeeded()

  • joinGroupIfNeeded() 方法中最重要的方法是 initiateJoinGroup(),它的调用流程是 sendJoinGroupRequest() -> JoinGroupResponseHandler.handle() -> onJoinLeader()、onJoinFollower() -> sendSyncGroupRequest()

/**
 * 确保 Group 是 active,并且加入该 group。
 * Ensure the group is active (i.e., joined and synced)
 *
 * @param timer Timer bounding how long this method can block
 * @throws KafkaException if the callback throws exception
 * @return true iff the group is active
 */

boolean ensureActiveGroup(final Timer timer) {
    // always ensure that the coordinator is ready because we may have been disconnected
    // when sending heartbeats and does not necessarily require us to rejoin the group.
    // 确保 GroupCoordinator 已经连接
    if (!ensureCoordinatorReady(timer)) {
        return false;
    }

    // 启动心跳发送线程(并不一定发送心跳,满足条件后才会发送心跳)
    startHeartbeatThreadIfNeeded();
    // 发送 JoinGroup 请求,并对返回的信息进行处理。
    return joinGroupIfNeeded(timer);
}

7.1 joinGroupIfNeeded()

join-group 的请求是在 joinGroupIfNeeded() 中实现的。

/**
 * Joins the group without starting the heartbeat thread.
 *
 * If this function returns true, the state must always be in STABLE and heartbeat enabled.
 * If this function returns false, the state can be in one of the following:
 *  * UNJOINED: got error response but times out before being able to re-join, heartbeat disabled
 *  * PREPARING_REBALANCE: not yet received join-group response before timeout, heartbeat disabled
 *  * COMPLETING_REBALANCE: not yet received sync-group response before timeout, hearbeat enabled
 *
 * Visible for testing.
 *
 * @param timer Timer bounding how long this method can block
 * @throws KafkaException if the callback throws exception
 * @return true iff the operation succeeded
 */

boolean joinGroupIfNeeded(final Timer timer) {
    while (rejoinNeededOrPending()) {
        if (!ensureCoordinatorReady(timer)) {
            return false;
        }

        // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
        // time if the client is woken up before a pending rebalance completes. This must be called
        // on each iteration of the loop because an event requiring a rebalance (such as a metadata
        // refresh which changes the matched subscription set) can occur while another rebalance is
        // still in progress.
        // 触发 onJoinPrepare, 包括 offset commit 和 rebalance listener。
        if (needsJoinPrepare) {
            // need to set the flag before calling onJoinPrepare since the user callback may throw
            // exception, in which case upon retry we should not retry onJoinPrepare either.
            needsJoinPrepare = false;
            onJoinPrepare(generation.generationId, generation.memberId);
        }

        // 初始化 JoinGroup 请求,并发送该请求。
        final RequestFuture future = initiateJoinGroup();
        client.poll(future, timer);
        if (!future.isDone()) {
            // we ran out of time
            return false;
        }

        if (future.succeeded()) {
            Generation generationSnapshot;
            MemberState stateSnapshot;

            // Generation data maybe concurrently cleared by Heartbeat thread.
            // Can't use synchronized for {@code onJoinComplete}, because it can be long enough
            // and shouldn't block heartbeat thread.
            // See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment}
            synchronized (AbstractCoordinator.this) {
                generationSnapshot = this.generation;
                stateSnapshot = this.state;
            }

            if (!generationSnapshot.equals(Generation.NO_GENERATION) && stateSnapshot == MemberState.STABLE) {
                // Duplicate the buffer in case `onJoinComplete` does not complete and needs to be retried.
                ByteBuffer memberAssignment = future.value().duplicate();

                // 发送完成
                onJoinComplete(generationSnapshot.generationId, generationSnapshot.memberId, generationSnapshot.protocolName, memberAssignment);

                // Generally speaking we should always resetJoinGroupFuture once the future is done, but here
                // we can only reset the join group future after the completion callback returns. This ensures
                // that if the callback is woken up, we will retry it on the next joinGroupIfNeeded.
                // And because of that we should explicitly trigger resetJoinGroupFuture in other conditions below.
                // 重置 joinGroupFuture 为空
                resetJoinGroupFuture();
                needsJoinPrepare = true;
            } else {
                log.info("Generation data was cleared by heartbeat thread to {} and state is now {} before " +
                     "the rebalance callback is triggered, marking this rebalance as failed and retry",
                     generationSnapshot, stateSnapshot);
                resetStateAndRejoin();
                resetJoinGroupFuture();
            }
        } else {
            final RuntimeException exception = future.exception();

            // we do not need to log error for memberId required,
            // since it is not really an error and is transient
            if (!(exception instanceof MemberIdRequiredException)) {
                log.info("Rebalance failed.", exception);
            }

            resetJoinGroupFuture();
            if (exception instanceof UnknownMemberIdException ||
                exception instanceof RebalanceInProgressException ||
                exception instanceof IllegalGenerationException ||
                exception instanceof MemberIdRequiredException)
                continue;
            else if (!future.isRetriable())
                throw exception;

            resetStateAndRejoin();
            timer.sleep(rebalanceConfig.retryBackoffMs);
        }
    }
    return true;
}

7.2 initiateJoinGroup()

joinGroupIfNeeded() 方法中最重要的方法是 initiateJoinGroup(),我们来看下:

private synchronized RequestFuture initiateJoinGroup() {
    // we store the join future in case we are woken up by the user after beginning the
    // rebalance in the call to poll below. This ensures that we do not mistakenly attempt
    // to rejoin before the pending rebalance has completed.
    if (joinFuture == null) {
        // 状态标记为 rebalance
        state = MemberState.PREPARING_REBALANCE;
        // a rebalance can be triggered consecutively if the previous one failed,
        // in this case we would not update the start time.
        if (lastRebalanceStartMs == -1L)
            lastRebalanceStartMs = time.milliseconds();
        // 发送 JoinGroup 请求
        joinFuture = sendJoinGroupRequest();
        joinFuture.addListener(new RequestFutureListener() {
            @Override
            public void onSuccess(ByteBuffer value) {
                // do nothing since all the handler logic are in SyncGroupResponseHandler already
            }

            @Override
            public void onFailure(RuntimeException e) {
                // we handle failures below after the request finishes. if the join completes
                // after having been woken up, the exception is ignored and we will rejoin;
                // this can be triggered when either join or sync request failed
                synchronized (AbstractCoordinator.this) {
                    sensors.failedRebalanceSensor.record();
                }
            }
        });
    }
    return joinFuture;
}

7.3 sendJoinGroupRequest() :join-group 请求

继续跟 sendJoinGroupRequest() 方法

/**
 *  发送 JoinGroup 请求并返回 the assignment for the next generation(这个是在 JoinGroupResponseHandler 中做的)
 *
 * Join the group and return the assignment for the next generation. This function handles both
 * JoinGroup and SyncGroup, delegating to {@link #performAssignment(String, String, List)} if
 * elected leader by the coordinator.
 *
 * NOTE: This is visible only for testing
 *
 * @return A request future which wraps the assignment returned from the group leader
 */

RequestFuture sendJoinGroupRequest() {
    if (coordinatorUnknown())
        return RequestFuture.coordinatorNotAvailable();

    // send a join group request to the coordinator
    log.info("(Re-)joining group");
    JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
            new JoinGroupRequestData()
                    .setGroupId(rebalanceConfig.groupId)
                    .setSessionTimeoutMs(this.rebalanceConfig.sessionTimeoutMs)
                    .setMemberId(this.generation.memberId)
                    .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
                    .setProtocolType(protocolType())
                    .setProtocols(metadata())
                    .setRebalanceTimeoutMs(this.rebalanceConfig.rebalanceTimeoutMs)
    );

    log.debug("Sending JoinGroup ({}) to coordinator {}", requestBuilder, this.coordinator);

    // Note that we override the request timeout using the rebalance timeout since that is the
    // maximum time that it may block on the coordinator. We add an extra 5 seconds for small delays.
    int joinGroupTimeoutMs = Math.max(client.defaultRequestTimeoutMs(),
        rebalanceConfig.rebalanceTimeoutMs + JOIN_GROUP_TIMEOUT_LAPSE);
    return client.send(coordinator, requestBuilder, joinGroupTimeoutMs)
            .compose(new JoinGroupResponseHandler(generation));
}

// 处理 JoinGroup response 的 handler(同步 group 信息)
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponseByteBuffer{
    private JoinGroupResponseHandler(final Generation generation) {
        super(generation);
    }

    @Override
    public void handle(JoinGroupResponse joinResponse, RequestFuture future) {
        Errors error = joinResponse.error();
        if (error == Errors.NONE) {
            if (isProtocolTypeInconsistent(joinResponse.data().protocolType())) {
                log.error("JoinGroup failed: Inconsistent Protocol Type, received {} but expected {}",
                    joinResponse.data().protocolType(), protocolType());
                future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
            } else {
                log.debug("Received successful JoinGroup response: {}", joinResponse);
                sensors.joinSensor.record(response.requestLatencyMs());

                synchronized (AbstractCoordinator.this) {
                    // 如果此时 Consumer 的状态不是 rebalacing,就引起异常。
                    if (state != MemberState.PREPARING_REBALANCE) {
                        // if the consumer was woken up before a rebalance completes, we may have already left
                        // the group. In this case, we do not want to continue with the sync group.
                        future.raise(new UnjoinedGroupException());
                    } else {
                        state = MemberState.COMPLETING_REBALANCE;

                        // we only need to enable heartbeat thread whenever we transit to
                        // COMPLETING_REBALANCE state since we always transit from this state to STABLE
                        if (heartbeatThread != null)
                            heartbeatThread.enable();

                        AbstractCoordinator.this.generation = new Generation(
                            joinResponse.data().generationId(),
                            joinResponse.data().memberId(), joinResponse.data().protocolName());

                        log.info("Successfully joined group with generation {}", AbstractCoordinator.this.generation);

                        // join group 成功,下面需要进行 sync-group,获取分配的 tp 列表。
                        if (joinResponse.isLeader()) {
                            // 当 consumer 客户端为 leader 时,对 group 下的所有实例进行分配,将 assign 的结果发送到 GroupCoordinator。
                            onJoinLeader(joinResponse).chain(future);
                        } else {
                            // 当 consumer 为 follower 时,从 GroupCoordinator 拉取分配结果。
                            onJoinFollower().chain(future);
                        }
                    }
                }
            }
        } else if (error == Errors.COORDINATOR_LOAD_IN_PROGRESS) {
            log.info("JoinGroup failed: Coordinator {} is loading the group.", coordinator());
            // backoff and retry
            future.raise(error);
        } else if (error == Errors.UNKNOWN_MEMBER_ID) {
            log.info("JoinGroup failed: {} Need to re-join the group. Sent generation was {}",
                     error.message(), sentGeneration);
            // only need to reset the member id if generation has not been changed,
            // then retry immediately
            if (generationUnchanged())
                resetGenerationOnResponseError(ApiKeys.JOIN_GROUP, error);

            future.raise(error);
        } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                || error == Errors.NOT_COORDINATOR) {
            // re-discover the coordinator and retry with backoff
            markCoordinatorUnknown(error);
            log.info("JoinGroup failed: {} Marking coordinator unknown. Sent generation was {}",
                      error.message(), sentGeneration);
            future.raise(error);
        } else if (error == Errors.FENCED_INSTANCE_ID) {
            // for join-group request, even if the generation has changed we would not expect the instance id
            // gets fenced, and hence we always treat this as a fatal error
            log.error("JoinGroup failed: The group instance id {} has been fenced by another instance. " +
                          "Sent generation was {}", rebalanceConfig.groupInstanceId, sentGeneration);
            future.raise(error);
        } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL
                || error == Errors.INVALID_SESSION_TIMEOUT
                || error == Errors.INVALID_GROUP_ID
                || error == Errors.GROUP_AUTHORIZATION_FAILED
                || error == Errors.GROUP_MAX_SIZE_REACHED) {
            // log the error and re-throw the exception
            log.error("JoinGroup failed due to fatal error: {}", error.message());
            if (error == Errors.GROUP_MAX_SIZE_REACHED) {
                future.raise(new GroupMaxSizeReachedException("Consumer group " + rebalanceConfig.groupId +
                        " already has the configured maximum number of members."));
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
            } else {
                future.raise(error);
            }
        } else if (error == Errors.UNSUPPORTED_VERSION) {
            log.error("JoinGroup failed due to unsupported version error. Please unset field group.instance.id " +
                      "and retry to see if the problem resolves");
            future.raise(error);
        } else if (error == Errors.MEMBER_ID_REQUIRED) {
            // Broker requires a concrete member id to be allowed to join the group. Update member id
            // and send another join group request in next cycle.
            String memberId = joinResponse.data().memberId();
            log.debug("JoinGroup failed due to non-fatal error: {} Will set the member id as {} and then rejoin. " +
                          "Sent generation was  {}", error, memberId, sentGeneration);
            synchronized (AbstractCoordinator.this) {
                AbstractCoordinator.this.generation = new Generation(OffsetCommitRequest.DEFAULT_GENERATION_ID, memberId, null);
            }
            future.raise(error);
        } else {
            // unexpected error, throw the exception
            log.error("JoinGroup failed due to unexpected error: {}", error.message());
            future.raise(new KafkaException("Unexpected error in join group response: " + error.message()));
        }
    }
}

sendJoinGroupRequest():向 GroupCoordinator 发送 join-group 请求

  • 如果 group 是新的 group.id,那么此时 group 初始化的状态为 Empty

  • 当 GroupCoordinator 接收到 consumer 的 join-group 请求后,由于此时这个 group 的 member 列表还是空(group 是新建的,每个 consumer 实例被称为这个 group 的一个 member),第一个加入的 member 将被选为 leader,也就是说,对于一个新的 consumer group 而言,当第一个 consumer 实例加入后将会被选为 leader。

  • 如果 GroupCoordinator 接收到 leader 发送 join-group 请求,将会触发 rebalance,group 的状态变为 PreparingRebalance

  • 此时,GroupCoordinator 将会等待一定的时间,如果在一定时间内,接收到 join-group 请求的 consumer 将被认为是依然存活的,此时 group 会变为 AwaitSync 状态,并且 GroupCoordinator 会向这个 group 的所有 member 返回其 response。

  • consumer 在接收到 GroupCoordinator 的 response 后,如果这个 consumer 是 group 的 leader,那么这个 consumer 将会负责为整个 group assign partition 订阅安排(默认是按 range 的策略,目前也可选 RoundRobin),然后 leader 将分配后的信息以 sendSyncGroupRequest() 请求的方式发给 GroupCoordinator,而作为 follower 的 consumer 实例会发送一个空列表。

  • GroupCoordinator 在接收到 leader 发来的请求后,会将 assign 的结果返回给所有已经发送 sync-group 请求的 consumer 实例,并且 group 的状态将会转变为 Stable,如果后续再收到 sync-group 请求,由于 group 的状态已经是 Stable,将会直接返回其分配结果。

7.4 sendSyncGroupRequest() :sync-group 请求

sync-group 发送请求核心代码如下:

// 当 consumer 客户端为 leader 时,对 group 下的所有实例进行分配,将 assign 的结果发送到 GroupCoordinator。
private RequestFuture onJoinLeader(JoinGroupResponse joinResponse) {
    try {
        // perform the leader synchronization and send back the assignment for the group
        // 进行 assign 操作
        Map groupAssignment = performAssignment(joinResponse.data().leader(), joinResponse.data().protocolName(),
                joinResponse.data().members());

        List groupAssignmentList = new ArrayList<>();
        for (Map.Entry assignment : groupAssignment.entrySet()) {
            groupAssignmentList.add(new SyncGroupRequestData.SyncGroupRequestAssignment()
                    .setMemberId(assignment.getKey())
                    .setAssignment(Utils.toArray(assignment.getValue()))
            );
        }

        SyncGroupRequest.Builder requestBuilder =
                new SyncGroupRequest.Builder(
                        new SyncGroupRequestData()
                                .setGroupId(rebalanceConfig.groupId)
                                .setMemberId(generation.memberId)
                                .setProtocolType(protocolType())
                                .setProtocolName(generation.protocolName)
                                .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
                                .setGenerationId(generation.generationId)
                                .setAssignments(groupAssignmentList)
                );
        log.debug("Sending leader SyncGroup to coordinator {} at generation {}: {}"this.coordinator, this.generation, requestBuilder);
        // 发送 sync-group 请求
        return sendSyncGroupRequest(requestBuilder);
    } catch (RuntimeException e) {
        return RequestFuture.failure(e);
    }
}

// 当 consumer 为 follower 时,从 GroupCoordinator 拉取分配结果。
private RequestFuture onJoinFollower() {
    // send follower's sync group with an empty assignment
    SyncGroupRequest.Builder requestBuilder =
            new SyncGroupRequest.Builder(
                    new SyncGroupRequestData()
                            .setGroupId(rebalanceConfig.groupId)
                            .setMemberId(generation.memberId)
                            .setProtocolType(protocolType())
                            .setProtocolName(generation.protocolName)
                            .setGroupInstanceId(this.rebalanceConfig.groupInstanceId.orElse(null))
                            .setGenerationId(generation.generationId)
                            .setAssignments(Collections.emptyList())
            );
    log.debug("Sending follower SyncGroup to coordinator {} at generation {}: {}"this.coordinator, this.generation, requestBuilder);
    return sendSyncGroupRequest(requestBuilder);
}

// 发送 SyncGroup 请求,获取对 partition 分配的安排。
private RequestFuture sendSyncGroupRequest(SyncGroupRequest.Builder requestBuilder) {
    if (coordinatorUnknown())
        return RequestFuture.coordinatorNotAvailable();
    return client.send(coordinator, requestBuilder)
            .compose(new SyncGroupResponseHandler(generation));
}

private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponseByteBuffer{
    private SyncGroupResponseHandler(final Generation generation) {
        super(generation);
    }

    @Override
    public void handle(SyncGroupResponse syncResponse,
                       RequestFuture future)
 
{
        Errors error = syncResponse.error();
        // 同步成功
        if (error == Errors.NONE) {
            if (isProtocolTypeInconsistent(syncResponse.data.protocolType())) {
                log.error("SyncGroup failed due to inconsistent Protocol Type, received {} but expected {}",
                    syncResponse.data.protocolType(), protocolType());
                future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
            } else {
                log.debug("Received successful SyncGroup response: {}", syncResponse);
                sensors.syncSensor.record(response.requestLatencyMs());

                synchronized (AbstractCoordinator.this) {
                    if (!generation.equals(Generation.NO_GENERATION) && state == MemberState.COMPLETING_REBALANCE) {
                        // check protocol name only if the generation is not reset
                        final String protocolName = syncResponse.data.protocolName();
                        final boolean protocolNameInconsistent = protocolName != null &&
                            !protocolName.equals(generation.protocolName);

                        if (protocolNameInconsistent) {
                            log.error("SyncGroup failed due to inconsistent Protocol Name, received {} but expected {}",
                                protocolName, generation.protocolName);

                            future.raise(Errors.INCONSISTENT_GROUP_PROTOCOL);
                        } else {
                            log.info("Successfully synced group in generation {}", generation);
                            state = MemberState.STABLE;
                            rejoinNeeded = false;
                            // record rebalance latency
                            lastRebalanceEndMs = time.milliseconds();
                            sensors.successfulRebalanceSensor.record(lastRebalanceEndMs - lastRebalanceStartMs);
                            lastRebalanceStartMs = -1L;

                            future.complete(ByteBuffer.wrap(syncResponse.data.assignment()));
                        }
                    } else {
                        log.info("Generation data was cleared by heartbeat thread to {} and state is now {} before " +
                            "receiving SyncGroup response, marking this rebalance as failed and retry",
                            generation, state);
                        // use ILLEGAL_GENERATION error code to let it retry immediately
                        future.raise(Errors.ILLEGAL_GENERATION);
                    }
                }
            }
        } else {
            // join 的标志位设置为 true
            requestRejoin();

            if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(GroupAuthorizationException.forGroupId(rebalanceConfig.groupId));
            } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                log.info("SyncGroup failed: The group began another rebalance. Need to re-join the group. " +
                             "Sent generation was {}", sentGeneration);
                future.raise(error);
            } else if (error == Errors.FENCED_INSTANCE_ID) {
                // for sync-group request, even if the generation has changed we would not expect the instance id
                // gets fenced, and hence we always treat this as a fatal error
                log.error("SyncGroup failed: The group instance id {} has been fenced by another instance. " +
                    "Sent generation was {}", rebalanceConfig.groupInstanceId, sentGeneration);
                future.raise(error);
            } else if (error == Errors.UNKNOWN_MEMBER_ID
                    || error == Errors.ILLEGAL_GENERATION) {
                log.info("SyncGroup failed: {} Need to re-join the group. Sent generation was {}",
                        error.message(), sentGeneration);
                if (generationUnchanged())
                    resetGenerationOnResponseError(ApiKeys.SYNC_GROUP, error);

                future.raise(error);
            } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                    || error == Errors.NOT_COORDINATOR) {
                log.info("SyncGroup failed: {} Marking coordinator unknown. Sent generation was {}",
                         error.message(), sentGeneration);
                markCoordinatorUnknown(error);
                future.raise(error);
            } else {
                future.raise(new KafkaException("Unexpected error from SyncGroup: " + error.message()));
            }
        }
    }
}

7.5 onJoinComplete()

经过上面的步骤,一个 consumer 实例就已经加入 group 成功了,加入 group 成功后,将会触发ConsumerCoordinator 的 onJoinComplete() 方法,其作用就是:更新订阅的 tp 列表以及更新其对应的 metadata。

// 加入 group 成功
@Override
protected void onJoinComplete(int generation,
                              String memberId,
                              String assignmentStrategy,
                              ByteBuffer assignmentBuffer)
 
{
    log.debug("Executing onJoinComplete with generation {} and memberId {}", generation, memberId);

    // Only the leader is responsible for monitoring for metadata changes (i.e. partition changes)
    if (!isLeader)
        assignmentSnapshot = null;

    ConsumerPartitionAssignor assignor = lookupAssignor(assignmentStrategy);
    if (assignor == null)
        throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);

    // Give the assignor a chance to update internal state based on the received assignment
    groupMetadata = new ConsumerGroupMetadata(rebalanceConfig.groupId, generation, memberId, rebalanceConfig.groupInstanceId);

    // 更新订阅的 tp list
    Set ownedPartitions = new HashSet<>(subscriptions.assignedPartitions());

    // should at least encode the short version
    if (assignmentBuffer.remaining() < 2)
        throw new IllegalStateException("There are insufficient bytes available to read assignment from the sync-group response (" +
            "actual byte size " + assignmentBuffer.remaining() + ") , this is not expected; " +
            "it is possible that the leader's assign function is buggy and did not return any assignment for this member, " +
            "or because static member is configured and the protocol is buggy hence did not get the assignment for this member");

    Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);

    Set assignedPartitions = new HashSet<>(assignment.partitions());

    if (!subscriptions.checkAssignmentMatchedSubscription(assignedPartitions)) {
        log.warn("We received an assignment {} that doesn't match our current subscription {}; it is likely " +
            "that the subscription has changed since we joined the group. Will try re-join the group with current subscription",
            assignment.partitions(), subscriptions.prettyString());

        requestRejoin();

        return;
    }

    final AtomicReference firstException = new AtomicReference<>(null);
    Set addedPartitions = new HashSet<>(assignedPartitions);
    addedPartitions.removeAll(ownedPartitions);

    if (protocol == RebalanceProtocol.COOPERATIVE) {
        Set revokedPartitions = new HashSet<>(ownedPartitions);
        revokedPartitions.removeAll(assignedPartitions);

        log.info("Updating assignment with\n" +
                "\tAssigned partitions:                       {}\n" +
                "\tCurrent owned partitions:                  {}\n" +
                "\tAdded partitions (assigned - owned):       {}\n" +
                "\tRevoked partitions (owned - assigned):     {}\n",
            assignedPartitions,
            ownedPartitions,
            addedPartitions,
            revokedPartitions
        );

        if (!revokedPartitions.isEmpty()) {
            // Revoke partitions that were previously owned but no longer assigned;
            // note that we should only change the assignment (or update the assignor's state)
            // AFTER we've triggered  the revoke callback
            firstException.compareAndSet(null, invokePartitionsRevoked(revokedPartitions));

            // If revoked any partitions, need to re-join the group afterwards
            log.info("Need to revoke partitions {} and re-join the group", revokedPartitions);
            requestRejoin();
        }
    }

    // The leader may have assigned partitions which match our subscription pattern, but which
    // were not explicitly requested, so we update the joined subscription here.
    // leader 可能已经分配了与我们的订阅模式匹配的分区,但没有明确请求,所以我们在这里更新加入的订阅。
    maybeUpdateJoinedSubscription(assignedPartitions);

    // Catch any exception here to make sure we could complete the user callback.
    firstException.compareAndSet(null, invokeOnAssignment(assignor, assignment));

    // Reschedule the auto commit starting from now
    // 如果自动提交开启,则更新自动提交间隔时间。
    if (autoCommitEnabled)
        this.nextAutoCommitTimer.updateAndReset(autoCommitIntervalMs);

    subscriptions.assignFromSubscribed(assignedPartitions);

    // Add partitions that were not previously owned but are now assigned
    firstException.compareAndSet(null, invokePartitionsAssigned(addedPartitions));

    if (firstException.get() != null) {
        if (firstException.get() instanceof KafkaException) {
            throw (KafkaException) firstException.get();
        } else {
            throw new KafkaException("User rebalance callback throws an error", firstException.get());
        }
    }
}

至此,一个 consumer 实例算是真正上意义上加入 group 成功。然后消费者就进入正常工作状态,同时消费者也通过向 GroupCoordinator 发送心跳来维持它们与消费者的从属关系以及它们对分区的所有权关系。只要以正常的间隔发送心跳,就被认为是活跃的,但是如果 GroupCoordinator 没有响应,那么就会发送 LeaveGroup 请求退出消费组。


Java

浏览 174
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报