Java中基于DeferredResult的異步服務(wù)詳解
一. 簡介
Servlet3.0提供了基于servlet的異步處理api,Spring MVC只是將這些api進(jìn)行了一系列的封裝,從而實現(xiàn)了DeferredResult。
DeferredResult字面意思是"延遲結(jié)果",它允許Spring MVC收到請求后,立即釋放(歸還)容器線程,以便容器可以接收更多的外部請求,提升吞吐量,與此同時,DeferredResult將陷入阻塞,直到我們主動將結(jié)果set到DeferredResult,最后,DeferredResult會重新申請容器線程,并將本次請求返回給客戶端。
二. 使用
1. 監(jiān)聽器 onTimeout()
當(dāng)deferredResult被創(chuàng)建出來之后,執(zhí)行setResult()之前,這之間的時間超過設(shè)定值時(比如下方案例中設(shè)置為5秒超時),則被判定為超時。
DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L); // 設(shè)置超時事件 deferredResult.onTimeout(() -> { System.out.println("異步線程執(zhí)行超時, 異步線程的名稱: " + Thread.currentThread().getName()); deferredResult.setResult("異步線程執(zhí)行超時"); });
2. 監(jiān)聽器 onError()
當(dāng)onTimeout()或onCompletion()等回調(diào)函數(shù)中的代碼報錯時,則會執(zhí)行監(jiān)聽器onError()的回調(diào)函數(shù)。
PS: DeferredResult之外的代碼報錯不會影響到onError()。
DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L); // 設(shè)置異常事件 deferredResult.onError((throwable) -> { System.out.println("異步請求出現(xiàn)錯誤,異步線程的名稱: " + Thread.currentThread().getName() + "異常: " + throwable); deferredResult.setErrorResult("異步線程執(zhí)行出錯"); });
3. 監(jiān)聽器 onCompletion()
代碼任意位置調(diào)用了同一個DeferredResult的setResult()后,則會被DeferredResult的onCompletion()監(jiān)聽器捕獲到。
Spring會任選一條容器線程來執(zhí)行onCompletion( )中的代碼(由于請求線程已被釋放(歸還),所以此處可能再次由同一條請求線程來處理,也可能由其他線程來處理)。
DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L); // 設(shè)置完成事件 deferredResult.onCompletion(() -> { System.out.println("異步線程執(zhí)行完畢,異步線程的名稱: " + Thread.currentThread().getName()); });
完整的代碼為:
import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @RestController public class DemoController { // 自定義線程池 public static ExecutorService exec = Executors.newCachedThreadPool(); @GetMapping("/demo") public DeferredResult<String> demoResult() { System.out.println("容器線程: " + Thread.currentThread().getName()); // 創(chuàng)建DeferredResult對象,設(shè)置超時時長 20秒 DeferredResult<String> deferredResult = new DeferredResult<String>(5 * 1000L); // 設(shè)置超時事件 deferredResult.onTimeout(() -> { System.out.println("異步線程執(zhí)行超時, 異步線程的名稱: " + Thread.currentThread().getName()); // throw new RuntimeException("超時事件報錯了!"); deferredResult.setResult("異步線程執(zhí)行超時"); }); // 設(shè)置異常事件 deferredResult.onError((throwable) -> { System.out.println("異步請求出現(xiàn)錯誤,異步線程的名稱: " + Thread.currentThread().getName() + "異常: " + throwable); deferredResult.setErrorResult("異步線程執(zhí)行出錯"); }); // 設(shè)置完成事件 deferredResult.onCompletion(() -> { System.out.println("異步線程執(zhí)行完畢,異步線程的名稱: " + Thread.currentThread().getName()); }); exec.execute(() -> { System.out.println("[線程池] 異步線程的名稱: " + Thread.currentThread().getName()); deferredResult.setResult("異步線程執(zhí)行完畢"); }); System.out.println("Servlet thread release"); return deferredResult; } }
三. 拓展
有些業(yè)務(wù)場景下,我們希望新的請求觸發(fā)(激活)之前陷入阻塞的請求,此外可以通過不同的key來區(qū)分不同的請求。
比如apollo在實現(xiàn)時就利用了DeferredResult??蛻舳讼蚍?wù)器端發(fā)送輪詢請求,服務(wù)端收到請求后,會立刻釋放容器線程,并阻塞本次請求,若apollo托管的配置文件沒有發(fā)生任何改變,則輪詢請求會超時(返回304)。當(dāng)有新的配置發(fā)布時,服務(wù)端會調(diào)用DeferredResult setResult()方法,進(jìn)入onCompletion(),并使尚未超時的輪尋請求正常返回(200)。
大概如下:
@SpringBootApplication public class DemoApplication implements WebMvcConfigurer { public static void main(String[] args) { SpringApplication.run(DemoApplication.class, args); } }
import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.Multimaps; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; import java.util.Collection; @RestController public class ApolloController { private final Logger logger = LoggerFactory.getLogger(this.getClass()); //guava中的Multimap,多值map,對map的增強(qiáng),一個key可以保持多個value private Multimap<String, DeferredResult<String>> watchRequests = Multimaps.synchronizedSetMultimap(HashMultimap.create()); //模擬長輪詢 @RequestMapping(value = "/watch/{namespace}", method = RequestMethod.GET, produces = "text/html") public DeferredResult<String> watch(@PathVariable("namespace") String namespace) { logger.info("Request received"); DeferredResult<String> deferredResult = new DeferredResult<>(); //當(dāng)deferredResult完成時(不論是超時還是異常還是正常完成),移除watchRequests中相應(yīng)的watch key deferredResult.onCompletion(new Runnable() { @Override public void run() { System.out.println("remove key:" + namespace); watchRequests.remove(namespace, deferredResult); } }); watchRequests.put(namespace, deferredResult); logger.info("Servlet thread released"); return deferredResult; } //模擬發(fā)布namespace配置 @RequestMapping(value = "/publish/{namespace}", method = RequestMethod.GET, produces = "text/html") public Object publishConfig(@PathVariable("namespace") String namespace) { if (watchRequests.containsKey(namespace)) { Collection<DeferredResult<String>> deferredResults = watchRequests.get(namespace); Long time = System.currentTimeMillis(); //通知所有watch這個namespace變更的長輪訓(xùn)配置變更結(jié)果 for (DeferredResult<String> deferredResult : deferredResults) { deferredResult.setResult(namespace + " changed:" + time); } } return "success"; } }
當(dāng)請求超時的時候會產(chǎn)生AsyncRequestTimeoutException,我們定義一個全局異常捕獲類:
import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.ControllerAdvice; import org.springframework.web.bind.annotation.ExceptionHandler; import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.context.request.async.AsyncRequestTimeoutException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @ControllerAdvice class GlobalControllerExceptionHandler { protected static final Logger logger = LoggerFactory.getLogger(GlobalControllerExceptionHandler.class); @ResponseStatus(HttpStatus.NOT_MODIFIED)//返回304狀態(tài)碼 @ResponseBody @ExceptionHandler(AsyncRequestTimeoutException.class) //捕獲特定異常 public void handleAsyncRequestTimeoutException(AsyncRequestTimeoutException e, HttpServletRequest request) { System.out.println("handleAsyncRequestTimeoutException"); } }
然后我們通過postman工具發(fā)送請求//localhost:8080/watch/mynamespace,請求會掛起,60秒后,DeferredResult超時,客戶端正常收到了304狀態(tài)碼,表明在這個期間配置沒有變更過。
然后我們在模擬配置變更的情況,再次發(fā)起請求//localhost:8080/watch/mynamespace,等待個10秒鐘(不要超過60秒),然后調(diào)用//localhost:8080/publish/mynamespace,發(fā)布配置變更。這時postman會立刻收到response響應(yīng)結(jié)果:
mynamespace changed:1538880050147
表明在輪訓(xùn)期間有配置變更過。
這里我們用了一個MultiMap來存放所有輪訓(xùn)的請求,Key對應(yīng)的是namespace,value對應(yīng)的是所有watch這個namespace變更的異步請求DeferredResult,需要注意的是:在DeferredResult完成的時候記得移除MultiMap中相應(yīng)的key,避免內(nèi)存溢出請求。
采用這種長輪詢的好處是,相比一直循環(huán)請求服務(wù)器,實例一多的話會對服務(wù)器產(chǎn)生很大的壓力,http長輪詢的方式會在服務(wù)器變更的時候主動推送給客戶端,其他時間客戶端是掛起請求的,這樣同時滿足了性能和實時性。
四. DeferredResult與Callable的區(qū)別
DeferredResult和Callable都可以在Controller層的方法中直接返回,請求收到后,釋放容器線程,在另一個線程中通過異步的方式執(zhí)行任務(wù),最后將請求返回給客戶端。
不同之處在于,使用Callable時,當(dāng)其它線程中的任務(wù)執(zhí)行完畢后,請求會立刻返回給客戶端,而DeferredResult則需要用戶在代碼中手動set值到DeferredResult,否則即便異步線程中的任務(wù)執(zhí)行完畢,DeferredResult仍然不會向客戶端返回任何結(jié)果。
到此這篇關(guān)于Java中基于DeferredResult的異步服務(wù)詳解的文章就介紹到這了,更多相關(guān)基于DeferredResult的異步服務(wù)內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Maven pom的distributionManagement配置方式
文章主要介紹了Maven的distributionManagement配置方式,以及它的作用、配置方法和重要性,distributionManagement用于指定構(gòu)件的發(fā)布位置,包括下載URL、狀態(tài)等,文章還詳細(xì)解釋了如何配置repository和snapshotRepository,以及它們的用途和區(qū)別2025-01-01基于HttpServletRequest 相關(guān)常用方法的應(yīng)用
本篇文章小編為大家介紹,基于HttpServletRequest 相關(guān)常用方法的應(yīng)用,需要的朋友參考下2013-04-04