AsyncHttpClient?RequestFilter請求篩選源碼解讀
序
本文主要研究一下AsyncHttpClient的RequestFilter
RequestFilter
org/asynchttpclient/filter/RequestFilter.java
/** * A Filter interface that gets invoked before making an actual request. */ public interface RequestFilter { /** * An {@link org.asynchttpclient.AsyncHttpClient} will invoke {@link RequestFilter#filter} and will use the * returned {@link FilterContext#getRequest()} and {@link FilterContext#getAsyncHandler()} to continue the request * processing. * * @param ctx a {@link FilterContext} * @param <T> the handler result type * @return {@link FilterContext}. The {@link FilterContext} instance may not the same as the original one. * @throws FilterException to interrupt the filter processing. */ <T> FilterContext<T> filter(FilterContext<T> ctx) throws FilterException; }
RequestFilter定義了filter方法
ThrottleRequestFilter
org/asynchttpclient/filter/ThrottleRequestFilter.java
/** * A {@link org.asynchttpclient.filter.RequestFilter} throttles requests and block when the number of permits is reached, * waiting for the response to arrives before executing the next request. */ public class ThrottleRequestFilter implements RequestFilter { private static final Logger logger = LoggerFactory.getLogger(ThrottleRequestFilter.class); private final Semaphore available; private final int maxWait; public ThrottleRequestFilter(int maxConnections) { this(maxConnections, Integer.MAX_VALUE); } public ThrottleRequestFilter(int maxConnections, int maxWait) { this(maxConnections, maxWait, false); } public ThrottleRequestFilter(int maxConnections, int maxWait, boolean fair) { this.maxWait = maxWait; available = new Semaphore(maxConnections, fair); } /** * {@inheritDoc} */ @Override public <T> FilterContext<T> filter(FilterContext<T> ctx) throws FilterException { try { if (logger.isDebugEnabled()) { logger.debug("Current Throttling Status {}", available.availablePermits()); } if (!available.tryAcquire(maxWait, TimeUnit.MILLISECONDS)) { throw new FilterException(String.format("No slot available for processing Request %s with AsyncHandler %s", ctx.getRequest(), ctx.getAsyncHandler())); } } catch (InterruptedException e) { throw new FilterException(String.format("Interrupted Request %s with AsyncHandler %s", ctx.getRequest(), ctx.getAsyncHandler())); } return new FilterContext.FilterContextBuilder<>(ctx) .asyncHandler(ReleasePermitOnComplete.wrap(ctx.getAsyncHandler(), available)) .build(); } }
ThrottleRequestFilter實現了RequestFilter接口,它使用Semaphore來對request進行限流,限流不通過拋出FilterException,若通過則通過ReleasePermitOnComplete.wrap(ctx.getAsyncHandler(), available)包裝一下asyncHandler以釋放信號量ReleasePermitOnComplete
ReleasePermitOnComplete
org/asynchttpclient/filter/ReleasePermitOnComplete.java
/** * Wrapper for {@link AsyncHandler}s to release a permit on {@link AsyncHandler#onCompleted()}. This is done via a dynamic proxy to preserve all interfaces of the wrapped handler. */ public class ReleasePermitOnComplete { /** * Wrap handler to release the permit of the semaphore on {@link AsyncHandler#onCompleted()}. * * @param handler the handler to be wrapped * @param available the Semaphore to be released when the wrapped handler is completed * @param <T> the handler result type * @return the wrapped handler */ @SuppressWarnings("unchecked") public static <T> AsyncHandler<T> wrap(final AsyncHandler<T> handler, final Semaphore available) { Class<?> handlerClass = handler.getClass(); ClassLoader classLoader = handlerClass.getClassLoader(); Class<?>[] interfaces = allInterfaces(handlerClass); return (AsyncHandler<T>) Proxy.newProxyInstance(classLoader, interfaces, (proxy, method, args) -> { try { return method.invoke(handler, args); } finally { switch (method.getName()) { case "onCompleted": case "onThrowable": available.release(); default: } } }); } //...... }
ReleasePermitOnComplete的wrap對原來的handler進行代理,在finally里頭執(zhí)行available.release()
preProcessRequest
org/asynchttpclient/DefaultAsyncHttpClient.java
/** * Configure and execute the associated {@link RequestFilter}. This class * may decorate the {@link Request} and {@link AsyncHandler} * * @param fc {@link FilterContext} * @return {@link FilterContext} */ private <T> FilterContext<T> preProcessRequest(FilterContext<T> fc) throws FilterException { for (RequestFilter asyncFilter : config.getRequestFilters()) { fc = asyncFilter.filter(fc); assertNotNull(fc, "filterContext"); } Request request = fc.getRequest(); if (fc.getAsyncHandler() instanceof ResumableAsyncHandler) { request = ResumableAsyncHandler.class.cast(fc.getAsyncHandler()).adjustRequestRange(request); } if (request.getRangeOffset() != 0) { RequestBuilder builder = new RequestBuilder(request); builder.setHeader("Range", "bytes=" + request.getRangeOffset() + "-"); request = builder.build(); } fc = new FilterContext.FilterContextBuilder<>(fc).request(request).build(); return fc; }
DefaultAsyncHttpClient的preProcessRequest方法遍歷config.getRequestFilters(),挨個執(zhí)行asyncFilter.filter(fc)
executeRequest
org/asynchttpclient/DefaultAsyncHttpClient.java
public <T> ListenableFuture<T> executeRequest(Request request, AsyncHandler<T> handler) { if (config.getCookieStore() != null) { try { List<Cookie> cookies = config.getCookieStore().get(request.getUri()); if (!cookies.isEmpty()) { RequestBuilder requestBuilder = new RequestBuilder(request); for (Cookie cookie : cookies) { requestBuilder.addOrReplaceCookie(cookie); } request = requestBuilder.build(); } } catch (Exception e) { handler.onThrowable(e); return new ListenableFuture.CompletedFailure<>("Failed to set cookies of request", e); } } if (noRequestFilters) { return execute(request, handler); } else { FilterContext<T> fc = new FilterContext.FilterContextBuilder<T>().asyncHandler(handler).request(request).build(); try { fc = preProcessRequest(fc); } catch (Exception e) { handler.onThrowable(e); return new ListenableFuture.CompletedFailure<>("preProcessRequest failed", e); } return execute(fc.getRequest(), fc.getAsyncHandler()); } }
executeRequest方法對于noRequestFilters為false會執(zhí)行preProcessRequest
小結
AsyncHttpClient的RequestFilter定義了filter方法,它有一個實現類為ThrottleRequestFilter,使用信號量用于對請求進行限流;
DefaultAsyncHttpClient的executeRequest方法對于noRequestFilters為false會執(zhí)行preProcessRequest,而preProcessRequest方法遍歷config.getRequestFilters(),挨個執(zhí)行asyncFilter.filter(fc)。
以上就是AsyncHttpClient RequestFilter請求篩選源碼解讀的詳細內容,更多關于AsyncHttpClient RequestFilter的資料請關注腳本之家其它相關文章!
相關文章
Spring實戰(zhàn)之獲取其他Bean的屬性值操作示例
這篇文章主要介紹了Spring實戰(zhàn)之獲取其他Bean的屬性值操作,結合實例形式分析了Spring操作Bean屬性值的相關配置與實現技巧,需要的朋友可以參考下2019-12-12Spring AOP定義AfterReturning增加實例分析
這篇文章主要介紹了Spring AOP定義AfterReturning增加,結合實例形式分析了Spring面相切面AOP定義AfterReturning增加相關操作技巧與使用注意事項,需要的朋友可以參考下2020-01-01Java格式化輸出詳細講解(printf、print、println、format等)
Java的格式化輸出等同于String.Format,與C有很大的相似,下面這篇文章主要給大家介紹了關于Java格式化輸出(printf、print、println、format等)的相關資料,文中通過圖文介紹的非常詳細,需要的朋友可以參考下2023-03-03