长文详解:DUBBO源码使用了哪些设计模式JAVA前线关注共 48818字,需浏览 98分钟 ·2021-01-07 21:01 JAVA前线 欢迎大家关注公众号「JAVA前线」查看更多精彩分享,主要包括源码分析、实际应用、架构思维、职场分享、产品思考等等,同时也非常欢迎大家加我微信「java_front」一起交流学习0 文章概述DUBBO作为RPC领域优秀开源的框架在业界十分流行,本文我们阅读其源码并对其使用到的设计模式进行分析。需要说明的是本文所说的设计模式更加广义,不仅包括标准意义上23种设计模式,还有一些常见经过检验的代码模式例如双重检查锁模式、多线程保护性暂停模式等等。1 模板方法模板方法模式定义一个操作中的算法骨架,一般使用抽象类定义算法骨架。抽象类同时定义一些抽象方法,这些抽象方法延迟到子类实现,这样子类不仅遵守了算法骨架约定,也实现了自己的算法。既保证了规约也兼顾灵活性。这就是用抽象构建框架,用实现扩展细节。DUBBO源码中有一个非常重要的核心概念Invoker,我们可以理解为执行器或者说一个可执行对象,能够根据方法的名称、参数得到相应执行结果,这个特性体现了代理模式我们后面章节再说,本章节我们先分析其中的模板方法模式。public abstract class AbstractInvoker<T> implements Invoker<T> { @Override public Result invoke(Invocation inv) throws RpcException { RpcInvocation invocation = (RpcInvocation) inv; invocation.setInvoker(this); if (attachment != null && attachment.size() > 0) { invocation.addAttachmentsIfAbsent(attachment); } Map contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { invocation.addAttachments(contextAttachments); } if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) { invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString()); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); try { return doInvoke(invocation); } catch (InvocationTargetException e) { Throwable te = e.getTargetException(); if (te == null) { return new RpcResult(e); } else { if (te instanceof RpcException) { ((RpcException) te).setCode(RpcException.BIZ_EXCEPTION); } return new RpcResult(te); } } catch (RpcException e) { if (e.isBiz()) { return new RpcResult(e); } else { throw e; } } catch (Throwable e) { return new RpcResult(e); } } protected abstract Result doInvoke(Invocation invocation) throws Throwable;}AbstractInvoker作为抽象父类定义了invoke方法这个方法骨架,并且定义了doInvoke抽象方法供子类扩展,例如子类InjvmInvoker、DubboInvoker各自实现了doInvoke方法。InjvmInvoker是本地引用,调用时直接从本地暴露生产者容器获取生产者Exporter对象即可。class InjvmInvoker<T> extends AbstractInvoker<T> { @Override public Result doInvoke(Invocation invocation) throws Throwable { Exporter exporter = InjvmProtocol.getExporter(exporterMap, getUrl()); if (exporter == null) { throw new RpcException("Service [" + key + "] not found."); } RpcContext.getContext().setRemoteAddress(Constants.LOCALHOST_VALUE, 0); return exporter.getInvoker().invoke(invocation); }}DubboInvoker相对复杂一些,需要考虑同步异步调用方式,配置优先级,远程通信等等。public class DubboInvoker<T> extends AbstractInvoker<T> { @Override protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv); boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); // 超时时间方法级别配置优先级最高 int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) { ResponseFuture future = currentClient.request(inv, timeout); FutureAdapter futureAdapter = new FutureAdapter<>(future); RpcContext.getContext().setFuture(futureAdapter); Result result; if (isAsyncFuture) { result = new AsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); } else { result = new SimpleAsyncRpcResult(futureAdapter, futureAdapter.getResultFuture(), false); } return result; } else { RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } }}2 动态代理代理模式核心是为一个目标对象提供一个代理,以控制对这个对象的访问,我们可以通过代理对象访问目标对象,这样可以增强目标对象功能。代理模式分为静态代理与动态代理,动态代理又分为JDK代理和Cglib代理,JDK代理只能代理实现类接口的目标对象,但是Cglib没有这种要求。2.1 JDK动态代理动态代理本质是通过生成字节码的方式将代理对象织入目标对象,本文以JDK动态代理为例。第一步定义业务方法,即被代理的目标对象:public interface StudentJDKService { public void addStudent(String name); public void updateStudent(String name);}public class StudentJDKServiceImpl implements StudentJDKService { @Override public void addStudent(String name) { System.out.println("add student=" + name); } @Override public void updateStudent(String name) { System.out.println("update student=" + name); }}第二步定义一个事务代理对象:public class TransactionInvocationHandler implements InvocationHandler { private Object target; public TransactionInvocationHandler(Object target) { this.target = target; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { System.out.println("------前置通知------"); Object rs = method.invoke(target, args); System.out.println("------后置通知------"); return rs; }}第三步定义代理工厂:public class ProxyFactory { public Object getProxy(Object target, InvocationHandler handler) { ClassLoader loader = this.getClass().getClassLoader(); Class[] interfaces = target.getClass().getInterfaces(); Object proxy = Proxy.newProxyInstance(loader, interfaces, handler); return proxy; }}第四步进行测试:public class ZTest { public static void main(String[] args) throws Exception { testSimple(); } public static void testSimple() { StudentJDKService target = new StudentJDKServiceImpl(); TransactionInvocationHandler handler = new TransactionInvocationHandler(target); ProxyFactory proxyFactory = new ProxyFactory(); Object proxy = proxyFactory.getProxy(target, handler); StudentJDKService studentService = (StudentJDKService) proxy; studentService.addStudent("JAVA前线"); }}ProxyGenerator.generateProxyClass是生成字节码文件核心方法,我们看一看动态字节码到底如何定义:public class ZTest { public static void main(String[] args) throws Exception { createProxyClassFile(); } public static void createProxyClassFile() { String name = "Student$Proxy"; byte[] data = ProxyGenerator.generateProxyClass(name, new Class[] { StudentJDKService.class }); FileOutputStream out = null; try { String fileName = "c:/test/" + name + ".class"; File file = new File(fileName); out = new FileOutputStream(file); out.write(data); } catch (Exception e) { System.out.println(e.getMessage()); } finally { if (null != out) { try { out.close(); } catch (IOException e) { e.printStackTrace(); } } } }}最终生成字节码文件如下,我们看到代理对象被织入了目标对象:import com.xpz.dubbo.simple.jdk.StudentJDKService;import java.lang.reflect.InvocationHandler;import java.lang.reflect.Method;import java.lang.reflect.Proxy;import java.lang.reflect.UndeclaredThrowableException;public final class Student$Proxy extends Proxy implements StudentJDKService { private static Method m1; private static Method m2; private static Method m4; private static Method m3; private static Method m0; public Student$Proxy(InvocationHandler paramInvocationHandler) { super(paramInvocationHandler); } public final boolean equals(Object paramObject) { try { return ((Boolean)this.h.invoke(this, m1, new Object[] { paramObject })).booleanValue(); } catch (Error | RuntimeException error) { throw null; } catch (Throwable throwable) { throw new UndeclaredThrowableException(throwable); } } public final String toString() { try { return (String)this.h.invoke(this, m2, null); } catch (Error | RuntimeException error) { throw null; } catch (Throwable throwable) { throw new UndeclaredThrowableException(throwable); } } public final void updateStudent(String paramString) { try { this.h.invoke(this, m4, new Object[] { paramString }); return; } catch (Error | RuntimeException error) { throw null; } catch (Throwable throwable) { throw new UndeclaredThrowableException(throwable); } } public final void addStudent(String paramString) { try { this.h.invoke(this, m3, new Object[] { paramString }); return; } catch (Error | RuntimeException error) { throw null; } catch (Throwable throwable) { throw new UndeclaredThrowableException(throwable); } } public final int hashCode() { try { return ((Integer)this.h.invoke(this, m0, null)).intValue(); } catch (Error | RuntimeException error) { throw null; } catch (Throwable throwable) { throw new UndeclaredThrowableException(throwable); } } static { try { m1 = Class.forName("java.lang.Object").getMethod("equals", new Class[] { Class.forName("java.lang.Object") }); m2 = Class.forName("java.lang.Object").getMethod("toString", new Class[0]); m4 = Class.forName("com.xpz.dubbo.simple.jdk.StudentJDKService").getMethod("updateStudent", new Class[] { Class.forName("java.lang.String") }); m3 = Class.forName("com.xpz.dubbo.simple.jdk.StudentJDKService").getMethod("addStudent", new Class[] { Class.forName("java.lang.String") }); m0 = Class.forName("java.lang.Object").getMethod("hashCode", new Class[0]); return; } catch (NoSuchMethodException noSuchMethodException) { throw new NoSuchMethodError(noSuchMethodException.getMessage()); } catch (ClassNotFoundException classNotFoundException) { throw new NoClassDefFoundError(classNotFoundException.getMessage()); } }}2.2 DUBBO源码应用那么在DUBBO源码中动态代理是如何体现的呢?我们知道消费者在消费方法时实际上执行的代理方法,这是消费者在refer时生成的代理方法。代理工厂AbstractProxyFactory有两个子类:JdkProxyFactoryJavassistProxyFactory通过下面源码我们可以分析得到,DUBBO通过InvokerInvocationHandler对象代理了invoker对象:public class JdkProxyFactory extends AbstractProxyFactory { @Override public T getProxy(Invoker invoker, Class[] interfaces) { return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker)); }}public class JavassistProxyFactory extends AbstractProxyFactory { @Override public T getProxy(Invoker invoker, Class[] interfaces) { return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker)); }}InvokerInvocationHandler将参数信息封装至RpcInvocation进行传递:public class InvokerInvocationHandler implements InvocationHandler { private final Invoker invoker; public InvokerInvocationHandler(Invoker handler) { this.invoker = handler; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class[] parameterTypes = method.getParameterTypes(); if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } // RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[JAVA前线], attachments={}] RpcInvocation rpcInvocation = createInvocation(method, args); return invoker.invoke(rpcInvocation).recreate(); } private RpcInvocation createInvocation(Method method, Object[] args) { RpcInvocation invocation = new RpcInvocation(method, args); if (RpcUtils.hasFutureReturnType(method)) { invocation.setAttachment(Constants.FUTURE_RETURNTYPE_KEY, "true"); invocation.setAttachment(Constants.ASYNC_KEY, "true"); } return invocation; }}3 策略模式在1995年出版的《设计模式:可复用面向对象软件的基础》给出了策略模式定义:Define a family of algorithms, encapsulate each one, and make them interchangeable. Strategy lets the algorithm vary independently from clients that use it定义一系列算法,封装每一个算法,并使它们可以互换。策略模式可以使算法的变化独立于使用它们的客户端代码。在设计模式原则中有一条开闭原则:对扩展开放,对修改关闭,我认为这是设计模式中最重要设计原则原因如下:(1) 当需求变化时应该通过扩展而不是通过修改已有代码来实现变化,这样就保证代码的稳定性,避免牵一发而动全身(2) 扩展也不是随意扩展,因为事先定义了算法,扩展也是根据算法扩展,体现了用抽象构建框架,用实现扩展细节(3) 标准意义的二十三种设计模式说到底最终都是在遵循开闭原则3.1 策略模式实例假设我们现在需要解析一段文本,这段文本有可能是HTML也有可能是TEXT,如果不使用策略模式应该怎么写呢?public enum DocTypeEnum { HTML(1, "HTML"), TEXT(2, "TEXT"); private int value; private String description; private DocTypeEnum(int value, String description) { this.value = value; this.description = description; } public int value() { return value; } }public class ParserManager { public void parse(Integer docType, String content) { // 文本类型是HTML if(docType == DocTypeEnum.HTML.getValue()) { // 解析逻辑 } // 文本类型是TEXT else if(docType == DocTypeEnum.TEXT.getValue()) { // 解析逻辑 } }}这种写法功能上没有问题,但是当本文类型越来越多时,那么parse方法将会越来越冗余和复杂,if else代码块也会越来越多,所以我们要使用策略模式。第一步定义业务类型和业务实体:public enum DocTypeEnum { HTML(1, "HTML"), TEXT(2, "TEXT"); private int value; private String description; private DocTypeEnum(int value, String description) { this.value = value; this.description = description; } public int value() { return value; }}public class BaseModel { // 公共字段}public class HtmlContentModel extends BaseModel { // HTML自定义字段}public class TextContentModel extends BaseModel { // TEXT自定义字段}第二步定义策略:public interface Strategy<T extends BaseModel> { public T parse(String sourceContent);}@Servicepublic class HtmlStrategy implements Strategy { @Override public HtmlContentModel parse(String sourceContent) { return new HtmlContentModel("html"); }}@Servicepublic class TextStrategy implements Strategy { @Override public TextContentModel parse(String sourceContent) { return new TextContentModel("text"); }}第三步定义策略工厂:@Servicepublic class StrategyFactory implements InitializingBean { private Map strategyMap = new HashMap<>(); @Resource private Strategy htmlStrategy ; @Resource private Strategy textStrategy ; @Override public void afterPropertiesSet() throws Exception { strategyMap.put(RechargeTypeEnum.HTML.value(), htmlStrategy); strategyMap.put(RechargeTypeEnum.TEXT.value(),textStrategy); } public Strategy getStrategy(int type) { return strategyMap.get(type); }} 第四步定义策略执行器:@Servicepublic class StrategyExecutor<T extends BaseModel> { @Resource private StrategyFactory strategyFactory; public T parse(String sourceContent, Integer type) { Strategy strategy = StrategyFactory.getStrategy(type); return strategy.parse(sourceContent); }}第五步执行测试用例:public class Test { @Resource private StrategyExecutor executor; @Test public void test() { // 解析HTML HtmlContentModel content1 = (HtmlContentModel) executor.parse("测试内容", DocTypeEnum.HTML.value()); System.out.println(content1); // 解析TEXT TextContentModel content2 = (TextContentModel)executor.calRecharge("测试内容", DocTypeEnum.TEXT.value()); System.out.println(content2); }}如果新增文本类型我们再扩展新策略即可。我们再回顾策略模式定义会有更深的体会:定义一系列算法,封装每一个算法,并使它们可以互换。策略模式可以使算法的变化独立于使用它们的客户端代码。3.2 DUBBO源码应用在上述实例中我们将策略存储在map容器,我们思考一下还有没有其它地方可以存储策略?答案是配置文件。下面就要介绍SPI机制,我认为这个机制在广义上实现了策略模式。SPI(Service Provider Interface)是一种服务发现机制,本质是将接口实现类的全限定名配置在文件中,并由服务加载器读取配置文件加载实现类,这样可以在运行时动态为接口替换实现类,我们通过SPI机制可以为程序提供拓展功能。3.2.1 JDK SPI我们首先分析JDK自身SPI机制,定义一个数据驱动接口并提供两个驱动实现,最后通过serviceLoader加载驱动。(1) 新建DataBaseDriver工程并定义接口public interface DataBaseDriver { String connect(String hostIp);}(2) 打包这个工程为JAR<dependency> <groupId>com.javafont.spigroupId> <artifactId>DataBaseDriverartifactId> <version>1.0.0-SNAPSHOTversion>dependency>(3) 新建MySQLDriver工程引用上述依赖并实现DataBaseDriver接口import com.javafont.database.driver.DataBaseDriver;public class MySQLDataBaseDriver implements DataBaseDriver { @Override public String connect(String hostIp) { return "MySQL DataBase Driver connect"; }}(4) 在MySQLDriver项目新建文件src/main/resources/META-INF/services/com.javafont.database.driver.DataBaseDriver(5) 在上述文件新增如下内容com.javafont.database.mysql.driver.MySQLDataBaseDriver(6) 按照上述相同步骤创建工程OracleDriver(7) 打包上述两个项目<dependency> <groupId>com.javafont.spigroupId> <artifactId>MySQLDriverartifactId> <version>1.0.0-SNAPSHOTversion>dependency><dependency> <groupId>com.javafont.spigroupId> <artifactId>OracleDriverartifactId> <version>1.0.0-SNAPSHOTversion>dependency>(8) 新建测试项目引入上述MySQLDriver、OracleDriverpublic class DataBaseConnector { public static void main(String[] args) { ServiceLoader serviceLoader = ServiceLoader.load(DataBaseDriver.class); Iterator iterator = serviceLoader.iterator(); while (iterator.hasNext()) { DataBaseDriver driver = iterator.next(); System.out.println(driver.connect("localhost")); } }}// 输出结果// MySQL DataBase Driver connect// Oracle DataBase Driver connect我们并没有指定使用哪个驱动连接数据库,而是通过ServiceLoader方式加载所有实现了DataBaseDriver接口的实现类。假设我们只需要使用MySQL数据库驱动那么直接引入相应依赖即可。3.2.2 DUBBO SPI我们发现JDK SPI机制还是有一些不完善之处:例如通过ServiceLoader会加载所有实现了某个接口的实现类,但是不能通过一个key去指定获取哪一个实现类,但是DUBBO自己实现的SPI机制解决了这个问题。例如Protocol接口有如下实现类:org.apache.dubbo.rpc.protocol.injvm.InjvmProtocolorg.apache.dubbo.rpc.protocol.dubbo.DubboProtocol我们现在将这些类配置信息在配置文件,配置文件在如下目录:META-INF/services/META-INF/dubbo/META-INF/dubbo/internal/配置方式和JDK SPI方式配置不一样,每个实现类都有key与之对应:dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocolinjvm=org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol使用时通过扩展点方式加载实现类:public class ReferenceConfig<T> extends AbstractReferenceConfig { private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); private T createProxy(Map map) { if (isJvmRefer) { URL url = new URL(Constants.LOCAL_PROTOCOL, Constants.LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map); invoker = refprotocol.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } }}getAdaptiveExtension()是加载自适应扩展点,javassist会为自适应扩展点生成动态代码:public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol { public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException { if (arg1 == null) throw new IllegalArgumentException("url == null"); org.apache.dubbo.common.URL url = arg1; String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol()); if (extName == null) throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName); return extension.refer(arg0, arg1); }}extension对象就是根据url中protocol属性等于injvm最终加载InjvmProtocol对象,动态获取到了我们制定的业务对象,所以我认为SPI体现了策略模式。4 装饰器模式装饰器模式可以动态将责任附加到对象上,在不改变原始类接口情况下,对原始类功能进行增强,并且支持多个装饰器的嵌套使用。实现装饰器模式需要以下组件:(1) Component(抽象构件)核心业务抽象:可以使用接口或者抽象类(2) ConcreteComponent(具体构件)实现核心业务:最终执行的业务代码(3) Decorator(抽象装饰器)抽象装饰器类:实现Component并且组合一个Component对象(4) ConcreteDecorator(具体装饰器)具体装饰内容:装饰核心业务代码4.1 装饰器实例有一名足球运动员要去踢球,我们用球鞋和球袜为他装饰一番,这样可以使其战力值增加,我们使用装饰器模式实现这个实例。(1) Component/** * 抽象构件(可以用接口替代) */public abstract class Component { /** * 踢足球(业务核心方法) */ public abstract void playFootBall();}(2) ConcreteComponent/** * 具体构件 */public class ConcreteComponent extends Component { @Override public void playFootBall() { System.out.println("球员踢球"); }}(3) Decorator/** * 抽象装饰器 */public abstract class Decorator extends Component { private Component component = null; public Decorator(Component component) { this.component = component; } @Override public void playFootBall() { this.component.playFootBall(); }}(4) ConcreteDecorator/** * 球袜装饰器 */public class ConcreteDecoratorA extends Decorator { public ConcreteDecoratorA(Component component) { super(component); } /** * 定义球袜装饰逻辑 */ private void decorateMethod() { System.out.println("换上球袜战力值增加"); } /** * 重写父类方法 */ @Override public void playFootBall() { this.decorateMethod(); super.playFootBall(); }}/** * 球鞋装饰器 */public class ConcreteDecoratorB extends Decorator { public ConcreteDecoratorB(Component component) { super(component); } /** * 定义球鞋装饰逻辑 */ private void decorateMethod() { System.out.println("换上球鞋战力值增加"); } /** * 重写父类方法 */ @Override public void playFootBall() { this.decorateMethod(); super.playFootBall(); }}(5) 运行测试public class TestDecoratorDemo { public static void main(String[] args) { Component component = new ConcreteComponent(); component = new ConcreteDecoratorA(component); component = new ConcreteDecoratorB(component); component.playFootBall(); }}// 换上球鞋战力值增加// 换上球袜战力值增加// 球员踢球4.2 DUBBO源码应用DUBBO是通过SPI机制实现装饰器模式,我们以Protocol接口进行分析,首先分析装饰器类,抽象装饰器核心要点是实现了Component并且组合一个Component对象。public class ProtocolFilterWrapper implements Protocol { private final Protocol protocol; public ProtocolFilterWrapper(Protocol protocol) { if (protocol == null) { throw new IllegalArgumentException("protocol == null"); } this.protocol = protocol; }}public class ProtocolListenerWrapper implements Protocol { private final Protocol protocol; public ProtocolListenerWrapper(Protocol protocol) { if (protocol == null) { throw new IllegalArgumentException("protocol == null"); } this.protocol = protocol; }}在配置文件中配置装饰器:filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapperlistener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper通过SPI机制加载扩展点时会使用装饰器装饰具体构件:public class ReferenceConfig<T> extends AbstractReferenceConfig { private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); private T createProxy(Map map) { if (isJvmRefer) { URL url = new URL(Constants.LOCAL_PROTOCOL, Constants.LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map); invoker = refprotocol.refer(interfaceClass, url); if (logger.isInfoEnabled()) { logger.info("Using injvm service " + interfaceClass.getName()); } } }}最终生成refprotocol为如下对象:ProtocolFilterWrapper(ProtocolListenerWrapper(InjvmProtocol))5 责任链模式责任链模式将请求发送和接收解耦,让多个接收对象都有机会处理这个请求。这些接收对象串成一条链路并沿着这条链路传递这个请求,直到链路上某个接收对象能够处理它。我们介绍责任链模式两种应用场景和四种代码实现方式,最后介绍了DUBBO如何应用责任链构建过滤器链路。5.1 应用场景:命中立即中断实现一个关键词过滤功能。系统设置三个关键词过滤器,输入内容命中任何一个过滤器规则就返回校验不通过,链路立即中断无需继续进行。(1) 实现方式一public interface ContentFilter { public boolean filter(String content);}public class AaaContentFilter implements ContentFilter { private final static String KEY_CONTENT = "aaa"; @Override public boolean filter(String content) { boolean isValid = Boolean.FALSE; if (StringUtils.isEmpty(content)) { return isValid; } isValid = !content.contains(KEY_CONTENT); return isValid; }}public class BbbContentFilter implements ContentFilter { private final static String KEY_CONTENT = "bbb"; @Override public boolean filter(String content) { boolean isValid = Boolean.FALSE; if (StringUtils.isEmpty(content)) { return isValid; } isValid = !content.contains(KEY_CONTENT); return isValid; }}public class CccContentFilter implements ContentFilter { private final static String KEY_CONTENT = "ccc"; @Override public boolean filter(String content) { boolean isValid = Boolean.FALSE; if (StringUtils.isEmpty(content)) { return isValid; } isValid = !content.contains(KEY_CONTENT); return isValid; }}具体过滤器已经完成,我们下面构造过滤器责任链路:@Servicepublic class ContentFilterChain { private List filters = new ArrayList(); @PostConstruct public void init() { ContentFilter aaaContentFilter = new AaaContentFilter(); ContentFilter bbbContentFilter = new BbbContentFilter(); ContentFilter cccContentFilter = new CccContentFilter(); filters.add(aaaContentFilter); filters.add(bbbContentFilter); filters.add(cccContentFilter); } public void addFilter(ContentFilter filter) { filters.add(filter); } public boolean filter(String content) { if (CollectionUtils.isEmpty(filters)) { throw new RuntimeException("ContentFilterChain is empty"); } for (ContentFilter filter : filters) { boolean isValid = filter.filter(content); if (!isValid) { System.out.println("校验不通过"); return isValid; } } return Boolean.TRUE; }}public class Test { public static void main(String[] args) throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/chain/spring-core.xml" }); ContentFilterChain chain = (ContentFilterChain) context.getBean("contentFilterChain"); System.out.println(context); boolean result1 = chain.filter("ccc"); boolean result2 = chain.filter("ddd"); System.out.println("校验结果1=" + result1); System.out.println("校验结果2=" + result2); }}(2) 实现方式二public abstract class FilterHandler { /** 下一个节点 **/ protected FilterHandler successor = null; public void setSuccessor(FilterHandler successor) { this.successor = successor; } public final boolean filter(String content) { /** 执行自身方法 **/ boolean isValid = doFilter(content); if (!isValid) { System.out.println("校验不通过"); return isValid; } /** 执行下一个节点链路 **/ if (successor != null && this != successor) { isValid = successor.filter(content); } return isValid; } /** 每个节点过滤方法 **/ protected abstract boolean doFilter(String content);}public class AaaContentFilterHandler extends FilterHandler { private final static String KEY_CONTENT = "aaa"; @Override protected boolean doFilter(String content) { boolean isValid = Boolean.FALSE; if (StringUtils.isEmpty(content)) { return isValid; } isValid = !content.contains(KEY_CONTENT); return isValid; }}// 省略其它过滤器代码具体过滤器已经完成,我们下面构造过滤器责任链路:@Servicepublic class FilterHandlerChain { private FilterHandler head = null; private FilterHandler tail = null; @PostConstruct public void init() { FilterHandler aaaHandler = new AaaContentFilterHandler(); FilterHandler bbbHandler = new BbbContentFilterHandler(); FilterHandler cccHandler = new CccContentFilterHandler(); addHandler(aaaHandler); addHandler(bbbHandler); addHandler(cccHandler); } public void addHandler(FilterHandler handler) { if (head == null) { head = tail = handler; } /** 设置当前tail继任者 **/ tail.setSuccessor(handler); /** 指针重新指向tail **/ tail = handler; } public boolean filter(String content) { if (null == head) { throw new RuntimeException("FilterHandlerChain is empty"); } /** head发起调用 **/ return head.filter(content); }}public class Test { public static void main(String[] args) throws Exception { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/chain/spring-core.xml" }); FilterHandlerChain chain = (FilterHandlerChain) context.getBean("filterHandlerChain"); System.out.println(context); boolean result1 = chain.filter("ccc"); boolean result2 = chain.filter("ddd"); System.out.println("校验结果1=" + result1); System.out.println("校验结果2=" + result2); }}5.2 应用场景:全链路执行我们实现一个考题生成功能。在线考试系统根据不同年级生成不同考题。系统设置三个考题生成器,每个生成器都会执行,根据学生年级决定是否生成考题,无需生成则执行下一个生成器。(1) 实现方式一public interface QuestionGenerator { public Question generateQuestion(String gradeInfo);}public class AaaQuestionGenerator implements QuestionGenerator { @Override public Question generateQuestion(String gradeInfo) { if (!gradeInfo.equals("一年级")) { return null; } Question question = new Question(); question.setId("aaa"); question.setScore(10); return question; }}// 省略其它生成器代码具体生成器已经编写完成,我们下面构造生成器责任链路:@Servicepublic class QuestionChain { private List generators = new ArrayList(); @PostConstruct public void init() { QuestionGenerator aaaQuestionGenerator = new AaaQuestionGenerator(); QuestionGenerator bbbQuestionGenerator = new BbbQuestionGenerator(); QuestionGenerator cccQuestionGenerator = new CccQuestionGenerator(); generators.add(aaaQuestionGenerator); generators.add(bbbQuestionGenerator); generators.add(cccQuestionGenerator); } public List generate(String gradeInfo) { if (CollectionUtils.isEmpty(generators)) { throw new RuntimeException("QuestionChain is empty"); } List questions = new ArrayList(); for (QuestionGenerator generator : generators) { Question question = generator.generateQuestion(gradeInfo); if (null == question) { continue; } questions.add(question); } return questions; }}public class Test { public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/chain/spring-core.xml" }); System.out.println(context); QuestionChain chain = (QuestionChain) context.getBean("questionChain"); List questions = chain.generate("一年级"); System.out.println(questions); }}(2) 实现方式二public abstract class GenerateHandler { /** 下一个节点 **/ protected GenerateHandler successor = null; public void setSuccessor(GenerateHandler successor) { this.successor = successor; } public final List generate(String gradeInfo) { List result = new ArrayList(); /** 执行自身方法 **/ Question question = doGenerate(gradeInfo); if (null != question) { result.add(question); } /** 执行下一个节点链路 **/ if (successor != null && this != successor) { List successorQuestions = successor.generate(gradeInfo); if (null != successorQuestions) { result.addAll(successorQuestions); } } return result; } /** 每个节点生成方法 **/ protected abstract Question doGenerate(String gradeInfo);}public class AaaGenerateHandler extends GenerateHandler { @Override protected Question doGenerate(String gradeInfo) { if (!gradeInfo.equals("一年级")) { return null; } Question question = new Question(); question.setId("aaa"); question.setScore(10); return question; }}// 省略其它生成器代码具体生成器已经完成,我们下面构造生成器责任链路:@Servicepublic class GenerateChain { private GenerateHandler head = null; private GenerateHandler tail = null; @PostConstruct public void init() { GenerateHandler aaaHandler = new AaaGenerateHandler(); GenerateHandler bbbHandler = new BbbGenerateHandler(); GenerateHandler cccHandler = new CccGenerateHandler(); addHandler(aaaHandler); addHandler(bbbHandler); addHandler(cccHandler); } public void addHandler(GenerateHandler handler) { if (head == null) { head = tail = handler; } /** 设置当前tail继任者 **/ tail.setSuccessor(handler); /** 指针重新指向tail **/ tail = handler; } public List generate(String gradeInfo) { if (null == head) { throw new RuntimeException("GenerateChain is empty"); } /** head发起调用 **/ return head.generate(gradeInfo); }}public class Test { public static void main(String[] args) { ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/chain/spring-core.xml" }); GenerateChain chain = (GenerateChain) context.getBean("generateChain"); System.out.println(context); List result = chain.generate("一年级"); System.out.println(result); }}5.3 DUBBO源码应用生产者和消费者最终执行对象都是过滤器链路最后一个节点,整个链路包含多个过滤器进行业务处理。我们看看生产者和消费者最终生成的过滤器链路。生产者过滤器链路EchoFilter > ClassloaderFilter > GenericFilter > ContextFilter > TraceFilter > TimeoutFilter > MonitorFilter > ExceptionFilter > AbstractProxyInvoker消费者过滤器链路ConsumerContextFilter > FutureFilter > MonitorFilter > DubboInvokerProtocolFilterWrapper作为链路生成核心通过匿名类方式构建过滤器链路,我们以消费者构建过滤器链路为例:public class ProtocolFilterWrapper implements Protocol { private static Invoker buildInvokerChain(final Invoker invoker, String key, String group) { // invoker = DubboInvoker Invoker last = invoker; // 查询符合条件过滤器列表 List filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group); if (!filters.isEmpty()) { for (int i = filters.size() - 1; i >= 0; i--) { final Filter filter = filters.get(i); final Invoker next = last; // 构造一个简化Invoker last = new Invoker() { @Override public Class getInterface() { return invoker.getInterface(); } @Override public URL getUrl() { return invoker.getUrl(); } @Override public boolean isAvailable() { return invoker.isAvailable(); } @Override public Result invoke(Invocation invocation) throws RpcException { // 构造过滤器链路 Result result = filter.invoke(next, invocation); if (result instanceof AsyncRpcResult) { AsyncRpcResult asyncResult = (AsyncRpcResult) result; asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation)); return asyncResult; } else { return filter.onResponse(result, invoker, invocation); } } @Override public void destroy() { invoker.destroy(); } @Override public String toString() { return invoker.toString(); } }; } } return last; } @Override public Invoker refer(Class type, URL url) throws RpcException { // RegistryProtocol不构造过滤器链路 if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } Invoker invoker = protocol.refer(type, url); return buildInvokerChain(invoker, Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER); }}6 保护性暂停模式在多线程编程实践中我们肯定会面临线程间数据交互的问题。在处理这类问题时需要使用一些设计模式,从而保证程序的正确性和健壮性。保护性暂停设计模式就是解决多线程间数据交互问题的一种模式。本文先从基础案例介绍保护性暂停基本概念和实践,再由浅入深,最终分析DUBBO源码中保护性暂停设计模式使用场景。6.1 保护性暂停实例我们设想这样一种场景:线程A生产数据,线程B读取数据这个数据。但是有一种情况:线程B准备读取数据时,此时线程A还没有生产出数据。在这种情况下线程B不能一直空转,也不能立即退出,线程B要等到生产数据完成并拿到数据之后才退出。那么在数据没有生产出这段时间,线程B需要执行一种等待机制,这样可以达到对系统保护目的,这就是保护性暂停。保护性暂停有多种实现方式,本文我们用synchronized/wait/notify的方式实现。class Resource { private MyData data; private Object lock = new Object(); public MyData getData(int timeOut) { synchronized (lock) { // 运行时长 long timePassed = 0; // 开始时间 long begin = System.currentTimeMillis(); // 如果结果为空 while (data == null) { try { // 如果运行时长大于超时时间退出循环 if (timePassed > timeOut) { break; } // 如果运行时长小于超时时间表示虚假唤醒 -> 只需再等待时间差值 long waitTime = timeOut - timePassed; // 等待时间差值 lock.wait(waitTime); // 结果不为空直接返回 if (data != null) { break; } // 被唤醒后计算运行时长 timePassed = System.currentTimeMillis() - begin; } catch (InterruptedException e) { e.printStackTrace(); } } if (data == null) { throw new RuntimeException("超时未获取到结果"); } return data; } } public void sendData(MyData data) { synchronized (lock) { this.data = data; lock.notifyAll(); } }}/** * 保护性暂停实例 */public class ProtectDesignTest { public static void main(String[] args) { Resource resource = new Resource(); new Thread(() -> { try { MyData data = new MyData("hello"); System.out.println(Thread.currentThread().getName() + "生产数据=" + data); // 模拟发送耗时 TimeUnit.SECONDS.sleep(3); resource.sendData(data); } catch (InterruptedException e) { e.printStackTrace(); } }, "t1").start(); new Thread(() -> { MyData data = resource.getData(1000); System.out.println(Thread.currentThread().getName() + "接收到数据=" + data); }, "t2").start(); }}6.2 加一个编号现在再来设想一个场景:现在有三个生产数据的线程1、2、3,三个获取数据的线程4、5、6,我们希望每个获取数据线程都只拿到其中一个生产线程的数据,不能多拿也不能少拿。这里引入一个Futures模型,这个模型为每个资源进行编号并存储在容器中,例如线程1生产的数据被拿走则从容器中删除,一直到容器为空结束。@Getter@Setterpublic class MyNewData implements Serializable { private static final long serialVersionUID = 1L; private static final AtomicLong ID = new AtomicLong(0); private Long id; private String message; public MyNewData(String message) { this.id = newId(); this.message = message; } /** * 自增到最大值会回到最小值(负值可以作为识别ID) */ private static long newId() { return ID.getAndIncrement(); } public Long getId() { return this.id; }}class MyResource { private MyNewData data; private Object lock = new Object(); public MyNewData getData(int timeOut) { synchronized (lock) { long timePassed = 0; long begin = System.currentTimeMillis(); while (data == null) { try { if (timePassed > timeOut) { break; } long waitTime = timeOut - timePassed; lock.wait(waitTime); if (data != null) { break; } timePassed = System.currentTimeMillis() - begin; } catch (InterruptedException e) { e.printStackTrace(); } } if (data == null) { throw new RuntimeException("超时未获取到结果"); } return data; } } public void sendData(MyNewData data) { synchronized (lock) { this.data = data; lock.notifyAll(); } }}class MyFutures { private static final Map FUTURES = new ConcurrentHashMap<>(); public static MyResource newResource(MyNewData data) { final MyResource future = new MyResource(); FUTURES.put(data.getId(), future); return future; } public static MyResource getResource(Long id) { return FUTURES.remove(id); } public static Set getIds() { return FUTURES.keySet(); }}/** * 保护性暂停实例 */public class ProtectDesignTest { public static void main(String[] args) throws Exception { for (int i = 0; i < 3; i++) { final int index = i; new Thread(() -> { try { MyNewData data = new MyNewData("hello_" + index); MyResource resource = MyFutures.newResource(data); // 模拟发送耗时 TimeUnit.SECONDS.sleep(1); resource.sendData(data); System.out.println("生产数据data=" + data); } catch (InterruptedException e) { e.printStackTrace(); } }).start(); } TimeUnit.SECONDS.sleep(1); for (Long i : MyFutures.getIds()) { final long index = i; new Thread(() -> { MyResource resource = MyFutures.getResource(index); int timeOut = 3000; System.out.println("接收数据data=" + resource.getData(timeOut)); }).start(); } }}6.3 DUBBO源码应用我们顺着这一个链路跟踪代码:消费者发送请求 > 提供者接收请求并执行,并且将运行结果发送给消费者 > 消费者接收结果。(1) 消费者发送请求消费者发送的数据包含请求ID,并且将关系维护进FUTURES容器final class HeaderExchangeChannel implements ExchangeChannel { @Override public ResponseFuture request(Object request, int timeout) throws RemotingException { if (closed) { throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!"); } Request req = new Request(); req.setVersion(Version.getProtocolVersion()); req.setTwoWay(true); req.setData(request); DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout); try { channel.send(req); } catch (RemotingException e) { future.cancel(); throw e; } return future; }}class DefaultFuture implements ResponseFuture { // FUTURES容器 private static final Map FUTURES = new ConcurrentHashMap<>(); private DefaultFuture(Channel channel, Request request, int timeout) { this.channel = channel; this.request = request; // 请求ID this.id = request.getId(); this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); FUTURES.put(id, this); CHANNELS.put(id, channel); }}(2) 提供者接收请求并执行,并且将运行结果发送给消费者public class HeaderExchangeHandler implements ChannelHandlerDelegate { void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException { // response与请求ID对应 Response res = new Response(req.getId(), req.getVersion()); if (req.isBroken()) { Object data = req.getData(); String msg; if (data == null) { msg = null; } else if (data instanceof Throwable) { msg = StringUtils.toString((Throwable) data); } else { msg = data.toString(); } res.setErrorMessage("Fail to decode request due to: " + msg); res.setStatus(Response.BAD_REQUEST); channel.send(res); return; } // message = RpcInvocation包含方法名、参数名、参数值等 Object msg = req.getData(); try { // DubboProtocol.reply执行实际业务方法 CompletableFuture future = handler.reply(channel, msg); // 如果请求已经完成则发送结果 if (future.isDone()) { res.setStatus(Response.OK); res.setResult(future.get()); channel.send(res); return; } } catch (Throwable e) { res.setStatus(Response.SERVICE_ERROR); res.setErrorMessage(StringUtils.toString(e)); channel.send(res); } }}(3) 消费者接收结果以下DUBBO源码很好体现了保护性暂停这个设计模式,说明参看注释class DefaultFuture implements ResponseFuture { private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); public static void received(Channel channel, Response response) { try { // 取出对应的请求对象 DefaultFuture future = FUTURES.remove(response.getId()); if (future != null) { future.doReceived(response); } else { logger.warn("The timeout response finally returned at " + (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())) + ", response " + response + (channel == null ? "" : ", channel: " + channel.getLocalAddress() + " -> " + channel.getRemoteAddress())); } } finally { CHANNELS.remove(response.getId()); } } @Override public Object get(int timeout) throws RemotingException { if (timeout <= 0) { timeout = Constants.DEFAULT_TIMEOUT; } if (!isDone()) { long start = System.currentTimeMillis(); lock.lock(); try { while (!isDone()) { // 放弃锁并使当前线程阻塞,直到发出信号中断它或者达到超时时间 done.await(timeout, TimeUnit.MILLISECONDS); // 阻塞结束后再判断是否完成 if (isDone()) { break; } // 阻塞结束后判断是否超时 if(System.currentTimeMillis() - start > timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } // response对象仍然为空则抛出超时异常 if (!isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } return returnFromResponse(); } private void doReceived(Response res) { lock.lock(); try { // 接收到服务器响应赋值response response = res; if (done != null) { // 唤醒get方法中处于等待的代码块 done.signal(); } } finally { lock.unlock(); } if (callback != null) { invokeCallback(callback); } }}7 双重检查锁模式单例设计模式可以保证在整个应用某个类只能存在一个对象实例,并且这个类只提供一个取得其对象实例方法,通常这个对象创建和销毁比较消耗资源,例如数据库连接对象等等。我们分析一个双重检查锁实现的单例模式实例。public class MyDCLConnection { private static volatile MyDCLConnection myConnection = null; private MyDCLConnection() { System.out.println(Thread.currentThread().getName() + " -> init connection"); } public static MyDCLConnection getConnection() { if (null == myConnection) { synchronized (MyDCLConnection.class) { if (null == myConnection) { myConnection = new MyDCLConnection(); } } } return myConnection; }}在DUBBO服务本地暴露时使用了双重检查锁模式判断exporter是否已经存在避免重复创建:public class RegistryProtocol implements Protocol { private ExporterChangeableWrapper doLocalExport(final Invoker originInvoker, URL providerUrl) { String key = getCacheKey(originInvoker); ExporterChangeableWrapper exporter = (ExporterChangeableWrapper) bounds.get(key); if (exporter == null) { synchronized (bounds) { exporter = (ExporterChangeableWrapper) bounds.get(key); if (exporter == null) { final Invoker invokerDelegete = new InvokerDelegate(originInvoker, providerUrl); final Exporter strongExporter = (Exporter) protocol.export(invokerDelegete); exporter = new ExporterChangeableWrapper(strongExporter, originInvoker); bounds.put(key, exporter); } } } return exporter; }}8 文章总结本文我们结合DUBBO源码分析了模板方法模式、动态代理模式、策略模式、装饰器模式、责任链模式、保护性暂停模式、双重检查锁模式,我认为在阅读源码时要学习其中优秀的设计模式和代码实例,这样有助于提高代码水平,希望本文对大家有所帮助。JAVA前线 欢迎大家关注公众号「JAVA前线」查看更多精彩分享,主要包括源码分析、实际应用、架构思维、职场分享、产品思考等等,同时也非常欢迎大家加我微信「java_front」一起交流学习 浏览 23点赞 评论 收藏 分享 手机扫一扫分享分享 举报 评论图片表情视频评价全部评论推荐 长文详解:DUBBO源码使用了哪些设计模式java12340长文详解:DUBBO源码使用了哪些设计模式Java资料站0面试题:DUBBO源码使用了哪些设计模式Hollis0设计模式详解——组合模式云中志0设计模式详解——复合模式云中志0设计模式详解——状态模式云中志0多线程设计模式:保护性暂停模式详解以及其在DUBBO应用源码分析java12340设计模式详解——外观模式云中志0设计模式详解——观察者模式云中志0设计模式详解——适配器模式云中志0点赞 评论 收藏 分享 手机扫一扫分享分享 举报