DUBBO泛化调用原理与设计思想
共 11181字,需浏览 23分钟
·
2022-01-11 16:51
JAVA前线
欢迎大家关注公众号「JAVA前线」查看更多精彩分享,主要包括源码分析、实际应用、架构思维、职场分享、产品思考等等,同时也非常欢迎大家加我微信「java_front」一起交流学习
1 泛化调用实例
对于JAVA服务端开发者而言在使用Dubbo时并不经常使用泛化调用,通常方法是在生产者发布服务之后,消费者可以通过引入生产者提供的client进行调用。那么泛化调用使用场景是什么呢?
第一种场景是消费者不希望引入生产者提供的client依赖,只希望关注调用哪个方法,需要传什么参数即可。第二种场景是消费者不是使用Java语言,而是使用例如Python语言,那么如何调用使用Java语言生产者提供的服务呢?这时我们可以选择泛化调用。
泛化调用使用方法并不复杂,下面我们编写一个泛化调用实例。首先生产者发布服务,这与普通服务发布没有任何区别。
package com.java.front.dubbo.demo.provider;
public interface HelloService {
public String sayHelloGeneric(Person person, String message);
}
public class HelloServiceImpl implements HelloService {
@Override
public String sayHelloGeneric(Person person, String message) throws Exception {
String result = "hello[" + person + "],message=" + message;
return result;
}
}
Person类声明:
package com.java.front.dubbo.demo.provider.model;
public class Person implements Serializable {
private String name;
}
provider.xml文件内容:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:dubbo="http://code.alibabatech.com/schema/dubbo"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://code.alibabatech.com/schema/dubbo
http://code.alibabatech.com/schema/dubbo/dubbo.xsd">
<dubbo:application name="java-front-provider" />
<dubbo:registry address="zookeeper://127.0.0.1:2181" />
<dubbo:protocol name="dubbo" port="9999" />
<bean id="helloService" class="com.java.front.dubbo.demo.provider.HelloServiceImpl" />
<dubbo:service interface="com.java.front.dubbo.demo.provider.HelloService" ref="helloService" />
beans>
消费者代码有所不同:
import org.apache.dubbo.config.ApplicationConfig;
import org.apache.dubbo.config.ReferenceConfig;
import org.apache.dubbo.config.RegistryConfig;
import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.service.GenericService;
public class Consumer {
public static void testGeneric() {
ReferenceConfig reference = new ReferenceConfig();
reference.setApplication(new ApplicationConfig("java-front-consumer"));
reference.setRegistry(new RegistryConfig("zookeeper://127.0.0.1:2181"));
reference.setInterface("com.java.front.dubbo.demo.provider.HelloService");
reference.setGeneric(true);
GenericService genericService = reference.get();
Map person = new HashMap();
person.put("name", "微信公众号「JAVA前线」");
String message = "你好";
Object result = genericService.$invoke("sayHelloGeneric", new String[] { "com.java.front.dubbo.demo.provider.model.Person", "java.lang.String" }, new Object[] { person, message });
System.out.println(result);
}
}
2 Invoker
我们通过源码分析讲解泛化调用原理,首先需要了解Invoker这个Dubbo重量级概念。
在生产者暴露服务流程总体分为两步,第一步是接口实现类转换为Invoker,第二步是Invoker转换为Exporter并放入ExporterMap,我们首先分析生产者暴露服务流程:
生产者通过ProxyFactory.getInvoker方法创建Invoker(AbstractProxyInvoker):
public class JdkProxyFactory extends AbstractProxyFactory {
@Override
public Invoker getInvoker(T proxy, Class type, URL url) {
return new AbstractProxyInvoker(proxy, type, url) {
@Override
protected Object doInvoke(T proxy, String methodName,
Class>[] parameterTypes,
Object[] arguments) throws Throwable {
// proxy为被代理对象 -> com.java.front.dubbo.demo.provider.HelloServiceImpl
Method method = proxy.getClass().getMethod(methodName, parameterTypes);
return method.invoke(proxy, arguments);
}
};
}
}
我们再分析消费者引用服务流程:
消费者Invoker通过显示实例化创建,例如本地暴露和远程暴露都是通过显示初始化的方法创建Invoker(AbstractInvoker):
new InjvmInvoker(serviceType, url, url.getServiceKey(), exporterMap)
new DubboInvoker(serviceType, url, getClients(url), invokers)
再通过ProxyFactory.getProxy创建代理:
public class JdkProxyFactory extends AbstractProxyFactory {
@Override
public T getProxy(Invoker invoker, Class>[] interfaces) {
InvokerInvocationHandler invokerInvocationHandler = new InvokerInvocationHandler(invoker);
return (T) Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), interfaces, invokerInvocationHandler);
}
}
无论是生产者还是消费者的Invoker都实现自org.apache.dubbo.rpc.Invoker:
public abstract class AbstractInvoker<T> implements Invoker<T>{
}
public abstract class AbstractProxyInvoker<T> implements Invoker<T> {
}
3 责任链模式
为什么生产者和消费者都要转换为Invoker,而不是不直接调用呢?我认为Invoker正是Dubbo设计精彩之处:真实调用都转换为Invoker,这样就可以通过责任链模式增强Invoker功能。
public class ProtocolFilterWrapper implements Protocol {
@Override
public Exporter export(Invoker invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
// 增加过滤器链
Invoker invokerChain = buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER);
return protocol.export(invokerChain);
}
@Override
public Invoker refer(Class type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
// 增加过滤器链
Invoker invoker = protocol.refer(type, url);
Invoker result = buildInvokerChain(invoker, Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
return result;
}
}
无论是生产者还是消费者都会创建过滤器链,我们看看buildInvokerChain这个方法:
public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol;
public ProtocolFilterWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
}
private static Invoker buildInvokerChain(final Invoker invoker, String key, String group) {
Invoker last = invoker;
// (1)加载所有包含Activate注解的过滤器
// (2)根据group过滤得到过滤器列表
// (3)Invoker最终被放到过滤器链尾部
List filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (!filters.isEmpty()) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker next = last;
// 构造一个简化Invoker
last = new Invoker() {
@Override
public Class getInterface() {
return invoker.getInterface();
}
@Override
public URL getUrl() {
return invoker.getUrl();
}
@Override
public boolean isAvailable() {
return invoker.isAvailable();
}
@Override
public Result invoke(Invocation invocation) throws RpcException {
// 构造过滤器链路
Result result = filter.invoke(next, invocation);
if (result instanceof AsyncRpcResult) {
AsyncRpcResult asyncResult = (AsyncRpcResult) result;
asyncResult.thenApplyWithContext(r -> filter.onResponse(r, invoker, invocation));
return asyncResult;
} else {
return filter.onResponse(result, invoker, invocation);
}
}
@Override
public void destroy() {
invoker.destroy();
}
@Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
}
}
加载所有包含Activate注解的过滤器,根据group过滤得到过滤器列表,Invoker最终被放到过滤器链尾部,生产者最终生成链路:
EchoFilter -> ClassloaderFilter -> GenericFilter -> ContextFilter -> TraceFilter -> TimeoutFilter -> MonitorFilter -> ExceptionFilter -> AbstractProxyInvoker
消费者最终生成链路:
ConsumerContextFilter -> FutureFilter -> MonitorFilter -> GenericImplFilter -> DubboInvoker
4 泛化调用原理
我们终于看到了泛化调用核心原理,在生产者链路看到GenericFilter过滤器,消费者链路看到GenericImplFilter过滤器,正是这两个过滤器实现了泛化调用。
(1) GenericImplFilter
@Activate(group = Constants.CONSUMER, value = Constants.GENERIC_KEY, order = 20000)
public class GenericImplFilter implements Filter {
@Override
public Result invoke(Invoker> invoker, Invocation invocation) throws RpcException {
// 方法名=$invoke
// invocation.getArguments()=("sayHelloGeneric", new String[] { "com.java.front.dubbo.demo.provider.model.Person", "java.lang.String" }, new Object[] { person, "你好" });
if (invocation.getMethodName().equals(Constants.$INVOKE)
&& invocation.getArguments() != null
&& invocation.getArguments().length == 3
&& ProtocolUtils.isGeneric(generic)) {
// 第一个参数表示方法名
// 第二个参数表示参数类型
// 第三个参数表示参数值 -> [{name=微信公众号「JAVA前线」},你好]
Object[] args = (Object[]) invocation.getArguments()[2];
if (ProtocolUtils.isJavaGenericSerialization(generic)) {
for (Object arg : args) {
if (!(byte[].class == arg.getClass())) {
error(generic, byte[].class.getName(), arg.getClass().getName());
}
}
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
for (Object arg : args) {
if (!(arg instanceof JavaBeanDescriptor)) {
error(generic, JavaBeanDescriptor.class.getName(), arg.getClass().getName());
}
}
}
// 附加参数generic值设置为true
((RpcInvocation) invocation).setAttachment(Constants.GENERIC_KEY, invoker.getUrl().getParameter(Constants.GENERIC_KEY));
}
// 继续执行过滤器链路
return invoker.invoke(invocation);
}
}
(2) GenericFilter
@Activate(group = Constants.PROVIDER, order = -20000)
public class GenericFilter implements Filter {
@Override
public Result invoke(Invoker> invoker, Invocation inv) throws RpcException {
// RpcInvocation[methodName=$invoke, parameterTypes=[class java.lang.String, class [Ljava.lang.String;, class [Ljava.lang.Object;], arguments=[sayHelloGeneric, [Ljava.lang.String;@14e77f6b, [Ljava.lang.Object;@51e5f393], attachments={path=com.java.front.dubbo.demo.provider.HelloService, input=451, dubbo=2.0.2, interface=com.java.front.dubbo.demo.provider.HelloService, version=0.0.0, generic=true}]
if (inv.getMethodName().equals(Constants.$INVOKE)
&& inv.getArguments() != null
&& inv.getArguments().length == 3
&& !GenericService.class.isAssignableFrom(invoker.getInterface())) {
// sayHelloGeneric
String name = ((String) inv.getArguments()[0]).trim();
// [com.java.front.dubbo.demo.provider.model.Person, java.lang.String]
String[] types = (String[]) inv.getArguments()[1];
// [{name=微信公众号「JAVA前线」}, 你好]
Object[] args = (Object[]) inv.getArguments()[2];
// RpcInvocation[methodName=sayHelloGeneric, parameterTypes=[class com.java.front.dubbo.demo.provider.model.Person, class java.lang.String], arguments=[Person(name=JAVA前线), abc], attachments={path=com.java.front.dubbo.demo.provider.HelloService, input=451, dubbo=2.0.2, interface=com.java.front.dubbo.demo.provider.HelloService, version=0.0.0, generic=true}]
RpcInvocation rpcInvocation = new RpcInvocation(method, args, inv.getAttachments());
Result result = invoker.invoke(rpcInvocation);
}
}
}
5 文章总结
第一本文介绍了如何使用泛化调用,并引出泛化调用为什么生效这个问题。
第二本文介绍了重量级概念Invoker,并引出为什么Dubbo要创建Invoker这个问题。
第三通过源码分析知道了过滤器链增强了Invoker功能,并且是实现泛化调用的核心。
JAVA前线
欢迎大家关注公众号「JAVA前线」查看更多精彩分享,主要包括源码分析、实际应用、架构思维、职场分享、产品思考等等,同时也非常欢迎大家加我微信「java_front」一起交流学习