Springboot整合Netty實(shí)現(xiàn)RPC服務(wù)器的示例代碼
一、什么是RPC?
RPC(Remote Procedure Call)遠(yuǎn)程過(guò)程調(diào)用,是一種進(jìn)程間的通信方式,其可以做到像調(diào)用本地方法那樣調(diào)用位于遠(yuǎn)程的計(jì)算機(jī)的服務(wù)。其實(shí)現(xiàn)的原理過(guò)程如下:
- 本地的進(jìn)程通過(guò)接口進(jìn)行本地方法調(diào)用。
- RPC客戶端將調(diào)用的接口名、接口方法、方法參數(shù)等信息利用網(wǎng)絡(luò)通信發(fā)送給RPC服務(wù)器。
- RPC服務(wù)器對(duì)請(qǐng)求進(jìn)行解析,根據(jù)接口名、接口方法、方法參數(shù)等信息找到對(duì)應(yīng)的方法實(shí)現(xiàn),并進(jìn)行本地方法調(diào)用,然后將方法調(diào)用結(jié)果響應(yīng)給RPC客戶端。
二、實(shí)現(xiàn)RPC需要解決那些問(wèn)題?
1. 約定通信協(xié)議格式
RPC分為客戶端與服務(wù)端,就像HTTP一樣,我們需要定義交互的協(xié)議格式。主要包括三個(gè)方面:
- 請(qǐng)求格式
- 響應(yīng)格式
- 網(wǎng)絡(luò)通信時(shí)數(shù)據(jù)的序列化方式
RPC請(qǐng)求
@Data public class RpcRequest { /** * 請(qǐng)求ID 用來(lái)標(biāo)識(shí)本次請(qǐng)求以匹配RPC服務(wù)器的響應(yīng) */ private String requestId; /** * 調(diào)用的類(接口)權(quán)限定名稱 */ private String className; /** * 調(diào)用的方法名 */ private String methodName; /** * 方法參類型列表 */ private Class<?>[] parameterTypes; /** * 方法參數(shù) */ private Object[] parameters; }
RPC響應(yīng)
@Data public class RpcResponse { /** * 響應(yīng)對(duì)應(yīng)的請(qǐng)求ID */ private String requestId; /** * 調(diào)用是否成功的標(biāo)識(shí) */ private boolean success = true; /** * 調(diào)用錯(cuò)誤信息 */ private String errorMessage; /** * 調(diào)用結(jié)果 */ private Object result; }
2. 序列化方式
序列化方式可以使用JDK自帶的序列化方式或者一些第三方的序列化方式,JDK自帶的由于性能較差所以不推薦。我們這里選擇JSON作為序列化協(xié)議,即將請(qǐng)求和響應(yīng)對(duì)象序列化為JSON字符串后發(fā)送到對(duì)端,對(duì)端接收到后反序列為相應(yīng)的對(duì)象,這里采用阿里的 fastjson 作為JSON序列化框架。
3. TCP粘包、拆包
TCP是個(gè)“流”協(xié)議,所謂流,就是沒(méi)有界限的一串?dāng)?shù)據(jù)。大家可以想想河里的流水,是連成一片的,其間并沒(méi)有分界線。TCP底層并不了解上層業(yè)務(wù)數(shù)據(jù)的具體含義,它會(huì)根據(jù)TCP緩沖區(qū)的實(shí)際情況進(jìn)行包的劃分,所以在業(yè)務(wù)上認(rèn)為,一個(gè)完整的包可能會(huì)被TCP拆分成多個(gè)包進(jìn)行發(fā)送,也有可能把多個(gè)小的包封裝成一個(gè)大的數(shù)據(jù)包發(fā)送,這就是所謂的TCP粘包和拆包問(wèn)題。粘包和拆包需要應(yīng)用層程序來(lái)解決。
我們采用在請(qǐng)求和響應(yīng)的頭部保存消息體的長(zhǎng)度的方式解決粘包和拆包問(wèn)題。請(qǐng)求和響應(yīng)的格式如下:
+--------+----------------+ | Length | Content | | 4字節(jié) | Length個(gè)字節(jié) | +--------+----------------+
4. 網(wǎng)絡(luò)通信框架的選擇
出于性能的考慮,RPC一般選擇異步非阻塞的網(wǎng)絡(luò)通信方式,JDK自帶的NIO網(wǎng)絡(luò)編程操作繁雜,Netty是一款基于NIO開發(fā)的網(wǎng)絡(luò)通信框架,其對(duì)java NIO進(jìn)行封裝對(duì)外提供友好的API,并且內(nèi)置了很多開箱即用的組件,如各種編碼解碼器。所以我們采用Netty作為RPC服務(wù)的網(wǎng)絡(luò)通信框架。
三、RPC服務(wù)端
RPC分為客戶端和服務(wù)端,它們有一個(gè)共同的服務(wù)接口API,我們首先定義一個(gè)接口 HelloService
public interface HelloService { String sayHello(String name); }
然后服務(wù)端需要提供該接口的實(shí)現(xiàn)類,然后使用自定義的@RpcService注解標(biāo)注,該注解擴(kuò)展自@Component,被其標(biāo)注的類可以被Spring的容器管理。
@RpcService public class HelloServiceImp implements HelloService { @Override public String sayHello(String name) { return "Hello " + name; } }
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Component public @interface RpcService { }
RPC服務(wù)器類
我們實(shí)現(xiàn)了ApplicationContextAware接口,以便從bean容器中取出@RpcService實(shí)現(xiàn)類,存入我們的map容器中。
@Component @Slf4j public class RpcServer implements ApplicationContextAware, InitializingBean { // RPC服務(wù)實(shí)現(xiàn)容器 private Map<String, Object> rpcServices = new HashMap<>(); @Value("${rpc.server.port}") private int port; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { Map<String, Object> services = applicationContext.getBeansWithAnnotation(RpcService.class); for (Map.Entry<String, Object> entry : services.entrySet()) { Object bean = entry.getValue(); Class<?>[] interfaces = bean.getClass().getInterfaces(); for (Class<?> inter : interfaces) { rpcServices.put(inter.getName(), bean); } } log.info("加載RPC服務(wù)數(shù)量:{}", rpcServices.size()); } @Override public void afterPropertiesSet() { start(); } private void start(){ new Thread(() -> { EventLoopGroup boss = new NioEventLoopGroup(1); EventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap bootstrap = new ServerBootstrap(); bootstrap.group(boss, worker) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new IdleStateHandler(0, 0, 60)); pipeline.addLast(new JsonDecoder()); pipeline.addLast(new JsonEncoder()); pipeline.addLast(new RpcInboundHandler(rpcServices)); } }) .channel(NioServerSocketChannel.class); ChannelFuture future = bootstrap.bind(port).sync(); log.info("RPC 服務(wù)器啟動(dòng), 監(jiān)聽端口:" + port); future.channel().closeFuture().sync(); }catch (Exception e){ e.printStackTrace(); boss.shutdownGracefully(); worker.shutdownGracefully(); } }).start(); } }
RpcServerInboundHandler 負(fù)責(zé)處理RPC請(qǐng)求
@Slf4j public class RpcServerInboundHandler extends ChannelInboundHandlerAdapter { private Map<String, Object> rpcServices; public RpcServerInboundHandler(Map<String, Object> rpcServices){ this.rpcServices = rpcServices; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.info("客戶端連接成功,{}", ctx.channel().remoteAddress()); } public void channelInactive(ChannelHandlerContext ctx) { log.info("客戶端斷開連接,{}", ctx.channel().remoteAddress()); ctx.channel().close(); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg){ RpcRequest rpcRequest = (RpcRequest) msg; log.info("接收到客戶端請(qǐng)求, 請(qǐng)求接口:{}, 請(qǐng)求方法:{}", rpcRequest.getClassName(), rpcRequest.getMethodName()); RpcResponse response = new RpcResponse(); response.setRequestId(rpcRequest.getRequestId()); Object result = null; try { result = this.handleRequest(rpcRequest); response.setResult(result); } catch (Exception e) { e.printStackTrace(); response.setSuccess(false); response.setErrorMessage(e.getMessage()); } log.info("服務(wù)器響應(yīng):{}", response); ctx.writeAndFlush(response); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { log.info("連接異常"); ctx.channel().close(); super.exceptionCaught(ctx, cause); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent)evt; if (event.state()== IdleState.ALL_IDLE){ log.info("客戶端已超過(guò)60秒未讀寫數(shù)據(jù), 關(guān)閉連接.{}",ctx.channel().remoteAddress()); ctx.channel().close(); } }else{ super.userEventTriggered(ctx,evt); } } private Object handleRequest(RpcRequest rpcRequest) throws Exception{ Object bean = rpcServices.get(rpcRequest.getClassName()); if(bean == null){ throw new RuntimeException("未找到對(duì)應(yīng)的服務(wù): " + rpcRequest.getClassName()); } Method method = bean.getClass().getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes()); method.setAccessible(true); return method.invoke(bean, rpcRequest.getParameters()); } }
四、RPC客戶端
/** * RPC遠(yuǎn)程調(diào)用的客戶端 */ @Slf4j @Component public class RpcClient { @Value("${rpc.remote.ip}") private String remoteIp; @Value("${rpc.remote.port}") private int port; private Bootstrap bootstrap; // 儲(chǔ)存調(diào)用結(jié)果 private final Map<String, SynchronousQueue<RpcResponse>> results = new ConcurrentHashMap<>(); public RpcClient(){ } @PostConstruct public void init(){ bootstrap = new Bootstrap().remoteAddress(remoteIp, port); NioEventLoopGroup worker = new NioEventLoopGroup(1); bootstrap.group(worker) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); pipeline.addLast(new IdleStateHandler(0, 0, 10)); pipeline.addLast(new JsonEncoder()); pipeline.addLast(new JsonDecoder()); pipeline.addLast(new RpcClientInboundHandler(results)); } }); } public RpcResponse send(RpcRequest rpcRequest) { RpcResponse rpcResponse = null; rpcRequest.setRequestId(UUID.randomUUID().toString()); Channel channel = null; try { channel = bootstrap.connect().sync().channel(); log.info("連接建立, 發(fā)送請(qǐng)求:{}", rpcRequest); channel.writeAndFlush(rpcRequest); SynchronousQueue<RpcResponse> queue = new SynchronousQueue<>(); results.put(rpcRequest.getRequestId(), queue); // 阻塞等待獲取響應(yīng) rpcResponse = queue.take(); results.remove(rpcRequest.getRequestId()); } catch (InterruptedException e) { e.printStackTrace(); } finally { if(channel != null && channel.isActive()){ channel.close(); } } return rpcResponse; } }
RpcClientInboundHandler負(fù)責(zé)處理服務(wù)端的響應(yīng)
@Slf4j public class RpcClientInboundHandler extends ChannelInboundHandlerAdapter { private Map<String, SynchronousQueue<RpcResponse>> results; public RpcClientInboundHandler(Map<String, SynchronousQueue<RpcResponse>> results){ this.results = results; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { RpcResponse rpcResponse = (RpcResponse) msg; log.info("收到服務(wù)器響應(yīng):{}", rpcResponse); if(!rpcResponse.isSuccess()){ throw new RuntimeException("調(diào)用結(jié)果異常,異常信息:" + rpcResponse.getErrorMessage()); } // 取出結(jié)果容器,將response放進(jìn)queue中 SynchronousQueue<RpcResponse> queue = results.get(rpcResponse.getRequestId()); queue.put(rpcResponse); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { if (evt instanceof IdleStateEvent){ IdleStateEvent event = (IdleStateEvent)evt; if (event.state() == IdleState.ALL_IDLE){ log.info("發(fā)送心跳包"); RpcRequest request = new RpcRequest(); request.setMethodName("heartBeat"); ctx.channel().writeAndFlush(request); } }else{ super.userEventTriggered(ctx, evt); } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause){ log.info("異常:{}", cause.getMessage()); ctx.channel().close(); } }
接口代理
為了使客戶端像調(diào)用本地方法一樣調(diào)用遠(yuǎn)程服務(wù),我們需要對(duì)接口進(jìn)行動(dòng)態(tài)代理。
代理類實(shí)現(xiàn)
@Component public class RpcProxy implements InvocationHandler { @Autowired private RpcClient rpcClient; @Override public Object invoke(Object proxy, Method method, Object[] args){ RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setClassName(method.getDeclaringClass().getName()); rpcRequest.setMethodName(method.getName()); rpcRequest.setParameters(args); rpcRequest.setParameterTypes(method.getParameterTypes()); RpcResponse rpcResponse = rpcClient.send(rpcRequest); return rpcResponse.getResult(); } }
實(shí)現(xiàn)FactoryBean接口,將生產(chǎn)動(dòng)態(tài)代理類納入 Spring 容器管理。
public class RpcFactoryBean<T> implements FactoryBean<T> { private Class<T> interfaceClass; @Autowired private RpcProxy rpcProxy; public RpcFactoryBean(Class<T> interfaceClass){ this.interfaceClass = interfaceClass; } @Override public T getObject(){ return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class[]{interfaceClass}, rpcProxy); } @Override public Class<?> getObjectType() { return interfaceClass; } }
自定義類路徑掃描器,掃描包下的RPC接口,動(dòng)態(tài)生產(chǎn)代理類,納入 Spring 容器管理
public class RpcScanner extends ClassPathBeanDefinitionScanner { public RpcScanner(BeanDefinitionRegistry registry) { super(registry); } @Override protected Set<BeanDefinitionHolder> doScan(String... basePackages) { Set<BeanDefinitionHolder> beanDefinitionHolders = super.doScan(basePackages); for (BeanDefinitionHolder beanDefinitionHolder : beanDefinitionHolders) { GenericBeanDefinition beanDefinition = (GenericBeanDefinition)beanDefinitionHolder.getBeanDefinition(); beanDefinition.getConstructorArgumentValues().addGenericArgumentValue(beanDefinition.getBeanClassName()); beanDefinition.setBeanClassName(RpcFactoryBean.class.getName()); } return beanDefinitionHolders; } @Override protected boolean isCandidateComponent(MetadataReader metadataReader) throws IOException { return true; } @Override protected boolean isCandidateComponent(AnnotatedBeanDefinition beanDefinition) { return beanDefinition.getMetadata().isInterface() && beanDefinition.getMetadata().isIndependent(); } }
@Component public class RpcBeanDefinitionRegistryPostProcessor implements BeanDefinitionRegistryPostProcessor { @Override public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException { RpcScanner rpcScanner = new RpcScanner(registry); // 傳入RPC接口所在的包名 rpcScanner.scan("com.ygd.rpc.common.service"); } @Override public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException { } }
JSON編解碼器
/** * 將 RpcRequest 編碼成字節(jié)序列發(fā)送 * 消息格式: Length + Content * Length使用int存儲(chǔ),標(biāo)識(shí)消息體的長(zhǎng)度 * * +--------+----------------+ * | Length | Content | * | 4字節(jié) | Length個(gè)字節(jié) | * +--------+----------------+ */ public class JsonEncoder extends MessageToByteEncoder<RpcRequest> { @Override protected void encode(ChannelHandlerContext ctx, RpcRequest rpcRequest, ByteBuf out){ byte[] bytes = JSON.toJSONBytes(rpcRequest); // 將消息體的長(zhǎng)度寫入消息頭部 out.writeInt(bytes.length); // 寫入消息體 out.writeBytes(bytes); } }
/** * 將響應(yīng)消息解碼成 RpcResponse */ public class JsonDecoder extends LengthFieldBasedFrameDecoder { public JsonDecoder(){ super(Integer.MAX_VALUE, 0, 4, 0, 4); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf in) throws Exception { ByteBuf msg = (ByteBuf) super.decode(ctx, in); byte[] bytes = new byte[msg.readableBytes()]; msg.readBytes(bytes); RpcResponse rpcResponse = JSON.parseObject(bytes, RpcResponse.class); return rpcResponse; } }
測(cè)試
我們編寫一個(gè)Controller進(jìn)行測(cè)試
@RestController @RequestMapping("/hello") public class HelloController { @Autowired private HelloService helloService; @GetMapping("/sayHello") public String hello(String name){ return helloService.sayHello(name); } }
通過(guò) PostMan調(diào)用 controller 接口 http://localhost:9998/hello/sayHello?name=小明
響應(yīng): Hello 小明
總結(jié)
本文實(shí)現(xiàn)了一個(gè)簡(jiǎn)易的、具有基本概念的RPC,主要涉及的知識(shí)點(diǎn)如下:
- 網(wǎng)絡(luò)通信及通信協(xié)議的編碼、解碼
- Java對(duì)象的序列化及反序列化
- 通信鏈路心跳檢測(cè)
- Java反射
- JDK動(dòng)態(tài)代理
項(xiàng)目完整代碼詳見:https://github.com/yinguodong/netty-rpc
到此這篇關(guān)于Springboot整合Netty實(shí)現(xiàn)RPC服務(wù)器的示例代碼的文章就介紹到這了,更多相關(guān)Springboot RPC服務(wù)器內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
IDEA運(yùn)行導(dǎo)入的javaweb項(xiàng)目tomcat正常,但是運(yùn)行失敗404問(wèn)題
這篇文章主要介紹了IDEA運(yùn)行導(dǎo)入的javaweb項(xiàng)目tomcat正常但是運(yùn)行失敗404問(wèn)題,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-07-07Spring Gateway處理微服務(wù)的路由轉(zhuǎn)發(fā)機(jī)制
我們?cè)敿?xì)地介紹了Spring Gateway,這個(gè)基于Spring 5、Spring Boot 2和Project Reactor的API網(wǎng)關(guān),通過(guò)這篇文章,我們可以清晰地看到Spring Gateway的工作原理,以及它的強(qiáng)大之處,感興趣的朋友一起看看吧2024-08-08springboot項(xiàng)目啟動(dòng)的時(shí)候參數(shù)無(wú)效的解決
這篇文章主要介紹了springboot項(xiàng)目啟動(dòng)的時(shí)候參數(shù)無(wú)效的解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助。如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2021-09-09java并發(fā)數(shù)據(jù)包Exchanger線程間的數(shù)據(jù)交換器
這篇文章主要為大家介紹了java并發(fā)數(shù)據(jù)包使用數(shù)據(jù)交換器Exchanger來(lái)進(jìn)行線程之間的數(shù)據(jù)交換。有需要的朋友可以借鑒參考下,希望能夠有所幫助2022-03-03SpringMVC源碼解讀之HandlerMapping - AbstractUrlHandlerMapping系列re
這篇文章主要介紹了SpringMVC源碼解讀之HandlerMapping - AbstractUrlHandlerMapping系列request分發(fā) 的相關(guān)資料,需要的朋友可以參考下2016-02-02Java利用Dijkstra和Floyd分別求取圖的最短路徑
本文主要介紹了圖的最短路徑的概念,并分別利用Dijkstra算法和Floyd算法求取最短路徑,最后提供了基于鄰接矩陣和鄰接表的圖對(duì)兩種算法的Java實(shí)現(xiàn)。需要的可以參考一下2022-01-01sftp和ftp 根據(jù)配置遠(yuǎn)程服務(wù)器地址下載文件到當(dāng)前服務(wù)
這篇文章主要介紹了sftp和ftp 根據(jù)配置遠(yuǎn)程服務(wù)器地址下載文件到當(dāng)前服務(wù)的相關(guān)資料本文給大家介紹的非常詳細(xì),具有參考借鑒價(jià)值,需要的朋友可以參考下2016-10-10Java java.sql.Timestamp時(shí)間戳案例詳解
這篇文章主要介紹了Java java.sql.Timestamp時(shí)間戳案例詳解,本篇文章通過(guò)簡(jiǎn)要的案例,講解了該項(xiàng)技術(shù)的了解與使用,以下就是詳細(xì)內(nèi)容,需要的朋友可以參考下2021-08-08Java?中很好用的數(shù)據(jù)結(jié)構(gòu)(你絕對(duì)沒(méi)用過(guò))
今天跟大家介紹的就是?java.util.EnumMap,也是?java.util?包下面的一個(gè)集合類,同樣的也有對(duì)應(yīng)的的?java.util.EnumSet,對(duì)java數(shù)據(jù)結(jié)構(gòu)相關(guān)知識(shí)感興趣的朋友一起看看吧2022-05-05