博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
深究Java中的RMI底层原理
阅读量:4093 次
发布时间:2019-05-25

本文共 16479 字,大约阅读时间需要 54 分钟。

前言:随着一个系统被用户认可,业务量、请求量不断上升,那么单机系统必然就无法满足了,于是系统就慢慢走向分布式了,随之而来的是系统之间“沟通”的障碍。一般来说,解决系统之间的通信可以有两种方式:即远程调用和消息。RMI(Remote Method Invocation)就是远程调用的一种方式,也是这篇文章主要介绍的。

一、RMI的一个简单示例

这个示例拆分为服务端和客户端,放在两个idea项目中,并且通过了单机和双机两种环境的测试,是真正意义上的分布式应用。

项目结构

服务端应用: Server

主程序:        com.jnu.wwt.entry.Server

服务接口:     com.jnu.wwt.service.IOperation

服务实现:     com.jnu.wwt.service.impl.OperationImpl

客户端应用: Client

主程序:        com.jnu.wwt.entry.Client

服务接口:    com.jnu.wwt.service.IOperation

源码:

Server.java

/** * Created by wwt on 2016/9/14. */public class Server {    public static void main(String args[]) throws Exception{        //以1099作为LocateRegistry接收客户端请求的端口,并注册服务的映射关系        Registry registry=LocateRegistry.createRegistry(1099);        IOperation iOperation=new OperationImpl();        Naming.rebind("rmi://127.0.0.1:1099/Operation",iOperation);        System.out.println("service running...");    }}

IOperation.java(服务端和客户端各需要一份)

/** * 服务端接口必须实现java.rmi.Remote * Created by wwt on 2016/9/14. */public interface IOperation extends Remote{    /**     * 远程接口上的方法必须抛出RemoteException,因为网络通信是不稳定的,不能吃掉异常     * @param a     * @param b     * @return     */    int add(int a, int b) throws RemoteException;}

OperationImpl.java

/** * Created by wwt on 2016/9/14. */public class OperationImpl extends UnicastRemoteObject implements IOperation{    public OperationImpl() throws RemoteException {        super();    }    @Override    public int add(int a, int b) throws RemoteException{        return a+b;    }}

Client.java

/** * Created by wwt on 2016/9/15. */public class Client {    public static void main(String args[]) throws Exception{        IOperation iOperation= (IOperation) Naming.lookup("rmi://127.0.0.1:1099/Operation");        System.out.println(iOperation.add(1,1));    }}

运行结果

先运行Server应用,服务就起来了。然后切换到Client应用,点击运行,Client调用Server的服务,返回结果。

二、RMI做了些什么

现在我们先忘记Java中有RMI这种东西。假设我们需要自己实现上面例子中的效果,怎么办呢?可以想到的步骤是:

  • 编写服务端服务,并将其通过某个服务机的端口暴露出去供客户端调用。
  • 编写客户端程序,客户端通过指定服务所在的主机和端口号、将请求封装并序列化,最终通过网络协议发送到服务端。
  • 服务端解析和反序列化请求,调用服务端上的服务,将结果序列化并返回给客户端。
  • 客户端接收并反序列化服务端返回的结果,反馈给用户。

这是大致的流程,我们不难想到,RMI其实也是帮我们封装了一些细节而通用的部分,比如序列化和反序列化,连接的建立和释放等,下面是RMI的具体流程:

这里涉及到几个新概念:

Stub和Skeleton:这两个的身份是一致的,都是作为代理的存在。客户端的称作Stub,服务端的称作Skeleton。要做到对程序员屏蔽远程方法调用的细节,这两个代理是必不可少的,包括网络连接等细节。

Registry:顾名思义,可以认为Registry是一个“注册所”,提供了服务名到服务的映射。如果没有它,意味着客户端需要记住每个服务所在的端口号,这种设计显然是不优雅的。

三、走进RMI原理之前,先来看看用到的类及其层次结构和主要的方法。

哪里看不懂随时回来看看结构。。。开始了

四、一步步解剖RMI的底层原理

  • 服务端启动Registry服务
Registry registry=LocateRegistry.createRegistry(1099);
从上面这句代码入手,追溯下去,可以发现服务端创建了一个RegistryImpl对象,这里做了一个判断。如果服务端指定的端口号是1099并且系统开启了安全管理器,那么可以在限定的权限集内(listen和accept)绕过系统的安全校验。反之则必须进行安全校验。这里纯粹是为了效率起见。真正做的事情在setUp()方法中,继续看下去。
public RegistryImpl(final int var1) throws RemoteException {    if(var1 == 1099 && System.getSecurityManager() != null) {        try {            AccessController.doPrivileged(new PrivilegedExceptionAction() {                public Void run() throws RemoteException {                    LiveRef var1x = new LiveRef(RegistryImpl.id, var1);                    RegistryImpl.this.setup(new UnicastServerRef(var1x));                    return null;                }            }, (AccessControlContext)null, new Permission[]{
new SocketPermission("localhost:" + var1, "listen,accept")}); } catch (PrivilegedActionException var3) { throw (RemoteException)var3.getException(); } } else { LiveRef var2 = new LiveRef(id, var1); this.setup(new UnicastServerRef(var2)); }}
setUp()方法将指向正在初始化的RegistryImpl对象的远程引用ref(RemoteRef)赋值为传入的UnicastServerRef对象,这里涉及了向上转型。然后继续移交UnicastServerRef的exportObject()方法。
private void setup(UnicastServerRef var1) throws RemoteException {    this.ref = var1;    var1.exportObject(this, (Object)null, true);}
进入UnicastServerRef的exportObject()方法。可以看到,这里首先为传入的RegistryImpl创建一个代理,这个代理我们可以推断出就是后面服务于客户端的RegistryImpl的Stub对象。然后将UnicastServerRef的skel(skeleton)对象设置为当前RegistryImpl对象。最后用skeleton、stub、UnicastServerRef对象、id和一个boolean值构造了一个Target对象,也就是这个Target对象基本上包含了全部的信息。调用UnicastServerRef的ref(LiveRef)变量的exportObject()方法。
public Remote exportObject(Remote var1, Object var2, boolean var3) throws RemoteException {    Class var4 = var1.getClass();    Remote var5;    try {        var5 = Util.createProxy(var4, this.getClientRef(), this.forceStubUse);    } catch (IllegalArgumentException var7) {        throw new ExportException("remote object implements illegal remote interface", var7);    }    if(var5 instanceof RemoteStub) {        this.setSkeleton(var1);    }    Target var6 = new Target(var1, this, var5, this.ref.getObjID(), var3);    this.ref.exportObject(var6);    this.hashToMethod_Map = (Map)hashToMethod_Maps.get(var4);    return var5;}
到上面为止,我们看到的都是一些变量的赋值和创建工作,还没有到连接层,这些引用对象将会被Stub和Skeleton对象使用。接下来就是连接层上的了。追溯LiveRef的exportObject()方法,很容易找到了TCPTransport的exportObject()方法。这个方法做的事情就是将上面构造的Target对象暴露出去。调用TCPTransport的listen()方法,listen()方法创建了一个ServerSocket,并且启动了一条线程等待客户端的请求。接着调用父类Transport的exportObject()将Target对象存放进ObjectTable中。
public void exportObject(Target var1) throws RemoteException {    synchronized(this) {        this.listen();        ++this.exportCount;    }    boolean var2 = false;    boolean var12 = false;    try {        var12 = true;        super.exportObject(var1);        var2 = true;        var12 = false;    } finally {        if(var12) {            if(!var2) {                synchronized(this) {                    this.decrementExportCount();                }            }        }    }    if(!var2) {        synchronized(this) {            this.decrementExportCount();        }    }}
到这里,我们已经将RegistryImpl对象创建并且起了服务等待客户端的请求。
  • 客户端获取服务端Rgistry代理

IOperation iOperation= (IOperation) Naming.lookup("rmi://127.0.0.1:1099/Operation");
从上面的代码看起,容易追溯到LocateRegistry的getRegistry()方法。这个方法做的事情是通过传入的host和port构造RemoteRef对象,并创建了一个本地代理。可以通过Debug功能发现,这个代理对象其实是RegistryImpl_Stub对象。这样客户端便有了服务端的RegistryImpl的代理(取决于ignoreStubClasses变量)。但注意此时这个代理其实还没有和服务端的RegistryImpl对象关联,毕竟是两个VM上面的对象,这里我们也可以猜测,代理和远程的Registry对象之间是通过socket消息来完成的。

public static Registry getRegistry(String host, int port,                                   RMIClientSocketFactory csf)    throws RemoteException{    Registry registry = null;    if (port <= 0)        port = Registry.REGISTRY_PORT;    if (host == null || host.length() == 0) {        // If host is blank (as returned by "file:" URL in 1.0.2 used in        // java.rmi.Naming), try to convert to real local host name so        // that the RegistryImpl's checkAccess will not fail.        try {            host = java.net.InetAddress.getLocalHost().getHostAddress();        } catch (Exception e) {            // If that failed, at least try "" (localhost) anyway...            host = "";        }    }    LiveRef liveRef =        new LiveRef(new ObjID(ObjID.REGISTRY_ID),                    new TCPEndpoint(host, port, csf, null),                    false);    RemoteRef ref =        (csf == null) ? new UnicastRef(liveRef) : new UnicastRef2(liveRef);    return (Registry) Util.createProxy(RegistryImpl.class, ref, false);}

  • 服务端创建服务对象
从OperationImpl的构造函数看起。调用了父类UnicastRemoteObject的构造方法,追溯到UnicastRemoteObject的私有方法exportObject()。这里做了一个判断,判断服务的实现是不是UnicastRemoteObject的子类,如果是,则直接赋值其ref(RemoteRef)对象为传入的UnicastServerRef对象。反之则调用UnicastServerRef的exportObject()方法。这里我们是第一种情况。
private static Remote exportObject(Remote obj, UnicastServerRef sref)    throws RemoteException{    // if obj extends UnicastRemoteObject, set its ref.    if (obj instanceof UnicastRemoteObject) {        ((UnicastRemoteObject) obj).ref = sref;    }    return sref.exportObject(obj, null, false);}

  • 将服务实现绑定到服务端的Registry上,使得客户端只需与Registry交互。
Naming.rebind("rmi://127.0.0.1:1099/Operation",iOperation);
从上面这行代码开始看,容易发现Naming的方法全部都是调用的Registry的方法。这里通过host和port找到我们第一步启动的服务端Registry服务对象,追溯到其rebind()方法,可以看到,其实做的事情很是简单,就是把名字和服务实现存进一个Map里面。
public void rebind(String var1, Remote var2) throws RemoteException, AccessException {    checkAccess("Registry.rebind");    this.bindings.put(var1, var2);}
  • 客户端查找远程服务

接下来就是重头戏了,从下面代码看起。

IOperation iOperation= (IOperation) Naming.lookup("rmi://127.0.0.1:1099/Operation");
追溯下去,获取到远程Registry对象的代理对象之后,调用RegistryImpl_Stub的lookUp()方法。主要代码如下。做的事情是利用上面通过服务端host和port等信息创建的RegistryImpl_stub对象构造RemoteCall调用对象,operations参数中是各个Registry中声明的操作,2指明了是lookUp()操作。接下来分步骤看看...

try {    RemoteCall var2 = super.ref.newCall(this, operations, 2, 4905912898345647071L);    try {        ObjectOutput var3 = var2.getOutputStream();        var3.writeObject(var1);    } catch (IOException var18) {        throw new MarshalException("error marshalling arguments", var18);    }    super.ref.invoke(var2);    Remote var23;    try {        ObjectInput var6 = var2.getInputStream();        var23 = (Remote)var6.readObject();    } catch (IOException var15) {        throw new UnmarshalException("error unmarshalling return", var15);    } catch (ClassNotFoundException var16) {        throw new UnmarshalException("error unmarshalling return", var16);    } finally {        super.ref.done(var2);    }    return var23;}

调用 RegistryImpl_Stub的ref(RemoteRef)对象的newCall()方法,将RegistryImpl_Stub对象传了进去,不要忘了构造它的时候我们将服务器的主机端口等信息传了进去,也就是我们把服务器相关的信息也传进了newCall()方法。newCall()方法做的事情简单来看就是建立了跟远程RegistryImpl的Skeleton对象的连接。(不要忘了上面我们说到过服务端通过TCPTransport的exportObject()方法等待着客户端的请求)

public RemoteCall newCall(RemoteObject var1, Operation[] var2, int var3, long var4) throws RemoteException {    clientRefLog.log(Log.BRIEF, "get connection");    Connection var6 = this.ref.getChannel().newConnection();    try {        clientRefLog.log(Log.VERBOSE, "create call context");        if(clientCallLog.isLoggable(Log.VERBOSE)) {            this.logClientCall(var1, var2[var3]);        }        StreamRemoteCall var7 = new StreamRemoteCall(var6, this.ref.getObjID(), var3, var4);        try {            this.marshalCustomCallData(var7.getOutputStream());        } catch (IOException var9) {            throw new MarshalException("error marshaling custom call data");        }        return var7;    } catch (RemoteException var10) {        this.ref.getChannel().free(var6, false);        throw var10;    }}
连接建立之后自然就是发送请求了。我们知道客户端终究只是拥有Registry对象的代理,而不是真正地位于服务端的Registry对象本身,他们位于不同的虚拟机实例之中,无法直接调用。必然是通过消息进行交互的。看看super.ref.invoke()这里做了什么?容易追溯到StreamRemoteCall的executeCall()方法。看似本地调用,但其实很容易从代码中看出来是通过tcp连接发送消息到服务端。由服务端解析并且处理调用。

try {    if(this.out != null) {        var2 = this.out.getDGCAckHandler();    }    this.releaseOutputStream();    DataInputStream var3 = new DataInputStream(this.conn.getInputStream());    byte var4 = var3.readByte();    if(var4 != 81) {        if(Transport.transportLog.isLoggable(Log.BRIEF)) {            Transport.transportLog.log(Log.BRIEF, "transport return code invalid: " + var4);        }        throw new UnmarshalException("Transport return code invalid");    }    this.getInputStream();    var1 = this.in.readByte();    this.in.readID();}
至此,我们已经将客户端的服务查询请求发出了。

  • 服务端接收客户端的服务查询请求并返回给客户端结果
这里我用的方法是直接断点在服务端的Thread的run()方法中,因为我们知道服务端已经用线程跑起了服务(当然我是先断点在Registry_Impl的lookUp()方法并查找调用栈找到源头的)。一步一步我们找到了Transport的serviceCall()方法,这个方法是关键。瞻仰一下主要的代码,到ObjectTable.getTarget()为止做的事情是从socket流中获取ObjId,并通过ObjId和Transport对象获取Target对象,这里的Target对象已经是服务端的对象。再借由Target的派发器Dispatcher,传入参数服务实现和请求对象RemoteCall,将请求派发给服务端那个真正提供服务的RegistryImpl的lookUp()方法,这就是Skeleton移交给具体实现的过程了,Skeleton负责底层的操作。
try {    ObjID var40;    try {        var40 = ObjID.read(var1.getInputStream());    } catch (IOException var34) {        throw new MarshalException("unable to read objID", var34);    }    Transport var41 = var40.equals(dgcID)?null:this;    Target var5 = ObjectTable.getTarget(new ObjectEndpoint(var40, var41));    final Remote var38;    if(var5 != null && (var38 = var5.getImpl()) != null) {        final Dispatcher var6 = var5.getDispatcher();        var5.incrementCallCount();        boolean var8;        try {            transportLog.log(Log.VERBOSE, "call dispatcher");            final AccessControlContext var7 = var5.getAccessControlContext();            ClassLoader var42 = var5.getContextClassLoader();            Thread var9 = Thread.currentThread();            ClassLoader var10 = var9.getContextClassLoader();            try {                var9.setContextClassLoader(var42);                currentTransport.set(this);                try {                    AccessController.doPrivileged(new PrivilegedExceptionAction() {                        public Void run() throws IOException {                            Transport.this.checkAcceptPermission(var7);                            var6.dispatch(var38, var1);                            return null;                        }                    }, var7);                    return true;                } catch (PrivilegedActionException var32) {                    throw (IOException)var32.getException();                }            } finally {                var9.setContextClassLoader(var10);                currentTransport.set((Object)null);            }        } catch (IOException var35) {            transportLog.log(Log.BRIEF, "exception thrown by dispatcher: ", var35);            var8 = false;        } finally {            var5.decrementCallCount();        }        return var8;    }    throw new NoSuchObjectException("no such object in table");}
看看RegistryImpl的lookUp()实现。做了同步控制,并通过服务名从Map中取出服务对象。返回给客户端。还记得我们在bindings中存放的其实是OperationImpl的真正实现,并非是Stub对象。
public Remote lookup(String var1) throws RemoteException, NotBoundException {    Hashtable var2 = this.bindings;    synchronized(this.bindings) {        Remote var3 = (Remote)this.bindings.get(var1);        if(var3 == null) {            throw new NotBoundException(var1);        } else {            return var3;        }    }}
  • 客户端获取通过lookUp()查询获得的客户端OperationImpl的Stub对象
这里就不多说了。。多说无益。心好累。凭什么服务端返回给客户端的是服务的实现,但是客户端获取到的是Stub对象呢?用同样的断点的方法,我们可以发现问题出在MarshalInputStream的resolveProxyClass()上,里面其实也是创建了一个代理。这就是那个Stub类。
  • 客户端进行真正地远程服务调用
到目前为止,客户端已经有了Stub对象。就可以和服务端进行愉快地交流了。细心的朋友可能发现这个例子中的服务实现OperationImpl继承了UnicastRemoteObject,就像前面说的,它似乎不会像RegistryImpl一样在服务端生成Skeleton对象。(对于非UnicastRemoteObject的则会生成Skeleton没啥争议)。我的理解是必然会进行一些处理生成Skeleton对象。因为Registry只是用来查找服务,最终调用服务还是得要客户端与服务的连接。这个连接必然由Skeleton为我们屏蔽了。
小结:前面我们做了很多工作,大量工作用于起Registry服务和如何查找客户端需要调用的服务。但事实上,这个Registry可以服务于很多的其他服务。一旦客户端和服务端通过Stub和Skeleton建立了socket连接,后面的操作直接通过这个连接完成就结了!

五、看看Skeleton和Stub如何为我们屏蔽底层连接细节

Stub类:

  1. public class Person_Stub implements Person {        
  2.     private Socket socket;        
  3.     public Person_Stub() throws Throwable {        
  4.         // connect to skeleton        
  5.         socket = new Socket("computer_name"9000);        
  6.     }        
  7.     public int getAge() throws Throwable {        
  8.         // pass method name to skeleton        
  9.         ObjectOutputStream outStream =        
  10.             new ObjectOutputStream(socket.getOutputStream());        
  11.         outStream.writeObject("age");        
  12.         outStream.flush();        
  13.         ObjectInputStream inStream =        
  14.             new ObjectInputStream(socket.getInputStream());        
  15.         return inStream.readInt();        
  16.     }        
  17.     public String getName() throws Throwable {        
  18.         // pass method name to skeleton        
  19.         ObjectOutputStream outStream =        
  20.             new ObjectOutputStream(socket.getOutputStream());        
  21.         outStream.writeObject("name");        
  22.         outStream.flush();        
  23.         ObjectInputStream inStream =        
  24.             new ObjectInputStream(socket.getInputStream());        
  25.         return (String)inStream.readObject();        
  26.     }  
  27. }   
可以看到,Stub对象做的事情是建立到服务端Skeleton对象的Socket连接。将客户端的方法调用转换为字符串标识传递给Skeleton对象。并且同步阻塞等待服务端返回结果。

Skeleton类:

  1. public class Person_Skeleton extends Thread {        
  2.     private PersonServer myServer;        
  3.     public Person_Skeleton(PersonServer server) {        
  4.         // get reference of object server        
  5.         this.myServer = server;        
  6.     }        
  7.     public void run() {        
  8.         try {        
  9.             // new socket at port 9000        
  10.             ServerSocket serverSocket = new ServerSocket(9000);        
  11.             // accept stub's request        
  12.             Socket socket = serverSocket.accept();        
  13.             while (socket != null) {        
  14.                 // get stub's request        
  15.                 ObjectInputStream inStream =        
  16.                     new ObjectInputStream(socket.getInputStream());        
  17.                 String method = (String)inStream.readObject();        
  18.                 // check method name        
  19.                 if (method.equals("age")) {        
  20.                     // execute object server's business method        
  21.                     int age = myServer.getAge();        
  22.                     ObjectOutputStream outStream =        
  23.                         new ObjectOutputStream(socket.getOutputStream());        
  24.                     // return result to stub        
  25.                     outStream.writeInt(age);        
  26.                     outStream.flush();        
  27.                 }        
  28.                 if(method.equals("name")) {        
  29.                     // execute object server's business method        
  30.                     String name = myServer.getName();        
  31.                     ObjectOutputStream outStream =        
  32.                         new ObjectOutputStream(socket.getOutputStream());        
  33.                     // return result to stub        
  34.                     outStream.writeObject(name);        
  35.                     outStream.flush();        
  36.                 }        
  37.             }        
  38.         } catch(Throwable t) {        
  39.             t.printStackTrace();        
  40.             System.exit(0);        
  41.         }        
  42.     }             
  43. }  
Skeleton对象做的事情是将服务实现传入构造参数,获取客户端通过socket传过来的方法调用字符串标识,将请求转发到具体的服务上面。获取结果之后返回给客户端。
六、总结:
本来是Java一个很简单的用法,用了将近3天看了这部分的内容,感觉收获还是比较大的。阿里实习的时候,一个师兄曾经说过,看源代码需要看到什么程度?老实说这玩意看起来真的太累了。。总算是看完了。我觉得这是走向分布式的比较重要的一步。由于篇幅的关系,没有涉及到TCP连接的细节。有兴趣的可以看看源码。
“看到你觉得你能说服自己就可以了”。
七、附注:
这里参考了这篇文章以及其引用的各篇文章。
致敬和感谢!
你可能感兴趣的文章
7 年工作经验,面试官竟然还让我写算法题???
查看>>
被 Zoom 逼疯的歪果仁,造出了视频会议机器人,同事已笑疯丨开源
查看>>
上古语言从入门到精通:COBOL 教程登上 GitHub 热榜
查看>>
再见,Eclipse...
查看>>
超全汇总!B 站上有哪些值得学习的 AI 课程...
查看>>
如果你还不了解 RTC,那我强烈建议你看看这个!
查看>>
神器面世:让你快速在 iOS 设备上安装 Windows、Linux 等操作系统!
查看>>
沙雕程序员在无聊的时候,都搞出了哪些好玩的小玩意...
查看>>
太赞了!GitHub 标星 2.4k+,《可解释机器学习》中文版正式开放!
查看>>
程序员用 AI 修复百年前的老北京视频后,火了!
查看>>
漫话:为什么你下载小电影的时候进度总是卡在 99% 就不动了?
查看>>
我去!原来大神都是这样玩转「多线程与高并发」的...
查看>>
当你无聊时,可以玩玩 GitHub 上这个开源项目...
查看>>
B 站爆红的数学视频,竟是用这个 Python 开源项目做的!
查看>>
安利 10 个让你爽到爆的 IDEA 必备插件!
查看>>
自学编程的八大误区!克服它!
查看>>
GitHub 上的一个开源项目,可快速生成一款属于自己的手写字体!
查看>>
早知道这些免费 API,我就可以不用到处爬数据了!
查看>>
Java各种集合类的合并(数组、List、Set、Map)
查看>>
JS中各种数组遍历方式的性能对比
查看>>