springboot使用kafka推送數(shù)據(jù)到服務(wù)端的操作方法帶認證
遇到的問題
在實際開發(fā)過程中,因為推送數(shù)據(jù)需要用到kafka,為了比較方便與后續(xù)其他需求需要使用kafka,所以開發(fā)的過程中是設(shè)想能寫一個工具類,方便后續(xù)的使用,但是,測試不帶認證的kafka服務(wù)端的時候,發(fā)送是正常的,但是實際情況是,對方的服務(wù)器需要認證,導(dǎo)致遇到連不上對方服務(wù),推送失敗的問題,需要找對方確認對方的認證配置信息。并且在查詢怎么處理的時候,驗證也出現(xiàn)了比較奇葩的情況,那本次文章就簡單寫遇到的問題和驗證結(jié)果。
碰到的天坑
1.度的時候,總是說引入了配置文件就完事,其他的配置不需要再配sasl.jaas.config,但是我實際測試下來不行啊。還是得引入配置文件+配置參數(shù)設(shè)置。
2.第二個基于未來思考的問題是,這個引入的配置文件內(nèi)容,是否可以多個。據(jù)度來度去的結(jié)果說是可以的,按順序會去逐一匹配直到匹配成功,那就是下面配置的KafkaClient配置塊是可以多個,就名稱不一樣就行。但是礙于條件限制,沒試過。??
處理步驟
加載配置
度了很多文章,都提到了需要在服務(wù)啟動的時候引入認證配置文件,設(shè)置屬性 java.security.auth.login.config,度了一下是有兩種方式。
a). 在服務(wù)啟動的時候用參數(shù)引入,命令如下:
java -Djava.security.auth.login.config=(具體地址自己填,因為我配置文件是跟test.jar同級目錄所以無前綴)kafka_jaas_config.config -jar test.jar
b). 在代碼中去引入,如:(這方式我是沒試的)
System.setProperty(“java.security.auth.login.config”, “kafka_jaas_config.config”);
c). 配置文件的內(nèi)容(注意password后面那個該死的分號是要的,配置參數(shù)設(shè)置的時候也是要的)
KafkaClient { org.apache.kafka.common.security.scram.ScramLoginModule required username="user" password="123"; };
設(shè)置認證參數(shù)(如果有的話)
類似要填的參數(shù)是:
security.protocol,sasl.mechanism,sasl.jaas.config(,sasl.username和sasl.password,這兩個我看是高版本直接配置據(jù)說能認證,不需要sasl.jaas.config,但是低版本是需要sasl.jaas.config,所以建議是可以都配上)
such as :
props.put("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"user\" password=\"123\";");
工具類
package platform.cars.utils; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.retry.annotation.Backoff; import org.springframework.retry.annotation.Retryable; import javax.annotation.PreDestroy; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; /** * @Auther: Ms.y */ @Configuration public class KafkaUtil { private static final ConcurrentHashMap<String, KafkaTemplate<String, String>> templateCache = new ConcurrentHashMap<>(); private Map<String, Object> kafkaProducerConfigs(String servers, Map<String, Object> otherConfigs) { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); if (otherConfigs != null){ otherConfigs.forEach(props::put); } return props; } public KafkaTemplate<String, String> getKafkaTemplate(String servers, Map<String, Object> otherConfigs) { return templateCache.computeIfAbsent(servers, bs -> createKafkaTemplate(bs, otherConfigs)); } private KafkaTemplate<String, String> createKafkaTemplate(String servers, Map<String, Object> otherConfigs) { Map<String, Object> configs = kafkaProducerConfigs(servers, otherConfigs); ProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(configs); return new KafkaTemplate<>(producerFactory); } @PreDestroy public void destroy() { for (KafkaTemplate<String, String> template : templateCache.values()) { template.destroy(); } } }
怎么用
1.正常的autowired就行
2.自己查配置還有是否需要認證的信息
3.獲取template去發(fā)送數(shù)據(jù)
4.處理結(jié)果
Map<String,Object> configs = new HashMap<>(); //自己給configs 填值 KafkaTemplate kafkaTemplate = kafkaUtil.getKafkaTemplate("ip:port",configs); kafkaTemplate.send("topic名稱", "消息內(nèi)容");
廢話
我這是因為為了方便加了個緩存隊列,存儲了kafka已經(jīng)連過的服務(wù),不需要的話完全可以自己改造去掉這部分。如果有可以精進的問題可以提啊,歡迎挑刺。
到此這篇關(guān)于springboot使用kafka推送數(shù)據(jù)到服務(wù)端,帶認證的文章就介紹到這了,更多相關(guān)springboot kafka推送數(shù)據(jù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java實現(xiàn)將容器 Map中的內(nèi)容保存到數(shù)組
這篇文章主要介紹了Java實現(xiàn)將容器 Map中的內(nèi)容保存到數(shù)組,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-09-09Spring?Boot緩存實戰(zhàn)之Redis?設(shè)置有效時間和自動刷新緩存功能(時間支持在配置文件中配置)
這篇文章主要介紹了Spring?Boot緩存實戰(zhàn)?Redis?設(shè)置有效時間和自動刷新緩存,時間支持在配置文件中配置,需要的朋友可以參考下2023-05-05