kafka運(yùn)維consumer-groups.sh消費(fèi)者組管理
消費(fèi)者組管理 kafka-consumer-groups.sh
1. 查看消費(fèi)者列表--list
sh bin/kafka-consumer-groups.sh --bootstrap-server xxxx:9090 --list
先調(diào)用MetadataRequest拿到所有在線Broker列表 再給每個(gè)Broker發(fā)送ListGroupsRequest請(qǐng)求獲取 消費(fèi)者組數(shù)據(jù)
2. 查看消費(fèi)者組詳情--describe
DescribeGroupsRequest
查看消費(fèi)組詳情--group 或 --all-groups
查看指定消費(fèi)組詳情--group sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --group test2_consumer_group
查看所有消費(fèi)組詳情--all-groups sh bin/kafka-consumer-groups.sh --bootstrap-server xxxxx:9090 --describe --all-groups 查看該消費(fèi)組 消費(fèi)的所有Topic、及所在分區(qū)、最新消費(fèi)offset、Log最新數(shù)據(jù)offset、Lag還未消費(fèi)數(shù)量、消費(fèi)者ID等等信息
查詢消費(fèi)者成員信息--members
所有消費(fèi)組成員信息 sh bin/kafka-consumer-groups.sh --describe --all-groups --members --bootstrap-server xxx:9090
指定消費(fèi)組成員信息 sh bin/kafka-consumer-groups.sh --describe --members --group test2_consumer_group --bootstrap-server xxxx:9090
查詢消費(fèi)者狀態(tài)信息--state
所有消費(fèi)組狀態(tài)信息 sh bin/kafka-consumer-groups.sh --describe --all-groups --state --bootstrap-server xxxx:9090
指定消費(fèi)組狀態(tài)信息 sh bin/kafka-consumer-groups.sh --describe --state --group test2_consumer_group --bootstrap-server xxxxx:9090
3. 刪除消費(fèi)者組--delete
DeleteGroupsRequest
刪除消費(fèi)組--delete
刪除指定消費(fèi)組--group sh bin/kafka-consumer-groups.sh --delete --group test2_consumer_group --bootstrap-server xxxx:9090
刪除所有消費(fèi)組--all-groups sh bin/kafka-consumer-groups.sh --delete --all-groups --bootstrap-server xxxx:9090
PS: 想要?jiǎng)h除消費(fèi)組前提是這個(gè)消費(fèi)組的所有客戶端都停止消費(fèi)/不在線才能夠成功刪除;否則會(huì)報(bào)下面異常
Error: Deletion of some consumer groups failed: * Group 'test2_consumer_group' could not be deleted due to: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.GroupNotEmptyException: The group is not empty.
4. 重置消費(fèi)組的偏移量 --reset-offsets
能夠執(zhí)行成功的一個(gè)前提是 消費(fèi)組這會(huì)是不可用狀態(tài);
下面的示例使用的參數(shù)是: --dry-run ;這個(gè)參數(shù)表示預(yù)執(zhí)行,會(huì)打印出來(lái)將要處理的結(jié)果; 等你想真正執(zhí)行的時(shí)候請(qǐng)換成參數(shù)--excute ;
下面示例 重置模式都是 --to-earliest 重置到最早的;
請(qǐng)根據(jù)需要參考下面 相關(guān)重置Offset的模式 換成其他模式;
重置指定消費(fèi)組的偏移量 --group
重置指定消費(fèi)組的所有Topic的偏移量--all-topic sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --all-topic
重置指定消費(fèi)組的指定Topic的偏移量--topic sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --topic test2
重置所有消費(fèi)組的偏移量 --all-group
重置所有消費(fèi)組的所有Topic的偏移量--all-topic sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --all-topic
重置所有消費(fèi)組中指定Topic的偏移量--topic sh bin/kafka-consumer-groups.sh --reset-offsets --to-earliest --all-group --bootstrap-server xxxx:9090 --dry-run --topic test2
--reset-offsets 后面需要接重置的模式
相關(guān)重置Offset的模式
參數(shù) | 描述 | 例子 |
---|---|---|
--to-earliest : | 重置offset到最開始的那條offset(找到還未被刪除最早的那個(gè)offset) | |
--to-current: | 直接重置offset到當(dāng)前的offset,也就是LOE | |
--to-latest: | 重置到最后一個(gè)offset | |
--to-datetime: | 重置到指定時(shí)間的offset;格式為:YYYY-MM-DDTHH:mm:SS.sss; | --to-datetime "2021-6-26T00:00:00.000" |
--to-offset | 重置到指定的offset,但是通常情況下,匹配到多個(gè)分區(qū),這里是將匹配到的所有分區(qū)都重置到這一個(gè)值; 如果 1.目標(biāo)最大offset<--to-offset, 這個(gè)時(shí)候重置為目標(biāo)最大offset;2.目標(biāo)最小offset>--to-offset ,則重置為最小; 3.否則的話才會(huì)重置為--to-offset的目標(biāo)值; 一般不用這個(gè) | --to-offset 3465 |
--shift-by | 按照偏移量增加或者減少多少個(gè)offset;正的為往前增加;負(fù)的往后退;當(dāng)然這里也是匹配所有的; | --shift-by 100 、--shift-by -100 |
--from-file | 根據(jù)CVS文檔來(lái)重置; 這里下面單獨(dú)講解 |
--to-offset 例子
--to-offset 3465
--from-file著重講解一下
上面其他的一些模式重置的都是匹配到的所有分區(qū); 不能夠每個(gè)分區(qū)重置到不同的offset;不過(guò)**--from-file**可以讓我們更靈活一點(diǎn);
先配置cvs文檔 格式為: Topic:分區(qū)號(hào): 重置目標(biāo)偏移量
test2,0,100 test2,1,200 test2,2,300
執(zhí)行命令
sh bin/kafka-consumer-groups.sh --reset-offsets --group test2_consumer_group --bootstrap-server xxxx:9090 --dry-run --from-file config/reset-offset.csv
5. 刪除偏移量delete-offsets
能夠執(zhí)行成功的一個(gè)前提是 消費(fèi)組這會(huì)是不可用狀態(tài);
偏移量被刪除了之后,Consumer Group下次啟動(dòng)的時(shí)候,會(huì)從頭消費(fèi);
sh bin/kafka-consumer-groups.sh --delete-offsets --group test2_consumer_group2 --bootstrap-server XXXX:9090 --topic test2
相關(guān)可選參數(shù)
參數(shù) | 描述 | 例子 |
---|---|---|
--bootstrap-server | 指定連接到的kafka服務(wù); | --bootstrap-server localhost:9092 |
--list | 列出所有消費(fèi)組名稱 | --list |
--describe | 查詢消費(fèi)者描述信息 | --describe |
--group | 指定消費(fèi)組 | |
--all-groups | 指定所有消費(fèi)組 | |
--members | 查詢消費(fèi)組的成員信息 | |
--state | 查詢消費(fèi)者的狀態(tài)信息 | |
--offsets | 在查詢消費(fèi)組描述信息的時(shí)候,這個(gè)參數(shù)會(huì)列出消息的偏移量信息; 默認(rèn)就會(huì)有這個(gè)參數(shù)的; | |
dry-run | 重置偏移量的時(shí)候,使用這個(gè)參數(shù)可以讓你預(yù)先看到重置情況,這個(gè)時(shí)候還沒(méi)有真正的執(zhí)行,真正執(zhí)行換成--excute;默認(rèn)為dry-run | |
--excute | 真正的執(zhí)行重置偏移量的操作; | |
--to-earliest | 將offset重置到最早 | |
to-latest | 將offset重置到最近 |
以上就是kafka運(yùn)維consumer-groups.sh消費(fèi)者組管理的詳細(xì)內(nèi)容,更多關(guān)于kafka運(yùn)維consumer groups sh的資料請(qǐng)關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java基礎(chǔ)之toString的序列化 匿名對(duì)象 復(fù)雜度精解
序列化即為把內(nèi)存中的對(duì)象轉(zhuǎn)換為字節(jié)寫入文件或通過(guò)網(wǎng)絡(luò)傳輸?shù)竭h(yuǎn)端服務(wù)器,本章節(jié)將帶你了解Java toString的序列化 匿名對(duì)象 復(fù)雜度,需要的朋友可以參考下2021-09-09springboot項(xiàng)目啟動(dòng)類錯(cuò)誤(找不到或無(wú)法加載主類 com.**Application)
本文主要介紹了spring-boot項(xiàng)目啟動(dòng)類錯(cuò)誤(找不到或無(wú)法加載主類 com.**Application),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2024-05-05SpringBoot動(dòng)態(tài)定時(shí)功能實(shí)現(xiàn)方案詳解
在SpringBoot項(xiàng)目中簡(jiǎn)單使用定時(shí)任務(wù),不過(guò)由于要借助cron表達(dá)式且都提前定義好放在配置文件里,不能在項(xiàng)目運(yùn)行中動(dòng)態(tài)修改任務(wù)執(zhí)行時(shí)間,實(shí)在不太靈活?,F(xiàn)在我們就來(lái)實(shí)現(xiàn)可以動(dòng)態(tài)修改cron表達(dá)式的定時(shí)任務(wù),感興趣的可以了解一下2022-11-11java使用單向鏈表解決數(shù)據(jù)存儲(chǔ)自定義排序問(wèn)題
本文主要介紹了java使用單向鏈表解決數(shù)據(jù)存儲(chǔ)自定義排序問(wèn)題,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-03-03在SpringBoot項(xiàng)目中使用Java8函數(shù)式接口的方法示例
在Spring Boot項(xiàng)目中,Java 8 的函數(shù)式接口廣泛用于實(shí)現(xiàn)各種功能,如自定義配置、數(shù)據(jù)處理等,函數(shù)式接口在Spring Boot中非常有用,本文展示了在SpringBoot項(xiàng)目中使用Java8的函數(shù)式接口的方法示例,需要的朋友可以參考下2024-03-03Spring-data-redis操作redis cluster的示例代碼
這篇文章主要介紹了Spring-data-redis操作redis cluster的示例代碼,小編覺(jué)得挺不錯(cuò)的,現(xiàn)在分享給大家,也給大家做個(gè)參考。一起跟隨小編過(guò)來(lái)看看吧2018-10-10