SpringCloud Bus消息總線的實現(xiàn)
好了現(xiàn)在我們接著上一篇的隨筆,繼續(xù)來講。上一篇我們講到,我們如果要去更新所有微服務的配置,在不重啟的情況下去更新配置,只能依靠spring cloud config了,但是,是我們要一個服務一個服務的發(fā)送post請求,
我們能受的了嗎?這比之前的沒配置中心好多了,那么我們如何繼續(xù)避免挨個挨個的向服務發(fā)送Post請求來告知服務,你的配置信息改變了,需要及時修改內存中的配置信息。
這時候我們就不要忘記消息隊列的發(fā)布訂閱模型。讓所有為服務來訂閱這個事件,當這個事件發(fā)生改變了,就可以通知所有微服務去更新它們的內存中的配置信息。這時Bus消息總線就能解決,你只需要在springcloud Config Server端發(fā)出refresh,就可以觸發(fā)所有微服務更新了。
如下架構圖所示:
Spring Cloud Bus除了支持RabbitMQ的自動化配置之外,還支持現(xiàn)在被廣泛應用的Kafka。在本文中,我們將搭建一個Kafka的本地環(huán)境,并通過它來嘗試使用Spring Cloud Bus對Kafka的支持,實現(xiàn)消息總線的功能。
Kafka使用Scala實現(xiàn),被用作LinkedIn的活動流和運營數(shù)據(jù)處理的管道,現(xiàn)在也被諸多互聯(lián)網(wǎng)企業(yè)廣泛地用作為數(shù)據(jù)流管道和消息系統(tǒng)。
Kafak架構圖如下:
Kafka是基于消息發(fā)布/訂閱模式實現(xiàn)的消息系統(tǒng),其主要設計目標如下:
1.消息持久化:以時間復雜度為O(1)的方式提供消息持久化能力,即使對TB級以上數(shù)據(jù)也能保證常數(shù)時間復雜度的訪問性能。
2.高吞吐:在廉價的商用機器上也能支持單機每秒100K條以上的吞吐量
3.分布式:支持消息分區(qū)以及分布式消費,并保證分區(qū)內的消息順序
4.跨平臺:支持不同技術平臺的客戶端(如:Java、PHP、Python等)
5.實時性:支持實時數(shù)據(jù)處理和離線數(shù)據(jù)處理
6.伸縮性:支持水平擴展
Kafka中涉及的一些基本概念:
1.Broker:Kafka集群包含一個或多個服務器,這些服務器被稱為Broker。
2.Topic:邏輯上同Rabbit的Queue隊列相似,每條發(fā)布到Kafka集群的消息都必須有一個Topic。(物理上不同Topic的消息分開存儲,邏輯上一個Topic的消息雖然保存于一個或多個Broker上,但用戶只需指定消息的Topic即可生產(chǎn)或消費數(shù)據(jù)而不必關心數(shù)據(jù)存于何處)
3.Partition:Partition是物理概念上的分區(qū),為了提供系統(tǒng)吞吐率,在物理上每個Topic會分成一個或多個Partition,每個Partition對應一個文件夾(存儲對應分區(qū)的消息內容和索引文件)。
4.Producer:消息生產(chǎn)者,負責生產(chǎn)消息并發(fā)送到Kafka Broker。
5.Consumer:消息消費者,向Kafka Broker讀取消息并處理的客戶端。
6.Consumer Group:每個Consumer屬于一個特定的組(可為每個Consumer指定屬于一個組,若不指定則屬于默認組),組可以用來實現(xiàn)一條消息被組內多個成員消費等功能。
可以從kafka的架構圖看到Kafka是需要Zookeeper支持的,你需要在你的Kafka配置里面指定Zookeeper在哪里,它是通過Zookeeper做一些可靠性的保證,做broker的主從,我們還要知道Kafka的消息是以topic形式作為組織的,Producers發(fā)送topic形式的消息,Consumer是按照組來分的,所以,一組Consumers都會都要同樣的topic形式的消息。在服務端,它還做了一些分片,那么一個Topic可能分布在不同的分片上面,方便我們拓展部署多個機器,Kafka是天生分布式的。這里為了演示,我們只需要用它的默認配置,在windows上做個小Demo即可。
我們這里主要針對Spring Cloud Bus對Kafka的支持,實現(xiàn)消息總線的功能,具體的Kafka,RabbitMQ消息隊列希望自己去找資料來學習一下。有了一些概念的支持后,我們進行一些Demo。
如下:首先新建一個springCloud-config-client1模塊,方便我們進行測試所引入的依賴如下:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-config</artifactId> <version>1.4.0.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-eureka</artifactId> <version>1.3.5.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-kafka</artifactId> <version>1.3.2.RELEASE</version> </dependency>
接著要注意一下,client1的配置文件要改為bootstrap.yml,因為這種配置格式,是優(yōu)先加載的,上一篇隨筆有講過,client1的配置如下:
server: port: 7006 spring: application: name: cloud-config cloud: config: #啟動什么環(huán)境下的配置,dev 表示開發(fā)環(huán)境,這跟你倉庫的文件的后綴有關,比如,倉庫配置文件命名格式是cloud-config-dev.properties,所以profile 就要寫dev profile: dev discovery: enabled: true #這個名字是Config Server端的服務名字,不能瞎寫。 service-id: config-server #注冊中心 eureka: client: service-url: defaultZone: http://localhost:8888/eureka/,http://localhost:8889/eureka/ #是否需要權限拉去,默認是true,如果不false就不允許你去拉取配置中心Server更新的內容 management: security: enabled: false
接著啟動類如下:
@SpringBootApplication @EnableDiscoveryClient public class Client1Application { public static void main(String[] args) { SpringApplication.run(Client1Application.class, args); } }
接著將client中的TestController賦值一份到client1中,代碼如下:
@RestController //這里面的屬性有可能會更新的,git中的配置中心變化的話就要刷新,沒有這個注解內,配置就不能及時更新 @RefreshScope public class TestController { @Value("${name}") private String name; @Value("${age}") private Integer age; @RequestMapping("/test") public String test(){ return this.name+this.age; } }
接著還要在先前的隨筆中的模塊中的Config Server加入如下配置:
#是否需要權限拉去,默認是true,如果不false就不允許你去拉取配置中心Server更新的內容 management: security: enabled: false
接著還要做一點就是,在config-client,config-client1,和config-Server都要引入kafka的依賴,如下:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-bus-kafka</artifactId> <version>1.3.2.RELEASE</version> </dependency>
我們工程準備好了,暫時先放在這里,下面進行Kafka的安裝下載,首先我們去Kafka官網(wǎng)kafka.apache.org/downloads 下來官網(wǎng)推薦的版本,
首先我們進到下載好的Kafka目錄中kafka_2.11-1.1.0\bin\windows 下編輯kafka-run-class.bat如下:
找到這條配置 如下:
可以看到%CLASSPATH%沒有雙引號,
因此用雙引號括起來,不然啟動不起來的,報你JDK沒安裝好,修改后如下:
接著,打開config文件夾中的server.properties配置如下:
可以看到是連接到本地的zookeeper就行了。
接著我們進行先啟動zookeeper,再啟動Kafka,如下:
當看到上面的信息證明啟動Zookeeper啟動成功。、
接下來再開一個CMD啟動Kafka,如下:
看到這些信息說明Kafka啟動成功了
好了,接下來把前面的工程,兩個注冊中心,一個springcloud-config-server,兩個springcloud-config-client,springcloud-config-client1啟動起來,
可以看到springcloudBus是在0分片上,如果兩個config-client啟動都出現(xiàn)上面信息,證明啟動成功了。
好了現(xiàn)在我們進行訪問一下config-server端,如下:
再訪問兩個client,如下:
好了,好戲開始了,現(xiàn)在我們去git倉庫上修改配置中心的文件,將年齡改為24,如下:
接下來,我們我們用refresh刷新配置服務端配置,通知兩個client去更新內存中的配置信息。用postman發(fā)送localhost:7000/bus/refresh,如下:
可以看到?jīng)]有返回什么信息,但是不要擔心,這是成功的通知所有client去更新了內存中的信息了。
接著我們分別重新請求config-server,兩個client,刷新頁面,結果如下:
兩個client如下:
可以看到所有client自動更新內存中的配置信息了。
到目前為止,上面都是刷新說有的配置的信息的,如果我們想刷新某個特定服務的配置信息也是可以的。我們可以指定刷新范圍,如下:
指定刷新范圍
上面的例子中,我們通過向服務實例請求Spring Cloud Bus的/bus/refresh
接口,從而觸發(fā)總線上其他服務實例的/refresh
。但是有些特殊場景下(比如:灰度發(fā)布),我們希望可以刷新微服務中某個具體實例的配置。
Spring Cloud Bus對這種場景也有很好的支持:/bus/refresh
接口還提供了destination
參數(shù),用來定位具體要刷新的應用程序。比如,我們可以請求/bus/refresh?destination=服務名字:9000
,此時總線上的各應用實例會根據(jù)destination
屬性的值來判斷是否為自己的實例名,
若符合才進行配置刷新,若不符合就忽略該消息。
destination
參數(shù)除了可以定位具體的實例之外,還可以用來定位具體的服務。定位服務的原理是通過使用Spring的PathMatecher(路徑匹配)來實現(xiàn),比如:/bus/refresh?destination=customers:**
,該請求會觸發(fā)customers
服務的所有實例進行刷新。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持腳本之家。
相關文章
Java語言----三種循環(huán)語句的區(qū)別介紹
下面小編就為大家?guī)硪黄狫ava語言----三種循環(huán)語句的區(qū)別介紹。小編舉得挺不錯的,現(xiàn)在就分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2016-07-07【spring-boot】快速構建spring-boot微框架的方法
本篇文章主要介紹了【spring-boot】快速構建spring-boot微框架的方法,小編覺得挺不錯的,現(xiàn)在分享給大家,也給大家做個參考。一起跟隨小編過來看看吧2017-12-12詳解J2EE開發(fā)的網(wǎng)站部署到阿里云服務器的方法
這篇文章主要介紹了詳解J2EE開發(fā)的網(wǎng)站部署到阿里云服務器的方法,需要的朋友可以參考下2018-01-01Spring?Cloud?Alibaba使用Nacos作為注冊中心和配置中心
這篇文章主要為大家介紹了Spring?Cloud?Alibaba使用Nacos作為注冊中心和配置中心的示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-06-06