基于Netty实现PRC框架
RPC全称Remote Procedure Call,即远程过程调用,对于调用者无感知这是一个远程调用。目前流行的开源 RPC 框架有阿里的Dubbo、Google 的 gRPC、Twitter 的Finagle 等。本次RPC框架的设计主要参考的是阿里的Dubbo,这里Netty 基本上是作为架构的技术底层而存在的,主要完成高性能的网络通信,从而实现高效的远程调用。
Dubbo的架构与Spring
其实在之前的文章中 《谈谈京东的服务框架》 ,探讨过Dubbo的组成和架构。
节点 | 说明 |
---|---|
Provider | 暴露服务的服务提供方 |
Consumer | 调用远程服务的服务消费方 |
Registry | 服务注册与发现的注册中心 |
Monitor | 统计服务的调用次数和调用时间的监控中心 |
Container | 服务运行容器 |
另外使用Dubbo最方便的地方在于它可以和Spring非常方便的集成,Dubbo对于配置的优化也是随着Spring一脉相承的,从最早的XML形式到后来的注解方式以及自动装配,都是在不断地简化开发过程来提高开发效率。
Dubbo在Spring框架中的工作流程:
1、Spring的IOC容器启动 2、把服务注册到注册中心(zookeeper软件)中 3、消费者启动时会把它需要用到的服务从注册中心拉取下来 4、提供者的地址发生改变时,注册中心会马上通知消费者 5、根据注册中心中的服务地址直接可以调用提供者了,如果调用了提供者,就会把提供者的地址主动缓存起来 6、监控消费者调用提供者的次数
RPC实现的关键
1、序列化与反序列化
在远程过程调用时,客户端跟服务端是不同的进程,甚至有时候客户端用Java,服务端用C++。这时候就需要客户端把参数先转成一个字节流,传给服务端后,再把字节流转成自己能读取的格式,这个过程叫序列化和反序列化,同理,从服务端返回的值也需要序列化反序列化的过程。在序列化的时候,我们选择Netty自身的对象序列化器。
2、数据网络传输
解决了序列化的问题,那么剩下的就是如何把数据参数传到生产者,网络传输层需要把序列化后的参数字节流传给服务端,然后再把序列化后的调用结果传回客户端,虽然大部分RPC框架都采用了TCP作为传输协议,其实UDP也可以作为传输协议的,基于TCP和UDP我们可以自定义任意规则的协议,加之我们要使用NIO通信方式作为高性能网络服务的前提,于是Netty似乎更符合我们Java程序员的口味,Netty真香!
3、告诉注册中心我要调谁
现在调用参数的序列化和网络传输都已经具备,但是还有个问题,那就是消费者要调用谁的问题,一个函数或者方法,我们可以理解为一个服务,这些服务注册在注册中心上面,只有当消费者告诉注册中心要调用谁,才可以进行远程调用。所以不但要把将要调用的服务的参数传过去,也要把要调用的服务信息传过去。
简易RPC框架的架构
Dubbo 核心模块主要有四个:Registry 注册中心、Provider 服务提供者、Consumer 服务消费者、Monitor监控,为了方便直接砍掉了监控模块,同时把服务提供者模块与注册中心模块写在一起,通过实现自己的简易IOC容器,完成对服务提供者的实例化。
关于使用Netty进行Socket编程的部分可以参考Netty的 官网 或者我之前的博客 《Netty编码实战与Channel生命周期》 ,在这里Netty的编码技巧和方式不作为本文的重点。
RPC框架编码实现
首先需要引入的依赖如下(Netty + Lombok):
1<dependency>
2 <groupId>io.netty</groupId>
3 <artifactId>netty-all</artifactId>
4 <version>4.1.6.Final</version>
5</dependency>
6<dependency>
7 <groupId>org.projectlombok</groupId>
8 <artifactId>lombok</artifactId>
9 <version>1.16.8</version>
10</dependency>
1、Registry与Provider
目录结构如下:
1───src
2 └─main
3 ├─java
4 │ └─edu
5 │ └─xpu
6 │ └─rpc
7 │ ├─api
8 │ │ IRpcCalc.java
9 │ │ IRpcHello.java
10 │ │
11 │ ├─core
12 │ │ InvokerMessage.java
13 │ │
14 │ ├─provider
15 │ │ RpcCalcProvider.java
16 │ │ RpcHelloProvider.java
17 │ │
18 │ └─registry
19 │ MyRegistryHandler.java
20 │ RpcRegistry.java
21 │
22 └─resources
23───pom.xml
IRpcCalc.java与IRpcHello.java是两个Service接口。IRpcCalc.java内容如下,完成模拟业务加、减、乘、除运算
1public interface IRpcCalc {
2 // 加
3 int add(int a, int b);
4
5 // 减
6 int sub(int a, int b);
7
8 // 乘
9 int mul(int a, int b);
10
11 // 除
12 int div(int a, int b);
13}
IRpcHello.java,测试服务是否可用:
至此API 模块就定义完成了,非常简单的两个接口。接下来,我们要确定传输规则,也就是传输协议,协议内容当然要自定义,才能体现出Netty 的优势。
设计一个InvokerMessage类,里面包含了服务名称、调用方法、参数列表、参数值,这就是我们自定义协议的协议包:
1@Data
2public class InvokerMessage implements Serializable {
3 private String className; // 服务名称
4 private String methodName; // 调用哪个方法
5 private Class<?>[] params; // 参数列表
6 private Object[] values; // 参数值
7}
通过定义这样的协议类,就能知道我们需要调用哪个服务,服务中的哪个方法,方法需要传递的参数列表(参数类型+参数值),这些信息正确传递过去了才能拿到正确的调用返回值。
接下来创建这两个服务的具体实现类,IRpcHello的实现类如下:
1public class RpcHelloProvider implements IRpcHello {
2 public String hello(String name) {
3 return "Hello, " + name + "!";
4 }
5}
IRpcCalc的实现类如下:
1public class RpcCalcProvider implements IRpcCalc {
2 @Override
3 public int add(int a, int b) {
4 return a + b;
5 }
6
7 @Override
8 public int sub(int a, int b) {
9 return a - b;
10 }
11
12 @Override
13 public int mul(int a, int b) {
14 return a * b;
15 }
16
17 @Override
18 public int div(int a, int b) {
19 return a / b;
20 }
21}
Registry 注册中心主要功能就是负责将所有Provider的服务名称和服务引用地址注册到一个容器中(这里为了方便直接使用接口类名作为服务名称,前提是假定我们每个服务只有一个实现类),并对外发布。Registry 应该要启动一个对外的服务,很显然应该作为服务端,并提供一个对外可以访问的端口。先启动一个Netty服务,创建RpcRegistry 类,RpcRegistry.java的具体代码如下:
1public class RpcRegistry {
2 private final int port;
3 public RpcRegistry(int port){
4 this.port = port;
5 }
6
7 public void start(){
8 NioEventLoopGroup bossGroup = new NioEventLoopGroup();
9 NioEventLoopGroup workGroup = new NioEventLoopGroup();
10 try{
11 ServerBootstrap serverBootstrap = new ServerBootstrap();
12 serverBootstrap.group(bossGroup, workGroup)
13 .channel(NioServerSocketChannel.class)
14 .childHandler(new ChannelInitializer<SocketChannel>() {
15 protected void initChannel(SocketChannel socketChannel) throws Exception {
16 ChannelPipeline pipeline = socketChannel.pipeline();
17 // 处理拆包、粘包的编解码器
18 pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
19 pipeline.addLast(new LengthFieldPrepender(4));
20 // 处理序列化的编解码器
21 pipeline.addLast("encoder", new ObjectEncoder());
22 pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
23 // 自己的业务逻辑
24 pipeline.addLast(new MyRegistryHandler());
25 }
26 })
27 .option(ChannelOption.SO_BACKLOG, 128)
28 .childOption(ChannelOption.SO_KEEPALIVE, true); // 设置长连接
29
30 ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync();
31 System.out.println("RPC Registry start listen at " + this.port);
32 channelFuture.channel().closeFuture().sync();
33 } catch (Exception e){
34 e.printStackTrace();
35 } finally {
36 bossGroup.shutdownGracefully();
37 workGroup.shutdownGracefully();
38 }
39 }
40 public static void main(String[] args) {
41 new RpcRegistry(8080).start();
42 }
43}
接下来只需要实现我们自己的Handler即可,创建MyRegistryHandler.java,内容如下:
1public class MyRegistryHandler extends ChannelInboundHandlerAdapter {
2 // 在注册中心注册服务需要有容器存放
3 public static ConcurrentHashMap<String, Object> registryMap = new ConcurrentHashMap<>();
4
5 // 类名的缓存位置
6 private static final List<String> classCache = new ArrayList<>();
7
8 // 约定,只要是写在provider下所有的类都认为是一个可以对完提供服务的实现类
9 // edu.xpu.rpc.provider
10
11 public MyRegistryHandler(){
12 scanClass("edu.xpu.rpc.provider");
13 doRegister();
14 }
15
16 @Override
17 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
18 Object result = new Object();
19 // 客户端传过来的调用信息
20 InvokerMessage request = (InvokerMessage)msg;
21 // 先判断有没有这个服务
22 String serverClassName = request.getClassName();
23 if(registryMap.containsKey(serverClassName)){
24 // 获取服务对象
25 Object clazz = registryMap.get(serverClassName);
26 Method method = clazz.getClass().getMethod(request.getMethodName(), request.getParams());
27 result = method.invoke(clazz, request.getValues());
28 System.out.println("request=" + request);
29 System.out.println("result=" + result);
30 }
31 ctx.writeAndFlush(result);
32 ctx.close();
33 }
34
35 @Override
36 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
37 cause.printStackTrace();
38 ctx.close();
39 }
40
41
42 // 实现简易IOC容器
43 // 扫描出包里面所有的Class
44 private void scanClass(String packageName){
45 ClassLoader classLoader = this.getClass().getClassLoader();
46 URL url = classLoader.getResource(packageName.replaceAll("\\.", "/"));
47 File dir = new File(url.getFile());
48 File[] files = dir.listFiles();
49 for (File file: files){
50 if(file.isDirectory()){
51 scanClass(packageName + "." + file.getName());
52 }else{
53 // 拿出类名
54 String className = packageName + "." + file.getName().replace(".class", "").trim();
55 classCache.add(className);
56 }
57 }
58 }
59
60 // 把扫描到的Class实例化,放到Map中
61 // 注册的服务名称就叫做接口的名字 [约定优于配置]
62 private void doRegister(){
63 if(classCache.size() == 0) return;
64 for (String className: classCache){
65 try {
66 Class<?> clazz = Class.forName(className);
67 // 服务名称
68 Class<?> anInterface = clazz.getInterfaces()[0];
69 registryMap.put(anInterface.getName(), clazz.newInstance());
70 } catch (Exception e) {
71 e.printStackTrace();
72 }
73 }
74 }
75}
在这里还通过反射实现了简易的IOC容器,先递归扫描provider包底下的类,把这些类的对象作为服务对象放到IOC容器中进行管理,由于IOC是一个Map实现的,所以将类名作为服务名称,也就是Key,服务对象作为Value。根据消费者传过来的服务名称,就可以找到对应的服务,到此,Registry和Provider已经全部写完了。
2、consumer
目录结构如下:
1└─src
2 ├─main
3 │ ├─java
4 │ │ └─edu
5 │ │ └─xpu
6 │ │ └─rpc
7 │ │ ├─api
8 │ │ │ IRpcCalc.java
9 │ │ │ IRpcHello.java
10 │ │ │
11 │ │ ├─consumer
12 │ │ │ │ RpcConsumer.java
13 │ │ │ │
14 │ │ │ └─proxy
15 │ │ │ RpcProxy.java
16 │ │ │ RpcProxyHandler.java
17 │ │ │
18 │ │ └─core
19 │ │ InvokerMessage.java
20 │ │
21 │ └─resources
22 └─test
23 └─java
24└─ pom.xml
再看客户端的实现之前,先梳理一下RPC流程。API 模块中的接口只在服务端实现了。因此,客户端调用API 中定义的某一个接口方法时,实际上是要发起一次网络请求去调用服务端的某一个服务。而这个网络请求首先被注册中心接收,由注册中心先确定需要调用的服务的位置,再将请求转发至真实的服务实现,最终调用服务端代码,将返回值通过网络传输给客户端。整个过程对于客户端而言是完全无感知的,就像调用本地方法一样,所以必定要对客户端的API接口做代理,隐藏网络请求的细节。
由上图的流程图可知,要让用户调用无感知,必须创建出代理类来完成网络请求的操作。
RpcProxy.java如下:
1public class RpcProxy {
2 public static <T> T create(Class<?> clazz) {
3 //clazz传进来本身就是interface
4 MethodProxy proxy = new MethodProxy(clazz);
5 T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz} , proxy);
6 return result;
7 }
8
9 private static class MethodProxy implements InvocationHandler {
10 private Class<?> clazz;
11
12 public MethodProxy(Class<?> clazz) {
13 this.clazz = clazz;
14 }
15 public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
16 // 如果传进来是一个已实现的具体类
17 if (Object.class.equals(method.getDeclaringClass())) {
18 try {
19 return method.invoke(this, args);
20 } catch (Throwable t) {
21 t.printStackTrace();
22 }
23 // 如果传进来的是一个接口(核心)
24 } else {
25 return rpcInvoke(method, args);
26 }
27 return null;
28 }
29
30 // 实现接口的核心方法
31 public Object rpcInvoke(Method method, Object[] args) {
32 // 传输协议封装
33 InvokerMessage invokerMessage = new InvokerMessage();
34 invokerMessage.setClassName(this.clazz.getName());
35 invokerMessage.setMethodName(method.getName());
36 invokerMessage.setValues(args);
37 invokerMessage.setParams(method.getParameterTypes());
38
39 final RpcProxyHandler consumerHandler = new RpcProxyHandler();
40 EventLoopGroup group = new NioEventLoopGroup();
41
42 try {
43 Bootstrap bootstrap = new Bootstrap();
44 bootstrap.group(group)
45 .channel(NioSocketChannel.class)
46 .option(ChannelOption.TCP_NODELAY, true)
47 .handler(new ChannelInitializer<SocketChannel>() {
48 @Override
49 public void initChannel(SocketChannel ch) throws Exception {
50 ChannelPipeline pipeline = ch.pipeline();
51 pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
52 //自定义协议编码器
53 pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
54 //对象参数类型编码器
55 pipeline.addLast("encoder", new ObjectEncoder());
56 //对象参数类型解码器
57 pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
58 pipeline.addLast("handler", consumerHandler);
59 }
60 });
61 ChannelFuture future = bootstrap.connect("localhost", 8080).sync();
62 future.channel().writeAndFlush(invokerMessage).sync();
63 future.channel().closeFuture().sync();
64 } catch (Exception e) {
65 e.printStackTrace();
66 } finally {
67 group.shutdownGracefully();
68 }
69 return consumerHandler.getResponse();
70 }
71 }
72}
我们通过传进来的接口对象,获得了要调用的服务名,服务方法名,参数类型列表,参数列表,这样就把自定义的RPC协议包封装好了,只需要把协议包发出去等待结果返回即可,所以为了接收返回值数据还需要自定义一个接收用的Handler,RpcProxyHandlerdiamante如下:
1public class RpcProxyHandler extends ChannelInboundHandlerAdapter {
2 private Object result;
3
4 public Object getResponse() {
5 return result;
6 }
7
8 @Override
9 public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
10 result = msg;
11 }
12
13 @Override
14 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
15 System.out.println("client exception is general");
16 }
17}
这样就算是完成了整个流程,下面开始测试一下吧,测试的RpcConsumer.java代码如下:
1public class RpcConsumer {
2 public static void main(String[] args) {
3 // 本机之间的正常调用
4 // IRpcHello iRpcHello = new RpcHelloProvider();
5 // iRpcHello.hello("Tom");
6
7 // 肯定是用动态代理来实现的
8 // 传给它接口,返回一个接口的实例,伪代理
9 IRpcHello rpcHello = RpcProxy.create(IRpcHello.class);
10 System.out.println(rpcHello.hello("ZouChangLin"));
11
12 int a = 10;
13 int b = 5;
14 IRpcCalc iRpcCalc = RpcProxy.create(IRpcCalc.class);
15
16 System.out.println(String.format("%d + %d = %d", a, b, iRpcCalc.add(a, b)));
17 System.out.println(String.format("%d - %d = %d ", a, b, iRpcCalc.sub(a, b)));
18 System.out.println(String.format("%d * %d = %d", a, b, iRpcCalc.mul(a, b)));
19 System.out.println(String.format("%d / %d = %d", a, b, iRpcCalc.div(a, b)));
20 }
21}
3、效果测试
先开启Registry,运行端口是8080:
开启consumer开始调用
调用完成后可以看到调用结果正确,并且在Registry这边也看到了日志:
可以发现,简易RPC框架顺利完工!