Kafka消費(fèi)客戶端協(xié)調(diào)器GroupCoordinator詳解
協(xié)調(diào)器的生命周期
- 什么是協(xié)調(diào)器
- 協(xié)調(diào)器工作原理
- 協(xié)調(diào)器的Rebalance機(jī)制
GroupCoordinator的創(chuàng)建
在Kafka啟動的時(shí)候, 會自動創(chuàng)建并啟動GroupCoordinator
這個GroupCoordinator對象創(chuàng)建的時(shí)候傳入的幾個屬性需要介紹一下
new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time, metrics)
offsetConfig相關(guān)配置
private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig( maxMetadataSize = config.offsetMetadataMaxSize, loadBufferSize = config.offsetsLoadBufferSize, offsetsRetentionMs = config.offsetsRetentionMinutes * 60L * 1000L, offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs, offsetsTopicNumPartitions = config.offsetsTopicPartitions, offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes, offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor, offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec, offsetCommitTimeoutMs = config.offsetCommitTimeoutMs, offsetCommitRequiredAcks = config.offsetCommitRequiredAcks )
屬性 | 介紹 | 默認(rèn)值 |
---|---|---|
offset.metadata.max.bytes | ||
offsets.load.buffer.size | ||
offsets.retention.minutes | ||
offsets.retention.check.interval.ms | ||
offsets.topic.num.partitions | ||
offsets.commit.timeout.ms | ||
offsets.topic.segment.bytes | ||
offsets.topic.replication.factor | ||
offsets.topic.compression.codec | ||
offsets.commit.timeout.ms | ||
offsets.commit.required.acks |
groupConfig相關(guān)配置
屬性 | 介紹 | 默認(rèn)值 |
---|---|---|
group.min.session.timeout.ms | ||
group.max.session.timeout.ms | ||
group.initial.rebalance.delay.ms | ||
group.max.size | ||
group.initial.rebalance.delay.ms |
groupMetadataManager
組元信息管理類
heartbeatPurgatory
心跳監(jiān)測操作,每一秒執(zhí)行一次
joinPurgatory
GroupCoordinator的啟動
def startup(enableMetadataExpiration: Boolean = true): Unit = { info("Starting up.") groupManager.startup(enableMetadataExpiration) isActive.set(true) info("Startup complete.") }
這個啟動對于GroupCoordinator來說只是給屬性isActive
標(biāo)記為了true, 但是同時(shí)呢也調(diào)用了GroupMetadataManager.startup
定時(shí)清理Group元信息
這個Group元信息管理類呢啟動了一個定時(shí)任務(wù), 名字為:delete-expired-group-metadata
每隔600000ms的時(shí)候就執(zhí)行一下 清理過期組元信息的操作, 這個600000ms時(shí)間是代碼寫死的。
TODO:GroupMetadataManager#cleanupGroupMetadata
GroupCoordinator OnElection
當(dāng)內(nèi)部topic __consumer_offsets
有分區(qū)的Leader變更的時(shí)候,比如觸發(fā)了 LeaderAndIsr的請求, 發(fā)現(xiàn)分區(qū)Leader進(jìn)行了切換。
那么就會執(zhí)行 GroupCoordinator#OnElection
的接口, 這個接口會把任務(wù)丟個一個單線程的調(diào)度程序, 專門處理offset元數(shù)據(jù)緩存加載和卸載的。線程名稱前綴為group-metadata-manager-
,一個分區(qū)一個任務(wù)
最終執(zhí)行的任務(wù)內(nèi)容是:GroupMetadataManager#doLoadGroupsAndOffsets
__consumer_offsets
的key有兩種消息類型
key version 0: 消費(fèi)組消費(fèi)偏移量信息 -> value version 0: [offset, metadata, timestamp]
key version 1: 消費(fèi)組消費(fèi)偏移量信息-> value version 1: [offset, metadata, commit_timestamp, expire_timestamp]
key version 2: 消費(fèi)組的元信息 -> value version 0: [protocol_type, generation, protocol, leader,
例如 version:3 的schemaForGroupValue
Version-0
{ protocol_type: STRING, generation: INT32, protocol: NULLABLE_STRING, leader: NULLABLE_STRING, members: ARRAY({ member_id: STRING, client_id: STRING, client_host: STRING, session_timeout: INT32, subscription: BYTES, assignment: BYTES }) }
Version-1
{ protocol_type: STRING, generation: INT32, protocol: NULLABLE_STRING, leader: NULLABLE_STRING, members: ARRAY({ member_id: STRING, client_id: STRING, client_host: STRING, rebalance_timeout: INT32, session_timeout: INT32, subscription: BYTES, assignment: BYTES }) }
Version-2
{ protocol_type: STRING, generation: INT32, protocol: NULLABLE_STRING, leader: NULLABLE_STRING, current_state_timestamp: INT64, members: ARRAY({ member_id: STRING, client_id: STRING, client_host: STRING, rebalance_timeout: INT32, session_timeout: INT32, subscription: BYTES, assignment: BYTES }) }
Version-3
{ protocol_type: STRING, generation: INT32, protocol: NULLABLE_STRING, leader: NULLABLE_STRING, current_state_timestamp: INT64, members: ARRAY({ member_id: STRING, group_instance_id: NULLABLE_STRING, client_id: STRING, client_host: STRING, rebalance_timeout: INT32, session_timeout: INT32, subscription: BYTES, assignment: BYTES }) }
Value每個版本的 Scheme如下
private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema( new Field(PROTOCOL_TYPE_KEY, STRING), new Field(GENERATION_KEY, INT32), new Field(PROTOCOL_KEY, NULLABLE_STRING), new Field(LEADER_KEY, NULLABLE_STRING), new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V0))) private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema( new Field(PROTOCOL_TYPE_KEY, STRING), new Field(GENERATION_KEY, INT32), new Field(PROTOCOL_KEY, NULLABLE_STRING), new Field(LEADER_KEY, NULLABLE_STRING), new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1))) private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema( new Field(PROTOCOL_TYPE_KEY, STRING), new Field(GENERATION_KEY, INT32), new Field(PROTOCOL_KEY, NULLABLE_STRING), new Field(LEADER_KEY, NULLABLE_STRING), new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64), new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2))) private val GROUP_METADATA_VALUE_SCHEMA_V3 = new Schema( new Field(PROTOCOL_TYPE_KEY, STRING), new Field(GENERATION_KEY, INT32), new Field(PROTOCOL_KEY, NULLABLE_STRING), new Field(LEADER_KEY, NULLABLE_STRING), new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64), new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V3)))
GroupCoordinator onResignation
以上就是Kafka消費(fèi)客戶端協(xié)調(diào)器GroupCoordinator詳解的詳細(xì)內(nèi)容,更多關(guān)于Kafka GroupCoordinator的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章

使用SpringBoot進(jìn)行身份驗(yàn)證和授權(quán)的示例詳解

IntelliJ IDEA 安裝及初次使用圖文教程(2020.3.2社區(qū)版)

Struts2學(xué)習(xí)教程之輸入校驗(yàn)示例詳解

Java實(shí)現(xiàn)并發(fā)執(zhí)行定時(shí)任務(wù)并手動控制開始結(jié)束

使用Mybatis-plus實(shí)現(xiàn)對數(shù)據(jù)庫表的內(nèi)部字段進(jìn)行比較