詳解Spring Boot對 Apache Pulsar的支持
https://docs.spring.io/spring-boot/docs/3.2.0/reference/htmlsingle/#messaging.pulsar
Apache Pulsar 通過提供 Spring for Apache Pulsar 項目的自動配置而受到支持。
當(dāng)類路徑中存在 org.springframework.pulsar:spring-pulsar
時,Spring Boot 將自動配置并注冊經(jīng)典的(命令式)Spring for Apache Pulsar 組件。當(dāng)類路徑中存在 org.springframework.pulsar:spring-pulsar-reactive
時,Spring Boot 也會對反應(yīng)式組件執(zhí)行相同的操作。
分別有適用于命令式和反應(yīng)式使用的 spring-boot-starter-pulsar
和 spring-boot-starter-pulsar-reactive
“Starters”,可方便地收集依賴項。
連接到Pulsar
當(dāng)使用 Pulsar 啟動器時,Spring Boot 將自動配置并注冊一個 PulsarClient
bean。
默認情況下,應(yīng)用程序嘗試連接到位于 pulsar://localhost:6650
的本地 Pulsar 實例。這可以通過將 spring.pulsar.client.service-url
屬性設(shè)置為不同的值來進行調(diào)整。
注意:該值必須是有效的 Pulsar 協(xié)議 URL。
可以通過指定任何以 spring.pulsar.client.*
開頭的應(yīng)用程序?qū)傩詠砼渲每蛻舳恕?/p>
如果需要更多控制權(quán)來配置 PulsarClient,請考慮注冊一個或多個 PulsarClientBuilderCustomizer
bean。
認證(Authentication)
要連接到需要認證的 Pulsar 集群,需要指定要使用哪個認證插件,通過設(shè)置 pluginClassName
和插件所需的任何參數(shù)??梢詫?shù)設(shè)置為參數(shù)名稱到參數(shù)值的映射。以下示例顯示了如何配置 AuthenticationOAuth2
插件。
spring.pulsar.client.authentication.plugin-class-name=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
spring.pulsar.client.authentication.param[issuerUrl]=https://auth.server.cloud/
spring.pulsar.client.authentication.param[privateKey]=file:///Users/some-key.json
spring.pulsar.client.authentication.param.audience=urn:sn:acme:dev:my-instance
注意:
需要確保在 spring.pulsar.client.authentication.param.*
下定義的名稱與認證插件所期望的名稱完全匹配(通常是駝峰命名法)。Spring Boot 不會嘗試對這些條目進行任何形式的寬松綁定。
例如,如果想為 AuthenticationOAuth2
認證插件配置issuer URL,則必須使用 spring.pulsar.client.authentication.param.issuerUrl
。如果使用其他形式,如 issuerurl
或 issuer-url
,則設(shè)置將不會應(yīng)用于插件。
SSL
默認情況下,Pulsar客戶端以明文形式與Pulsar服務(wù)進行通信。以下部分描述了如何配置Pulsar客戶端以使用TLS加密(SSL)。一個先決條件是Broker也已經(jīng)配置為使用TLS加密。
Spring Boot自動配置目前不支持任何TLS/SSL配置屬性。相反,你可以提供一個PulsarClientBuilderCustomizer
,該定制器會在Pulsar客戶端構(gòu)建器上設(shè)置必要的屬性。Pulsar支持Privacy Enhanced Mail(PEM)和Java KeyStore(JKS)兩種證書格式。
按照以下步驟配置TLS:
- 調(diào)整Pulsar客戶端服務(wù)URL以使用
pulsar+ssl://
scheme 和TLS端口(通常為6651
)。 - 調(diào)整管理客戶端服務(wù)URL以使用
https://
scheme 和TLS Web端口(通常為8443
)。 - 提供客戶端構(gòu)建器定制器,該定制器會在構(gòu)建器上設(shè)置相關(guān)屬性。
以響應(yīng)式方式連接到Pulsar
當(dāng)Reactive自動配置被激活時,Spring Boot將自動配置并注冊一個ReactivePulsarClient
bean。
連接到Pulsar管理界面
Spring for Apache Pulsar的PulsarAdministration
客戶端也實現(xiàn)了自動配置。
默認情況下,應(yīng)用程序嘗試連接到位于http://localhost:8080
的本地Pulsar實例??梢酝ㄟ^將spring.pulsar.admin.service-url
屬性設(shè)置為(http|https)://<host>:<port>
的不同值來調(diào)整此設(shè)置。
如果需要更多控制權(quán)來配置PulsarAdmin
,請考慮注冊一個或多個PulsarAdminBuilderCustomizer
bean。
認證
當(dāng)訪問需要身份驗證的Pulsar集群時,管理客戶端需要與普通Pulsar客戶端相同的安全配置??梢酝ㄟ^將spring.pulsar.client.authentication
替換為spring.pulsar.admin.authentication
來使用上述身份驗證配置。
提示:在啟動時創(chuàng)建主題,請?zhí)砑右粋€類型為PulsarTopic
的bean。如果主題已經(jīng)存在,則該bean將被忽略。
發(fā)送消息
Spring的PulsarTemplate
實現(xiàn)了自動配置,可以使用它來發(fā)送消息,如下所示:
import org.apache.pulsar.client.api.PulsarClientException; import org.springframework.pulsar.core.PulsarTemplate; import org.springframework.stereotype.Component; @Component public class MyBean { private final PulsarTemplate<String> pulsarTemplate; public MyBean(PulsarTemplate<String> pulsarTemplate) { this.pulsarTemplate = pulsarTemplate; } public void someMethod() throws PulsarClientException { this.pulsarTemplate.send("someTopic", "Hello"); } }
PulsarTemplate
依賴于PulsarProducerFactory
來創(chuàng)建底層的Pulsar生產(chǎn)者。Spring Boot的自動配置也提供了這個生產(chǎn)者工廠,默認情況下,它會緩存所創(chuàng)建的生產(chǎn)者。你可以通過指定任何以spring.pulsar.producer.*
和 spring.pulsar.producer.cache.*
為前綴的應(yīng)用屬性來配置生產(chǎn)者工廠和緩存設(shè)置。
如果你需要對生產(chǎn)者工廠的配置進行更多的控制,考慮注冊一個或多個ProducerBuilderCustomizer
bean。這些定制器會應(yīng)用于所有創(chuàng)建的生產(chǎn)者。你也可以在發(fā)送消息時傳入一個ProducerBuilderCustomizer
,只影響當(dāng)前的生產(chǎn)者。
如果你需要對正在發(fā)送的消息進行更多的控制,你可以在發(fā)送消息時傳入一個TypedMessageBuilderCustomizer
。
以響應(yīng)式方式發(fā)送消息
當(dāng)Reactive自動配置被激活時,Spring的ReactivePulsarTemplate
也會實現(xiàn)自動配置,可以使用它來發(fā)送消息,如下所示:
import org.springframework.pulsar.reactive.core.ReactivePulsarTemplate; import org.springframework.stereotype.Component; @Component public class MyBean { private final ReactivePulsarTemplate<String> pulsarTemplate; public MyBean(ReactivePulsarTemplate<String> pulsarTemplate) { this.pulsarTemplate = pulsarTemplate; } public void someMethod() { this.pulsarTemplate.send("someTopic", "Hello").subscribe(); } }
ReactivePulsarTemplate
依賴于ReactivePulsarSenderFactory
來實際創(chuàng)建底層的發(fā)送器。Spring Boot的自動配置也提供了這個發(fā)送器工廠,默認情況下,它會緩存所創(chuàng)建的發(fā)送器。你可以通過指定任何以spring.pulsar.producer.*
和 spring.pulsar.producer.cache.*
為前綴的應(yīng)用屬性來配置發(fā)送器工廠和緩存設(shè)置。
如果你需要對發(fā)送器工廠的配置進行更多的控制,考慮注冊一個或多個ReactiveMessageSenderBuilderCustomizer
bean。這些定制器會應(yīng)用于所有創(chuàng)建的發(fā)送器。你也可以在發(fā)送消息時傳入一個ReactiveMessageSenderBuilderCustomizer
,只影響當(dāng)前的發(fā)送器。
如果你需要對正在發(fā)送的消息進行更多的控制,你可以在發(fā)送消息時傳入一個MessageSpecBuilderCustomizer
。
接收消息
當(dāng)存在Apache Pulsar基礎(chǔ)設(shè)施時,任何bean都可以通過添加@PulsarListener
注解來創(chuàng)建監(jiān)聽器端點。以下組件在someTopic
主題上創(chuàng)建了一個監(jiān)聽器端點:
import org.springframework.pulsar.annotation.PulsarListener; import org.springframework.stereotype.Component; @Component public class MyBean { @PulsarListener(topics = "someTopic") public void processMessage(String content) { // ... } }
Spring Boot的自動配置為PulsarListener
提供了所有必要的組件,如PulsarListenerContainerFactory
和用于構(gòu)建底層Pulsar消費者的消費者工廠。你可以通過指定任何以spring.pulsar.listener.*
和spring.pulsar.consumer.*
為前綴的應(yīng)用屬性來配置這些組件。
如果你需要對消費者工廠的配置進行更多的控制,考慮注冊一個或多個ConsumerBuilderCustomizer
bean。這些定制器會應(yīng)用于工廠創(chuàng)建的所有消費者,因此適用于所有@PulsarListener
實例。你還可以通過設(shè)置@PulsarListener
注解的consumerCustomizer
屬性來定制單個監(jiān)聽器。
以響應(yīng)式方式接收消息
當(dāng)存在Apache Pulsar基礎(chǔ)設(shè)施且Reactive自動配置被激活時,任何bean都可以通過添加@ReactivePulsarListener
注解來創(chuàng)建響應(yīng)式監(jiān)聽器端點。以下組件在someTopic
主題上創(chuàng)建了一個響應(yīng)式監(jiān)聽器端點:
import reactor.core.publisher.Mono; import org.springframework.pulsar.reactive.config.annotation.ReactivePulsarListener; import org.springframework.stereotype.Component; @Component public class MyBean { @ReactivePulsarListener(topics = "someTopic") public Mono<Void> processMessage(String content) { // ... return Mono.empty(); } }
Spring Boot的自動配置為ReactivePulsarListener
提供了所有必要的組件,如ReactivePulsarListenerContainerFactory
和用于構(gòu)建底層響應(yīng)式Pulsar消費者的消費者工廠。你可以通過指定任何以spring.pulsar.listener.
和spring.pulsar.consumer.
為前綴的應(yīng)用屬性來配置這些組件。
如果你需要對消費者工廠的配置進行更多的控制,考慮注冊一個或多個ReactiveMessageConsumerBuilderCustomizer
bean。這些定制器會應(yīng)用于工廠創(chuàng)建的所有消費者,因此適用于所有@ReactivePulsarListener
實例。你還可以通過設(shè)置@ReactivePulsarListener
注解的consumerCustomizer
屬性來定制單個監(jiān)聽器。
讀取消息
Pulsar的讀取器接口使應(yīng)用程序能夠手動管理游標(biāo)。當(dāng)你使用讀取器連接到主題時,你需要指定當(dāng)讀取器連接到主題時從哪個消息開始讀取。
當(dāng)存在Apache Pulsar基礎(chǔ)設(shè)施時,任何bean都可以通過添加@PulsarReader
注解來使用讀取器消費消息。以下組件創(chuàng)建了一個讀取器端點,該端點從someTopic
主題的開頭開始讀取消息:
import org.springframework.pulsar.annotation.PulsarReader; import org.springframework.stereotype.Component; @Component public class MyBean { @PulsarReader(topics = "someTopic", startMessageId = "earliest") public void processMessage(String content) { // ... } }
@PulsarReader
依賴于PulsarReaderFactory
來創(chuàng)建底層的Pulsar讀取器。Spring Boot的自動配置提供了這個讀取器工廠,可以通過設(shè)置任何以spring.pulsar.reader.*
為前綴的應(yīng)用屬性來定制它。
如果你需要對讀取器工廠的配置進行更多的控制,考慮注冊一個或多個ReaderBuilderCustomizer
bean。這些定制器會應(yīng)用于工廠創(chuàng)建的所有讀取器,因此適用于所有@PulsarReader
實例。你還可以通過設(shè)置@PulsarReader
注解的readerCustomizer
屬性來定制單個監(jiān)聽器。
以響應(yīng)式方式讀取消息
當(dāng)存在Apache Pulsar基礎(chǔ)設(shè)施且Reactive自動配置被激活時,Spring會提供ReactivePulsarReaderFactory
,你可以使用它來創(chuàng)建讀取器,以響應(yīng)式的方式讀取消息。以下組件使用提供的工廠創(chuàng)建一個讀取器,并從someTopic
主題中讀取5秒鐘前的一條消息:
import java.time.Instant; import java.util.List; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.reactive.client.api.StartAtSpec; import reactor.core.publisher.Mono; import org.springframework.pulsar.reactive.core.ReactiveMessageReaderBuilderCustomizer; import org.springframework.pulsar.reactive.core.ReactivePulsarReaderFactory; import org.springframework.stereotype.Component; @Component public class MyBean { private final ReactivePulsarReaderFactory<String> pulsarReaderFactory; public MyBean(ReactivePulsarReaderFactory<String> pulsarReaderFactory) { this.pulsarReaderFactory = pulsarReaderFactory; } public void someMethod() { ReactiveMessageReaderBuilderCustomizer<String> readerBuilderCustomizer = (readerBuilder) -> readerBuilder .topic("someTopic") .startAtSpec(StartAtSpec.ofInstant(Instant.now().minusSeconds(5))); Mono<Message<String>> message = this.pulsarReaderFactory .createReader(Schema.STRING, List.of(readerBuilderCustomizer)) .readOne(); // ... } }
Spring Boot的自動配置提供了這個讀取器工廠,可以通過設(shè)置任何以spring.pulsar.reader.*
為前綴的應(yīng)用屬性來定制它。
如果你需要對讀取器工廠的配置進行更多的控制,當(dāng)使用工廠創(chuàng)建讀取器時,考慮傳入一個或多個ReactiveMessageReaderBuilderCustomizer
實例。
如果你需要對讀取器工廠的配置進行更多的控制,考慮注冊一個或多個ReactiveMessageReaderBuilderCustomizer
bean。這些定制器會應(yīng)用于所有創(chuàng)建的讀取器。你也可以在創(chuàng)建讀取器時傳入一個或多個ReactiveMessageReaderBuilderCustomizer
,只將定制應(yīng)用于創(chuàng)建的讀取器。
額外的Pulsar屬性
只有Pulsar支持的屬性子集才能直接通過PulsarProperties
類使用。如果你希望使用額外的屬性來調(diào)整自動配置的組件,而這些屬性不被直接支持,你可以使用前面提到的每個組件支持的定制器。
到此這篇關(guān)于詳解Spring Boot對 Apache Pulsar的支持的文章就介紹到這了,更多相關(guān)Spring Boot Apache Pulsar內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
servlet的url-pattern匹配規(guī)則詳細描述(小結(jié))
在利用servlet或Filter進行url請求的匹配時,很關(guān)鍵的一點就是匹配規(guī)則。這篇文章主要介紹了servlet的url-pattern匹配規(guī)則詳細描述(小結(jié)),非常具有實用價值,需要的朋友可以參考下2018-07-07springboot + rabbitmq 如何實現(xiàn)消息確認機制(踩坑經(jīng)驗)
這篇文章主要介紹了springboot + rabbitmq 如何實現(xiàn)消息確認機制,本文給大家分享小編實際開發(fā)中的一點踩坑經(jīng)驗,內(nèi)容簡單易懂,需要的朋友可以參考下2020-07-07mybatis-plus的selectById(或者selectOne)在根據(jù)主鍵ID查詢實體對象的時候偶爾會出現(xiàn)nul
這篇文章主要介紹了mybatis-plus的selectById(或者selectOne)在根據(jù)主鍵ID查詢實體對象的時候偶爾會出現(xiàn)null的問題記錄,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-09-09Jmeter跨線程組傳值調(diào)用實現(xiàn)圖解
這篇文章主要介紹了Jmeter跨線程組傳值調(diào)用實現(xiàn)圖解,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2020-07-07Spring Cloud OAuth2 實現(xiàn)用戶認證及單點登錄的示例代碼
這篇文章主要介紹了Spring Cloud OAuth2 實現(xiàn)用戶認證及單點登錄的示例代碼,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2019-10-10SparkSQL使用IDEA快速入門DataFrame與DataSet的完美教程
本文給大家介紹使用idea開發(fā)Spark SQL 的詳細過程,本文通過實例代碼給大家介紹的非常詳細,對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友參考下吧2021-08-08Springboot利于第三方服務(wù)進行ip定位獲取省份城市
本文主要介紹了Springboot利于第三方服務(wù)進行ip定位獲取省份城市,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2023-07-07