亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

Kafka消費(fèi)客戶端協(xié)調(diào)器GroupCoordinator詳解

 更新時(shí)間:2022年10月18日 10:21:27   作者:石臻臻的雜貨鋪  
這篇文章主要為大家介紹了Kafka消費(fèi)客戶端協(xié)調(diào)器GroupCoordinator使用示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪

協(xié)調(diào)器的生命周期

  • 什么是協(xié)調(diào)器
  • 協(xié)調(diào)器工作原理
  • 協(xié)調(diào)器的Rebalance機(jī)制

GroupCoordinator的創(chuàng)建

在Kafka啟動的時(shí)候, 會自動創(chuàng)建并啟動GroupCoordinator

這個GroupCoordinator對象創(chuàng)建的時(shí)候傳入的幾個屬性需要介紹一下

    new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time, metrics)

offsetConfig相關(guān)配置

  private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig(
    maxMetadataSize = config.offsetMetadataMaxSize,
    loadBufferSize = config.offsetsLoadBufferSize,
    offsetsRetentionMs = config.offsetsRetentionMinutes * 60L * 1000L,
    offsetsRetentionCheckIntervalMs = config.offsetsRetentionCheckIntervalMs,
    offsetsTopicNumPartitions = config.offsetsTopicPartitions,
    offsetsTopicSegmentBytes = config.offsetsTopicSegmentBytes,
    offsetsTopicReplicationFactor = config.offsetsTopicReplicationFactor,
    offsetsTopicCompressionCodec = config.offsetsTopicCompressionCodec,
    offsetCommitTimeoutMs = config.offsetCommitTimeoutMs,
    offsetCommitRequiredAcks = config.offsetCommitRequiredAcks
  )
屬性介紹默認(rèn)值
offset.metadata.max.bytes  
offsets.load.buffer.size  
offsets.retention.minutes  
offsets.retention.check.interval.ms  
offsets.topic.num.partitions  
offsets.commit.timeout.ms  
offsets.topic.segment.bytes  
offsets.topic.replication.factor  
offsets.topic.compression.codec  
offsets.commit.timeout.ms  
offsets.commit.required.acks  

groupConfig相關(guān)配置

屬性介紹默認(rèn)值
group.min.session.timeout.ms  
group.max.session.timeout.ms  
group.initial.rebalance.delay.ms  
group.max.size  
group.initial.rebalance.delay.ms  

groupMetadataManager

組元信息管理類

heartbeatPurgatory

心跳監(jiān)測操作,每一秒執(zhí)行一次

joinPurgatory

GroupCoordinator的啟動

  def startup(enableMetadataExpiration: Boolean = true): Unit = {
    info("Starting up.")
    groupManager.startup(enableMetadataExpiration)
    isActive.set(true)
    info("Startup complete.")
  }

這個啟動對于GroupCoordinator來說只是給屬性isActive標(biāo)記為了true, 但是同時(shí)呢也調(diào)用了GroupMetadataManager.startup

定時(shí)清理Group元信息

這個Group元信息管理類呢啟動了一個定時(shí)任務(wù), 名字為:delete-expired-group-metadata

每隔600000ms的時(shí)候就執(zhí)行一下 清理過期組元信息的操作, 這個600000ms時(shí)間是代碼寫死的。

TODO:GroupMetadataManager#cleanupGroupMetadata

GroupCoordinator OnElection

當(dāng)內(nèi)部topic __consumer_offsets 有分區(qū)的Leader變更的時(shí)候,比如觸發(fā)了 LeaderAndIsr的請求, 發(fā)現(xiàn)分區(qū)Leader進(jìn)行了切換。

那么就會執(zhí)行 GroupCoordinator#OnElection 的接口, 這個接口會把任務(wù)丟個一個單線程的調(diào)度程序, 專門處理offset元數(shù)據(jù)緩存加載和卸載的。線程名稱前綴為group-metadata-manager- ,一個分區(qū)一個任務(wù)

最終執(zhí)行的任務(wù)內(nèi)容是:GroupMetadataManager#doLoadGroupsAndOffsets

__consumer_offsets 的key有兩種消息類型

key version 0: 消費(fèi)組消費(fèi)偏移量信息 -> value version 0: [offset, metadata, timestamp]

key version 1: 消費(fèi)組消費(fèi)偏移量信息-> value version 1: [offset, metadata, commit_timestamp, expire_timestamp]

key version 2: 消費(fèi)組的元信息 -> value version 0: [protocol_type, generation, protocol, leader,

例如 version:3 的schemaForGroupValue

Version-0

{
	protocol_type: STRING,
	generation: INT32,
	protocol: NULLABLE_STRING,
	leader: NULLABLE_STRING,
	members: ARRAY({
		member_id: STRING,
		client_id: STRING,
		client_host: STRING,
		session_timeout: INT32,
		subscription: BYTES,
		assignment: BYTES
	})
}

Version-1

{
	protocol_type: STRING,
	generation: INT32,
	protocol: NULLABLE_STRING,
	leader: NULLABLE_STRING,
	members: ARRAY({
		member_id: STRING,
		client_id: STRING,
		client_host: STRING,
		rebalance_timeout: INT32,
		session_timeout: INT32,
		subscription: BYTES,
		assignment: BYTES
	})
}

Version-2

{
	protocol_type: STRING,
	generation: INT32,
	protocol: NULLABLE_STRING,
	leader: NULLABLE_STRING,
	current_state_timestamp: INT64,
	members: ARRAY({
		member_id: STRING,
		client_id: STRING,
		client_host: STRING,
		rebalance_timeout: INT32,
		session_timeout: INT32,
		subscription: BYTES,
		assignment: BYTES
	})
}

Version-3

{
	protocol_type: STRING,
	generation: INT32,
	protocol: NULLABLE_STRING,
	leader: NULLABLE_STRING,
	current_state_timestamp: INT64,
	members: ARRAY({
		member_id: STRING,
		group_instance_id: NULLABLE_STRING,
		client_id: STRING,
		client_host: STRING,
		rebalance_timeout: INT32,
		session_timeout: INT32,
		subscription: BYTES,
		assignment: BYTES
	})
}

Value每個版本的 Scheme如下

  private val GROUP_METADATA_VALUE_SCHEMA_V0 = new Schema(
    new Field(PROTOCOL_TYPE_KEY, STRING),
    new Field(GENERATION_KEY, INT32),
    new Field(PROTOCOL_KEY, NULLABLE_STRING),
    new Field(LEADER_KEY, NULLABLE_STRING),
    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V0)))
  private val GROUP_METADATA_VALUE_SCHEMA_V1 = new Schema(
    new Field(PROTOCOL_TYPE_KEY, STRING),
    new Field(GENERATION_KEY, INT32),
    new Field(PROTOCOL_KEY, NULLABLE_STRING),
    new Field(LEADER_KEY, NULLABLE_STRING),
    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V1)))
  private val GROUP_METADATA_VALUE_SCHEMA_V2 = new Schema(
    new Field(PROTOCOL_TYPE_KEY, STRING),
    new Field(GENERATION_KEY, INT32),
    new Field(PROTOCOL_KEY, NULLABLE_STRING),
    new Field(LEADER_KEY, NULLABLE_STRING),
    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V2)))
  private val GROUP_METADATA_VALUE_SCHEMA_V3 = new Schema(
    new Field(PROTOCOL_TYPE_KEY, STRING),
    new Field(GENERATION_KEY, INT32),
    new Field(PROTOCOL_KEY, NULLABLE_STRING),
    new Field(LEADER_KEY, NULLABLE_STRING),
    new Field(CURRENT_STATE_TIMESTAMP_KEY, INT64),
    new Field(MEMBERS_KEY, new ArrayOf(MEMBER_METADATA_V3)))

GroupCoordinator onResignation

以上就是Kafka消費(fèi)客戶端協(xié)調(diào)器GroupCoordinator詳解的詳細(xì)內(nèi)容,更多關(guān)于Kafka GroupCoordinator的資料請關(guān)注腳本之家其它相關(guān)文章!

  • 使用SpringBoot進(jìn)行身份驗(yàn)證和授權(quán)的示例詳解

    使用SpringBoot進(jìn)行身份驗(yàn)證和授權(quán)的示例詳解

    在廣闊的 Web 開發(fā)世界中,身份驗(yàn)證是每個數(shù)字領(lǐng)域的守護(hù)者,在本教程中,我們將了解如何以本機(jī)方式保護(hù)、驗(yàn)證和授權(quán) Spring-Boot 應(yīng)用程序的用戶,并遵循框架的良好實(shí)踐,希望對大家有所幫助
    2023-11-11
  • IntelliJ IDEA 安裝及初次使用圖文教程(2020.3.2社區(qū)版)

    IntelliJ IDEA 安裝及初次使用圖文教程(2020.3.2社區(qū)版)

    這篇文章主要介紹了IntelliJ IDEA 安裝及初次使用(2020.3.2社區(qū)版),本文通過圖文并茂的形式給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下
    2021-03-03
  • JPA中EntityListeners注解的使用詳解

    JPA中EntityListeners注解的使用詳解

    這篇文章主要介紹了JPA中EntityListeners注解的使用詳解,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2019-01-01
  • idea引入外部jar包的方法實(shí)現(xiàn)

    idea引入外部jar包的方法實(shí)現(xiàn)

    本文主要介紹了idea引入外部jar包的方法實(shí)現(xiàn),文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2023-06-06
  • Struts2學(xué)習(xí)教程之輸入校驗(yàn)示例詳解

    Struts2學(xué)習(xí)教程之輸入校驗(yàn)示例詳解

    這篇文章主要給大家介紹了關(guān)于Struts2學(xué)習(xí)教程之輸入校驗(yàn)的相關(guān)資料,文中通過示例介紹的非常詳細(xì),對大家學(xué)習(xí)或者使用struts2具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2018-05-05
  • java程序員必須知道的4個書寫代碼技巧

    java程序員必須知道的4個書寫代碼技巧

    本篇文章主要給大家講述了作為JAVA程序員如何能寫出高效的代碼以及運(yùn)行效率更高的代碼,一起學(xué)習(xí)分享下吧。
    2017-12-12
  • Java實(shí)現(xiàn)并發(fā)執(zhí)行定時(shí)任務(wù)并手動控制開始結(jié)束

    Java實(shí)現(xiàn)并發(fā)執(zhí)行定時(shí)任務(wù)并手動控制開始結(jié)束

    這篇文章主要介紹了Java實(shí)現(xiàn)并發(fā)執(zhí)行定時(shí)任務(wù)并手動控制開始結(jié)束,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教
    2024-05-05
  • Ubuntu快速安裝eclipse

    Ubuntu快速安裝eclipse

    這篇文章主要為大家詳細(xì)介紹了Ubuntu快速安裝eclipse的簡單教程,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下
    2017-04-04
  • 使用Mybatis-plus實(shí)現(xiàn)對數(shù)據(jù)庫表的內(nèi)部字段進(jìn)行比較

    使用Mybatis-plus實(shí)現(xiàn)對數(shù)據(jù)庫表的內(nèi)部字段進(jìn)行比較

    這篇文章主要介紹了使用Mybatis-plus實(shí)現(xiàn)對數(shù)據(jù)庫表的內(nèi)部字段進(jìn)行比較方式,具有很好的參考價(jià)值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教
    2023-07-07
  • 最新評論