长文详解:DUBBO源码使用了哪些设计模式
共 48818字,需浏览 98分钟
·
2021-01-17 09:18
JAVA前线
欢迎大家关注公众号「JAVA前线」查看更多精彩分享,主要包括源码分析、实际应用、架构思维、职场分享、产品思考等等,同时也非常欢迎大家加我微信「java_front」一起交流学习
0 文章概述
DUBBO作为RPC领域优秀开源的框架在业界十分流行,本文我们阅读其源码并对其使用到的设计模式进行分析。需要说明的是本文所说的设计模式更加广义,不仅包括标准意义上23种设计模式,还有一些常见经过检验的代码模式例如双重检查锁模式、多线程保护性暂停模式等等。
1 模板方法
模板方法模式定义一个操作中的算法骨架,一般使用抽象类定义算法骨架。抽象类同时定义一些抽象方法,这些抽象方法延迟到子类实现,这样子类不仅遵守了算法骨架约定,也实现了自己的算法。既保证了规约也兼顾灵活性。这就是用抽象构建框架,用实现扩展细节。
DUBBO源码中有一个非常重要的核心概念Invoker,我们可以理解为执行器或者说一个可执行对象,能够根据方法的名称、参数得到相应执行结果,这个特性体现了代理模式我们后面章节再说,本章节我们先分析其中的模板方法模式。
public abstract class AbstractInvoker<T> implements Invoker<T> {
@Override
public Result invoke(Invocation inv) throws RpcException {
RpcInvocation invocation = (RpcInvocation) inv;
invocation.setInvoker(this);
if (attachment != null && attachment.size() > 0) {
invocation.addAttachmentsIfAbsent(attachment);
}
Map contextAttachments = RpcContext.getContext().getAttachments();
if (contextAttachments != null && contextAttachments.size() != 0) {
invocation.addAttachments(contextAttachments);
}
if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {
invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
try {
return doInvoke(invocation);
} catch (InvocationTargetException e) {
Throwable te = e.getTargetException();
if (te == null) {
return new RpcResult(e);
} else {
if (te instanceof RpcException) {
((RpcException) te).setCode(RpcException.BIZ_EXCEPTION);
}
return new RpcResult(te);
}
} catch (RpcException e) {
if (e.isBiz()) {
return new RpcResult(e);
} else {
throw e;
}
} catch (Throwable e) {
return new RpcResult(e);
}
}
protected abstract Result doInvoke(Invocation invocation) throws Throwable;
}
AbstractInvoker作为抽象父类定义了invoke方法这个方法骨架,并且定义了doInvoke抽象方法供子类扩展,例如子类InjvmInvoker、DubboInvoker各自实现了doInvoke方法。
InjvmInvoker是本地引用,调用时直接从本地暴露生产者容器获取生产者Exporter对象即可。
class InjvmInvoker<T> extends AbstractInvoker<T> {
@Override
public Result doInvoke(Invocation invocation) throws Throwable {
Exporter> exporter = InjvmProtocol.getExporter(exporterMap, getUrl());
if (exporter == null) {
throw new RpcException("Service [" + key + "] not found.");
}
RpcContext.getContext().setRemoteAddress(Constants.LOCALHOST_VALUE, 0);
return exporter.getInvoker().invoke(invocation);
}
}
DubboInvoker相对复杂一些,需要考虑同步异步调用方式,配置优先级,远程通信等等。
public class DubboInvoker<T> extends AbstractInvoker<T> {
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isAsyncFuture = RpcUtils.isReturnTypeFuture(inv);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
// 超时时间方法级别配置优先级最高
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout);
FutureAdapter
2 动态代理
代理模式核心是为一个目标对象提供一个代理,以控制对这个对象的访问,我们可以通过代理对象访问目标对象,这样可以增强目标对象功能。
代理模式分为静态代理与动态代理,动态代理又分为JDK代理和Cglib代理,JDK代理只能代理实现类接口的目标对象,但是Cglib没有这种要求。
2.1 JDK动态代理
动态代理本质是通过生成字节码的方式将代理对象织入目标对象,本文以JDK动态代理为例。
第一步定义业务方法,即被代理的目标对象:
public interface StudentJDKService {
public void addStudent(String name);
public void updateStudent(String name);
}
public class StudentJDKServiceImpl implements StudentJDKService {
@Override
public void addStudent(String name) {
System.out.println("add student=" + name);
}
@Override
public void updateStudent(String name) {
System.out.println("update student=" + name);
}
}
第二步定义一个事务代理对象:
public class TransactionInvocationHandler implements InvocationHandler {
private Object target;
public TransactionInvocationHandler(Object target) {
this.target = target;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
System.out.println("------前置通知------");
Object rs = method.invoke(target, args);
System.out.println("------后置通知------");
return rs;
}
}
第三步定义代理工厂:
public class ProxyFactory {
public Object getProxy(Object target, InvocationHandler handler) {
ClassLoader loader = this.getClass().getClassLoader();
Class>[] interfaces = target.getClass().getInterfaces();
Object proxy = Proxy.newProxyInstance(loader, interfaces, handler);
return proxy;
}
}
第四步进行测试:
public class ZTest {
public static void main(String[] args) throws Exception {
testSimple();
}
public static void testSimple() {
StudentJDKService target = new StudentJDKServiceImpl();
TransactionInvocationHandler handler = new TransactionInvocationHandler(target);
ProxyFactory proxyFactory = new ProxyFactory();
Object proxy = proxyFactory.getProxy(target, handler);
StudentJDKService studentService = (StudentJDKService) proxy;
studentService.addStudent("JAVA前线");
}
}
ProxyGenerator.generateProxyClass是生成字节码文件核心方法,我们看一看动态字节码到底如何定义:
public class ZTest {
public static void main(String[] args) throws Exception {
createProxyClassFile();
}
public static void createProxyClassFile() {
String name = "Student$Proxy";
byte[] data = ProxyGenerator.generateProxyClass(name, new Class[] { StudentJDKService.class });
FileOutputStream out = null;
try {
String fileName = "c:/test/" + name + ".class";
File file = new File(fileName);
out = new FileOutputStream(file);
out.write(data);
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
if (null != out) {
try {
out.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
}
最终生成字节码文件如下,我们看到代理对象被织入了目标对象:
import com.xpz.dubbo.simple.jdk.StudentJDKService;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.lang.reflect.UndeclaredThrowableException;
public final class Student$Proxy extends Proxy implements StudentJDKService {
private static Method m1;
private static Method m2;
private static Method m4;
private static Method m3;
private static Method m0;
public Student$Proxy(InvocationHandler paramInvocationHandler) {
super(paramInvocationHandler);
}
public final boolean equals(Object paramObject) {
try {
return ((Boolean)this.h.invoke(this, m1, new Object[] { paramObject })).booleanValue();
} catch (Error | RuntimeException error) {
throw null;
} catch (Throwable throwable) {
throw new UndeclaredThrowableException(throwable);
}
}
public final String toString() {
try {
return (String)this.h.invoke(this, m2, null);
} catch (Error | RuntimeException error) {
throw null;
} catch (Throwable throwable) {
throw new UndeclaredThrowableException(throwable);
}
}
public final void updateStudent(String paramString) {
try {
this.h.invoke(this, m4, new Object[] { paramString });
return;
} catch (Error | RuntimeException error) {
throw null;
} catch (Throwable throwable) {
throw new UndeclaredThrowableException(throwable);
}
}
public final void addStudent(String paramString) {
try {
this.h.invoke(this, m3, new Object[] { paramString });
return;
} catch (Error | RuntimeException error) {
throw null;
} catch (Throwable throwable) {
throw new UndeclaredThrowableException(throwable);
}
}
public final int hashCode() {
try {
return ((Integer)this.h.invoke(this, m0, null)).intValue();
} catch (Error | RuntimeException error) {
throw null;
} catch (Throwable throwable) {
throw new UndeclaredThrowableException(throwable);
}
}
static {
try {
m1 = Class.forName("java.lang.Object").getMethod("equals", new Class[] { Class.forName("java.lang.Object") });
m2 = Class.forName("java.lang.Object").getMethod("toString", new Class[0]);
m4 = Class.forName("com.xpz.dubbo.simple.jdk.StudentJDKService").getMethod("updateStudent", new Class[] { Class.forName("java.lang.String") });
m3 = Class.forName("com.xpz.dubbo.simple.jdk.StudentJDKService").getMethod("addStudent", new Class[] { Class.forName("java.lang.String") });
m0 = Class.forName("java.lang.Object").getMethod("hashCode", new Class[0]);
return;
} catch (NoSuchMethodException noSuchMethodException) {
throw new NoSuchMethodError(noSuchMethodException.getMessage());
} catch (ClassNotFoundException classNotFoundException) {
throw new NoClassDefFoundError(classNotFoundException.getMessage());
}
}
}
2.2 DUBBO源码应用
那么在DUBBO源码中动态代理是如何体现的呢?我们知道消费者在消费方法时实际上执行的代理方法,这是消费者在refer时生成的代理方法。
代理工厂AbstractProxyFactory有两个子类:
JdkProxyFactory
JavassistProxyFactory
通过下面源码我们可以分析得到,DUBBO通过InvokerInvocationHandler对象代理了invoker对象:
public class JdkProxyFactory extends AbstractProxyFactory {
@Override
public T getProxy(Invoker invoker, Class>[] interfaces) {
return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, new InvokerInvocationHandler(invoker));
}
}
public class JavassistProxyFactory extends AbstractProxyFactory {
@Override
public T getProxy(Invoker invoker, Class>[] interfaces) {
return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
}
}
InvokerInvocationHandler将参数信息封装至RpcInvocation进行传递:
public class InvokerInvocationHandler implements InvocationHandler {
private final Invoker> invoker;
public InvokerInvocationHandler(Invoker> handler) {
this.invoker = handler;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
String methodName = method.getName();
Class>[] parameterTypes = method.getParameterTypes();
if (method.getDeclaringClass() == Object.class) {
return method.invoke(invoker, args);
}
if ("toString".equals(methodName) && parameterTypes.length == 0) {
return invoker.toString();
}
if ("hashCode".equals(methodName) && parameterTypes.length == 0) {
return invoker.hashCode();
}
if ("equals".equals(methodName) && parameterTypes.length == 1) {
return invoker.equals(args[0]);
}
// RpcInvocation [methodName=sayHello, parameterTypes=[class java.lang.String], arguments=[JAVA前线], attachments={}]
RpcInvocation rpcInvocation = createInvocation(method, args);
return invoker.invoke(rpcInvocation).recreate();
}
private RpcInvocation createInvocation(Method method, Object[] args) {
RpcInvocation invocation = new RpcInvocation(method, args);
if (RpcUtils.hasFutureReturnType(method)) {
invocation.setAttachment(Constants.FUTURE_RETURNTYPE_KEY, "true");
invocation.setAttachment(Constants.ASYNC_KEY, "true");
}
return invocation;
}
}
3 策略模式
在1995年出版的《设计模式:可复用面向对象软件的基础》给出了策略模式定义:
Define a family of algorithms, encapsulate each one, and make them interchangeable. Strategy lets the algorithm vary independently from clients that use it
定义一系列算法,封装每一个算法,并使它们可以互换。策略模式可以使算法的变化独立于使用它们的客户端代码。
在设计模式原则中有一条开闭原则:对扩展开放,对修改关闭,我认为这是设计模式中最重要设计原则原因如下:
(1) 当需求变化时应该通过扩展而不是通过修改已有代码来实现变化,这样就保证代码的稳定性,避免牵一发而动全身
(2) 扩展也不是随意扩展,因为事先定义了算法,扩展也是根据算法扩展,体现了用抽象构建框架,用实现扩展细节
(3) 标准意义的二十三种设计模式说到底最终都是在遵循开闭原则
3.1 策略模式实例
假设我们现在需要解析一段文本,这段文本有可能是HTML也有可能是TEXT,如果不使用策略模式应该怎么写呢?
public enum DocTypeEnum {
HTML(1, "HTML"),
TEXT(2, "TEXT");
private int value;
private String description;
private DocTypeEnum(int value, String description) {
this.value = value;
this.description = description;
}
public int value() {
return value;
}
}
public class ParserManager {
public void parse(Integer docType, String content) {
// 文本类型是HTML
if(docType == DocTypeEnum.HTML.getValue()) {
// 解析逻辑
}
// 文本类型是TEXT
else if(docType == DocTypeEnum.TEXT.getValue()) {
// 解析逻辑
}
}
}
这种写法功能上没有问题,但是当本文类型越来越多时,那么parse方法将会越来越冗余和复杂,if else代码块也会越来越多,所以我们要使用策略模式。
第一步定义业务类型和业务实体:
public enum DocTypeEnum {
HTML(1, "HTML"),
TEXT(2, "TEXT");
private int value;
private String description;
private DocTypeEnum(int value, String description) {
this.value = value;
this.description = description;
}
public int value() {
return value;
}
}
public class BaseModel {
// 公共字段
}
public class HtmlContentModel extends BaseModel {
// HTML自定义字段
}
public class TextContentModel extends BaseModel {
// TEXT自定义字段
}
第二步定义策略:
public interface Strategy<T extends BaseModel> {
public T parse(String sourceContent);
}
@Service
public class HtmlStrategy implements Strategy {
@Override
public HtmlContentModel parse(String sourceContent) {
return new HtmlContentModel("html");
}
}
@Service
public class TextStrategy implements Strategy {
@Override
public TextContentModel parse(String sourceContent) {
return new TextContentModel("text");
}
}
第三步定义策略工厂:
@Service
public class StrategyFactory implements InitializingBean {
private Map strategyMap = new HashMap<>();
@Resource
private Strategy htmlStrategy ;
@Resource
private Strategy textStrategy ;
@Override
public void afterPropertiesSet() throws Exception {
strategyMap.put(RechargeTypeEnum.HTML.value(), htmlStrategy);
strategyMap.put(RechargeTypeEnum.TEXT.value(),textStrategy);
}
public Strategy getStrategy(int type) {
return strategyMap.get(type);
}
}
第四步定义策略执行器:
@Service
public class StrategyExecutor<T extends BaseModel> {
@Resource
private StrategyFactory strategyFactory;
public T parse(String sourceContent, Integer type) {
Strategy strategy = StrategyFactory.getStrategy(type);
return strategy.parse(sourceContent);
}
}
第五步执行测试用例:
public class Test {
@Resource
private StrategyExecutor executor;
@Test
public void test() {
// 解析HTML
HtmlContentModel content1 = (HtmlContentModel) executor.parse("测试内容", DocTypeEnum.HTML.value());
System.out.println(content1);
// 解析TEXT
TextContentModel content2 = (TextContentModel)executor.calRecharge("测试内容", DocTypeEnum.TEXT.value());
System.out.println(content2);
}
}
如果新增文本类型我们再扩展新策略即可。我们再回顾策略模式定义会有更深的体会:定义一系列算法,封装每一个算法,并使它们可以互换。策略模式可以使算法的变化独立于使用它们的客户端代码。
3.2 DUBBO源码应用
在上述实例中我们将策略存储在map容器,我们思考一下还有没有其它地方可以存储策略?答案是配置文件。下面就要介绍SPI机制,我认为这个机制在广义上实现了策略模式。
SPI(Service Provider Interface)是一种服务发现机制,本质是将接口实现类的全限定名配置在文件中,并由服务加载器读取配置文件加载实现类,这样可以在运行时动态为接口替换实现类,我们通过SPI机制可以为程序提供拓展功能。
3.2.1 JDK SPI
我们首先分析JDK自身SPI机制,定义一个数据驱动接口并提供两个驱动实现,最后通过serviceLoader加载驱动。
(1) 新建DataBaseDriver工程并定义接口
public interface DataBaseDriver {
String connect(String hostIp);
}
(2) 打包这个工程为JAR
<dependency>
<groupId>com.javafont.spigroupId>
<artifactId>DataBaseDriverartifactId>
<version>1.0.0-SNAPSHOTversion>
dependency>
(3) 新建MySQLDriver工程引用上述依赖并实现DataBaseDriver接口
import com.javafont.database.driver.DataBaseDriver;
public class MySQLDataBaseDriver implements DataBaseDriver {
@Override
public String connect(String hostIp) {
return "MySQL DataBase Driver connect";
}
}
(4) 在MySQLDriver项目新建文件
src/main/resources/META-INF/services/com.javafont.database.driver.DataBaseDriver
(5) 在上述文件新增如下内容
com.javafont.database.mysql.driver.MySQLDataBaseDriver
(6) 按照上述相同步骤创建工程OracleDriver
(7) 打包上述两个项目
<dependency>
<groupId>com.javafont.spigroupId>
<artifactId>MySQLDriverartifactId>
<version>1.0.0-SNAPSHOTversion>
dependency>
<dependency>
<groupId>com.javafont.spigroupId>
<artifactId>OracleDriverartifactId>
<version>1.0.0-SNAPSHOTversion>
dependency>
(8) 新建测试项目引入上述MySQLDriver、OracleDriver
public class DataBaseConnector {
public static void main(String[] args) {
ServiceLoader serviceLoader = ServiceLoader.load(DataBaseDriver.class);
Iterator iterator = serviceLoader.iterator();
while (iterator.hasNext()) {
DataBaseDriver driver = iterator.next();
System.out.println(driver.connect("localhost"));
}
}
}
// 输出结果
// MySQL DataBase Driver connect
// Oracle DataBase Driver connect
我们并没有指定使用哪个驱动连接数据库,而是通过ServiceLoader方式加载所有实现了DataBaseDriver接口的实现类。假设我们只需要使用MySQL数据库驱动那么直接引入相应依赖即可。
3.2.2 DUBBO SPI
我们发现JDK SPI机制还是有一些不完善之处:例如通过ServiceLoader会加载所有实现了某个接口的实现类,但是不能通过一个key去指定获取哪一个实现类,但是DUBBO自己实现的SPI机制解决了这个问题。
例如Protocol接口有如下实现类:
org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
我们现在将这些类配置信息在配置文件,配置文件在如下目录:
META-INF/services/
META-INF/dubbo/
META-INF/dubbo/internal/
配置方式和JDK SPI方式配置不一样,每个实现类都有key与之对应:
dubbo=org.apache.dubbo.rpc.protocol.dubbo.DubboProtocol
injvm=org.apache.dubbo.rpc.protocol.injvm.InjvmProtocol
使用时通过扩展点方式加载实现类:
public class ReferenceConfig<T> extends AbstractReferenceConfig {
private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private T createProxy(Map map) {
if (isJvmRefer) {
URL url = new URL(Constants.LOCAL_PROTOCOL, Constants.LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
}
}
}
getAdaptiveExtension()是加载自适应扩展点,javassist会为自适应扩展点生成动态代码:
public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
if (arg1 == null)
throw new IllegalArgumentException("url == null");
org.apache.dubbo.common.URL url = arg1;
String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
if (extName == null)
throw new IllegalStateException("Fail to get extension(org.apache.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])");
org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
return extension.refer(arg0, arg1);
}
}
extension对象就是根据url中protocol属性等于injvm最终加载InjvmProtocol对象,动态获取到了我们制定的业务对象,所以我认为SPI体现了策略模式。
4 装饰器模式
装饰器模式可以动态将责任附加到对象上,在不改变原始类接口情况下,对原始类功能进行增强,并且支持多个装饰器的嵌套使用。实现装饰器模式需要以下组件:
(1) Component(抽象构件)
核心业务抽象:可以使用接口或者抽象类
(2) ConcreteComponent(具体构件)
实现核心业务:最终执行的业务代码
(3) Decorator(抽象装饰器)
抽象装饰器类:实现Component并且组合一个Component对象
(4) ConcreteDecorator(具体装饰器)
具体装饰内容:装饰核心业务代码
4.1 装饰器实例
有一名足球运动员要去踢球,我们用球鞋和球袜为他装饰一番,这样可以使其战力值增加,我们使用装饰器模式实现这个实例。
(1) Component
/**
* 抽象构件(可以用接口替代)
*/
public abstract class Component {
/**
* 踢足球(业务核心方法)
*/
public abstract void playFootBall();
}
(2) ConcreteComponent
/**
* 具体构件
*/
public class ConcreteComponent extends Component {
@Override
public void playFootBall() {
System.out.println("球员踢球");
}
}
(3) Decorator
/**
* 抽象装饰器
*/
public abstract class Decorator extends Component {
private Component component = null;
public Decorator(Component component) {
this.component = component;
}
@Override
public void playFootBall() {
this.component.playFootBall();
}
}
(4) ConcreteDecorator
/**
* 球袜装饰器
*/
public class ConcreteDecoratorA extends Decorator {
public ConcreteDecoratorA(Component component) {
super(component);
}
/**
* 定义球袜装饰逻辑
*/
private void decorateMethod() {
System.out.println("换上球袜战力值增加");
}
/**
* 重写父类方法
*/
@Override
public void playFootBall() {
this.decorateMethod();
super.playFootBall();
}
}
/**
* 球鞋装饰器
*/
public class ConcreteDecoratorB extends Decorator {
public ConcreteDecoratorB(Component component) {
super(component);
}
/**
* 定义球鞋装饰逻辑
*/
private void decorateMethod() {
System.out.println("换上球鞋战力值增加");
}
/**
* 重写父类方法
*/
@Override
public void playFootBall() {
this.decorateMethod();
super.playFootBall();
}
}
(5) 运行测试
public class TestDecoratorDemo {
public static void main(String[] args) {
Component component = new ConcreteComponent();
component = new ConcreteDecoratorA(component);
component = new ConcreteDecoratorB(component);
component.playFootBall();
}
}
// 换上球鞋战力值增加
// 换上球袜战力值增加
// 球员踢球
4.2 DUBBO源码应用
DUBBO是通过SPI机制实现装饰器模式,我们以Protocol接口进行分析,首先分析装饰器类,抽象装饰器核心要点是实现了Component并且组合一个Component对象。
public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol;
public ProtocolFilterWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
}
public class ProtocolListenerWrapper implements Protocol {
private final Protocol protocol;
public ProtocolListenerWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
}
在配置文件中配置装饰器:
filter=org.apache.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=org.apache.dubbo.rpc.protocol.ProtocolListenerWrapper
通过SPI机制加载扩展点时会使用装饰器装饰具体构件:
public class ReferenceConfig<T> extends AbstractReferenceConfig {
private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();
private T createProxy(Map map) {
if (isJvmRefer) {
URL url = new URL(Constants.LOCAL_PROTOCOL, Constants.LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
invoker = refprotocol.refer(interfaceClass, url);
if (logger.isInfoEnabled()) {
logger.info("Using injvm service " + interfaceClass.getName());
}
}
}
}
最终生成refprotocol为如下对象:
ProtocolFilterWrapper(ProtocolListenerWrapper(InjvmProtocol))
5 责任链模式
责任链模式将请求发送和接收解耦,让多个接收对象都有机会处理这个请求。这些接收对象串成一条链路并沿着这条链路传递这个请求,直到链路上某个接收对象能够处理它。我们介绍责任链模式两种应用场景和四种代码实现方式,最后介绍了DUBBO如何应用责任链构建过滤器链路。
5.1 应用场景:命中立即中断
实现一个关键词过滤功能。系统设置三个关键词过滤器,输入内容命中任何一个过滤器规则就返回校验不通过,链路立即中断无需继续进行。
(1) 实现方式一
public interface ContentFilter {
public boolean filter(String content);
}
public class AaaContentFilter implements ContentFilter {
private final static String KEY_CONTENT = "aaa";
@Override
public boolean filter(String content) {
boolean isValid = Boolean.FALSE;
if (StringUtils.isEmpty(content)) {
return isValid;
}
isValid = !content.contains(KEY_CONTENT);
return isValid;
}
}
public class BbbContentFilter implements ContentFilter {
private final static String KEY_CONTENT = "bbb";
@Override
public boolean filter(String content) {
boolean isValid = Boolean.FALSE;
if (StringUtils.isEmpty(content)) {
return isValid;
}
isValid = !content.contains(KEY_CONTENT);
return isValid;
}
}
public class CccContentFilter implements ContentFilter {
private final static String KEY_CONTENT = "ccc";
@Override
public boolean filter(String content) {
boolean isValid = Boolean.FALSE;
if (StringUtils.isEmpty(content)) {
return isValid;
}
isValid = !content.contains(KEY_CONTENT);
return isValid;
}
}
具体过滤器已经完成,我们下面构造过滤器责任链路:
@Service
public class ContentFilterChain {
private List filters = new ArrayList();
@PostConstruct
public void init() {
ContentFilter aaaContentFilter = new AaaContentFilter();
ContentFilter bbbContentFilter = new BbbContentFilter();
ContentFilter cccContentFilter = new CccContentFilter();
filters.add(aaaContentFilter);
filters.add(bbbContentFilter);
filters.add(cccContentFilter);
}
public void addFilter(ContentFilter filter) {
filters.add(filter);
}
public boolean filter(String content) {
if (CollectionUtils.isEmpty(filters)) {
throw new RuntimeException("ContentFilterChain is empty");
}
for (ContentFilter filter : filters) {
boolean isValid = filter.filter(content);
if (!isValid) {
System.out.println("校验不通过");
return isValid;
}
}
return Boolean.TRUE;
}
}
public class Test {
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/chain/spring-core.xml" });
ContentFilterChain chain = (ContentFilterChain) context.getBean("contentFilterChain");
System.out.println(context);
boolean result1 = chain.filter("ccc");
boolean result2 = chain.filter("ddd");
System.out.println("校验结果1=" + result1);
System.out.println("校验结果2=" + result2);
}
}
(2) 实现方式二
public abstract class FilterHandler {
/** 下一个节点 **/
protected FilterHandler successor = null;
public void setSuccessor(FilterHandler successor) {
this.successor = successor;
}
public final boolean filter(String content) {
/** 执行自身方法 **/
boolean isValid = doFilter(content);
if (!isValid) {
System.out.println("校验不通过");
return isValid;
}
/** 执行下一个节点链路 **/
if (successor != null && this != successor) {
isValid = successor.filter(content);
}
return isValid;
}
/** 每个节点过滤方法 **/
protected abstract boolean doFilter(String content);
}
public class AaaContentFilterHandler extends FilterHandler {
private final static String KEY_CONTENT = "aaa";
@Override
protected boolean doFilter(String content) {
boolean isValid = Boolean.FALSE;
if (StringUtils.isEmpty(content)) {
return isValid;
}
isValid = !content.contains(KEY_CONTENT);
return isValid;
}
}
// 省略其它过滤器代码
具体过滤器已经完成,我们下面构造过滤器责任链路:
@Service
public class FilterHandlerChain {
private FilterHandler head = null;
private FilterHandler tail = null;
@PostConstruct
public void init() {
FilterHandler aaaHandler = new AaaContentFilterHandler();
FilterHandler bbbHandler = new BbbContentFilterHandler();
FilterHandler cccHandler = new CccContentFilterHandler();
addHandler(aaaHandler);
addHandler(bbbHandler);
addHandler(cccHandler);
}
public void addHandler(FilterHandler handler) {
if (head == null) {
head = tail = handler;
}
/** 设置当前tail继任者 **/
tail.setSuccessor(handler);
/** 指针重新指向tail **/
tail = handler;
}
public boolean filter(String content) {
if (null == head) {
throw new RuntimeException("FilterHandlerChain is empty");
}
/** head发起调用 **/
return head.filter(content);
}
}
public class Test {
public static void main(String[] args) throws Exception {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/chain/spring-core.xml" });
FilterHandlerChain chain = (FilterHandlerChain) context.getBean("filterHandlerChain");
System.out.println(context);
boolean result1 = chain.filter("ccc");
boolean result2 = chain.filter("ddd");
System.out.println("校验结果1=" + result1);
System.out.println("校验结果2=" + result2);
}
}
5.2 应用场景:全链路执行
我们实现一个考题生成功能。在线考试系统根据不同年级生成不同考题。系统设置三个考题生成器,每个生成器都会执行,根据学生年级决定是否生成考题,无需生成则执行下一个生成器。
(1) 实现方式一
public interface QuestionGenerator {
public Question generateQuestion(String gradeInfo);
}
public class AaaQuestionGenerator implements QuestionGenerator {
@Override
public Question generateQuestion(String gradeInfo) {
if (!gradeInfo.equals("一年级")) {
return null;
}
Question question = new Question();
question.setId("aaa");
question.setScore(10);
return question;
}
}
// 省略其它生成器代码
具体生成器已经编写完成,我们下面构造生成器责任链路:
@Service
public class QuestionChain {
private List generators = new ArrayList();
@PostConstruct
public void init() {
QuestionGenerator aaaQuestionGenerator = new AaaQuestionGenerator();
QuestionGenerator bbbQuestionGenerator = new BbbQuestionGenerator();
QuestionGenerator cccQuestionGenerator = new CccQuestionGenerator();
generators.add(aaaQuestionGenerator);
generators.add(bbbQuestionGenerator);
generators.add(cccQuestionGenerator);
}
public List generate(String gradeInfo) {
if (CollectionUtils.isEmpty(generators)) {
throw new RuntimeException("QuestionChain is empty");
}
List questions = new ArrayList();
for (QuestionGenerator generator : generators) {
Question question = generator.generateQuestion(gradeInfo);
if (null == question) {
continue;
}
questions.add(question);
}
return questions;
}
}
public class Test {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/chain/spring-core.xml" });
System.out.println(context);
QuestionChain chain = (QuestionChain) context.getBean("questionChain");
List questions = chain.generate("一年级");
System.out.println(questions);
}
}
(2) 实现方式二
public abstract class GenerateHandler {
/** 下一个节点 **/
protected GenerateHandler successor = null;
public void setSuccessor(GenerateHandler successor) {
this.successor = successor;
}
public final List generate(String gradeInfo) {
List result = new ArrayList();
/** 执行自身方法 **/
Question question = doGenerate(gradeInfo);
if (null != question) {
result.add(question);
}
/** 执行下一个节点链路 **/
if (successor != null && this != successor) {
List successorQuestions = successor.generate(gradeInfo);
if (null != successorQuestions) {
result.addAll(successorQuestions);
}
}
return result;
}
/** 每个节点生成方法 **/
protected abstract Question doGenerate(String gradeInfo);
}
public class AaaGenerateHandler extends GenerateHandler {
@Override
protected Question doGenerate(String gradeInfo) {
if (!gradeInfo.equals("一年级")) {
return null;
}
Question question = new Question();
question.setId("aaa");
question.setScore(10);
return question;
}
}
// 省略其它生成器代码
具体生成器已经完成,我们下面构造生成器责任链路:
@Service
public class GenerateChain {
private GenerateHandler head = null;
private GenerateHandler tail = null;
@PostConstruct
public void init() {
GenerateHandler aaaHandler = new AaaGenerateHandler();
GenerateHandler bbbHandler = new BbbGenerateHandler();
GenerateHandler cccHandler = new CccGenerateHandler();
addHandler(aaaHandler);
addHandler(bbbHandler);
addHandler(cccHandler);
}
public void addHandler(GenerateHandler handler) {
if (head == null) {
head = tail = handler;
}
/** 设置当前tail继任者 **/
tail.setSuccessor(handler);
/** 指针重新指向tail **/
tail = handler;
}
public List generate(String gradeInfo) {
if (null == head) {
throw new RuntimeException("GenerateChain is empty");
}
/** head发起调用 **/
return head.generate(gradeInfo);
}
}
public class Test {
public static void main(String[] args) {
ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext(new String[] { "classpath*:META-INF/chain/spring-core.xml" });
GenerateChain chain = (GenerateChain) context.getBean("generateChain");
System.out.println(context);
List result = chain.generate("一年级");
System.out.println(result);
}
}
5.3 DUBBO源码应用
生产者和消费者最终执行对象都是过滤器链路最后一个节点,整个链路包含多个过滤器进行业务处理。我们看看生产者和消费者最终生成的过滤器链路。
生产者过滤器链路
EchoFilter > ClassloaderFilter > GenericFilter > ContextFilter > TraceFilter > TimeoutFilter > MonitorFilter > ExceptionFilter > AbstractProxyInvoker
消费者过滤器链路
ConsumerContextFilter > FutureFilter > MonitorFilter > DubboInvoker
ProtocolFilterWrapper作为链路生成核心通过匿名类方式构建过滤器链路,我们以消费者构建过滤器链路为例:
public class ProtocolFilterWrapper implements Protocol {
private static Invoker buildInvokerChain(final Invoker invoker, String key, String group) {
// invoker = DubboInvoker
Invoker last = invoker;
// 查询符合条件过滤器列表
List filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker next = last;
// 构造一个简化Invoker
last = new Invoker() {
@Override
public Class getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
// 构造过滤器链路
Result result = filter.invoke(next, invocation);
if (result instanceof AsyncRpcResult) {
AsyncRpcResult asyncResult = (AsyncRpcResult) result;
asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation));
return asyncResult;
} else {
return filter.onResponse(result, invoker, invocation);
}
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
@Override
public Invoker refer(Class type, URL url) throws RpcException {
// RegistryProtocol不构造过滤器链路
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
Invoker invoker = protocol.refer(type, url);
return buildInvokerChain(invoker, Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
}
}
6 保护性暂停模式
在多线程编程实践中我们肯定会面临线程间数据交互的问题。在处理这类问题时需要使用一些设计模式,从而保证程序的正确性和健壮性。
保护性暂停设计模式就是解决多线程间数据交互问题的一种模式。本文先从基础案例介绍保护性暂停基本概念和实践,再由浅入深,最终分析DUBBO源码中保护性暂停设计模式使用场景。
6.1 保护性暂停实例
我们设想这样一种场景:线程A生产数据,线程B读取数据这个数据。
但是有一种情况:线程B准备读取数据时,此时线程A还没有生产出数据。
在这种情况下线程B不能一直空转,也不能立即退出,线程B要等到生产数据完成并拿到数据之后才退出。
那么在数据没有生产出这段时间,线程B需要执行一种等待机制,这样可以达到对系统保护目的,这就是保护性暂停。
保护性暂停有多种实现方式,本文我们用synchronized/wait/notify的方式实现。
class Resource {
private MyData data;
private Object lock = new Object();
public MyData getData(int timeOut) {
synchronized (lock) {
// 运行时长
long timePassed = 0;
// 开始时间
long begin = System.currentTimeMillis();
// 如果结果为空
while (data == null) {
try {
// 如果运行时长大于超时时间退出循环
if (timePassed > timeOut) {
break;
}
// 如果运行时长小于超时时间表示虚假唤醒 -> 只需再等待时间差值
long waitTime = timeOut - timePassed;
// 等待时间差值
lock.wait(waitTime);
// 结果不为空直接返回
if (data != null) {
break;
}
// 被唤醒后计算运行时长
timePassed = System.currentTimeMillis() - begin;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (data == null) {
throw new RuntimeException("超时未获取到结果");
}
return data;
}
}
public void sendData(MyData data) {
synchronized (lock) {
this.data = data;
lock.notifyAll();
}
}
}
/**
* 保护性暂停实例
*/
public class ProtectDesignTest {
public static void main(String[] args) {
Resource resource = new Resource();
new Thread(() -> {
try {
MyData data = new MyData("hello");
System.out.println(Thread.currentThread().getName() + "生产数据=" + data);
// 模拟发送耗时
TimeUnit.SECONDS.sleep(3);
resource.sendData(data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}, "t1").start();
new Thread(() -> {
MyData data = resource.getData(1000);
System.out.println(Thread.currentThread().getName() + "接收到数据=" + data);
}, "t2").start();
}
}
6.2 加一个编号
现在再来设想一个场景:现在有三个生产数据的线程1、2、3,三个获取数据的线程4、5、6,我们希望每个获取数据线程都只拿到其中一个生产线程的数据,不能多拿也不能少拿。
这里引入一个Futures模型,这个模型为每个资源进行编号并存储在容器中,例如线程1生产的数据被拿走则从容器中删除,一直到容器为空结束。
@Getter
@Setter
public class MyNewData implements Serializable {
private static final long serialVersionUID = 1L;
private static final AtomicLong ID = new AtomicLong(0);
private Long id;
private String message;
public MyNewData(String message) {
this.id = newId();
this.message = message;
}
/**
* 自增到最大值会回到最小值(负值可以作为识别ID)
*/
private static long newId() {
return ID.getAndIncrement();
}
public Long getId() {
return this.id;
}
}
class MyResource {
private MyNewData data;
private Object lock = new Object();
public MyNewData getData(int timeOut) {
synchronized (lock) {
long timePassed = 0;
long begin = System.currentTimeMillis();
while (data == null) {
try {
if (timePassed > timeOut) {
break;
}
long waitTime = timeOut - timePassed;
lock.wait(waitTime);
if (data != null) {
break;
}
timePassed = System.currentTimeMillis() - begin;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if (data == null) {
throw new RuntimeException("超时未获取到结果");
}
return data;
}
}
public void sendData(MyNewData data) {
synchronized (lock) {
this.data = data;
lock.notifyAll();
}
}
}
class MyFutures {
private static final Map FUTURES = new ConcurrentHashMap<>();
public static MyResource newResource(MyNewData data) {
final MyResource future = new MyResource();
FUTURES.put(data.getId(), future);
return future;
}
public static MyResource getResource(Long id) {
return FUTURES.remove(id);
}
public static Set getIds() {
return FUTURES.keySet();
}
}
/**
* 保护性暂停实例
*/
public class ProtectDesignTest {
public static void main(String[] args) throws Exception {
for (int i = 0; i < 3; i++) {
final int index = i;
new Thread(() -> {
try {
MyNewData data = new MyNewData("hello_" + index);
MyResource resource = MyFutures.newResource(data);
// 模拟发送耗时
TimeUnit.SECONDS.sleep(1);
resource.sendData(data);
System.out.println("生产数据data=" + data);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
TimeUnit.SECONDS.sleep(1);
for (Long i : MyFutures.getIds()) {
final long index = i;
new Thread(() -> {
MyResource resource = MyFutures.getResource(index);
int timeOut = 3000;
System.out.println("接收数据data=" + resource.getData(timeOut));
}).start();
}
}
}
6.3 DUBBO源码应用
我们顺着这一个链路跟踪代码:消费者发送请求 > 提供者接收请求并执行,并且将运行结果发送给消费者 > 消费者接收结果。
(1) 消费者发送请求
消费者发送的数据包含请求ID,并且将关系维护进FUTURES容器
final class HeaderExchangeChannel implements ExchangeChannel {
@Override
public ResponseFuture request(Object request, int timeout) throws RemotingException {
if (closed) {
throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
}
Request req = new Request();
req.setVersion(Version.getProtocolVersion());
req.setTwoWay(true);
req.setData(request);
DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout);
try {
channel.send(req);
} catch (RemotingException e) {
future.cancel();
throw e;
}
return future;
}
}
class DefaultFuture implements ResponseFuture {
// FUTURES容器
private static final Map FUTURES = new ConcurrentHashMap<>();
private DefaultFuture(Channel channel, Request request, int timeout) {
this.channel = channel;
this.request = request;
// 请求ID
this.id = request.getId();
this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);
FUTURES.put(id, this);
CHANNELS.put(id, channel);
}
}
(2) 提供者接收请求并执行,并且将运行结果发送给消费者
public class HeaderExchangeHandler implements ChannelHandlerDelegate {
void handleRequest(final ExchangeChannel channel, Request req) throws RemotingException {
// response与请求ID对应
Response res = new Response(req.getId(), req.getVersion());
if (req.isBroken()) {
Object data = req.getData();
String msg;
if (data == null) {
msg = null;
} else if (data instanceof Throwable) {
msg = StringUtils.toString((Throwable) data);
} else {
msg = data.toString();
}
res.setErrorMessage("Fail to decode request due to: " + msg);
res.setStatus(Response.BAD_REQUEST);
channel.send(res);
return;
}
// message = RpcInvocation包含方法名、参数名、参数值等
Object msg = req.getData();
try {
// DubboProtocol.reply执行实际业务方法
CompletableFuture
(3) 消费者接收结果
以下DUBBO源码很好体现了保护性暂停这个设计模式,说明参看注释
class DefaultFuture implements ResponseFuture {
private final Lock lock = new ReentrantLock();
private final Condition done = lock.newCondition();
public static void received(Channel channel, Response response) {
try {
// 取出对应的请求对象
DefaultFuture future = FUTURES.remove(response.getId());
if (future != null) {
future.doReceived(response);
} else {
logger.warn("The timeout response finally returned at "
+ (new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date()))
+ ", response " + response
+ (channel == null ? "" : ", channel: " + channel.getLocalAddress()
+ " -> " + channel.getRemoteAddress()));
}
} finally {
CHANNELS.remove(response.getId());
}
}
@Override
public Object get(int timeout) throws RemotingException {
if (timeout <= 0) {
timeout = Constants.DEFAULT_TIMEOUT;
}
if (!isDone()) {
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
// 放弃锁并使当前线程阻塞,直到发出信号中断它或者达到超时时间
done.await(timeout, TimeUnit.MILLISECONDS);
// 阻塞结束后再判断是否完成
if (isDone()) {
break;
}
// 阻塞结束后判断是否超时
if(System.currentTimeMillis() - start > timeout) {
break;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
// response对象仍然为空则抛出超时异常
if (!isDone()) {
throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false));
}
}
return returnFromResponse();
}
private void doReceived(Response res) {
lock.lock();
try {
// 接收到服务器响应赋值response
response = res;
if (done != null) {
// 唤醒get方法中处于等待的代码块
done.signal();
}
} finally {
lock.unlock();
}
if (callback != null) {
invokeCallback(callback);
}
}
}
7 双重检查锁模式
单例设计模式可以保证在整个应用某个类只能存在一个对象实例,并且这个类只提供一个取得其对象实例方法,通常这个对象创建和销毁比较消耗资源,例如数据库连接对象等等。我们分析一个双重检查锁实现的单例模式实例。
public class MyDCLConnection {
private static volatile MyDCLConnection myConnection = null;
private MyDCLConnection() {
System.out.println(Thread.currentThread().getName() + " -> init connection");
}
public static MyDCLConnection getConnection() {
if (null == myConnection) {
synchronized (MyDCLConnection.class) {
if (null == myConnection) {
myConnection = new MyDCLConnection();
}
}
}
return myConnection;
}
}
在DUBBO服务本地暴露时使用了双重检查锁模式判断exporter是否已经存在避免重复创建:
public class RegistryProtocol implements Protocol {
private ExporterChangeableWrapper doLocalExport(final Invoker originInvoker, URL providerUrl) {
String key = getCacheKey(originInvoker);
ExporterChangeableWrapper exporter = (ExporterChangeableWrapper) bounds.get(key);
if (exporter == null) {
synchronized (bounds) {
exporter = (ExporterChangeableWrapper) bounds.get(key);
if (exporter == null) {
final Invoker> invokerDelegete = new InvokerDelegate(originInvoker, providerUrl);
final Exporter strongExporter = (Exporter) protocol.export(invokerDelegete);
exporter = new ExporterChangeableWrapper(strongExporter, originInvoker);
bounds.put(key, exporter);
}
}
}
return exporter;
}
}
8 文章总结
本文我们结合DUBBO源码分析了模板方法模式、动态代理模式、策略模式、装饰器模式、责任链模式、保护性暂停模式、双重检查锁模式,我认为在阅读源码时要学习其中优秀的设计模式和代码实例,这样有助于提高代码水平,希望本文对大家有所帮助。
JAVA前线
欢迎大家关注公众号「JAVA前线」查看更多精彩分享,主要包括源码分析、实际应用、架构思维、职场分享、产品思考等等,同时也非常欢迎大家加我微信「java_front」一起交流学习