亚洲乱码中文字幕综合,中国熟女仑乱hd,亚洲精品乱拍国产一区二区三区,一本大道卡一卡二卡三乱码全集资源,又粗又黄又硬又爽的免费视频

SpringBoot整合Pulsar的實(shí)現(xiàn)示例

 更新時間:2022年07月01日 16:33:32   作者:小波同學(xué)  
本文主要介紹了SpringBoot整合Pulsar的實(shí)現(xiàn)示例,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧

一、添加pom.xml依賴

<parent>
? ? <groupId>org.springframework.boot</groupId>
? ? <artifactId>spring-boot-starter-parent</artifactId>
? ? <version>2.7.0</version>
</parent>

<dependencies>
? ? <dependency>
? ? ? ? <groupId>org.springframework.boot</groupId>
? ? ? ? <artifactId>spring-boot-starter-web</artifactId>
? ? </dependency>

? ? <dependency>
? ? ? ? <groupId>org.apache.pulsar</groupId>
? ? ? ? <artifactId>pulsar-client</artifactId>
? ? ? ? <version>2.10.0</version>
? ? </dependency>

? ? <dependency>
? ? ? ? <groupId>org.projectlombok</groupId>
? ? ? ? <artifactId>lombok</artifactId>
? ? ? ? <version>1.18.24</version>
? ? ? ? <scope>provided</scope>
? ? </dependency>
</dependencies>

<build>
? ? <plugins>
? ? ? ? <plugin>
? ? ? ? ? ? <groupId>org.apache.maven.plugins</groupId>
? ? ? ? ? ? <artifactId>maven-compiler-plugin</artifactId>
? ? ? ? ? ? <configuration>
? ? ? ? ? ? ? ? <source>8</source>
? ? ? ? ? ? ? ? <target>8</target>
? ? ? ? ? ? </configuration>
? ? ? ? </plugin>
? ? </plugins>
</build> ? ?

二、Pulsar 參數(shù)類

import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

import java.util.Map;

/**
?* @Author: huangyibo
?* @Date: 2022/5/28 2:32
?* @Description: Pulsar 參數(shù)類
?*/

@Component
@ConfigurationProperties(prefix = "tdmq.pulsar")
@Data
public class PulsarProperties {

? ? /**
? ? ?* 接入地址
? ? ?*/
? ? private String serviceurl;

? ? /**
? ? ?* 命名空間tdc
? ? ?*/
? ? private String tdcNamespace;

? ? /**
? ? ?* 角色tdc的token
? ? ?*/
? ? private String tdcToken;

? ? /**
? ? ?* 集群name
? ? ?*/
? ? private String cluster;

? ? /**
? ? ?* topicMap
? ? ?*/
? ? private Map<String, String> topicMap;

? ? /**
? ? ?* 訂閱
? ? ?*/
? ? private Map<String, String> subMap;

? ? /**
? ? ?* 開關(guān) on:Consumer可用 ||||| off:Consumer斷路
? ? ?*/
? ? private String onOff;
}

三、Pulsar 配置類

import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
?* @Author: huangyibo
?* @Date: 2022/5/28 2:33
?* @Description: Pulsar 配置類
?*/

@Configuration
@EnableConfigurationProperties(PulsarProperties.class)
public class PulsarConfig {

? ? @Autowired
? ? PulsarProperties pulsarProperties;

? ? @Bean
? ? public PulsarClient getPulsarClient() {

? ? ? ? try {
? ? ? ? ? ? return PulsarClient.builder()
? ? ? ? ? ? ? ? ? ? .authentication(AuthenticationFactory.token(pulsarProperties.getTdcToken()))
? ? ? ? ? ? ? ? ? ? .serviceUrl(pulsarProperties.getServiceurl())
? ? ? ? ? ? ? ? ? ? .build();
? ? ? ? } catch (PulsarClientException e) {
? ? ? ? ? ? System.out.println(e);
? ? ? ? ? ? throw new RuntimeException("初始化Pulsar Client失敗");
? ? ? ? }
? ? }

}

四、不同消費(fèi)數(shù)據(jù)類型的監(jiān)聽器

import com.yibo.pulsar.pojo.User;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.springframework.stereotype.Component;

/**
?* @Author: huangyibo
?* @Date: 2022/5/28 2:37
?* @Description:
?*/

@Component
public class UserMessageListener implements MessageListener<User> {

? ? @Override
? ? public void received(Consumer<User> consumer, Message<User> msg) {
? ? ? ? try {
? ? ? ? ? ? User user = msg.getValue();
? ? ? ? ? ? System.out.println(user);
? ? ? ? ? ? consumer.acknowledge(msg);
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? consumer.negativeAcknowledge(msg);
? ? ? ? }
? ? }
}
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.springframework.stereotype.Component;

/**
?* @Author: huangyibo
?* @Date: 2022/5/28 2:37
?* @Description:
?*/

@Component
public class StringMessageListener implements MessageListener<String> {

? ? @Override
? ? public void received(Consumer<String> consumer, Message<String> msg) {
? ? ? ? try {
? ? ? ? ? ? System.out.println(msg.getValue());
? ? ? ? ? ? consumer.acknowledge(msg);
? ? ? ? } catch (Exception e) {
? ? ? ? ? ? consumer.negativeAcknowledge(msg);
? ? ? ? }
? ? }
}

五、Pulsar的核心服務(wù)類

import com.yibo.pulsar.common.listener.StringMessageListener;
import com.yibo.pulsar.common.listener.UserMessageListener;
import com.yibo.pulsar.pojo.User;
import org.apache.pulsar.client.api.*;
import org.apache.pulsar.client.impl.schema.AvroSchema;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

/**
?* @Author: huangyibo
?* @Date: 2022/5/28 2:35
?* @Description: Pulsar的核心服務(wù)類
?*/

@Component
public class PulsarCommon {

? ? @Autowired
? ? private PulsarProperties pulsarProperties;

? ? @Autowired
? ? private PulsarClient client;

? ? @Autowired
? ? private UserMessageListener userMessageListener;

? ? @Autowired
? ? private StringMessageListener stringMessageListener;


? ? /**
? ? ?* 創(chuàng)建一個生產(chǎn)者?
? ? ?* @param topic ? ? topic name
? ? ?* @param schema ? ?schema方式
? ? ?* @param <T> ? ? ? 泛型
? ? ?* @return ? ? ? ? ?Producer生產(chǎn)者
? ? ?*/
? ? public <T> Producer<T> createProducer(String topic, Schema<T> schema) {

? ? ? ? try {
? ? ? ? ? ? return client.newProducer(schema)
? ? ? ? ? ? ? ? ? ? .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)
? ? ? ? ? ? ? ? ? ? .batchingMaxPublishDelay(10, TimeUnit.MILLISECONDS)
? ? ? ? ? ? ? ? ? ? .sendTimeout(10, TimeUnit.SECONDS)
? ? ? ? ? ? ? ? ? ? .blockIfQueueFull(true)
? ? ? ? ? ? ? ? ? ? .create();
? ? ? ? } catch (PulsarClientException e) {
? ? ? ? ? ? throw new RuntimeException("初始化Pulsar Producer失敗");
? ? ? ? }
? ? }


? ? /**
? ? ?*?
? ? ?* @param topic ? ? ? ? ? ? topic name
? ? ?* @param subscription ? ? ?sub name
? ? ?* @param messageListener ? MessageListener的自定義實(shí)現(xiàn)類
? ? ?* @param schema ? ? ? ? ? ?schema消費(fèi)方式
? ? ?* @param <T> ? ? ? ? ? ? ? 泛型
? ? ?* @return ? ? ? ? ? ? ? ? ?Consumer消費(fèi)者
? ? ?*/
? ? public <T> Consumer<T> createConsumer(String topic, String subscription,
? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ?MessageListener<T> messageListener, Schema<T> schema) {
? ? ? ? try {
? ? ? ? ? ? return client.newConsumer(schema)
? ? ? ? ? ? ? ? ? ? .topic(pulsarProperties.getCluster() + "/" + pulsarProperties.getTdcNamespace() + "/" + topic)
? ? ? ? ? ? ? ? ? ? .subscriptionName(subscription)
? ? ? ? ? ? ? ? ? ? .ackTimeout(10, TimeUnit.SECONDS)
? ? ? ? ? ? ? ? ? ? .subscriptionType(SubscriptionType.Shared)
? ? ? ? ? ? ? ? ? ? .messageListener(messageListener)
? ? ? ? ? ? ? ? ? ? .subscribe();
? ? ? ? } catch (PulsarClientException e) {
? ? ? ? ? ? throw new RuntimeException("初始化Pulsar Consumer失敗");
? ? ? ? }
? ? }

? ??
? ? /**
? ? ?* 異步發(fā)送一條消息
? ? ?* @param message ? ? ? 消息體
? ? ?* @param producer ? ? ?生產(chǎn)者實(shí)例
? ? ?* @param <T> ? ? ? ? ? 消息泛型
? ? ?*/
? ? public <T> void sendAsyncMessage(T message, Producer<T> producer) {
? ? ? ? producer.sendAsync(message).thenAccept(msgId -> {
? ? ? ? });
? ? }
? ??
? ??
? ? /**
? ? ?* 同步發(fā)送一條消息
? ? ?* @param message ? ? ? 消息體
? ? ?* @param producer ? ? ?生產(chǎn)者實(shí)例
? ? ?* @param <T> ? ? ? ? ? 泛型
? ? ?* @throws PulsarClientException
? ? ?*/
? ? public <T> void sendSyncMessage(T message, Producer<T> producer) throws PulsarClientException {
? ? ? ? MessageId send = producer.send(message);
? ? ? ? System.out.println();
? ? ? ? System.out.println();
? ? ? ? System.out.println();
? ? ? ? System.out.println();
? ? ? ? System.out.println(send);
? ? }

? ??
? ? //-----------consumer-----------
? ? @Bean(name = "comment-publish-topic-consumer")
? ? public Consumer<String> getCommentPublishTopicConsumer() {
? ? ? ? return this.createConsumer(pulsarProperties.getTopicMap().get("comment-publish-topic"),
? ? ? ? ? ? ? ? pulsarProperties.getSubMap().get("comment-publish-topic-test"),
? ? ? ? ? ? ? ? stringMessageListener, Schema.STRING);
? ? }


? ? @Bean(name = "reply-publish-topic-consumer")
? ? public Consumer<User> getReplyPublishTopicConsumer() {
? ? ? ? return this.createConsumer(pulsarProperties.getTopicMap().get("reply-publish-topic"),
? ? ? ? ? ? ? ? pulsarProperties.getSubMap().get("reply-publish-topic-test"),
? ? ? ? ? ? ? ? userMessageListener, AvroSchema.of(User.class));
? ? }


? ? //-----------producer-----------
? ? @Bean(name = "comment-publish-topic-producer")
? ? public Producer<String> getCommentPublishTopicProducer() {
? ? ? ? return this.createProducer(pulsarProperties.getTopicMap().get("comment-publish-topic"),Schema.STRING);
? ? }


? ? @Bean(name = "reply-publish-topic-producer")
? ? public Producer<User> getReplyPublishTopicProducer() {
? ? ? ? return this.createProducer(pulsarProperties.getTopicMap().get("reply-publish-topic"), AvroSchema.of(User.class));
? ? }
}

六、Pulsar整合Spring Cloud

后來發(fā)現(xiàn)如上代碼會導(dǎo)致BUG-> 在更新Nacos配置之后 Consumer會掛掉
經(jīng)排查發(fā)現(xiàn)結(jié)果是由于@RefreshScope注解導(dǎo)致,此注解將摧毀Bean,PulsarConsumer和Producer都將被摧毀,只是說Producer將在下?次調(diào)?中完成重啟,Consumer則不能重啟,因?yàn)闆]有調(diào)?,那么怎么解決呢?

就是發(fā)布系列事件以刷新容器

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

/**
?* @Author: huangyibo
?* @Date: 2022/5/28 2:34
?* @Description:
?*/

@Component
@Slf4j
public class RefreshPulsarListener implements ApplicationListener {

? ? @Autowired
? ? ApplicationContext applicationContext;

? ? @Override
? ? public void onApplicationEvent(ApplicationEvent event) {

? ? ? ? if (event.getSource().equals("__refreshAll__")) {
? ? ? ? ? ? log.info("Nacos配置中心配置修改 重啟Pulsar====================================");
? ? ? ? ? ? log.info("重啟PulsarClient,{}", applicationContext.getBean("getPulsarClient"));
? ? ? ? ? ? log.info("重啟PulsarConsumer,{}", applicationContext.getBean("comment-publish-topic-consumer"));
? ? ? ? ? ? log.info("重啟PulsarConsumer,{}", applicationContext.getBean("reply-publish-topic-consumer"));
? ? ? ? }
? ? }

}

參考:

https://wenku.baidu.com/view/4d3337ab6b0203d8ce2f0066f5335a8102d266a7.html

https://gitee.com/zhaoyuxuan66/pulsar-springcloud_boot-demo/tree/master/

https://blog.csdn.net/weixin_56227932/article/details/122897075

http://www.zzvips.com/article/219361.html

https://mp.weixin.qq.com/s/4w0eucDNcrYrsiDXHzLwuQ

到此這篇關(guān)于SpringBoot整合Pulsar的實(shí)現(xiàn)示例的文章就介紹到這了,更多相關(guān)SpringBoot整合Pulsar內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!

相關(guān)文章

  • Java線程實(shí)現(xiàn)的三種方式詳細(xì)解析

    Java線程實(shí)現(xiàn)的三種方式詳細(xì)解析

    這篇文章主要介紹了Java線程實(shí)現(xiàn)的三種方式詳細(xì)解析,Java多線程實(shí)現(xiàn)方式主要有三種,繼承Thread類、實(shí)現(xiàn)Runnable接口、使用ExecutorService、Callable、Future實(shí)現(xiàn)有返回結(jié)果的多線程,需要的朋友可以參考下
    2023-12-12
  • MyBatis整合Redis實(shí)現(xiàn)二級緩存的示例代碼

    MyBatis整合Redis實(shí)現(xiàn)二級緩存的示例代碼

    這篇文章主要介紹了MyBatis整合Redis實(shí)現(xiàn)二級緩存的示例代碼,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友們下面隨著小編來一起學(xué)習(xí)學(xué)習(xí)吧
    2020-08-08
  • Spring與Struts整合之讓Spring管理控制器操作示例

    Spring與Struts整合之讓Spring管理控制器操作示例

    這篇文章主要介紹了Spring與Struts整合之讓Spring管理控制器操作,結(jié)合實(shí)例形式詳細(xì)分析了Spring管理控制器相關(guān)配置、接口實(shí)現(xiàn)與使用技巧,需要的朋友可以參考下
    2020-01-01
  • java設(shè)計(jì)模式-組合模式詳解

    java設(shè)計(jì)模式-組合模式詳解

    這篇文章主要介紹了JAVA設(shè)計(jì)模式之組合模式,簡單說明了組合模式的原理,并結(jié)合實(shí)例分析了java組合模式的具體用法,需要的朋友可以參考下
    2021-07-07
  • Java實(shí)現(xiàn)汽車租賃系統(tǒng)

    Java實(shí)現(xiàn)汽車租賃系統(tǒng)

    這篇文章介紹了Java實(shí)現(xiàn)汽車租賃系統(tǒng)的方法,文中通過示例代碼介紹的非常詳細(xì)。對大家的學(xué)習(xí)或工作具有一定的參考借鑒價值,需要的朋友可以參考下
    2022-03-03
  • springBoot?啟動指定配置文件環(huán)境多種方案(最新推薦)

    springBoot?啟動指定配置文件環(huán)境多種方案(最新推薦)

    springBoot?啟動指定配置文件環(huán)境理論上是有多種方案的,一般都是結(jié)合我們的實(shí)際業(yè)務(wù)選擇不同的方案,比如,有pom.xml文件指定、maven命令行指定、配置文件指定、啟動jar包時指定等方案,今天我們一一分享一下,需要的朋友可以參考下
    2023-09-09
  • springboot整合vue項(xiàng)目(小試牛刀)

    springboot整合vue項(xiàng)目(小試牛刀)

    這篇文章主要介紹了springboot整合vue項(xiàng)目(小試牛刀),小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧
    2018-09-09
  • Java中使用Preconditions來檢查傳入?yún)?shù)介紹

    Java中使用Preconditions來檢查傳入?yún)?shù)介紹

    這篇文章主要介紹了Java中使用Preconditions來檢查傳入?yún)?shù)介紹,本文只是作為一個簡單的用法介紹,需要的朋友可以參考下
    2015-06-06
  • 在RedisTemplate中使用scan代替keys指令操作

    在RedisTemplate中使用scan代替keys指令操作

    這篇文章主要介紹了在RedisTemplate中使用scan代替keys指令操作,具有很好的參考價值,希望對大家有所幫助。一起跟隨小編過來看看吧
    2020-11-11
  • Java多線程的用法詳細(xì)介紹

    Java多線程的用法詳細(xì)介紹

    這篇文章主要介紹了Java多線程的用法詳細(xì)介紹的相關(guān)資料,希望通過本文能幫助到大家,需要的朋友可以參考下
    2017-09-09

最新評論