java?kafka如何動態(tài)設(shè)置用戶讀寫權(quán)限
更新時(shí)間:2023年08月31日 08:35:14 作者:精英丶阿琦
這篇文章主要介紹了java?kafka如何動態(tài)設(shè)置用戶讀寫權(quán)限問題,具有很好的參考家價(jià)值,希望對大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教
kafka動態(tài)設(shè)置用戶讀寫權(quán)限
我這里cloud Hoxton.SR8 版本
boot 2.3.0.RELEASE版本
直接上代碼了嗷
import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.DescribeAclsResult; import org.apache.kafka.common.acl.AccessControlEntry; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.acl.AclOperation; import org.apache.kafka.common.acl.AclPermissionType; import org.apache.kafka.common.resource.PatternType; import org.apache.kafka.common.resource.ResourcePattern; import org.apache.kafka.common.resource.ResourceType; import org.springframework.kafka.core.KafkaAdmin; public static void main(String[] args) { Map<String, Object> configs = new HashMap<>(); // broker地址,多個(gè)用逗號分割 configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "ip:端口"); configs.put("security.protocol", "SASL_PLAINTEXT"); configs.put("sasl.mechanism", "SCRAM-SHA-512"); // 登錄broker的賬戶 admin是管理員 configs.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"admin\" password=\"admin\";"); KafkaAdmin admin = new KafkaAdmin(configs); AdminClient adminClient = AdminClient.create(admin.getConfigurationProperties()); // principal:User:test2是需要賦予權(quán)限的帳號 // host:主機(jī) (*號即可) // operation:權(quán)限操作 // permissionType:權(quán)限類型 AccessControlEntry ace = new AccessControlEntry("User:test2", "*", AclOperation.WRITE, AclPermissionType.ALLOW); // resourceType:資源類型(topic) // name:topic名稱 // patternType:資源模式類型 ResourcePattern rp = new ResourcePattern(ResourceType.TOPIC, "Z7TEST", PatternType.LITERAL); AclBinding ab = new AclBinding(rp, ace); // 多個(gè)權(quán)限賦予可以傳list List<AclBinding> ablist = Arrays.asList(ab); adminClient.createAcls(ablist); // 可以查看賦予用戶的所有權(quán)限 DescribeAclsResult b = adminClient.describeAcls(AclBindingFilter.ANY); System.out.println(b.values()); adminClient.close(); }
kafka版本
kafka-2.11-2.1.1
- Kafka 1.0.0 后,Kafka 版本命名規(guī)則從 4 位到 3 位
- Kafka版本號是 2.1.1
- 前 2 : 大版本號 (MajorVersion)
- 中 1 : 小版本號或次版本號 (Minor Version)
- 后 1 : 修訂版本號 (Patch)
Kafka 0.7 最早開源版本
- 只提供最基礎(chǔ)的消息隊(duì)列功能,扭頭就跑
Kafka 0.8
- 引入了副本機(jī)制, 成了分布式高可靠消息隊(duì)列解決方案
- 副本備份機(jī)制保障了消息無丟失
- 生產(chǎn)/消費(fèi)用老客戶端 API,要指定 ZooKeeper 的地址 , 而非 Broker的地址
- 生產(chǎn)者 API,默認(rèn)用同步方式發(fā)送消息,吞吐量一般 (異步方式 : 有可能丟失消息)
- 0.8.2.0 后 , 引入新 Producer API (Bug),即 : 指定 Broker 地址的 Producer
- 升到 0.8.2.2 后 , 用老消費(fèi)者 API (較穩(wěn)定)
Kafka 0.9.0.0 后
- 增加基礎(chǔ)的安全認(rèn)證 / 權(quán)限功能
- 用 Java 重寫了新消費(fèi)者 API (Bug)
- 引入了 Kafka Connect 組件 , 實(shí)現(xiàn)高性能的數(shù)據(jù)抽取
- 新 Producer API 較穩(wěn)定
Kafka 0.10.0.0 后
- 引入了 Kafka Streams,正式成為分布式流處理平臺
- 自 0.10.2.2 后,新 Consumer API 較穩(wěn)定
Kafka 0.11.0.0 后
- 引入冪等性 Producer API 以事務(wù) (Transaction) API (Bug)
- 對 Kafka 消息格式做了重構(gòu)
- 建議用 0.11.0.3
不管用哪個(gè)版本,都要保持服務(wù)器端版本和客戶端版本一致
- 后果 : 損失 Kafka 的性能優(yōu)化
總結(jié)
以上為個(gè)人經(jīng)驗(yàn),希望能給大家一個(gè)參考,也希望大家多多支持腳本之家。
相關(guān)文章
Java并發(fā)編程深入理解之Synchronized的使用及底層原理詳解 上
在并發(fā)編程中存在線程安全問題,主要原因有:1.存在共享數(shù)據(jù) 2.多線程共同操作共享數(shù)據(jù)。關(guān)鍵字synchronized可以保證在同一時(shí)刻,只有一個(gè)線程可以執(zhí)行某個(gè)方法或某個(gè)代碼塊,同時(shí)synchronized可以保證一個(gè)線程的變化可見(可見性),即可以代替volatile2021-09-09EasyExcel自定義下拉注解的三種實(shí)現(xiàn)方式總結(jié)
使用EasyExcel設(shè)置下拉數(shù)據(jù)時(shí),每次都要?jiǎng)?chuàng)建一個(gè)SheetWriteHandler組件確實(shí)比較繁瑣,為了優(yōu)化這個(gè)過程,我們可以通過自定義注解來簡化操作,下面就來看看具體實(shí)現(xiàn)方法吧2024-10-10IDEA版使用Java操作Redis數(shù)據(jù)庫的方法
這篇文章主要介紹了IDEA版使用Java操作Redis數(shù)據(jù)庫的方法,首先需要下載jedis.jar包,然后再工程中設(shè)置具體操作步驟跟隨小編一起學(xué)習(xí)下吧2021-08-08java如何實(shí)現(xiàn)自動生成數(shù)據(jù)庫設(shè)計(jì)文檔
以前我們還需要手寫數(shù)據(jù)庫設(shè)計(jì)文檔、現(xiàn)在可以通過引入screw核心包來實(shí)現(xiàn)Java?數(shù)據(jù)庫文檔一鍵生成。本文將具體介紹一下如何通過java自動生成數(shù)據(jù)庫設(shè)計(jì)文檔,需要的朋友可以參考下2021-11-11