詳解Spring Cloud中Hystrix的請(qǐng)求合并
在微服務(wù)架構(gòu)中,我們將一個(gè)項(xiàng)目拆分成很多個(gè)獨(dú)立的模塊,這些獨(dú)立的模塊通過(guò)遠(yuǎn)程調(diào)用來(lái)互相配合工作,但是,在高并發(fā)情況下,通信次數(shù)的增加會(huì)導(dǎo)致總的通信時(shí)間增加,同時(shí),線程池的資源也是有限的,高并發(fā)環(huán)境會(huì)導(dǎo)致有大量的線程處于等待狀態(tài),進(jìn)而導(dǎo)致響應(yīng)延遲,為了解決這些問(wèn)題,我們需要來(lái)了解Hystrix的請(qǐng)求合并。
Hystrix中的請(qǐng)求合并,就是利用一個(gè)合并處理器,將對(duì)同一個(gè)服務(wù)發(fā)起的連續(xù)請(qǐng)求合并成一個(gè)請(qǐng)求進(jìn)行處理(這些連續(xù)請(qǐng)求的時(shí)間窗默認(rèn)為10ms),在這個(gè)過(guò)程中涉及到的一個(gè)核心類就是HystrixCollapser,OK,接下來(lái)我們就來(lái)看看如何實(shí)現(xiàn)Hystrix的請(qǐng)求合并。
服務(wù)提供者接口
我需在在服務(wù)提供者中提供兩個(gè)接口供服務(wù)消費(fèi)者調(diào)用,如下:
@RequestMapping("/getbook6") public List<Book> book6(String ids) { System.out.println("ids>>>>>>>>>>>>>>>>>>>>>" + ids); ArrayList<Book> books = new ArrayList<>(); books.add(new Book("《李自成》", 55, "姚雪垠", "人民文學(xué)出版社")); books.add(new Book("中國(guó)文學(xué)簡(jiǎn)史", 33, "林庚", "清華大學(xué)出版社")); books.add(new Book("文學(xué)改良芻議", 33, "胡適", "無(wú)")); books.add(new Book("ids", 22, "helloworld", "haha")); return books; } @RequestMapping("/getbook6/{id}") public Book book61(@PathVariable Integer id) { Book book = new Book("《李自成》2", 55, "姚雪垠2", "人民文學(xué)出版社2"); return book; }
第一個(gè)接口是一個(gè)批處理接口,第二個(gè)接口是一個(gè)處理單個(gè)請(qǐng)求的接口。在批處理接口中,服務(wù)消費(fèi)者傳來(lái)的ids參數(shù)格式是1,2,3,4…這種格式,正常情況下我們需要根據(jù)ids查詢到對(duì)應(yīng)的數(shù)據(jù),然后組裝成一個(gè)集合返回,我這里為了處理方便,不管什么樣的請(qǐng)求統(tǒng)統(tǒng)都返回一樣的數(shù)據(jù)集;處理單個(gè)請(qǐng)求的接口就比較簡(jiǎn)單了,不再贅述。
服務(wù)消費(fèi)者
OK,服務(wù)提供者處理好之后,接下來(lái)我們來(lái)看看服務(wù)消費(fèi)者要怎么處理。
BookService
首先在BookService中添加兩個(gè)方法用來(lái)調(diào)用服務(wù)提供者提供的接口,如下:
public Book test8(Long id) { return restTemplate.getForObject("http://HELLO-SERVICE/getbook6/{1}", Book.class, id); } public List<Book> test9(List<Long> ids) { System.out.println("test9---------"+ids+"Thread.currentThread().getName():" + Thread.currentThread().getName()); Book[] books = restTemplate.getForObject("http://HELLO-SERVICE/getbook6?ids={1}", Book[].class, StringUtils.join(ids, ",")); return Arrays.asList(books); }
test8用來(lái)調(diào)用提供單個(gè)id的接口,test9用來(lái)調(diào)用批處理的接口,在test9中,我將test9執(zhí)行時(shí)所處的線程打印出來(lái),方便我們觀察執(zhí)行結(jié)果,另外,在RestTemplate中,如果返回值是一個(gè)集合,我們得先用一個(gè)數(shù)組接收,然后再轉(zhuǎn)為集合(或許也有其他辦法,小伙伴們有更好的建議可以提)。
BookBatchCommand
OK,BookService中的方法準(zhǔn)備好了后,我們就可以來(lái)創(chuàng)建一個(gè)BookBatchCommand,這是一個(gè)批處理命令,如下:
public class BookBatchCommand extends HystrixCommand<List<Book>> { private List<Long> ids; private BookService bookService; public BookBatchCommand(List<Long> ids, BookService bookService) { super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("CollapsingGroup")) .andCommandKey(HystrixCommandKey.Factory.asKey("CollapsingKey"))); this.ids = ids; this.bookService = bookService; } @Override protected List<Book> run() throws Exception { return bookService.test9(ids); } }
這個(gè)類實(shí)際上和我們?cè)谏掀┛椭薪榻B的類差不多,都是繼承自HystrixCommand,用來(lái)處理合并之后的請(qǐng)求,在run方法中調(diào)用BookService中的test9方法。
BookCollapseCommand
接下來(lái)我們需要?jiǎng)?chuàng)建BookCollapseCommand繼承自HystrixCollapser來(lái)實(shí)現(xiàn)請(qǐng)求合并。如下:
public class BookCollapseCommand extends HystrixCollapser<List<Book>, Book, Long> { private BookService bookService; private Long id; public BookCollapseCommand(BookService bookService, Long id) { super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("bookCollapseCommand")).andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100))); this.bookService = bookService; this.id = id; } @Override public Long getRequestArgument() { return id; } @Override protected HystrixCommand<List<Book>> createCommand(Collection<CollapsedRequest<Book, Long>> collapsedRequests) { List<Long> ids = new ArrayList<>(collapsedRequests.size()); ids.addAll(collapsedRequests.stream().map(CollapsedRequest::getArgument).collect(Collectors.toList())); BookBatchCommand bookBatchCommand = new BookBatchCommand(ids, bookService); return bookBatchCommand; } @Override protected void mapResponseToRequests(List<Book> batchResponse, Collection<CollapsedRequest<Book, Long>> collapsedRequests) { System.out.println("mapResponseToRequests"); int count = 0; for (CollapsedRequest<Book, Long> collapsedRequest : collapsedRequests) { Book book = batchResponse.get(count++); collapsedRequest.setResponse(book); } } }
關(guān)于這個(gè)類,我說(shuō)如下幾點(diǎn):
1.首先在構(gòu)造方法中,我們?cè)O(shè)置了請(qǐng)求時(shí)間窗為100ms,即請(qǐng)求時(shí)間間隔在100ms之內(nèi)的請(qǐng)求會(huì)被合并為一個(gè)請(qǐng)求。
2.createCommand方法主要用來(lái)合并請(qǐng)求,在這里獲取到各個(gè)單個(gè)請(qǐng)求的id,將這些單個(gè)的id放到一個(gè)集合中,然后再創(chuàng)建出一個(gè)BookBatchCommand對(duì)象,用該對(duì)象去發(fā)起一個(gè)批量請(qǐng)求。
3.mapResponseToRequests方法主要用來(lái)為每個(gè)請(qǐng)求設(shè)置請(qǐng)求結(jié)果。該方法的第一個(gè)參數(shù)batchResponse表示批處理請(qǐng)求的結(jié)果,第二個(gè)參數(shù)collapsedRequests則代表了每一個(gè)被合并的請(qǐng)求,然后我們通過(guò)遍歷batchResponse來(lái)為collapsedRequests設(shè)置請(qǐng)求結(jié)果。
OK,所有的這些操作完成后,我們就可以來(lái)測(cè)試?yán)病?/p>
測(cè)試
我們?cè)诜?wù)消費(fèi)者端創(chuàng)建訪問(wèn)接口,來(lái)測(cè)試合并請(qǐng)求,測(cè)試接口如下:
@RequestMapping("/test7") @ResponseBody public void test7() throws ExecutionException, InterruptedException { HystrixRequestContext context = HystrixRequestContext.initializeContext(); BookCollapseCommand bc1 = new BookCollapseCommand(bookService, 1l); BookCollapseCommand bc2 = new BookCollapseCommand(bookService, 2l); BookCollapseCommand bc3 = new BookCollapseCommand(bookService, 3l); BookCollapseCommand bc4 = new BookCollapseCommand(bookService, 4l); Future<Book> q1 = bc1.queue(); Future<Book> q2 = bc2.queue(); Future<Book> q3 = bc3.queue(); Book book1 = q1.get(); Book book2 = q2.get(); Book book3 = q3.get(); Thread.sleep(3000); Future<Book> q4 = bc4.queue(); Book book4 = q4.get(); System.out.println("book1>>>"+book1); System.out.println("book2>>>"+book2); System.out.println("book3>>>"+book3); System.out.println("book4>>>"+book4); context.close(); }
關(guān)于這個(gè)測(cè)試接口我說(shuō)如下兩點(diǎn):
1.首先要初始化HystrixRequestContext
2.創(chuàng)建BookCollapseCommand類的實(shí)例來(lái)發(fā)起請(qǐng)求,先發(fā)送3個(gè)請(qǐng)求,然后睡眠3秒鐘,再發(fā)起1個(gè)請(qǐng)求,這樣,前3個(gè)請(qǐng)求就會(huì)被合并為一個(gè)請(qǐng)求,第四個(gè)請(qǐng)求因?yàn)殚g隔的時(shí)間比較久,所以不會(huì)被合并,而是單獨(dú)創(chuàng)建一個(gè)線程去處理。
OK,我們來(lái)看看執(zhí)行結(jié)果,如下:
通過(guò)注解實(shí)現(xiàn)請(qǐng)求合并
OK,上面這種請(qǐng)求合并方式寫(xiě)起來(lái)稍微有一點(diǎn)麻煩,我們可以使用注解來(lái)更優(yōu)雅的實(shí)現(xiàn)這一功能。首先在BookService中添加兩個(gè)方法,如下:
@HystrixCollapser(batchMethod = "test11",collapserProperties = {@HystrixProperty(name ="timerDelayInMilliseconds",value = "100")}) public Future<Book> test10(Long id) { return null; } @HystrixCommand public List<Book> test11(List<Long> ids) { System.out.println("test9---------"+ids+"Thread.currentThread().getName():" + Thread.currentThread().getName()); Book[] books = restTemplate.getForObject("http://HELLO-SERVICE/getbook6?ids={1}", Book[].class, StringUtils.join(ids, ",")); return Arrays.asList(books); }
在test10方法上添加@HystrixCollapser注解實(shí)現(xiàn)請(qǐng)求合并,用batchMethod屬性指明請(qǐng)求合并后的處理方法,collapserProperties屬性指定其他屬性。
OK,在BookService中寫(xiě)好之后,直接調(diào)用就可以了,如下:
@RequestMapping("/test8") @ResponseBody public void test8() throws ExecutionException, InterruptedException { HystrixRequestContext context = HystrixRequestContext.initializeContext(); Future<Book> f1 = bookService.test10(1l); Future<Book> f2 = bookService.test10(2l); Future<Book> f3 = bookService.test10(3l); Book b1 = f1.get(); Book b2 = f2.get(); Book b3 = f3.get(); Thread.sleep(3000); Future<Book> f4 = bookService.test10(4l); Book b4 = f4.get(); System.out.println("b1>>>"+b1); System.out.println("b2>>>"+b2); System.out.println("b3>>>"+b3); System.out.println("b4>>>"+b4); context.close(); }
和前面的一樣,前三個(gè)請(qǐng)求會(huì)進(jìn)行合并,第四個(gè)請(qǐng)求會(huì)單獨(dú)執(zhí)行,OK,執(zhí)行結(jié)果如下:
總結(jié)
請(qǐng)求合并的優(yōu)點(diǎn)小伙伴們已經(jīng)看到了,多個(gè)請(qǐng)求被合并為一個(gè)請(qǐng)求進(jìn)行一次性處理,可以有效節(jié)省網(wǎng)絡(luò)帶寬和線程池資源,但是,有優(yōu)點(diǎn)必然也有缺點(diǎn),設(shè)置請(qǐng)求合并之后,本來(lái)一個(gè)請(qǐng)求可能5ms就搞定了,但是現(xiàn)在必須再等10ms看看還有沒(méi)有其他的請(qǐng)求一起的,這樣一個(gè)請(qǐng)求的耗時(shí)就從5ms增加到15ms了,不過(guò),如果我們要發(fā)起的命令本身就是一個(gè)高延遲的命令,那么這個(gè)時(shí)候就可以使用請(qǐng)求合并了,因?yàn)檫@個(gè)時(shí)候時(shí)間窗的時(shí)間消耗就顯得微不足道了,另外高并發(fā)也是請(qǐng)求合并的一個(gè)非常重要的場(chǎng)景。
Ok,我們的請(qǐng)求合并就說(shuō)到這里,有問(wèn)題歡迎小伙伴們留言討論。希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
SpringBoot結(jié)合ElasticSearch實(shí)現(xiàn)模糊查詢的項(xiàng)目實(shí)踐
本文主要介紹了SpringBoot結(jié)合ElasticSearch實(shí)現(xiàn)模糊查詢的項(xiàng)目實(shí)踐,主要實(shí)現(xiàn)模糊查詢、批量CRUD、排序、分頁(yè)和高亮功能,具有一定的參考價(jià)值,感興趣的可以了解一下2024-03-03springBoot+mybaties后端多層架構(gòu)的實(shí)現(xiàn)示例
本文主要介紹了springBoot+mybaties后端多層架構(gòu)的實(shí)現(xiàn)示例,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2022-07-07SpringCloud Gateway鑒權(quán)和跨域解決方案
網(wǎng)關(guān)是介于客戶端和服務(wù)器端之間的中間層,所有的外部請(qǐng)求都會(huì)先經(jīng)過(guò) 網(wǎng)關(guān)這一層,也就是說(shuō),API 的實(shí)現(xiàn)方面更多的考慮業(yè)務(wù)邏輯,而安全、性能、監(jiān)控可以交由 網(wǎng)關(guān)來(lái)做,這樣既提高業(yè)務(wù)靈活性又不缺安全性,本文給大家介紹SpringCloud Gateway鑒權(quán)和跨域解決方案,一起看看吧2023-11-11Java使用POI將多個(gè)Sheet合并為一個(gè)Sheet
這篇文章主要為大家詳細(xì)介紹了Java使用POI將多個(gè)Sheet合并為一個(gè)Sheet,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2022-04-04restemplate請(qǐng)求亂碼之content-encoding=“gzip“示例詳解
RestTemplate從Spring3.0開(kāi)始支持的一個(gè)HTTP請(qǐng)求工具,它提供了常見(jiàn)的REST請(qǐng)求方案的模板,及一些通用的請(qǐng)求執(zhí)行方法 exchange 以及 execute,接下來(lái)通過(guò)本文給大家介紹restemplate請(qǐng)求亂碼之content-encoding=“gzip“,需要的朋友可以參考下2024-03-03