1. 前言
Hadoop
是分布式计算系统,在分布式环境中,网络通信模块是其核心模块之一。要学好Hadoop
,需理解其底层通信系统的基本工作原理。Hadoop
提供有体系完整的RPC
框架,实现了对底层网络通信过程的优雅封装。
本文将从RPC
概念说起,一起聊聊Hadoop RPC
的实现细节。
先理解什么是RPC
?
RPC
中的R
是单词Remote
的首字母,P
是Procedure
的首字母,C
是Call
首字母。
翻译过来:远程过程调用。 如果仅是翻译一下,说了等于没有说。
如需彻底理解RPC
,则需理解过程
的 含义:
过程
可以认为是方法
或函数
,甚至可以认为是一个对象
或子程序
。为了简化问题,本文所说过程
指方法
。
在同进程
中方法之间的调用,称为本地调用
。
那么,是否可以认为发生在不同进程之间的调用就是远程调用,广义而言,如果认为远
是指不在同一个进程中,这么说也没有什么不对。
Tips: 狭义而言,远程调用指物理位置不同的计算机内进程之间方法的调用。如分布式、微服务、
B/S
……环境。
不同进程之间如何实现过程的调用?
答案是:使用网络通信模块
实现。
可以称:通过底层网络通信模块
实现的不同进程间的过程调用就是远程调用
。所以说,远程调用是一个广义上的概念。套用一个广告语:不是所有的牛奶都叫特仑苏,但是特仑苏
指的是牛奶。同理,不是所有的远程调用
都叫PRC
,但是RPC
一定是远程调用。
什么样的远程调用才叫RPC
?则需要从远程调用的底层流程说起。
2. 原生网络通信
什么是原生网络通信?
从一个问题开始。
如果现在有一个进程A
,它需要一个业务逻辑功能,发现进程B
里面有。便心念:能否借用B
进程的方法用一用?
想法很美好,但毕竟不是自己家的,所以需要一些方法和措施。
为了便于理解,另举一个现实生活中的例子:比如你想借邻居家的洗衣机洗衣服。想想,你会怎么办?对了,假设你隔壁住的是一个很友好的邻居。
常规的操作流程是不是应该如下:
- 首先,你来到邻居家门前,开始敲门。
- 邻居帮你把门打门。
- 你发出请求:您好,能不能借你家的洗衣机洗几件衣服。
- 中国好邻居说:可以的,你可以把衣服拿过来,我帮你先洗着。
- 你把自己的衣服打包交给邻居。
- 邻居拆开你的包裹,把衣服丢到洗衣机里。
- 邻居把衣服洗完后,把洗好的衣服打包后交给你。
- 最后,你取走衣服后,别忘记说声谢谢。
不同进程之间方法的调用,和你借邻居家洗衣机洗衣服的流程差不多。只是进程之间的敲门
和开门
需要使用计算机语言提供的网络编程 API
。
其流程大致如下:
B
进程先建立一个socket
监听器。B
进程必须是一个非常热心的进程,随时候命等待别的进程的敲门。A
进程向B
进程发生网络连接请求,得到的B
的应答后,两者便建立起了网络连接。类似于你家的邻居开门了。A
把自己的数据(类似于衣服)打包后向B
进程发起处理请求(类似于洗衣请求)。B
进程接收到你的包裹,解开包裹,并把A
的数据交给自己的方法进行处理。B
的方法处理完成后,B
会把处理结果打包,再通过网络通信发送给A
。
可以使用JAVA
语言提供的API
实现如上流程。完整代码如下:
B
程序代码:B
程序是服务提供者,可称B
为服务器组件。
package com.gk.server;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.net.Socket;
/*
* 服务提供者
*/
public class B {
/*
* B 中的方法,也是 A 需要的
*/
static String hello(String name) {
return "Hello!" + name;
}
/*
* B端的网络通信
* 本质 B 就是服务器 socket
*/
public static void main(String[] args) throws IOException {
// 监听请求
ServerSocket serverSocket = new ServerSocket(1234);
// 等待网络连接
Socket socket = serverSocket.accept();
// 接受 A 传递过来的数据
InputStream inputStream = socket.getInputStream();
byte buffers[] = new byte[20];
int read = inputStream.read(buffers);
String name = new String(buffers, 0, read);
//调用自己的方法,成全 A 的远程调用
String info = hello(name);
// 把处理结果传递给 A
OutputStream outputStream = socket.getOutputStream();
outputStream.write(info.getBytes());
inputStream.close();
outputStream.close();
socket.close();
serverSocket.close();
}
}
A
程序的代码:A
是需要服务的一端,可称A
为客户端程序。
package com.gk.clien;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.net.UnknownHostException;
/*
* 客户端
*/
public class A {
public static void main(String[] args) throws UnknownHostException, IOException {
// 发起网络连接
Socket socket = new Socket("localhost", 1234);
// 准备数据
String name = "rose";
// 把数据传递给 B
OutputStream outputStream = socket.getOutputStream();
outputStream.write(name.getBytes());
// 接受 B 处理好后的数据
InputStream inputStream=socket.getInputStream();
byte[] buffers=new byte[20];
int read= inputStream.read(buffers);
//输出
System.out.println(new String(buffers,0,read));
outputStream.close();
inputStream.close();
socket.close();
}
}
- 测试:先执行
B
程序,再执行A
程序。在A
端可以看到B
端处理后的数据。
本质上,A
和B
的程序结构就是基于通信机制的C/S
结构。
至此,应该会有一个疑问?
如果A
对B
经常会有如此的请求,或者除了A
还有更多的进程需要请求 B
。类似于你经常要借邻居家的洗衣机洗衣服。
会发现每次都要经过敲门、开门等一系列繁琐的流程。而实际上,每一次唯一变化的是要洗的衣服。你如何才能简化这些流程,让借
的过程变的具有艺术性。
现实生活中,你可以请一个代理人。自然,邻居也可以请一个代理人。把自身从繁琐的流程中解脱出来。
Tips: 需要明白,请代理人只是简化了请求者的工作量,并没有减少实际流程。
同理,进程间通信时,也可以请代理人。这里的代理者,只是不是人而是组件。
至此回答什么是原生网络通信?
基于原生API
,老老实实、按部就班地实现网络通信称为原生网络通信。
刚说过,可以使用代理者的模型实现网络通信,其本质就是封装理念。
3. 代理者模式
代理者模式的基本思想:
- 把原生通信系统中的
公共流程
封装在特定的组件中。 - 为
A
和B
分别设计一个代理者。 - 当
A
或其它进程需要B
进程的服务时,只需要把数据传递给代理者组件,然后舒适地等待代理者把B
处理的结果返回给自己。 B
同样由自己的代理组件负责接收A
或其它进程传递过来的数据,并正确调用自己的方法且返回数据处理结果。
于此,A
的高层业务组件和B
的业务服务组件便可从无聊而不得不面对的流程中解脱出来,可全心全意关注自己的高层业务。本质就是基于单一职责思想的解耦操作。
基于代理者思想,现在开始自定义简易版的远程请求框架。
-
首先,
B
程序需要以接口的方式告诉需求者自己能提供什么样的功能。类似于公司发布招聘信息,需要清晰告诉求职者什么样的岗位有什么样的具体要求。然后服务需求者(A)
需要明白B
的岗位要求,并签订严格的劳务合同,明确自己的责任。这里的接口便是协议,约束供需双方的行为规范。
package com.gk.protocol;
/*
* 通信双方共同遵守的行为准则
*/
public interface MyProtocol {
String hello(String name);
}
-
A
程序至少有2
个独立组件:**业务组件:**通俗而言,具体做些事情。
**代理组件:**当业务组件有远程调用请求时,由代理组件去实施。
代理组件
本质是遵循代理设计模式设计出来的组件,这里使用 java
的proxy
类动态生成代理组件。代理组件本身不能提供具体实现,而是封装了网络API
,以此访问指定主机上的功能模块。
先编写代理组件:
package com.gk.clien;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.Socket;
import com.gk.protocol.MyProtocol;
/*
* A 的代理组件,
* 功能,代替服务需求组件访问 B 提供的功能。
* 代理组件必须实现 B 定义的接口以此了解 B 提供的功能。
* 如果对方有什么功能都不知道,代理者是不合格的
*/
public class AProxy implements InvocationHandler {
// 远程计算机的 ip
private String ip;
// 远程计算机的端口
private int port;
public AProxy(String ip, int port) {
this.ip = ip;
this.port = port;
}
/*
* 创建动态代理组件
*/
MyProtocol createProxy() {
//基于 B 程序的接口定义动态创建代理者
MyProtocol myProtocol = (MyProtocol) Proxy.newProxyInstance(AProxy.class.getClassLoader(),
new Class[] { MyProtocol.class }, this);
return myProtocol;
}
/*
* 封装具体的网络请求
*/
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 发起网络连接
Socket socket = new Socket(this.ip, this.port);
// 把数据传递给 B
OutputStream outputStream = socket.getOutputStream();
// 把参数和方法名传递过去。设计一个简单的字符串格式的通信协议,最好使用 json 数据格式
StringBuffer info = new StringBuffer(method + "\t");
for (Object arg : args) {
// 参数之间使用逗号隔开
info.append(arg).append(",");
}
info.deleteCharAt(info.length() - 1);
info.append("]");
outputStream.write(info.toString().getBytes());
// 接受 B 处理好后的数据
InputStream inputStream = socket.getInputStream();
byte[] buffers = new byte[20];
int read = inputStream.read(buffers);
// 转换成字符串
String res=new String(buffers, 0, read);
outputStream.close();
inputStream.close();
socket.close();
return res;
}
}
编写A
的业务组件:
package com.gk.clien;
import com.gk.protocol.MyProtocol;
/*
* A 的业务组件
*/
public class AService {
//依赖 B 接口中定义的功能
private MyProtocol myProtocol;
public AService(MyProtocol myProtocol) {
this.myProtocol = myProtocol;
}
/*
* 业务方法
*/
public void doSomething(String name) {
// 自己的业务
System.out.println("自己能实现的业务");
// 另一部分业务需要远程调用
String res = this.myProtocol.hello(name);
System.out.println("远程业务功能模块处理结果:" + res);
}
}
-
B
也应该有2
个组件。B
向外服务的业务组件。B
的代理者。
编写B
的业务组件: 就是对自己接口定义的实现。
package com.gk.server;
import com.gk.protocol.MyProtocol;
/*
* 需要实现自己定义的接口
*/
public class BService implements MyProtocol {
@Override
public String hello(String name) {
return "Hello!" + name;
}
}
编写 B 的代理者组件: B
的代理者主要是解析A
传递过来的数据,并使用反射方式动态调用业务模块的功能。理论而言,B
应该提供网络连接和网络响应组件。因本文仅是为了讲明白远程调用概念,故突出主要的,忽略次要的。
package com.gk.server;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.ServerSocket;
import java.net.Socket;
import com.gk.protocol.MyProtocol;
public class BProxy {
//真正实现了接口的组件
private MyProtocol myProtocol;
public BProxy(MyProtocol myProtocol) {
this.myProtocol = myProtocol;
}
/*
* 方法中代码有 3 层功能:
* A、网络连接
* B、解析数据
* C、处理数据并返回数据
* 理论而言,为了响应多用户请求,需使用多线程机制,且需把上述三部分功能设计到 3 个组件中
*/
void getRes() throws IOException, NoSuchMethodException, SecurityException, IllegalAccessException,
IllegalArgumentException, InvocationTargetException {
// 监听请求
ServerSocket serverSocket = new ServerSocket(1234);
// 等待网络连接
Socket socket = serverSocket.accept();
// 接受 A 传递过来的数据
InputStream inputStream = socket.getInputStream();
byte buffers[] = new byte[100];
int read = inputStream.read(buffers);
//得到请求数据
String info = new String(buffers, 0, read);
//解析请求数据
String[] strs = info.split("\t");
//方法名
String methodName = strs[0];
//解析参数
String args[] = strs[1].split(",");
Class<?> clz = MyProtocol.class;
// 利用反射机制,根据请求者提供的数据调用相关方法
Method method = clz.getMethod(methodName, new Class[] { String.class });
String res = String.valueOf(method.invoke(this.myProtocol, args));
// 返回给 A
OutputStream outputStream = socket.getOutputStream();
outputStream.write(res.getBytes());
inputStream.close();
outputStream.close();
socket.close();
serverSocket.close();
}
}
-
测试:
B
端的测试代码:
package com.gk.server;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
public class B {
public static void main(String[] args) throws NoSuchMethodException, SecurityException, IllegalAccessException, IllegalArgumentException, InvocationTargetException, IOException {
//调用代理者
BProxy bProxy = new BProxy(new BService());
bProxy.getRes();
}
}
A
端的测试代码:
package com.gk.clien;
public class A {
public static void main(String[] args) {
//代理对象
AProxy aProxy=new AProxy("127.0.0.1",1234);
//业务组件
AService aService=new AService(aProxy.createProxy());
//业务实现
aService.doSomething("world");
}
}
先执行 B 端测试代码,再测试 A 端代码。输出结果:
借助代理者思想,对原生网络通信的封装代码,可以让 A
程序在不了解底层网络通信细节基础上,实现对B
程序中功能模块的访问。每次调用时,只需把数据传递给代理者便可,大大简化了远程调用的过程。
这也是RPC
的目标。那么原生网络通信,以及自定义的远程调用框架,是否可以称为RPC
?
回归到RPC
的概念上。
RPC
本质是一个思想,或一个协议。提供了统一封装原生网络通信的标准,这个标准也称为RPC
协议。在RPC
协议或标准中无论是客户端还是服务端,都有一个叫 stub
的程序,类似于代理者。其访问流程如下:
- 客户程序以本地方式调用系统产生的
Stub
程序; Stub
程序将函数调用信息按照网络通信模块的要求封装成消息包,并交给通信模块发送到远程服务器端。- 远程服务器端接收此消息后,将此消息发送给相应的
Stub
程序; Stub
程序拆封消息,形成被调过程要求的形式,并调用对应函数;- 被调用函数按照所获参数执行,并将结果返回给
Stub
程序; Stub
程序将此结果封装成消息,通过网络通信模块逐级地传送给客户程序
Tips:
RPC
是思想、规范,基于PRC
规范实现的具有远程调用的程序称为RPC
框架。所以PRC
在实现层面上各有差异性。
如此看来,纯原生的网络通信不能算是RPC
,而基于代理者思想的自定义远程访问框架可以算是简陋版的RPC
实现。
Tips:
j2ee
的servlet
规范,本质也是一个远程调用规范,而其接口规范便是http
协议。tomcat
和基于serlvt
规范编写的web
程序就是远程调用的实例。
4. Hadoop RPC
4.1 特点及结构
Hadoop RPC
实际上是分布式计算中C/S(Client/Server)
模型的一个应用实例,对于Hadoop RPC
而言,它具有以下几个特点。
- 透明性:封装底层网络通信,简化高层业务组件的调用需求,目的就让
客户端
调用服务器端子程序时,如同本地调用一样。 - 高性能。
Hadoop
各个系统(如HDFS、YARN、MapReduce
等)均采用了Master/Slave
结构,其中,Master
本质上是一个RPC Server
,负责响应、处理Slave
发送的请求,为了保证Master
的并发处理能力,RPC Server
必须是高性能服务器。 - 可控性。
JDK
中已经自带了一个RPC
框架—RMI(Remote Method Invocation,远程方法调用)
,但RMI
体量过大且不易控制。Hadoop
尽可能重新实现,满足轻量级效果。
Hadoop RPC
由采用四层体系结构设计:
- 序列化层:为了方便跨机器传输数据,
Hadoop
会将各种数据序列化成字节流后在网络中传输。 - 函数调用层:函数调用层本质是使用动态代理实现远程调用。
- 网络传输层:基于
socket
,实现客户端和服务器端的真正意义上的数据交互。 - 服务器端处理层:让服务器具有并发处理能力。
hadoop
采用基于Reactor
设计模式的事件驱动I/O
模型。
4.2 使用 Hadoop RPC
Hadoop 与 RPC
有关的主要功能代码均封装在 RPC
类中:
org.apache.hadoop.ipc.RPC
主要方法介绍:
getProxy/waitForProtocolProxy
:构造客户端代理对象(该对象实现了某个协议),用于向服务器发送RPC
请求。
public static ProtocolProxy <T> public static <T> T getProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr, Configuration conf,
SocketFactory factory) throws IOException{}
public static <T> ProtocolProxy<T> waitForProtocolProxy(Class<T> protocol,
long clientVersion,
InetSocketAddress addr, Configuration conf,
int rpcTimeout,
RetryPolicy connectionRetryPolicy,
long timeout) throws IOException { }
RPC.Builder
:为某个协议(实际上是Java接口)实例构造一个服务器对象,用于处理客户端发送的请求。使用Hadoop RPC
可以定制自己的网络请求模型。
public static Server RPC.Builder (Configuration).build():
hadoop rpc
除了代码设计上的艺术性和优雅性以及结构上的层次性。hadoop rpc
相关功能模块在上文自定义框架中都可以找到对应项。现在使用hadoop rpc
的API
同样实现 hello
功能请求。会发现整个过程和自定义框架中的实现流程大同小异。
- 首先自定义服务端的
PRC
协议,需要继承VersionedProtocol
。
package com.hc.rpc;
import java.io.IOException;
import org.apache.hadoop.ipc.VersionedProtocol;
/*
* 功能定义
*/
interface MyProtocol extends VersionedProtocol {
// 版本号,默认情况下,不同版本号的RPC Client和Server之间不能相互通信
public static final long versionID = 1L;
String hello(String name) throws IOException;
int add(int num1, int num2) throws IOException;
}
- 实现
RPC
协议。Hadoop RPC
协议只是一个接口,需要实现该接口提供实际功能。
package com.hc.rpc;
import java.io.IOException;
import org.apache.hadoop.ipc.ProtocolSignature;
/*
* 功能实现类
*/
public class MyProtocolmpl implements MyProtocol {
// 重载的方法,用于获取自定义的协议版本号,
public long getProtocolVersion(String protocol, long clientVersion) {
return MyProtocol.versionID;
}
// 重载的方法,用于获取协议签名
public ProtocolSignature getProtocolSignature(String protocol, long clientVersion, int hashcode) {
return new ProtocolSignature(MyProtocol.versionID, null);
}
/*
*对外的服务方法
*/
@Override
public String hello(String name) throws IOException {
return "hello" + name;
}
/*
*对外的服务方法
*/
@Override
public int add(int num1, int num2) throws IOException {
return num1 + num2;
}
}
- 构造并启动
RPC Server
。类似于B
程序。使用静态类Builde
r构造一个RPC Server,并调用start()
启动该Server
。
package com.hc.rpc;
import java.io.IOException;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RPC.Server;
/*
* 服务提供者
*/
public class HadoopServer {
public static void main(String[] args) throws HadoopIllegalArgumentException, IOException {
Configuration conf = new Configuration();
/*
* BindAddress和Port分别表示服务器的host和监听端口号。
* NnumHandlers 表示服务器端处理请求的线程数目。
* 到此为止,服务器处理监听状态,等待客户端请求到达。
*/
Server server = new RPC.Builder(conf).setProtocol(MyProtocol.class).setInstance(new MyProtocolmpl())
.setBindAddress("127.0.0.1").setPort(1234).setNumHandlers(5).build();
server.start();
}
}
- 构造
RPC Client
并发送RPC
请求(类似于A
程序)。使用静态方法getProxy
构造客户端代理对象,直接通过代理对象调用远程端的方法,具体如下所示:
package com.hc.rpc;
import java.io.IOException;
import java.net.InetSocketAddress;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ipc.RPC;
public class Client {
public static void main(String[] args) throws IOException {
Configuration conf = new Configuration();
//动态代理组件
MyProtocol proxy = (MyProtocol) RPC.getProxy(MyProtocol.class, MyProtocol.versionID,
new InetSocketAddress("127.0.0.1", 1234), conf);
//远程调用
int result = proxy.add(5, 6);
System.out.println(result);
//远程调用
String res = proxy.hello("world");
System.out.println(res);
}
}
- 测试,先启动服务端程序,再启动客户端程序。
4. 总结
RPC
是对远程访问的一种架构思想。用来简化客户端远程请求模式。
Hadoop rpc
是基于RPC
思想的RPC
构架实例,因此构架用于分布式计算环境中,需要服务器快速、并行地响应多用户的请求,且要保证数据的安全性和健壮性。所以了解其原理以及读懂源代码,可以让使用者在使用hadoop
时更有通透性。