SpringCloud?Gateway讀取Request?Body方式
Gateway讀取Request Body
我們使用SpringCloud Gateway做微服務(wù)網(wǎng)關(guān)的時候,經(jīng)常需要在過濾器Filter中讀取到Post請求中的Body內(nèi)容進行日志記錄、簽名驗證、權(quán)限驗證等操作。我們知道,Request的Body是只能讀取一次的,如果直接通過在Filter中讀取,而不封裝回去回導(dǎo)致后面的服務(wù)無法讀取數(shù)據(jù)。
SpringCloud Gateway內(nèi)部提供了一個斷言工廠類ReadBodyPredicateFactory,這個類實現(xiàn)了讀取Request的Body內(nèi)容并放入緩存,我們可以通過從緩存中獲取body內(nèi)容來實現(xiàn)我們的目的。
分析ReadBodyPredicateFactory
public AsyncPredicate<ServerWebExchange> applyAsync(ReadBodyPredicateFactory.Config config) { ? ? ? ? return (exchange) -> { ? ? ? ? ? ? Class inClass = config.getInClass(); ? ? ? ? ? ? Object cachedBody = exchange.getAttribute("cachedRequestBodyObject"); ? ? ? ? ? ? if (cachedBody != null) { ? ? ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? ? ? boolean test = config.predicate.test(cachedBody); ? ? ? ? ? ? ? ? ? ? exchange.getAttributes().put("read_body_predicate_test_attribute", test); ? ? ? ? ? ? ? ? ? ? return Mono.just(test); ? ? ? ? ? ? ? ? } catch (ClassCastException var7) { ? ? ? ? ? ? ? ? ? ? if (LOGGER.isDebugEnabled()) { ? ? ? ? ? ? ? ? ? ? ? ? LOGGER.debug("Predicate test failed because class in predicate does not match the cached body object", var7); ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? return Mono.just(false); ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } else { ? ? ? ? ? ? ? ? return DataBufferUtils.join(exchange.getRequest().getBody()).flatMap((dataBuffer) -> { ? ? ? ? ? ? ? ? ? ? DataBufferUtils.retain(dataBuffer); ? ? ? ? ? ? ? ? ? ? final Flux<DataBuffer> cachedFlux = Flux.defer(() -> { ? ? ? ? ? ? ? ? ? ? ? ? return Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount())); ? ? ? ? ? ? ? ? ? ? }); ? ? ? ? ? ? ? ? ? ? ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator(exchange.getRequest()) { ? ? ? ? ? ? ? ? ? ? ? ? public Flux<DataBuffer> getBody() { ? ? ? ? ? ? ? ? ? ? ? ? ? ? return cachedFlux; ? ? ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? }; ? ? ? ? ? ? ? ? ? ? return ServerRequest.create(exchange.mutate().request(mutatedRequest).build(), messageReaders).bodyToMono(inClass).doOnNext((objectValue) -> { ? ? ? ? ? ? ? ? ? ? ? ? exchange.getAttributes().put("cachedRequestBodyObject", objectValue); ? ? ? ? ? ? ? ? ? ? ? ? exchange.getAttributes().put("cachedRequestBody", cachedFlux); ? ? ? ? ? ? ? ? ? ? }).map((objectValue) -> { ? ? ? ? ? ? ? ? ? ? ? ? return config.predicate.test(objectValue); ? ? ? ? ? ? ? ? ? ? }); ? ? ? ? ? ? ? ? }); ? ? ? ? ? ? } ? ? ? ? }; ? ? }
通過查看ReadBodyPredicateFactory內(nèi)部實現(xiàn),我們可以看到,該工廠類將request body內(nèi)容讀取后存放在 exchange的cachedRequestBodyObject中。
那么我們可以通過代碼:
exchange.getAttribute(“cachedRequestBodyObject”); //將body內(nèi)容取出來
知道如何取body內(nèi)容后,我們只需將該工廠類注冊到y(tǒng)ml配置文件中的predicates,然后從Filter中獲取即可。
配置ReadBodyPredicateFactory
查看ReadBodyPredicateFactory關(guān)于配置的代碼:
public <T> ReadBodyPredicateFactory.Config setPredicate(Class<T> inClass, Predicate<T> predicate) { ? ? ? ? ? ? this.setInClass(inClass); ? ? ? ? ? ? this.predicate = predicate; ? ? ? ? ? ? return this; ? ? ? ? }
配置該工廠類需要兩個參數(shù):
inClass
:接收body內(nèi)容的對象Class,我們用字符串接收,配置String即可。Predicate
:Predicate的接口實現(xiàn)類,我們自定義一個Predicate的實現(xiàn)類即可。
自定義Predicate實現(xiàn),并注冊Bean。
? ? /** ? ? ?* 用于readBody斷言,可配置到y(tǒng)ml ? ? ?* @return ? ? ?*/ ? ? @Bean ? ? public Predicate bodyPredicate(){ ? ? ? ? return new Predicate() { ? ? ? ? ? ? @Override ? ? ? ? ? ? public boolean test(Object o) { ? ? ? ? ? ? ? ? return true; ? ? ? ? ? ? } ? ? ? ? }; ? ? }
兩個參數(shù)都有了,直接在yml中配置:
? ? ? ? predicates: ? ? ? ? - Path=/card/api/** ? ? ? ? - name: ReadBodyPredicateFactory #使用ReadBodyPredicateFactory斷言,將body讀入緩存 ? ? ? ? ? args: ? ? ? ? ? ? inClass: '#{T(String)}' ? ? ? ? ? ? predicate: '#{@bodyPredicate}' #注入實現(xiàn)predicate接口類
編寫自定義GatewayFilterFactory
編寫自己的過濾器工廠類,讀取緩存的body內(nèi)容,并支持在配置文件中配置。
public class ReadBodyGatewayFilterFactory ? ? ? ? extends AbstractGatewayFilterFactory<ReadBodyGatewayFilterFactory.Config> { ? ? private Logger logger = LoggerFactory.getLogger(ReadBodyGatewayFilterFactory.class); ? ? private static final String CACHE_REQUEST_BODY_OBJECT_KEY = "cachedRequestBodyObject"; ? ? public ReadBodyGatewayFilterFactory() { ? ? ? ? super(Config.class); ? ? } ? ? @Override ? ? public GatewayFilter apply(Config config) { ? ? ? ? return ((exchange, chain) -> { ? ? ? ? ? ? //利用ReadBodyPredicateFactory斷言,會將body讀入exchange的cachedRequestBodyObject中 ? ? ? ? ? ? Object requestBody = exchange.getAttribute(CACHE_REQUEST_BODY_OBJECT_KEY); ? ? ? ? ? ? logger.info("request body is:{}", requestBody); ? ? ? ? ? ? return chain.filter(exchange); ? ? ? ? }); ? ? } ? ? @Override ? ? public List<String> shortcutFieldOrder() { ? ? ? ? return Arrays.asList("withParams");//將參數(shù)放入 ? ? } ? ? public static class Config { ? ? ? ? private boolean withParams;//接收配置的參數(shù)值,可以隨便寫 ? ? ? ? public boolean isWithParams() { ? ? ? ? ? ? return withParams; ? ? ? ? } ? ? ? ? public void setWithParams(boolean withParams) { ? ? ? ? ? ? this.withParams = withParams; ? ? ? ? } ? ? } }
將ReadBodyGatewayFilterFactory工程類在容器中注入。
? ? ?/** ? ? ?* 注入ReadBody過濾器 ? ? ?* @return ? ? ?*/ ? ? @Bean ? ? public ReadBodyGatewayFilterFactory readBodyGatewayFilterFactory() { ? ? ? ? return new ReadBodyGatewayFilterFactory(); ? ? }
到此,我們的Filter類也可以在yml配置文件中直接配置使用了。
完整的yml配置
? ? ? - id: body_route #讀取post中的body路由 ? ? ? ? order: 5 ? ? ? ? uri: lb://API-CARD ? ? ? ? filters: ? ? ? ? - ReadBody=true #使用自定義的過濾器工廠類,讀取request body內(nèi)容 ? ? ? ? predicates: ? ? ? ? - Path=/card/api/** ? ? ? ? - name: ReadBodyPredicateFactory #使用ReadBodyPredicateFactory斷言,將body讀入緩存 ? ? ? ? ? args: ? ? ? ? ? ? inClass: '#{T(String)}' ? ? ? ? ? ? predicate: '#{@bodyPredicate}' #注入實現(xiàn)predicate接口類
OK,以上是通過ReadBodyPredicateFactory這個類讀取到request body內(nèi)容。
另外springcloud gateway內(nèi)部還提供了ModifyRequestBodyGatewayFilterFactory類用于修改body內(nèi)容,既然能修改,自然也能獲取body,大家可自行去研究。
Gateway自定義filter獲取body的數(shù)據(jù)為空
最近在使用SpringCloud Gateway進行網(wǎng)關(guān)的開發(fā),我使用的版本是:SpringBoot的2.3.4.RELEASE+SpringCloud的Hoxton.SR8,在自定義過濾器時需要獲取ServerHttpRequest中body的數(shù)據(jù),發(fā)現(xiàn)一直無法獲取到數(shù)據(jù),經(jīng)過各種百度、谷歌,再加上自己的實踐,終于找到解決方案:
首先創(chuàng)建一個全局過濾器把body中的數(shù)據(jù)緩存起來
package com.cloudpath.gateway.portal.filter; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.gateway.filter.GatewayFilterChain; import org.springframework.cloud.gateway.filter.GlobalFilter; import org.springframework.core.Ordered; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.http.server.reactive.ServerHttpRequestDecorator; import org.springframework.stereotype.Component; import org.springframework.web.server.ServerWebExchange; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** ?* @author mazhen ?* @className CacheBodyGlobalFilter ?* @Description 把body中的數(shù)據(jù)緩存起來 ?* @date 2020/10/28 18:02 ?*/ @Slf4j @Component public class CacheBodyGlobalFilter implements Ordered, GlobalFilter { ? ? // ?public static final String CACHE_REQUEST_BODY_OBJECT_KEY = "cachedRequestBodyObject"; ? ? @Override ? ? public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) { ? ? ? ? if (exchange.getRequest().getHeaders().getContentType() == null) { ? ? ? ? ? ? return chain.filter(exchange); ? ? ? ? } else { ? ? ? ? ? ? return DataBufferUtils.join(exchange.getRequest().getBody()) ? ? ? ? ? ? ? ? ? ? .flatMap(dataBuffer -> { ? ? ? ? ? ? ? ? ? ? ? ? DataBufferUtils.retain(dataBuffer); ? ? ? ? ? ? ? ? ? ? ? ? Flux<DataBuffer> cachedFlux = Flux ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? .defer(() -> Flux.just(dataBuffer.slice(0, dataBuffer.readableByteCount()))); ? ? ? ? ? ? ? ? ? ? ? ? ServerHttpRequest mutatedRequest = new ServerHttpRequestDecorator( ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? exchange.getRequest()) { ? ? ? ? ? ? ? ? ? ? ? ? ? ? @Override ? ? ? ? ? ? ? ? ? ? ? ? ? ? public Flux<DataBuffer> getBody() { ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? ? return cachedFlux; ? ? ? ? ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? ? ? ? ? ? ? }; ? ? ? ? ? ? ? ? ? ? ? ? //exchange.getAttributes().put(CACHE_REQUEST_BODY_OBJECT_KEY, cachedFlux); ? ? ? ? ? ? ? ? ? ? ? ? return chain.filter(exchange.mutate().request(mutatedRequest).build()); ? ? ? ? ? ? ? ? ? ? }); ? ? ? ? } ? ? } ? ? @Override ? ? public int getOrder() { ? ? ? ? return Ordered.HIGHEST_PRECEDENCE; ? ? } }
CacheBodyGlobalFilter這個全局過濾器的目的就是把原有的request請求中的body內(nèi)容讀出來,并且使用ServerHttpRequestDecorator這個請求裝飾器對request進行包裝,重寫getBody方法,并把包裝后的請求放到過濾器鏈中傳遞下去。這樣后面的過濾器中再使用exchange.getRequest().getBody()來獲取body時,實際上就是調(diào)用的重載后的getBody方法,獲取的最先已經(jīng)緩存了的body數(shù)據(jù)。這樣就能夠?qū)崿F(xiàn)body的多次讀取了。
值得一提的是,這個過濾器的order設(shè)置的是Ordered.HIGHEST_PRECEDENCE,即最高優(yōu)先級的過濾器。優(yōu)先級設(shè)置這么高的原因是某些系統(tǒng)內(nèi)置的過濾器可能也會去讀body,這樣就會導(dǎo)致我們自定義過濾器中獲取body的時候報body只能讀取一次這樣的錯誤如下:
java.lang.IllegalStateException: Only one connection receive subscriber allowed.
at reactor.ipc.netty.channel.FluxReceive.startReceiver(FluxReceive.java:279)
at reactor.ipc.netty.channel.FluxReceive.lambda$subscribe$2(FluxReceive.java:129)
at
所以,必須把CacheBodyGlobalFilter的優(yōu)先級設(shè)到最高。
在自定義的過濾器中嘗試獲取body中的數(shù)據(jù)
package com.cloudpath.iam.gateway.customerfilter; import com.cloudpath.iam.gateway.utils.FilterRequestResponseUtil; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.gateway.filter.GatewayFilter; import org.springframework.cloud.gateway.filter.factory.AbstractGatewayFilterFactory; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.http.server.reactive.ServerHttpRequest; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import java.util.Arrays; import java.util.List; /** ?* @author by mazhen ?* @Classname TestGatewayFilterFactory ?* @Description 自定義過濾器獲取body中的數(shù)據(jù) ?* @Date 2020/10/27 14:38 ?*/ @Component @Slf4j public class TestGatewayFilterFactory extends AbstractGatewayFilterFactory<TestGatewayFilterFactory.Config> { ? ? @Override ? ? public List<String> shortcutFieldOrder() { ? ? ? ? return Arrays.asList("enabled"); ? ? } ? ? public TestGatewayFilterFactory() { ? ? ? ? super(Config.class); ? ? ? ? log.info("Loaded TestGatewayFilterFactory"); ? ? } ? ? @Override ? ? public GatewayFilter apply(Config config) { ? ? ? ? return (exchange, chain) -> { ? ? ? ? ? ? if (!config.isEnabled()) { ? ? ? ? ? ? ? ? return chain.filter(exchange); ? ? ? ? ? ? } ? ? ? ? ? ? if (null != exchange) { ? ? ? ? ? ? ? ? ServerHttpRequest httpRequest = exchange.getRequest(); ? ? ? ? ? ? ? ? ? ? try { ? ? ? ? ? ? ? ? ? ? ? ? Flux<DataBuffer> dataBufferFlux = httpRequest.getBody(); ? ? ? ? ? ? ? ? ? ? ? ? //獲取body中的數(shù)據(jù) ? ? ? ? ? ? ? ? ? ? ? ? String body = FilterRequestResponseUtil.resolveBodyFromRequest(dataBufferFlux); ? ? ? ? ? ? ? ? ? ? ? ? log.info("body:{}",body); ? ? ? ? ? ? ? ? ? ? } catch (Exception e) { ? ? ? ? ? ? ? ? ? ? ? ? log.error("異常:",e); ? ? ? ? ? ? ? ? ? ? ? ? return chain.filter(exchange); ? ? ? ? ? ? ? ? ? ? } ? ? ? ? ? ? } ? ? ? ? ? ? return chain.filter(exchange); ? ? ? ? }; ? ? } ? ? public static class Config { ? ? ? ? /** ? ? ? ? ?* 控制是否開啟統(tǒng)計 ? ? ? ? ?*/ ? ? ? ? private boolean enabled; ? ? ? ? public Config() { ? ? ? ? } ? ? ? ? public boolean isEnabled() { ? ? ? ? ? ? return enabled; ? ? ? ? } ? ? ? ? public void setEnabled(boolean enabled) { ? ? ? ? ? ? this.enabled = enabled; ? ? ? ? } ? ? } }
解析body的工具類
package com.cloudpath.iam.gateway.utils; import org.springframework.core.io.buffer.DataBuffer; import org.springframework.core.io.buffer.DataBufferUtils; import reactor.core.publisher.Flux; import java.nio.CharBuffer; import java.nio.charset.StandardCharsets; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; import java.util.regex.Pattern; /** ?* @author mazhen ?* @className FilterHeadersUtil ?* @Description 過濾器請求/響應(yīng)工具類 ?* @date 2020/10/29 9:31 ?*/ public final class FilterRequestResponseUtil { ? ? /** ? ? ?* spring cloud gateway 獲取post請求的body體 ? ? ?* @param body ? ? ?* @return ? ? ?*/ ? ? public static String resolveBodyFromRequest( Flux<DataBuffer> body){ ? ? ? ? AtomicReference<String> bodyRef = new AtomicReference<>(); ? ? ? ? // 緩存讀取的request body信息 ? ? ? ? body.subscribe(dataBuffer -> { ? ? ? ? ? ? CharBuffer charBuffer = StandardCharsets.UTF_8.decode(dataBuffer.asByteBuffer()); ? ? ? ? ? ? DataBufferUtils.release(dataBuffer); ? ? ? ? ? ? bodyRef.set(charBuffer.toString()); ? ? ? ? }); ? ? ? ? //獲取request body ? ? ? ? return bodyRef.get(); ? ? } ? ? /** ? ? ?* 讀取body內(nèi)容 ? ? ?* @param body ? ? ?* @return ? ? ?*/ ? ? public static String resolveBodyFromRequest2( Flux<DataBuffer> body){ ? ? ? ? StringBuilder sb = new StringBuilder(); ? ? ? ? body.subscribe(buffer -> { ? ? ? ? ? ? byte[] bytes = new byte[buffer.readableByteCount()]; ? ? ? ? ? ? buffer.read(bytes); ? ? ? ? ? ? DataBufferUtils.release(buffer); ? ? ? ? ? ? String bodyString = new String(bytes, StandardCharsets.UTF_8); ? ? ? ? ? ? sb.append(bodyString); ? ? ? ? }); ? ? ? ? return formatStr(sb.toString()); ? ? } ? ? /** ? ? ?* 去掉空格,換行和制表符 ? ? ?* @param str ? ? ?* @return ? ? ?*/ ? ? private static String formatStr(String str){ ? ? ? ? if (str != null && str.length() > 0) { ? ? ? ? ? ? Pattern p = Pattern.compile("\\s*|\t|\r|\n"); ? ? ? ? ? ? Matcher m = p.matcher(str); ? ? ? ? ? ? return m.replaceAll(""); ? ? ? ? } ? ? ? ? return str; ? ? } }
解析body的內(nèi)容,網(wǎng)上普遍是上面的兩種方式,親測resolveBodyFromRequest方法解析body中的數(shù)據(jù),沒有1024字節(jié)的限制。
ps:我傳的參數(shù)有1萬多字節(jié)。。。。。。。
大家可以按需所選。
以上為個人經(jīng)驗,希望能給大家一個參考,也希望大家多多支持腳本之家。
相關(guān)文章
去掉 IDEA 中 mybatis配置文件的局部背景顏色(圖解)
這篇文章通過圖文并茂的形式給大家介紹了去掉IntelliJ IDEA 中 mybatis配置文件的局部背景顏色及mybatis 對應(yīng)的 xml 文件警告的方法圖解,需要的朋友可以參考下2018-09-09大數(shù)據(jù)Kafka:消息隊列和Kafka基本介紹
本文對消息隊列的應(yīng)用場景,優(yōu)缺點,消息隊列的兩種方式,常見的消息隊列產(chǎn)品以及Kafka的特點和應(yīng)用場景做了詳細的講解,需要的朋友可以參考下,希望可以對大家有所幫助2021-08-08