Tim

一枚野生程序员~

  • 主页
  • 分类
  • 标签
  • 归档
  • 关于
所有文章 工具

Tim

一枚野生程序员~

  • 主页
  • 分类
  • 标签
  • 归档
  • 关于

基于Netty实现PRC框架

阅读数:次 2020-07-11
字数统计: 3.4k字   |   阅读时长≈ 14分

RPC全称Remote Procedure Call,即远程过程调用,对于调用者无感知这是一个远程调用。目前流行的开源 RPC 框架有阿里的Dubbo、Google 的 gRPC、Twitter 的Finagle 等。本次RPC框架的设计主要参考的是阿里的Dubbo,这里Netty 基本上是作为架构的技术底层而存在的,主要完成高性能的网络通信,从而实现高效的远程调用。

Dubbo的架构与Spring

其实在之前的文章中《谈谈京东的服务框架》,探讨过Dubbo的组成和架构。

节点 说明
Provider 暴露服务的服务提供方
Consumer 调用远程服务的服务消费方
Registry 服务注册与发现的注册中心
Monitor 统计服务的调用次数和调用时间的监控中心
Container 服务运行容器

另外使用Dubbo最方便的地方在于它可以和Spring非常方便的集成,Dubbo对于配置的优化也是随着Spring一脉相承的,从最早的XML形式到后来的注解方式以及自动装配,都是在不断地简化开发过程来提高开发效率。

Dubbo在Spring框架中的工作流程:

1、Spring的IOC容器启动
2、把服务注册到注册中心(zookeeper软件)中
3、消费者启动时会把它需要用到的服务从注册中心拉取下来
4、提供者的地址发生改变时,注册中心会马上通知消费者
5、根据注册中心中的服务地址直接可以调用提供者了,如果调用了提供者,就会把提供者的地址主动缓存起来
6、监控消费者调用提供者的次数

RPC实现的关键

1、序列化与反序列化

在远程过程调用时,客户端跟服务端是不同的进程,甚至有时候客户端用Java,服务端用C++。这时候就需要客户端把参数先转成一个字节流,传给服务端后,再把字节流转成自己能读取的格式,这个过程叫序列化和反序列化,同理,从服务端返回的值也需要序列化反序列化的过程。在序列化的时候,我们选择Netty自身的对象序列化器。

mark

2、数据网络传输

解决了序列化的问题,那么剩下的就是如何把数据参数传到生产者,网络传输层需要把序列化后的参数字节流传给服务端,然后再把序列化后的调用结果传回客户端,虽然大部分RPC框架都采用了TCP作为传输协议,其实UDP也可以作为传输协议的,基于TCP和UDP我们可以自定义任意规则的协议,加之我们要使用NIO通信方式作为高性能网络服务的前提,于是Netty似乎更符合我们Java程序员的口味,Netty真香!

3、告诉注册中心我要调谁

现在调用参数的序列化和网络传输都已经具备,但是还有个问题,那就是消费者要调用谁的问题,一个函数或者方法,我们可以理解为一个服务,这些服务注册在注册中心上面,只有当消费者告诉注册中心要调用谁,才可以进行远程调用。所以不但要把将要调用的服务的参数传过去,也要把要调用的服务信息传过去。

简易RPC框架的架构

mark

Dubbo 核心模块主要有四个:Registry 注册中心、Provider 服务提供者、Consumer 服务消费者、Monitor监控,为了方便直接砍掉了监控模块,同时把服务提供者模块与注册中心模块写在一起,通过实现自己的简易IOC容器,完成对服务提供者的实例化。

关于使用Netty进行Socket编程的部分可以参考Netty的官网 或者我之前的博客《Netty编码实战与Channel生命周期》,在这里Netty的编码技巧和方式不作为本文的重点。

RPC框架编码实现

首先需要引入的依赖如下(Netty + Lombok):

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.6.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.8</version>
</dependency>

1、Registry与Provider

目录结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
───src
└─main
├─java
│ └─edu
│ └─xpu
│ └─rpc
│ ├─api
│ │ IRpcCalc.java
│ │ IRpcHello.java
│ │
│ ├─core
│ │ InvokerMessage.java
│ │
│ ├─provider
│ │ RpcCalcProvider.java
│ │ RpcHelloProvider.java
│ │
│ └─registry
│ MyRegistryHandler.java
│ RpcRegistry.java
│
└─resources
───pom.xml

IRpcCalc.java与IRpcHello.java是两个Service接口。IRpcCalc.java内容如下,完成模拟业务加、减、乘、除运算

1
2
3
4
5
6
7
8
9
10
11
12
13
public interface IRpcCalc {
// 加
int add(int a, int b);

// 减
int sub(int a, int b);

// 乘
int mul(int a, int b);

// 除
int div(int a, int b);
}

IRpcHello.java,测试服务是否可用:

1
2
3
public interface IRpcHello {
String hello(String name);
}

至此API 模块就定义完成了,非常简单的两个接口。接下来,我们要确定传输规则,也就是传输协议,协议内容当然要自定义,才能体现出Netty 的优势。

设计一个InvokerMessage类,里面包含了服务名称、调用方法、参数列表、参数值,这就是我们自定义协议的协议包:

1
2
3
4
5
6
7
@Data
public class InvokerMessage implements Serializable {
private String className; // 服务名称
private String methodName; // 调用哪个方法
private Class<?>[] params; // 参数列表
private Object[] values; // 参数值
}

通过定义这样的协议类,就能知道我们需要调用哪个服务,服务中的哪个方法,方法需要传递的参数列表(参数类型+参数值),这些信息正确传递过去了才能拿到正确的调用返回值。

接下来创建这两个服务的具体实现类,IRpcHello的实现类如下:

1
2
3
4
5
public class RpcHelloProvider implements IRpcHello {
public String hello(String name) {
return "Hello, " + name + "!";
}
}

IRpcCalc的实现类如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class RpcCalcProvider implements IRpcCalc {
@Override
public int add(int a, int b) {
return a + b;
}

@Override
public int sub(int a, int b) {
return a - b;
}

@Override
public int mul(int a, int b) {
return a * b;
}

@Override
public int div(int a, int b) {
return a / b;
}
}

Registry 注册中心主要功能就是负责将所有Provider的服务名称和服务引用地址注册到一个容器中(这里为了方便直接使用接口类名作为服务名称,前提是假定我们每个服务只有一个实现类),并对外发布。Registry 应该要启动一个对外的服务,很显然应该作为服务端,并提供一个对外可以访问的端口。先启动一个Netty服务,创建RpcRegistry 类,RpcRegistry.java的具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class RpcRegistry {
private final int port;
public RpcRegistry(int port){
this.port = port;
}

public void start(){
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
try{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
// 处理拆包、粘包的编解码器
pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
pipeline.addLast(new LengthFieldPrepender(4));
// 处理序列化的编解码器
pipeline.addLast("encoder", new ObjectEncoder());
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
// 自己的业务逻辑
pipeline.addLast(new MyRegistryHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true); // 设置长连接

ChannelFuture channelFuture = serverBootstrap.bind(this.port).sync();
System.out.println("RPC Registry start listen at " + this.port);
channelFuture.channel().closeFuture().sync();
} catch (Exception e){
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workGroup.shutdownGracefully();
}
}
public static void main(String[] args) {
new RpcRegistry(8080).start();
}
}

接下来只需要实现我们自己的Handler即可,创建MyRegistryHandler.java,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
public class MyRegistryHandler extends ChannelInboundHandlerAdapter {
// 在注册中心注册服务需要有容器存放
public static ConcurrentHashMap<String, Object> registryMap = new ConcurrentHashMap<>();

// 类名的缓存位置
private static final List<String> classCache = new ArrayList<>();

// 约定,只要是写在provider下所有的类都认为是一个可以对完提供服务的实现类
// edu.xpu.rpc.provider

public MyRegistryHandler(){
scanClass("edu.xpu.rpc.provider");
doRegister();
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Object result = new Object();
// 客户端传过来的调用信息
InvokerMessage request = (InvokerMessage)msg;
// 先判断有没有这个服务
String serverClassName = request.getClassName();
if(registryMap.containsKey(serverClassName)){
// 获取服务对象
Object clazz = registryMap.get(serverClassName);
Method method = clazz.getClass().getMethod(request.getMethodName(), request.getParams());
result = method.invoke(clazz, request.getValues());
System.out.println("request=" + request);
System.out.println("result=" + result);
}
ctx.writeAndFlush(result);
ctx.close();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}


// 实现简易IOC容器
// 扫描出包里面所有的Class
private void scanClass(String packageName){
ClassLoader classLoader = this.getClass().getClassLoader();
URL url = classLoader.getResource(packageName.replaceAll("\\.", "/"));
File dir = new File(url.getFile());
File[] files = dir.listFiles();
for (File file: files){
if(file.isDirectory()){
scanClass(packageName + "." + file.getName());
}else{
// 拿出类名
String className = packageName + "." + file.getName().replace(".class", "").trim();
classCache.add(className);
}
}
}

// 把扫描到的Class实例化,放到Map中
// 注册的服务名称就叫做接口的名字 [约定优于配置]
private void doRegister(){
if(classCache.size() == 0) return;
for (String className: classCache){
try {
Class<?> clazz = Class.forName(className);
// 服务名称
Class<?> anInterface = clazz.getInterfaces()[0];
registryMap.put(anInterface.getName(), clazz.newInstance());
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

在这里还通过反射实现了简易的IOC容器,先递归扫描provider包底下的类,把这些类的对象作为服务对象放到IOC容器中进行管理,由于IOC是一个Map实现的,所以将类名作为服务名称,也就是Key,服务对象作为Value。根据消费者传过来的服务名称,就可以找到对应的服务,到此,Registry和Provider已经全部写完了。

2、consumer

目录结构如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
└─src
├─main
│ ├─java
│ │ └─edu
│ │ └─xpu
│ │ └─rpc
│ │ ├─api
│ │ │ IRpcCalc.java
│ │ │ IRpcHello.java
│ │ │
│ │ ├─consumer
│ │ │ │ RpcConsumer.java
│ │ │ │
│ │ │ └─proxy
│ │ │ RpcProxy.java
│ │ │ RpcProxyHandler.java
│ │ │
│ │ └─core
│ │ InvokerMessage.java
│ │
│ └─resources
└─test
└─java
└─ pom.xml

再看客户端的实现之前,先梳理一下RPC流程。API 模块中的接口只在服务端实现了。因此,客户端调用API 中定义的某一个接口方法时,实际上是要发起一次网络请求去调用服务端的某一个服务。而这个网络请求首先被注册中心接收,由注册中心先确定需要调用的服务的位置,再将请求转发至真实的服务实现,最终调用服务端代码,将返回值通过网络传输给客户端。整个过程对于客户端而言是完全无感知的,就像调用本地方法一样,所以必定要对客户端的API接口做代理,隐藏网络请求的细节。

mark

由上图的流程图可知,要让用户调用无感知,必须创建出代理类来完成网络请求的操作。

RpcProxy.java如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
public class RpcProxy {
public static <T> T create(Class<?> clazz) {
//clazz传进来本身就是interface
MethodProxy proxy = new MethodProxy(clazz);
T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class[]{clazz} , proxy);
return result;
}

private static class MethodProxy implements InvocationHandler {
private Class<?> clazz;

public MethodProxy(Class<?> clazz) {
this.clazz = clazz;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 如果传进来是一个已实现的具体类
if (Object.class.equals(method.getDeclaringClass())) {
try {
return method.invoke(this, args);
} catch (Throwable t) {
t.printStackTrace();
}
// 如果传进来的是一个接口(核心)
} else {
return rpcInvoke(method, args);
}
return null;
}

// 实现接口的核心方法
public Object rpcInvoke(Method method, Object[] args) {
// 传输协议封装
InvokerMessage invokerMessage = new InvokerMessage();
invokerMessage.setClassName(this.clazz.getName());
invokerMessage.setMethodName(method.getName());
invokerMessage.setValues(args);
invokerMessage.setParams(method.getParameterTypes());

final RpcProxyHandler consumerHandler = new RpcProxyHandler();
EventLoopGroup group = new NioEventLoopGroup();

try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
//自定义协议编码器
pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
//对象参数类型编码器
pipeline.addLast("encoder", new ObjectEncoder());
//对象参数类型解码器
pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
pipeline.addLast("handler", consumerHandler);
}
});
ChannelFuture future = bootstrap.connect("localhost", 8080).sync();
future.channel().writeAndFlush(invokerMessage).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
return consumerHandler.getResponse();
}
}
}

我们通过传进来的接口对象,获得了要调用的服务名,服务方法名,参数类型列表,参数列表,这样就把自定义的RPC协议包封装好了,只需要把协议包发出去等待结果返回即可,所以为了接收返回值数据还需要自定义一个接收用的Handler,RpcProxyHandlerdiamante如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public class RpcProxyHandler extends ChannelInboundHandlerAdapter {
private Object result;

public Object getResponse() {
return result;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
result = msg;
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
System.out.println("client exception is general");
}
}

这样就算是完成了整个流程,下面开始测试一下吧,测试的RpcConsumer.java代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public class RpcConsumer {
public static void main(String[] args) {
// 本机之间的正常调用
// IRpcHello iRpcHello = new RpcHelloProvider();
// iRpcHello.hello("Tom");

// 肯定是用动态代理来实现的
// 传给它接口,返回一个接口的实例,伪代理
IRpcHello rpcHello = RpcProxy.create(IRpcHello.class);
System.out.println(rpcHello.hello("ZouChangLin"));

int a = 10;
int b = 5;
IRpcCalc iRpcCalc = RpcProxy.create(IRpcCalc.class);

System.out.println(String.format("%d + %d = %d", a, b, iRpcCalc.add(a, b)));
System.out.println(String.format("%d - %d = %d ", a, b, iRpcCalc.sub(a, b)));
System.out.println(String.format("%d * %d = %d", a, b, iRpcCalc.mul(a, b)));
System.out.println(String.format("%d / %d = %d", a, b, iRpcCalc.div(a, b)));
}
}

3、效果测试

先开启Registry,运行端口是8080:

mark

开启consumer开始调用

mark

调用完成后可以看到调用结果正确,并且在Registry这边也看到了日志:

mark

可以发现,简易RPC框架顺利完工!

赏

谢谢你请我喝咖啡

支付宝
微信
  • 本文作者: Tim
  • 本文链接: https://zouchanglin.cn/3887185772.html
  • 版权声明: 本博客所有文章除特别声明外,均采用 CC BY-SA 4.0 许可协议。转载请注明出处!
  • RPC
  • 序列化
  • Netty
  • 高性能网络
  • 分布式理论

扫一扫,分享到微信

Jsoup实战(正方教务系统爬取)
Netty编码实战与Channel生命周期
  1. 1. Dubbo的架构与Spring
  2. 2. RPC实现的关键
    1. 2.1. 1、序列化与反序列化
    2. 2.2. 2、数据网络传输
    3. 2.3. 3、告诉注册中心我要调谁
  3. 3. 简易RPC框架的架构
  4. 4. RPC框架编码实现
    1. 4.1. 1、Registry与Provider
    2. 4.2. 2、consumer
    3. 4.3. 3、效果测试
© 2017-2021 Tim
本站总访问量次 | 本站访客数人
  • 所有文章
  • 工具

tag:

  • 生活
  • Android
  • 索引
  • MySQL
  • 组件通信
  • Nginx
  • JavaSE
  • JUC
  • JavaWeb
  • 模板引擎
  • 前端
  • Linux
  • 计算机网络
  • Docker
  • C/C++
  • JVM
  • 上传下载
  • JavaEE
  • SpringCloud
  • Golang
  • Gradle
  • 网络安全
  • 非对称加密
  • IDEA
  • SpringBoot
  • Jenkins
  • 字符串
  • vim
  • 存储
  • 文件下载
  • Mac
  • Windows
  • NIO
  • RPC
  • 集群
  • 微服务
  • SSH
  • 配置中心
  • XML
  • Chrome
  • 压力测试
  • Git
  • 博客
  • 概率论
  • 排序算法
  • 分布式
  • 异常处理
  • 文件系统
  • 哈希
  • openCV
  • 栈
  • 回溯
  • SpringCore
  • 流媒体
  • rtmp
  • 面向对象
  • Vue
  • ElementUI
  • 软件工程
  • 异步
  • 自定义UI
  • ORM框架
  • 模块化
  • 交互式
  • Jsoup
  • Http Client
  • LRUCache
  • RabbitMQ
  • 消息通信
  • 服务解耦
  • 负载均衡
  • 权限
  • 多线程
  • 单例模式
  • Protobuf
  • 序列化
  • Python
  • m3u8
  • 堆
  • 二叉树
  • 自定义View
  • 观察者模式
  • 设计模式
  • 线程池
  • 动态扩容
  • 高可用
  • GC
  • ffmpeg
  • SpringMVC
  • REST
  • Redis
  • 缓存中间件
  • UML
  • Maven
  • Netty
  • 高性能网络
  • IPC通信
  • IO
  • Stream
  • 发布订阅
  • SQLite
  • Hash
  • 集合框架
  • 链表
  • Lambda
  • 汇编语言
  • 组件化
  • Router
  • 开发工具

    缺失模块。
    1、请确保node版本大于6.2
    2、在博客根目录(注意不是yilia-plus根目录)执行以下命令:
    npm i hexo-generator-json-content --save

    3、在根目录_config.yml里添加配置:

      jsonContent:
        meta: false
        pages: false
        posts:
          title: true
          date: true
          path: true
          text: false
          raw: false
          content: false
          slug: false
          updated: false
          comments: false
          link: false
          permalink: false
          excerpt: false
          categories: false
          tags: true
    

  • 思维导图
  • PDF工具
  • 无损放大
  • 代码转图
  • HTTPS证书