真的够可以的,基于Netty实现了RPC框架
创始人
2024-02-19 01:49:28
0

RPC全称Remote Procedure Call,即远程过程调用,对于调用者无感知这是一个远程调用功能。目前流行的开源RPC 框架有阿里的Dubbo、Google 的 gRPC、Twitter 的Finagle 等。本次RPC框架的设计主要参考的是阿里的Dubbo,这里Netty 基本上是作为架构的技术底层而存在的,主要完成高性能的网络通信,从而实现高效的远程调用。

Dubbo的架构与Spring

其实在之前的文章中《谈谈京东的服务框架》,探讨过Dubbo的组成和架构。

 

 

另外使用Dubbo最方便的地方在于它可以和Spring非常方便的集成,Dubbo对于配置的优化也是随着Spring一脉相承的,从最早的XML形式到后来的注解方式以及自动装配,都是在不断地简化开发过程来提高开发效率。

Dubbo在Spring框架中的工作流程:

1、Spring的IOC容器启动

2、把服务注册到注册中心(zookeeper软件)中

3、消费者启动时会把它需要用到的服务从注册中心拉取下来

4、提供者的地址发生改变时,注册中心会马上通知消费者

5、根据注册中心中的服务地址直接就可以调用提供者了,如果调用了提供者,就会把提供者的地址主动缓存起来

6、监控消费者调用提供者的次数

RPC实现的关键

1、序列化与反序列化

在远程过程调用时,客户端跟服务端是不同的进程,甚至有时候客户端用Java,服务端用C++。这时候就需要客户端把参数先转成一个字节流,传给服务端后,再把字节流转成自己能读取的格式,这个过程叫序列化和反序列化,同理,从服务端返回的值也需要序列化反序列化的过程。在序列化的时候,我们选择Netty自身的对象序列化器。

 

2、数据网络传输

解决了序列化的问题,那么剩下的就是如何把数据参数传到生产者,网络传输层需要把序列化后的参数字节流传给服务端,然后再把序列化后的调用结果传回客户端,虽然大部分RPC框架都采用了TCP作为传输协议,其实UDP也可以作为传输协议的,基于TCP和UDP我们可以自定义任意规则的协议,加之我们要使用NIO通信方式作为高性能网络服务的前提,于是Netty似乎更符合我们Java程序员的口味,Netty真香!

3、告诉注册中心我要调谁

现在调用参数的序列化和网络传输都已经具备,但是还有个问题,那就是消费者要调用谁的问题,一个函数或者方法,我们可以理解为一个服务,这些服务注册在注册中心上面,只有当消费者告诉注册中心要调用谁,才可以进行远程调用。所以不但要把将要调用的服务的参数传过去,也要把要调用的服务信息传过去。

简易RPC框架的架构

 

Dubbo 核心模块主要有四个:Registry 注册中心、Provider 服务提供者、Consumer 服务消费者、Monitor监控,为了方便直接砍掉了监控模块,同时把服务提供者模块与注册中心模块写在一起,通过实现自己的简易IOC容器,完成对服务提供者的实例化。

关于使用Netty进行Socket编程的部分可以参考Netty的官网 或者我之前的博客《Netty编码实战与Channel生命周期》,在这里Netty的编码技巧和方式不作为本文的重点。

RPC框架编码实现

首先需要引入的依赖如下(Netty + Lombok):

io.nettynetty-all4.1.6.Final

org.projectlomboklombok1.16.8

1、Registry与Provider

目录结构如下:

───src└─main├─java│  └─edu│      └─xpu│          └─rpc│              ├─api│              │      IRpcCalc.java│              │      IRpcHello.java│              ││              ├─core│              │      InvokerMessage.java│              ││              ├─provider│              │      RpcCalcProvider.java│              │      RpcHelloProvider.java│              ││              └─registry│                      MyRegistryHandler.java│                      RpcRegistry.java│└─resources
───pom.xml

IRpcCalc.java与IRpcHello.java是两个Service接口。IRpcCalc.java内容如下,完成模拟业务加、减、乘、除运算

public interface IRpcCalc {// 加int add(int a, int b);// 减int sub(int a, int b);// 乘int mul(int a, int b);// 除int div(int a, int b);
}

IRpcHello.java,测试服务是否可用:

public interface IRpcHello {String hello(String name);
}

至此API 模块就定义完成了,非常简单的两个接口。接下来,我们要确定传输规则,也就是传输协议,协议内容当然要自定义,才能体现出Netty 的优势。

设计一个InvokerMessage类,里面包含了服务名称、调用方法、参数列表、参数值,这就是我们自定义协议的协议包:

@Data
public class InvokerMessage implements Serializable {private String className; // 服务名称private String methodName; // 调用哪个方法private Class[] params; // 参数列表private Object[] values; // 参数值
}

通过定义这样的协议类,就能知道我们需要调用哪个服务,服务中的哪个方法,方法需要传递的参数列表(参数类型+参数值),这些信息正确传递过去了才能拿到正确的调用返回值。

接下来创建这两个服务的具体实现类,IRpcHello的实现类如下:

public class RpcHelloProvider implements IRpcHello {public String hello(String name) {return "Hello, " + name + "!";}
}

IRpcCalc的实现类如下:

public class RpcCalcProvider implements IRpcCalc {@Overridepublic int add(int a, int b) {return a + b;}@Overridepublic int sub(int a, int b) {return a - b;}@Overridepublic int mul(int a, int b) {return a * b;}@Overridepublic int div(int a, int b) {return a / b;}
}

Registry 注册中心主要功能就是负责将所有Provider的服务名称和服务引用地址注册到一个容器中(这里为了方便直接使用接口类名作为服务名称,前提是假定我们每个服务只有一个实现类),并对外发布。Registry 应该要启动一个对外的服务,很显然应该作为服务端,并提供一个对外可以访问的端口。先启动一个Netty服务,创建RpcRegistry 类,RpcRegistry.java的具体代码如下:

public class RpcRegistry {private final int port;public RpcRegistry(int port){this.port = port;}public void start(){NioEventLoopGroup bossGroup = new NioEventLoopGroup();NioEventLoopGroup workGroup = new NioEventLoopGroup();try{ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer() {protected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();// 处理拆包、粘包的编解码器pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));pipeline.addLast(new LengthFieldPrepender(4));// 处理序列化的编解码器pipeline.addLast("encoder", new ObjectEncoder());pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));// 自己的业务逻辑pipeline.addLast(new MyRegistryHandler());}}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true); // 设置长连接ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync();System.out.println("RPC Registry start listen at " + this.port);channelFuture.channel().closeFuture().sync();} catch (Exception e){e.printStackTrace();} finally {bossGroup.shutdownGracefully();workGroup.shutdownGracefully();}}public static void main(String[] args) {new RpcRegistry(8080).start();}
}

接下来只需要实现我们自己的Handler即可,创建MyRegistryHandler.java,内容如下:

public class MyRegistryHandler extends ChannelInboundHandlerAdapter {// 在注册中心注册服务需要有容器存放public static ConcurrentHashMap registryMap = new ConcurrentHashMap<>();// 类名的缓存位置private static final List classCache = new ArrayList<>();// 约定,只要是写在provider下所有的类都认为是一个可以对完提供服务的实现类// edu.xpu.rpc.providerpublic MyRegistryHandler(){scanClass("edu.xpu.rpc.provider");doRegister();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {Object result = new Object();// 客户端传过来的调用信息InvokerMessage request = (InvokerMessage)msg;// 先判断有没有这个服务String serverClassName = request.getClassName();if(registryMap.containsKey(serverClassName)){// 获取服务对象Object clazz = registryMap.get(serverClassName);Method method = clazz.getClass().getMethod(request.getMethodName(), request.getParams());result = method.invoke(clazz, request.getValues());System.out.println("request=" + request);System.out.println("result=" + result);}ctx.writeAndFlush(result);ctx.close();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {cause.printStackTrace();ctx.close();}// 实现简易IOC容器// 扫描出包里面所有的Classprivate void scanClass(String packageName){ClassLoader classLoader = this.getClass().getClassLoader();URL url = classLoader.getResource(packageName.replaceAll("\\.", "/"));File dir = new File(url.getFile());File[] files = dir.listFiles();for (File file: files){if(file.isDirectory()){scanClass(packageName + "." + file.getName());}else{// 拿出类名String className = packageName + "." + file.getName().replace(".class", "").trim();classCache.add(className);}}}// 把扫描到的Class实例化,放到Map中// 注册的服务名称就叫做接口的名字 [约定优于配置]private void doRegister(){if(classCache.size() == 0) return;for (String className: classCache){try {Class clazz = Class.forName(className);// 服务名称Class anInterface = clazz.getInterfaces()[0];registryMap.put(anInterface.getName(), clazz.newInstance());} catch (Exception e) {e.printStackTrace();}}}
}

在这里还通过反射实现了简易的IOC容器,先递归扫描provider包底下的类,把这些类的对象作为服务对象放到IOC容器中进行管理,由于IOC是一个Map实现的,所以将类名作为服务名称,也就是Key,服务对象作为Value。根据消费者传过来的服务名称,就可以找到对应的服务,到此,Registry和Provider已经全部写完了。

2、consumer

目录结构如下:

└─src├─main│  ├─java│  │  └─edu│  │      └─xpu│  │          └─rpc│  │              ├─api│  │              │      IRpcCalc.java│  │              │      IRpcHello.java│  │              ││  │              ├─consumer│  │              │  │  RpcConsumer.java│  │              │  ││  │              │  └─proxy│  │              │          RpcProxy.java│  │              │          RpcProxyHandler.java│  │              ││  │              └─core│  │                      InvokerMessage.java│  ││  └─resources└─test└─java
└─ pom.xml

在看客户端的实现之前,先梳理一下RPC流程。API 模块中的接口只在服务端实现了。因此,客户端调用API 中定义的某一个接口方法时,实际上是要发起一次网络请求去调用服务端的某一个服务。而这个网络请求首先被注册中心接收,由注册中心先确定需要调用的服务的位置,再将请求转发至真实的服务实现,最终调用服务端代码,将返回值通过网络传输给客户端。整个过程对于客户端而言是完全无感知的,就像调用本地方法一样,所以必定要对客户端的API接口做代理,隐藏网络请求的细节。

 

由上图的流程图可知,要让用户调用无感知,必须创建出代理类来完成网络请求的操作。

RpcProxy.java如下:

public class RpcProxy {public static  T create(Class clazz) {//clazz传进来本身就是interfaceMethodProxy proxy = new MethodProxy(clazz);T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz} , proxy);return result;}private static class MethodProxy implements InvocationHandler {private Class clazz;public MethodProxy(Class clazz) {this.clazz = clazz;}public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// 如果传进来是一个已实现的具体类if (Object.class.equals(method.getDeclaringClass())) {try {return method.invoke(this, args);} catch (Throwable t) {t.printStackTrace();}// 如果传进来的是一个接口(核心)} else {return rpcInvoke(method, args);}return null;}// 实现接口的核心方法public Object rpcInvoke(Method method, Object[] args) {// 传输协议封装InvokerMessage invokerMessage = new InvokerMessage();invokerMessage.setClassName(this.clazz.getName());invokerMessage.setMethodName(method.getName());invokerMessage.setValues(args);invokerMessage.setParams(method.getParameterTypes());final RpcProxyHandler consumerHandler = new RpcProxyHandler();EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap bootstrap = new Bootstrap();bootstrap.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline pipeline = ch.pipeline();pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));//自定义协议编码器pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));//对象参数类型编码器pipeline.addLast("encoder", new ObjectEncoder());//对象参数类型解码器pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));pipeline.addLast("handler", consumerHandler);}});ChannelFuture future = bootstrap.connect("localhost", 8080).sync();future.channel().writeAndFlush(invokerMessage).sync();future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();} finally {group.shutdownGracefully();}return consumerHandler.getResponse();}}
}

我们通过传进来的接口对象,获得了要调用的服务名,服务方法名,参数类型列表,参数列表,这样就把自定义的RPC协议包封装好了,只需要把协议包发出去等待结果返回即可,所以为了接收返回值数据还需要自定义一个接收用的Handler,RpcProxyHandlerdiamante如下:

public class RpcProxyHandler extends ChannelInboundHandlerAdapter {private Object result;public Object getResponse() {return result;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {result = msg;}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {System.out.println("client exception is general");}
}

这样就算是完成了整个流程,下面开始测试一下吧,测试的RpcConsumer.java代码如下:

public class RpcConsumer {public static void main(String[] args) {// 本机之间的正常调用// IRpcHello iRpcHello = new RpcHelloProvider();// iRpcHello.hello("Tom");// 肯定是用动态代理来实现的// 传给它接口,返回一个接口的实例,伪代理IRpcHello rpcHello = RpcProxy.create(IRpcHello.class);System.out.println(rpcHello.hello("ZouChangLin"));int a = 10;int b = 5;IRpcCalc iRpcCalc = RpcProxy.create(IRpcCalc.class);System.out.println(String.format("%d + %d = %d", a, b, iRpcCalc.add(a, b)));System.out.println(String.format("%d - %d = %d ", a, b, iRpcCalc.sub(a, b)));System.out.println(String.format("%d * %d = %d", a, b, iRpcCalc.mul(a, b)));System.out.println(String.format("%d / %d = %d", a, b, iRpcCalc.div(a, b)));}
}

3、效果测试

先开启Registry,运行端口是8080:

 

开启consumer开始调用

 

调用完成后可以看到调用结果正确,并且在Registry这边也看到了日志:

 

可以发现,简易RPC框架顺利完工!

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
苏州离哪个飞机场近(苏州离哪个... 本篇文章极速百科小编给大家谈谈苏州离哪个飞机场近,以及苏州离哪个飞机场近点对应的知识点,希望对各位有...
客厅放八骏马摆件可以吗(家里摆... 今天给各位分享客厅放八骏马摆件可以吗的知识,其中也会对家里摆八骏马摆件好吗进行解释,如果能碰巧解决你...