Spring Boot集群管理工具KafkaAdminClient使用方法解析
原理介紹
在Kafka官網(wǎng)中這么描述AdminClient:The AdminClient API supports managing and inspecting topics, brokers, acls, and other Kafka objects. 具體的KafkaAdminClient包含了一下幾種功能(以Kafka1.0.0版本為準(zhǔn)):
- 創(chuàng)建Topic:createTopics(Collection<NewTopic> newTopics)
- 刪除Topic:deleteTopics(Collection<String> topics)
- 羅列所有Topic:listTopics()
- 查詢Topic:describeTopics(Collection<String> topicNames)
- 查詢集群信息:describeCluster()
- 查詢ACL信息:describeAcls(AclBindingFilter filter)
- 創(chuàng)建ACL信息:createAcls(Collection<AclBinding> acls)
- 刪除ACL信息:deleteAcls(Collection<AclBindingFilter> filters)
- 查詢配置信息:describeConfigs(Collection<ConfigResource> resources)
- 修改配置信息:alterConfigs(Map<ConfigResource, Config> configs)
- 修改副本的日志目錄:alterReplicaLogDirs(Map<TopicPartitionReplica, String> replicaAssignment)
- 查詢節(jié)點(diǎn)的日志目錄信息:describeLogDirs(Collection<Integer> brokers)
- 查詢副本的日志目錄信息:describeReplicaLogDirs(Collection<TopicPartitionReplica> replicas)
- 增加分區(qū):createPartitions(Map<String, NewPartitions> newPartitions)
其內(nèi)部原理是使用Kafka自定義的一套二進(jìn)制協(xié)議來實(shí)現(xiàn),詳細(xì)可以參見Kafka協(xié)議。主要實(shí)現(xiàn)步驟:
客戶端根據(jù)方法的調(diào)用創(chuàng)建相應(yīng)的協(xié)議請求,比如創(chuàng)建Topic的createTopics方法,其內(nèi)部就是發(fā)送CreateTopicRequest請求。
客戶端發(fā)送請求至Kafka Broker。
Kafka Broker處理相應(yīng)的請求并回執(zhí),比如與CreateTopicRequest對應(yīng)的是CreateTopicResponse。
客戶端接收相應(yīng)的回執(zhí)并進(jìn)行解析處理。
和協(xié)議有關(guān)的請求和回執(zhí)的類基本都在org.apache.kafka.common.requests包中,AbstractRequest和AbstractResponse是這些請求和回執(zhí)類的兩個(gè)基本父類。
代碼如下
@Component public class KafkaConfig{ // 配置Kafka public Properties getProps(){ Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); /* props.put("retries", 2); // 重試次數(shù) props.put("batch.size", 16384); // 批量發(fā)送大小 props.put("buffer.memory", 33554432); // 緩存大小,根據(jù)本機(jī)內(nèi)存大小配置 props.put("linger.ms", 1000); // 發(fā)送頻率,滿足任務(wù)一個(gè)條件發(fā)送*/ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return props; } }
@RestController public class KafkaTopicManager { @Autowired private KafkaConfig kafkaConfig; @GetMapping("createTopic") public void createTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); NewTopic newTopic = new NewTopic("test1",4, (short) 1); Collection<NewTopic> newTopicList = new ArrayList<>(); newTopicList.add(newTopic); adminClient.createTopics(newTopicList); adminClient.close(); } @GetMapping("deleteTopic") public void deleteTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); adminClient.deleteTopics(Arrays.asList("test1")); adminClient.close(); } @GetMapping("listAllTopic") public void listAllTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); ListTopicsResult result = adminClient.listTopics(); KafkaFuture<Set<String>> names = result.names(); try { names.get().forEach((k)->{ System.out.println(k); }); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } adminClient.close(); } @GetMapping("getTopic") public void getTopic(){ AdminClient adminClient = KafkaAdminClient.create(kafkaConfig.getProps()); DescribeTopicsResult describeTopics = adminClient.describeTopics(Arrays.asList("syn-test")); Collection<KafkaFuture<TopicDescription>> values = describeTopics.values().values(); if(values.isEmpty()){ System.out.println("找不到描述信息"); }else{ for (KafkaFuture<TopicDescription> value : values) { System.out.println(value); } } adminClient.close(); } }
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
- Spring?Boot整合Kafka教程詳解
- Spring?Boot?中使用@KafkaListener并發(fā)批量接收消息的完整代碼
- 基于SpringBoot?使用?Flink?收發(fā)Kafka消息的示例詳解
- SpringBoot如何獲取Kafka的Topic列表
- SpringBoot整合kafka遇到的版本不對應(yīng)問題及解決
- SpringBoot+Nacos+Kafka微服務(wù)流編排的簡單實(shí)現(xiàn)
- SpringBoot集成Kafka的步驟
- Springboot集成Kafka實(shí)現(xiàn)producer和consumer的示例代碼
- Spring?Boot?基于?SCRAM?認(rèn)證集成?Kafka?的過程詳解
相關(guān)文章
Java兩種方式實(shí)現(xiàn)動(dòng)態(tài)代理
Java 在 java.lang.reflect 包中有自己的代理支持,該類(Proxy.java)用于動(dòng)態(tài)生成代理類,只需傳入目標(biāo)接口、目標(biāo)接口的類加載器以及 InvocationHandler 便可為目標(biāo)接口生成代理類及代理對象。我們稱這個(gè)Java技術(shù)為:動(dòng)態(tài)代理2020-10-10Jackson處理Optional時(shí)遇到問題的解決與分析
Optional是Java實(shí)現(xiàn)函數(shù)式編程的強(qiáng)勁一步,并且?guī)椭诜妒街袑?shí)現(xiàn),但是Optional的意義顯然不止于此,下面這篇文章主要給大家介紹了關(guān)于Jackson處理Optional時(shí)遇到問題的解決與分析的相關(guān)資料,需要的朋友可以參考下2022-02-02spring boot實(shí)現(xiàn)圖片上傳和下載功能
這篇文章主要為大家詳細(xì)介紹了spring boot實(shí)現(xiàn)圖片上傳和下載功能,具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2018-02-02Springboot?Mybatis使用pageHelper如何實(shí)現(xiàn)分頁查詢
這篇文章主要介紹了Springboot?Mybatis使用pageHelper如何實(shí)現(xiàn)分頁查詢問題,具有很好的參考價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-05-05MyBatisPlus利用Service實(shí)現(xiàn)獲取數(shù)據(jù)列表
這篇文章主要為大家詳細(xì)介紹了怎樣使用 IServer 提供的 list 方法查詢多條數(shù)據(jù),這些方法將根據(jù)查詢條件獲取多條數(shù)據(jù),感興趣的可以了解一下2022-06-06JAVA線程sleep()和wait()詳解及實(shí)例
這篇文章主要介紹了JAVA線程sleep()和wait()詳解及實(shí)例的相關(guān)資料,探討一下sleep()和wait()方法的區(qū)別和實(shí)現(xiàn)機(jī)制,需要的朋友可以參考下2017-05-05