SpringBoot整合Pulsar的實(shí)現(xiàn)示例
一、添加pom.xml依賴
<parent> ? ? <groupId>org.springframework.boot</groupId> ? ? <artifactId>spring-boot-starter-parent</artifactId> ? ? <version>2.7.0</version> </parent> <dependencies> ? ? <dependency> ? ? ? ? <groupId>org.springframework.boot</groupId> ? ? ? ? <artifactId>spring-boot-starter-web</artifactId> ? ? </dependency> ? ? <dependency> ? ? ? ? <groupId>org.apache.pulsar</groupId> ? ? ? ? <artifactId>pulsar-client</artifactId> ? ? ? ? <version>2.10.0</version> ? ? </dependency> ? ? <dependency> ? ? ? ? <groupId>org.projectlombok</groupId> ? ? ? ? <artifactId>lombok</artifactId> ? ? ? ? <version>1.18.24</version> ? ? ? ? <scope>provided</scope> ? ? </dependency> </dependencies> <build> ? ? <plugins> ? ? ? ? <plugin> ? ? ? ? ? ? <groupId>org.apache.maven.plugins</groupId> ? ? ? ? ? ? <artifactId>maven-compiler-plugin</artifactId> ? ? ? ? ? ? <configuration> ? ? ? ? ? ? ? ? <source>8</source> ? ? ? ? ? ? ? ? <target>8</target> ? ? ? ? ? ? </configuration> ? ? ? ? </plugin> ? ? </plugins> </build> ? ?
二、Pulsar 參數(shù)類
import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.stereotype.Component; import java.util.Map; /** ?* @Author: huangyibo ?* @Date: 2022/5/28 2:32 ?* @Description: Pulsar 參數(shù)類 ?*/ @Component @ConfigurationProperties(prefix = "tdmq.pulsar") @Data public class PulsarProperties { ? ? /** ? ? ?* 接入地址 ? ? ?*/ ? ? private String serviceurl; ? ? /** ? ? ?* 命名空間tdc ? ? ?*/ ? ? private String tdcNamespace; ? ? /** ? ? ?* 角色tdc的token ? ? ?*/ ? ? private String tdcToken; ? ? /** ? ? ?* 集群name ? ? ?*/ ? ? private String cluster; ? ? /** ? ? ?* topicMap ? ? ?*/ ? ? private Map<String, String> topicMap; ? ? /** ? ? ?* 訂閱 ? ? ?*/ ? ? private Map<String, String> subMap; ? ? /** ? ? ?* 開關(guān) on:Consumer可用 ||||| off:Consumer斷路 ? ? ?*/ ? ? private String onOff; }
三、Pulsar 配置類
import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.PulsarClientException; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** ?* @Author: huangyibo ?* @Date: 2022/5/28 2:33 ?* @Description: Pulsar 配置類 ?*/ @Configuration @EnableConfigurationProperties(PulsarProperties.class) public class PulsarConfig { ? ? @Autowired ? ? PulsarProperties pulsarProperties; ? ? @Bean ? ? public PulsarClient getPulsarClient() { ? ? ? ? try { ? ? ? ? ? ? return PulsarClient.builder() ? ? ? ? ? ? ? ? ? ? .authentication(AuthenticationFactory.token(pulsarProperties.getTdcToken())) ? ? ? ? ? ? ? ? ? ? .serviceUrl(pulsarProperties.getServiceurl()) ? ? ? ? ? ? ? ? ? ? .build(); ? ? ? ? } catch (PulsarClientException e) { ? ? ? ? ? ? System.out.println(e); ? ? ? ? ? ? throw new RuntimeException("初始化Pulsar Client失敗"); ? ? ? ? } ? ? } }
四、不同消費(fèi)數(shù)據(jù)類型的監(jiān)聽器
import com.yibo.pulsar.pojo.User; import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageListener; import org.springframework.stereotype.Component; /** ?* @Author: huangyibo ?* @Date: 2022/5/28 2:37 ?* @Description: ?*/ @Component public class UserMessageListener implements MessageListener<User> { ? ? @Override ? ? public void received(Consumer<User> consumer, Message<User> msg) { ? ? ? ? try { ? ? ? ? ? ? User user = msg.getValue(); ? ? ? ? ? ? System.out.println(user); ? ? ? ? ? ? consumer.acknowledge(msg); ? ? ? ? } catch (Exception e) { ? ? ? ? ? ? consumer.negativeAcknowledge(msg); ? ? ? ? } ? ? } } import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.MessageListener; import org.springframework.stereotype.Component; /** ?* @Author: huangyibo ?* @Date: 2022/5/28 2:37 ?* @Description: ?*/ @Component public class StringMessageListener implements MessageListener<String> { ? ? @Override ? ? public void received(Consumer<String> consumer, Message<String> msg) { ? ? ? ? try { ? ? ? ? ? ? System.out.println(msg.getValue()); ? ? ? ? ? ? consumer.acknowledge(msg); ? ? ? ? } catch (Exception e) { ? ? ? ? ? ? consumer.negativeAcknowledge(msg); ? ? ? ? } ? ? } }
五、Pulsar的核心服務(wù)類
import com.yibo.pulsar.common.listener.StringMessageListener; import com.yibo.pulsar.common.listener.UserMessageListener; import com.yibo.pulsar.pojo.User; import org.apache.pulsar.client.api.*; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import java.util.concurrent.TimeUnit; /** ?* @Author: huangyibo ?* @Date: 2022/5/28 2:35 ?* @Description: Pulsar的核心服務(wù)類 ?*/ @Component public class PulsarCommon { ? ? @Autowired ? ? private PulsarProperties pulsarProperties; ? ? @Autowired ? ? private PulsarClient client; ? ? @Autowired ? ? private UserMessageListener userMessageListener; ? ? @Autowired ? ? private StringMessageListener stringMessageListener; ? ? /** ? ? ?* 創(chuàng)建一個生產(chǎn)者? ? ? ?* @param topic ? ? topic name ? ? ?* @param schema ? ?schema方式 ? ? ?* @param <T> ? ? ? 泛型 ? ? ?* @return ? ? ? ? ?Producer生產(chǎn)者 ? ? ?*/ ? ? public <T> Producer<T> createProducer(String topic, Schema<T> schema) { ? ? ? ? try { ? ? ? ? ? ? return client.newProducer(schema) ? ? ? ? ? ? ? ? ? ? .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic) ? ? ? ? ? ? ? ? ? ? .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS) ? ? ? ? ? ? ? ? ? ? .sendTimeout(10, TimeUnit.SECONDS) ? ? ? ? ? ? ? ? ? ? .blockIfQueueFull(true) ? ? ? ? ? ? ? ? ? ? .create(); ? ? ? ? } catch (PulsarClientException e) { ? ? ? ? ? ? throw new RuntimeException("初始化Pulsar Producer失敗"); ? ? ? ? } ? ? } ? ? /** ? ? ?*? ? ? ?* @param topic ? ? ? ? ? ? topic name ? ? ?* @param subscription ? ? ?sub name ? ? ?* @param messageListener ? MessageListener的自定義實(shí)現(xiàn)類 ? ? ?* @param schema ? ? ? ? ? ?schema消費(fèi)方式 ? ? ?* @param <T> ? ? ? ? ? ? ? 泛型 ? ? ?* @return ? ? ? ? ? ? ? ? ?Consumer消費(fèi)者 ? ? ?*/ ? ? public <T> Consumer<T> createConsumer(String topic, String subscription, ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?MessageListener<T> messageListener, Schema<T> schema) { ? ? ? ? try { ? ? ? ? ? ? return client.newConsumer(schema) ? ? ? ? ? ? ? ? ? ? .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic) ? ? ? ? ? ? ? ? ? ? .subscriptionName(subscription) ? ? ? ? ? ? ? ? ? ? .ackTimeout(10, TimeUnit.SECONDS) ? ? ? ? ? ? ? ? ? ? .subscriptionType(SubscriptionType.Shared) ? ? ? ? ? ? ? ? ? ? .messageListener(messageListener) ? ? ? ? ? ? ? ? ? ? .subscribe(); ? ? ? ? } catch (PulsarClientException e) { ? ? ? ? ? ? throw new RuntimeException("初始化Pulsar Consumer失敗"); ? ? ? ? } ? ? } ? ?? ? ? /** ? ? ?* 異步發(fā)送一條消息 ? ? ?* @param message ? ? ? 消息體 ? ? ?* @param producer ? ? ?生產(chǎn)者實(shí)例 ? ? ?* @param <T> ? ? ? ? ? 消息泛型 ? ? ?*/ ? ? public <T> void sendAsyncMessage(T message, Producer<T> producer) { ? ? ? ? producer.sendAsync(message).thenAccept(msgId -> { ? ? ? ? }); ? ? } ? ?? ? ?? ? ? /** ? ? ?* 同步發(fā)送一條消息 ? ? ?* @param message ? ? ? 消息體 ? ? ?* @param producer ? ? ?生產(chǎn)者實(shí)例 ? ? ?* @param <T> ? ? ? ? ? 泛型 ? ? ?* @throws PulsarClientException ? ? ?*/ ? ? public <T> void sendSyncMessage(T message, Producer<T> producer) throws PulsarClientException { ? ? ? ? MessageId send = producer.send(message); ? ? ? ? System.out.println(); ? ? ? ? System.out.println(); ? ? ? ? System.out.println(); ? ? ? ? System.out.println(); ? ? ? ? System.out.println(send); ? ? } ? ?? ? ? //-----------consumer----------- ? ? @Bean(name = "comment-publish-topic-consumer") ? ? public Consumer<String> getCommentPublishTopicConsumer() { ? ? ? ? return this.createConsumer(pulsarProperties.getTopicMap().get("comment-publish-topic"), ? ? ? ? ? ? ? ? pulsarProperties.getSubMap().get("comment-publish-topic-test"), ? ? ? ? ? ? ? ? stringMessageListener, Schema.STRING); ? ? } ? ? @Bean(name = "reply-publish-topic-consumer") ? ? public Consumer<User> getReplyPublishTopicConsumer() { ? ? ? ? return this.createConsumer(pulsarProperties.getTopicMap().get("reply-publish-topic"), ? ? ? ? ? ? ? ? pulsarProperties.getSubMap().get("reply-publish-topic-test"), ? ? ? ? ? ? ? ? userMessageListener, AvroSchema.of(User.class)); ? ? } ? ? //-----------producer----------- ? ? @Bean(name = "comment-publish-topic-producer") ? ? public Producer<String> getCommentPublishTopicProducer() { ? ? ? ? return this.createProducer(pulsarProperties.getTopicMap().get("comment-publish-topic"),Schema.STRING); ? ? } ? ? @Bean(name = "reply-publish-topic-producer") ? ? public Producer<User> getReplyPublishTopicProducer() { ? ? ? ? return this.createProducer(pulsarProperties.getTopicMap().get("reply-publish-topic"), AvroSchema.of(User.class)); ? ? } }
六、Pulsar整合Spring Cloud
后來發(fā)現(xiàn)如上代碼會導(dǎo)致BUG-> 在更新Nacos配置之后 Consumer會掛掉
經(jīng)排查發(fā)現(xiàn)結(jié)果是由于@RefreshScope注解導(dǎo)致,此注解將摧毀Bean,PulsarConsumer和Producer都將被摧毀,只是說Producer將在下?次調(diào)?中完成重啟,Consumer則不能重啟,因?yàn)闆]有調(diào)?,那么怎么解決呢?
就是發(fā)布系列事件以刷新容器
import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationEvent; import org.springframework.context.ApplicationListener; import org.springframework.stereotype.Component; /** ?* @Author: huangyibo ?* @Date: 2022/5/28 2:34 ?* @Description: ?*/ @Component @Slf4j public class RefreshPulsarListener implements ApplicationListener { ? ? @Autowired ? ? ApplicationContext applicationContext; ? ? @Override ? ? public void onApplicationEvent(ApplicationEvent event) { ? ? ? ? if (event.getSource().equals("__refreshAll__")) { ? ? ? ? ? ? log.info("Nacos配置中心配置修改 重啟Pulsar===================================="); ? ? ? ? ? ? log.info("重啟PulsarClient,{}", applicationContext.getBean("getPulsarClient")); ? ? ? ? ? ? log.info("重啟PulsarConsumer,{}", applicationContext.getBean("comment-publish-topic-consumer")); ? ? ? ? ? ? log.info("重啟PulsarConsumer,{}", applicationContext.getBean("reply-publish-topic-consumer")); ? ? ? ? } ? ? } }
參考:
https://wenku.baidu.com/view/4d3337ab6b0203d8ce2f0066f5335a8102d266a7.html
https://gitee.com/zhaoyuxuan66/pulsar-springcloud_boot-demo/tree/master/
https://blog.csdn.net/weixin_56227932/article/details/122897075
http://www.zzvips.com/article/219361.html
https://mp.weixin.qq.com/s/4w0eucDNcrYrsiDXHzLwuQ
到此這篇關(guān)于SpringBoot整合Pulsar的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)SpringBoot整合Pulsar內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java線程實(shí)現(xiàn)的三種方式詳細(xì)解析
這篇文章主要介紹了Java線程實(shí)現(xiàn)的三種方式詳細(xì)解析,Java多線程實(shí)現(xiàn)方式主要有三種,繼承Thread類、實(shí)現(xiàn)Runnable接口、使用ExecutorService、Callable、Future實(shí)現(xiàn)有返回結(jié)果的多線程,需要的朋友可以參考下2023-12-12MyBatis整合Redis實(shí)現(xiàn)二級緩存的示例代碼
這篇文章主要介紹了MyBatis整合Redis實(shí)現(xiàn)二級緩存的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08Spring與Struts整合之讓Spring管理控制器操作示例
這篇文章主要介紹了Spring與Struts整合之讓Spring管理控制器操作,結(jié)合實(shí)例形式詳細(xì)分析了Spring管理控制器相關(guān)配置、接口實(shí)現(xiàn)與使用技巧,需要的朋友可以參考下2020-01-01springBoot?啟動指定配置文件環(huán)境多種方案(最新推薦)
springBoot?啟動指定配置文件環(huán)境理論上是有多種方案的,一般都是結(jié)合我們的實(shí)際業(yè)務(wù)選擇不同的方案,比如,有pom.xml文件指定、maven命令行指定、配置文件指定、啟動jar包時指定等方案,今天我們一一分享一下,需要的朋友可以參考下2023-09-09springboot整合vue項(xiàng)目(小試牛刀)
這篇文章主要介紹了springboot整合vue項(xiàng)目(小試牛刀),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2018-09-09Java中使用Preconditions來檢查傳入?yún)?shù)介紹
這篇文章主要介紹了Java中使用Preconditions來檢查傳入?yún)?shù)介紹,本文只是作為一個簡單的用法介紹,需要的朋友可以參考下2015-06-06在RedisTemplate中使用scan代替keys指令操作
這篇文章主要介紹了在RedisTemplate中使用scan代替keys指令操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧2020-11-11