Kafka常用命令之kafka-console-consumer.sh解讀
kafka-console-consumer.sh解讀
kafka-console-consumer.sh 腳本是一個(gè)簡易的消費(fèi)者控制臺。
該 shell 腳本的功能通過調(diào)用 kafka.tools 包下的 ConsoleConsumer 類,并將提供的命令行參數(shù)全部傳給該類實(shí)現(xiàn)。
- 注意:Kafka 從 2.2 版本開始將 kafka-topic.sh 腳本中的 −−zookeeper 參數(shù)標(biāo)注為 “過時(shí)”,推薦使用 −−bootstrap-server 參數(shù)。
- 若讀者依舊使用的是 2.1 及以下版本,請將下述的 --bootstrap-server 參數(shù)及其值手動替換為 --zookeeper zk1:2181,zk2:2181,zk:2181。
- 一定要注意兩者參數(shù)值所指向的集群地址是不同的。
消息消費(fèi)
bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic topicName
表示從 latest 位移位置開始消費(fèi)該主題的所有分區(qū)消息,即僅消費(fèi)正在寫入的消息。
從開始位置消費(fèi)
bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --from-beginning --topic topicName
表示從指定主題中有效的起始位移位置開始消費(fèi)所有分區(qū)的消息。
顯示key消費(fèi)
bin/kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --property print.key=true --topic topicName
消費(fèi)出的消息結(jié)果將打印出消息體的 key 和 value。
若還需要為你的消息添加其他屬性
請參考下述列表
參數(shù) | 值類型 | 說明 | 有效值 |
---|---|---|---|
--topic | string | 被消費(fèi)的topic | |
--whitelist | string | 正則表達(dá)式,指定要包含以供使用的主題的白名單 | |
--partition | integer | 指定分區(qū) 除非指定’–offset’,否則從分區(qū)結(jié)束(latest)開始消費(fèi) | |
--offset | string | 執(zhí)行消費(fèi)的起始o(jì)ffset位置 默認(rèn)值:latest | latest earliest <offset> |
--consumer-property | string | 將用戶定義的屬性以key=value的形式傳遞給使用者 | |
--consumer.config | string | 消費(fèi)者配置屬性文件 請注意,[consumer-property]優(yōu)先于此配置 | |
--formatter | string | 用于格式化kafka消息以供顯示的類的名稱 默認(rèn)值:kafka.tools.DefaultMessageFormatter | kafka.tools.DefaultMessageFormatter kafka.tools.LoggingMessageFormatter kafka.tools.NoOpMessageFormatter kafka.tools.ChecksumMessageFormatter |
--property | string | 初始化消息格式化程序的屬性 | print.timestamp=true|false print.key=true|false print.value=true|false key.separator=<key.separator> line.separator=<line.separator> key.deserializer=<key.deserializer> value.deserializer=<value.deserializer> |
--from-beginning | 從存在的最早消息開始,而不是從最新消息開始 | ||
--max-messages | integer | 消費(fèi)的最大數(shù)據(jù)量,若不指定,則持續(xù)消費(fèi)下去 | |
--timeout-ms | integer | 在指定時(shí)間間隔內(nèi)沒有消息可用時(shí)退出 | |
--skip-message-on-error | 如果處理消息時(shí)出錯(cuò),請?zhí)^它而不是暫停 | ||
--bootstrap-server | string | 必需(除非使用舊版本的消費(fèi)者),要連接的服務(wù)器 | |
--key-deserializer | string | ||
--value-deserializer | string | ||
--enable-systest-events | 除記錄消費(fèi)的消息外,還記錄消費(fèi)者的生命周期 (用于系統(tǒng)測試) | ||
--isolation-level | string | 設(shè)置為read_committed以過濾掉未提交的事務(wù)性消息 設(shè)置為read_uncommitted以讀取所有消息 默認(rèn)值:read_uncommitted | |
--group | string | 指定消費(fèi)者所屬組的ID | |
--blacklist | string | 要從消費(fèi)中排除的主題黑名單 | |
--csv-reporter-enabled | 如果設(shè)置,將啟用csv metrics報(bào)告器 | ||
--delete-consumer-offsets | 如果指定,則啟動時(shí)刪除zookeeper中的消費(fèi)者信息 | ||
--metrics-dir | string | 輸出csv度量值 需與[csv-reporter-enable]配合使用 | |
--zookeeper | string | 必需(僅當(dāng)使用舊的使用者時(shí))連接zookeeper的字符串。 可以給出多個(gè)URL以允許故障轉(zhuǎn)移 |
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java實(shí)現(xiàn)學(xué)生信息管理系統(tǒng)(借助Array?List)
這篇文章主要為大家詳細(xì)介紹了Java實(shí)現(xiàn)學(xué)生信息管理系統(tǒng),借助Array?List,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-01-01Mybatis Plus使用條件構(gòu)造器增刪改查功能的實(shí)現(xiàn)方法
這篇文章主要介紹了Mybatis-Plus使用條件構(gòu)造器增刪改查,本文通過實(shí)例代碼給大家介紹的非常詳細(xì),對大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2021-05-05Spring?Security?基于URL的權(quán)限判斷源碼解析
這篇文章主要介紹了Spring?Security?基于URL的權(quán)限判斷問題,我們想要實(shí)現(xiàn)自己的基于請求Url的授權(quán)只需自定義一個(gè)?AccessDecisionManager即可,接下來跟隨小編一起看看實(shí)現(xiàn)代碼吧2021-12-12Spring boot+VUE實(shí)現(xiàn)token驗(yàn)證的示例代碼
本文詳細(xì)介紹了使用Vue和SpringBoot實(shí)現(xiàn)token認(rèn)證的方法,包括前后端交互流程、后端依賴導(dǎo)入、token工具類、攔截器、跨域處理、前端路由守衛(wèi)、請求攔截器等內(nèi)容,具有一定的參考價(jià)值,感興趣的可以了解一下2024-10-10簡單實(shí)現(xiàn)Java通訊錄系統(tǒng)
這篇文章主要為大家詳細(xì)介紹了如何簡單實(shí)現(xiàn)Java通訊錄系統(tǒng),文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-02-02