10个类手写实现 RPC 通信框架原理

共 9299字,需浏览 19分钟

 ·

2020-08-27 03:48

链接:autumn200.com/2020/06/21/write-rpc/

     

   正文   

什么是rpc

RPC:remote procedure call Protocol 远程过程调用 调用远程服务,就像调用本地的服务一样,不用关心调用细节,就像调用本机的服务一样的

RPC原理

实现RPC通信的程序包括5个部分:rpc-client、客户端proxy、socket、服务端proxy、rpc-server

request

  • 客户端:当rpc-client发起远程调用时,实际上是通过客户端代理 将要调用的接口、方法、参数、参数类型进行序列化,然后通过socket实时将封装调用参数的实例发送到服务端。
  • 服务端:socket接收到客户端传来的信息进行反序列化,然后通过这些信息委派到具体的实现对象

response

  • 服务端:目标方法执行完成后,将执行结果返回给socket
  • 客户端:socket接收到结果后,返回给rpc-client,调用结束

应用到的技术

  • java
  • spring
  • 序列化
  • socket
  • 反射
  • 动态代理

项目GitHub地址

https://github.com/autumnqfeng/write_rpc

服务端项目

项目结构

rpc-server项目包含2个子项目:order-api、order-provider

  • order-api中存放请求接口与RpcRequest(类名、方法名、参数的实体类)

  • order-provider为请求接口实现、socket、proxy相关类

order-api

order-provider

服务注册

要想实现动态调用ServiceImpl,关键就需要将service类管理起来,那问题来了,我们如何管理这些服务类呢?

我们可以参照spring中的@Service注解,通过自定义注解的方式来做到服务注册,我们定义一个注解@RpcRemoteService作用在ServiceImpl类上,将标记此注解的类名、方法名保存到Map中,以此来定位到具体实现类。

@RpcRemoteService注解

/**
 * 服务端服务发现注解
 *
 * @author: ***
 * @date: 2020/6/21 16:21
 */

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcRemoteService {
}

服务注册类InitialMerdiator

在spring容器初始化完成之后,扫描到@RpcRemoteService标记的类,并保存到Mediator.ROUTING中。

/**
 * 初始化中间代理层对象
 *
 * @author: ***
 * @date: 2020/6/21 16:33
 */

@Component
public class InitialMerdiator implements BeanPostProcessor {

    @Override
    public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
        //加了服务发布标记的bean进行远程发布
        if (bean.getClass().isAnnotationPresent(RpcRemoteService.class)) {
            Method[] methods = bean.getClass().getDeclaredMethods();
            for (Method method : methods) {
                String routingKey = bean.getClass().getInterfaces()[0].getName() + "." + method.getName();
                BeanMethod beanMethod = new BeanMethod();
                beanMethod.setBean(bean);
                beanMethod.setMethod(method);
                Mediator.ROUTING.put(routingKey, beanMethod);
            }
        }
        return bean;
    }
}

socket监听

socket监听客户端请求

socket启动类SocketServer

spring容器加载完成之后,启动socket

/**
 * spring 容器启动完成之后,会发布一个ContextRefreshedEven
 * 容器启动后启动socket监听
 *
 * @author: ***
 * @date: 2020/6/21 16:51
 */

@Component
public class SocketServer implements ApplicationListener<ContextRefreshedEvent{
    private final ExecutorService executorService= Executors.newCachedThreadPool();

    @Override
    public void onApplicationEvent(ContextRefreshedEvent contextRefreshedEvent) {
        ServerSocket serverSocket=null;
        try {
            serverSocket = new ServerSocket(8888);
            while (true) {
                Socket accept = serverSocket.accept();
                // 线程池处理socket
    executorService.execute(new ProcessorHandler(accept));
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (serverSocket != null) {
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

socket处理类ProcessorHandler

处理监听到的每个socket

public class ProcessorHandler implements Runnable {

    private Socket socket;

    public ProcessorHandler(Socket socket) {
        this.socket = socket;
    }

    @Override
    public void run() {
        ObjectInputStream inputStream = null;
        ObjectOutputStream outputStream = null;
        try {
            inputStream = new ObjectInputStream(socket.getInputStream());
            // 反序列化
            RpcRequest rpcRequest = (RpcRequest) inputStream.readObject();

            // 中间代理执行目标方法
            Mediator mediator = Mediator.getInstance();
            Object response = mediator.processor(rpcRequest);
            System.out.println("服务端的执行结果:"+response);

            outputStream = new ObjectOutputStream(socket.getOutputStream());
            outputStream.writeObject(response);
            outputStream.flush();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            closeStream(inputStream, outputStream);
        }
    }

    private void closeStream(ObjectInputStream inputStream, ObjectOutputStream outputStream) {
        // 关闭流
        if(inputStream!=null){
            try {
                inputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if (outputStream!=null){
            try {
                outputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

服务端代理

Mediator

/**
 * 服务端socket与目标方法的中间代理层
 *
 * @author: ***
 * @date: 2020/6/21 16:25
 */

public class Mediator {

    /** 用来存储发布的服务的实例(服务调用的路由) */
    public static Map ROUTING = new ConcurrentHashMap<>();

    /** 单例模式创建该代理层实例 */
    private volatile static Mediator instance;

    private Mediator() {
    }

    public static Mediator getInstance() {
        if (instance == null) {
            synchronized (Mediator.class) {
                if (instance == null) {
                    instance = new Mediator();
                }
            }
        }
        return instance;
    }

    public Object processor(RpcRequest rpcRequest) {
        // 路由key
        String routingKey = rpcRequest.getClassName() + "." + rpcRequest.getMethodName();
        BeanMethod beanMethod = ROUTING.get(routingKey);
        if (beanMethod == null) {
            return null;
        }
        // 执行目标方法
        Object bean = beanMethod.getBean();
        Method method = beanMethod.getMethod();
        try {
            return method.invoke(bean, rpcRequest.getArgs());
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }
}

BeanMethod

/**
 * 中间层反射调用时,存储目标方法、目标类的实体
 *
 * @author: ***
 * @date: 2020/6/21 16:43
 */

public class BeanMethod {

    private Object bean;

    private Method method;

    // setter、getter略
}

客户端项目

项目结构

服务发现

服务发现我们同样使用注解来做,这就需要参照Spring中@Autowired的原理实现,我们自定义@RpcReference注解,定义在字段上,将接口实现的代理类注入到该字段上。

@RpcReference注解

/**
 * 服务注入注解
 *
 * @author: ***
 * @date: 2020/6/20 22:41
 */

@Target(ElementType.FIELD)
@Retention(RetentionPolicy.RUNTIME)
@Component
public @interface RpcReference {
}

服务发现类ReferenceInvokeProxy

在spring容器初始化之前,扫描bean中所有@RpcReference注解标记的字段。

/**
 * 远程动态调用service代理
 *
 * @author: ***
 * @date: 2020/6/20 22:44
 */

@Component
public class ReferenceInvokeProxy implements BeanPostProcessor {

    @Autowired
    private RemoteInvocationHandler invocationHandler;

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
        Field[] fields = bean.getClass().getDeclaredFields();
        for (Field field : fields) {
            if (field.isAnnotationPresent(RpcReference.class)) {
                field.setAccessible(true);
                // 针对这个加了RpcReference注解的字段,设置为一个代理的值
                Object proxy = Proxy.newProxyInstance(field.getType().getClassLoader(), new Class[]{field.getType()}, invocationHandler);
                try {
                    // 相当于针对加了RpcReference的注解,设置了一个代理,这个代理的实现是invocationHandler
                    field.set(bean, proxy);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                }
            }
        }
        return bean;
    }
}

客户端代理

客户端动态代理InvocationHandler实现类RemoteInvocationHandler

将目标方法名、目标类名、参数信息封装到RpcRequest,然后交给socket发送到服务端。

/**
 * @author: ***
 * @date: 2020/6/20 22:51
 */

@Component
public class RemoteInvocationHandler implements InvocationHandler {

    @Autowired
    private RpcNetTransport rpcNetTransport;

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        RpcRequest rpcRequest = new RpcRequest();
        rpcRequest.setClassName(method.getDeclaringClass().getName());
        rpcRequest.setMethodName(method.getName());
        rpcRequest.setTypes(method.getParameterTypes());
        rpcRequest.setArgs(args);
        return rpcNetTransport.send(rpcRequest);
    }
}

客户端socket

网络传输RpcNetTransport

/**
 * rpc socket 网络传输
 *
 * @author: ***
 * @date: 2020/6/20 22:59
 */

@Component
public class RpcNetTransport {
    @Value("${rpc.host}")
    private String host;
    @Value("${rpc.port}")
    private int port;


    public Object send(RpcRequest rpcRequest) {
        ObjectOutputStream outputStream = null;
        ObjectInputStream inputStream = null;
        try {
            Socket socket = new Socket(host, port);
            // 发送目标方法信息
            outputStream = new ObjectOutputStream(socket.getOutputStream());
            outputStream.writeObject(rpcRequest);
            outputStream.flush();
   // 接收返回值
            inputStream = new ObjectInputStream(socket.getInputStream());
            return inputStream.readObject();
        } catch (IOException | ClassNotFoundException e) {
            e.printStackTrace();
        } finally {
            closeStream(inputStream, outputStream);
        }
        return null;
    }

    private void closeStream(ObjectInputStream inputStream, ObjectOutputStream outputStream) {
        // 关闭流
        if(inputStream!=null){
            try {
                inputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if (outputStream!=null){
            try {
                outputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}
浏览 19
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报