Dubbo中重要的protocol【协议层】概述
共 26713字,需浏览 54分钟
·
2021-04-17 14:32
protocol【协议层】概述
协议通常是涉及到多方相互协作为实现某功能而遵守的规则
常见通讯协议 如:网络七层协议【OSI】、HTTP协议、TCP协议、UDP协议
网络七层协议【OSI】:应用层、表示层、会话层、传输层、网络层、数据链路层、物理层;规定了网络通讯顶层大框架,只有接口【很抽象】
HTTP协议:大概会想到 应用层协议、URL格式、httpheader、httpbody、get|post...【较具象】
TCP协议:大概会想到 传输层协议、三次握手/四次挥手、可靠性协议...【较具象】
不论那种通讯协议都是指定了网络通讯中某些【或全部】环节具体规则
dubbo框架有很多通讯协议可供选择,不同通讯协议相当于实现RPC功能的不同路径
将RPC功能类比成:“从A城市到B城市” 这么一件事情
多种协议 可类比成 从A到B 有多种可选择的交通工具
选择一种具体交通工具 就决定了:买什么票、司机是谁、舒适度、交通线路 ...
dubbo的任何一种协议也规定【或默认】了 序列化方式、【长|短】链接、底层协议【http|tcp】、同步或异步、消息处理线程模型 ...
dubbo中的Protocol
org.apache.dubbo.rpc.Protocol#export; provider 暴露invoker对象
核心功能1: 启动xxxServer,将服务执行对象【invoker】暴露在网络中
org.apache.dubbo.rpc.Protocol#refer ; consumer 将URL转换成invoker对象
核心功能2: 创建xxxClient,封装为调用对象【invoker】供consumer调用
Protocol 简图
// 默认rpc层协议采用:DubboProtocol
@SPI("dubbo")
public interface Protocol {
// provider 暴露服务阶段调用
// invoker:rpc接口实现类[impl]转成invoker对象
// 通过调用org.apache.dubbo.rpc.ProxyFactory#getInvoker进行【impl与invoker】 转换
// provider方法执行链:【.. -> Invoker#invoker -> Wrapper#invokeMethod -> Impl#xxx】
//-------------------invoker 说明 end------------------------
// 例:http协议暴露服务:启动http服务器,将对应接口实现类暴露成http 服务
// 服务暴露URL 例 : http://127.0.0.1:49178/org.apache.dubbo.rpc.protocol.http.HttpService?version=1.0.0
// 例:dubbo协议暴露服务:默认启动netty server进行
@Adaptive
<T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
// consumer 引用服务阶段执行
// 根据接口类型type,与远程服务url生成invoker对象
// consumer方法调用链:rpcInterface#xxx -> proxy#xxx -> invoker#invoker -> nettyClient#send【默认】
@Adaptive
<T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
void destroy();
。。。
}
AbstractProtocol 以及相关子类协议都在dubbo-rpc模块下
DubboProtocol、RedisProtocol、MemcacheProtocol 直接继承AbstractProtocol
其他常见dubbo-rpc包下协议需要继承AbstractProxyProtocol
RegistryProtocol 服务注册监听阶段功能处理【本文暂不分析】
ProtocolFilterWrapper, ProtocolListenerWrapper 协议包装【代理】类
public abstract class AbstractProtocol implements Protocol {
// 服务暴露map对象
protected final Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();
@Override
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
// 异步转同步invoker
return new AsyncToSyncInvoker<>(protocolBindingRefer(type, url));
}
// 子类【服务引用】实现该方法
protected abstract <T> Invoker<T> protocolBindingRefer(Class<T> type, URL url) throws RpcException;
。。。
}
AbstractProtocol 中定义了服务暴露exporterMap属性
定义新服务引用抽象方法 :protocolBindingRefer
dubbo-rpc模块中协议分类
代理协议
AbstractProxyProtocol
核心功能:rpc接口代理类与invoker对象之间进行转换【阅读完该类代码后,发现类名称很形象】
Protocol定义的export与refer方法对应的参数与返回值 与 AbstractProxyProtocol子类对应 doExport, doRefer 参数与返回值类型不一样;所以需要转换
AbstractProxyProtocol 中的 export与refer,会调用子类的doExport 与 doRefer, 并进行 proxy【接口代理类对象】与 invoker对象 之间转换
那么AbstractProxyProtocol 子类 就不能直接接收或返回与Protocol export或refer相同类型的参数或返回值么?或者自己进行转换?答案:不能直接接收或返回相同的参数或返回值【下文可以看到对应子类实现】;子类可以在内部转换,但每个子类都需要转换,所以相同功能可以抽到父类处理
【DubboProtocol 服务暴露与引用方法中 比AbstractProxyProtocol 少了一次转换】
public abstract class AbstractProxyProtocol extends AbstractProtocol {
private final List<Class<?>> rpcExceptions = new CopyOnWriteArrayList<Class<?>>();
// 代理工厂
protected ProxyFactory proxyFactory;
public AbstractProxyProtocol() {
}
public AbstractProxyProtocol(Class<?>... exceptions) {
for (Class<?> exception : exceptions) {
addRpcException(exception);
}
}
public ProxyFactory getProxyFactory() {
return proxyFactory;
}
public void setProxyFactory(ProxyFactory proxyFactory) {
this.proxyFactory = proxyFactory;
}
// 对应子类实现该服务暴露方法时 需传入rpc接口实现类impl【或实现代理类】
// 而Protocol接口方法是 :<T> Exporter<T> export(Invoker<T> invoker);
// 所以需要该代理协议 实现 invoker 与 代理实现类impl的转换
protected abstract <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException;
// 对应子类实现该引用服务方法时 返回值类型是 :rpc接口代理类
// 而Protocol接口方法:<T> Invoker<T> refer(Class<T> type, URL url);
// 所以需要该代理协议 实现 invoker 与 rpc接口代理类的转换
protected abstract <T> T doRefer(Class<T> type, URL url) throws RpcException;
@Override
@SuppressWarnings("unchecked")
public <T> Exporter<T> export(final Invoker<T> invoker) throws RpcException {
final String uri = serviceKey(invoker.getUrl());
Exporter<T> exporter = (Exporter<T>) exporterMap.get(uri);
if (exporter != null) {
// When modifying the configuration through override, you need to re-expose the newly modified service.
if (Objects.equals(exporter.getInvoker().getUrl(), invoker.getUrl())) {
return exporter;
}
}
// 服务暴露阶段将invoker对象转换成接口代理类对象进行暴露
// 【DubboProtocol没有这一次转换】
final Runnable runnable = doExport(proxyFactory.getProxy(invoker, true), invoker.getInterface(), invoker.getUrl());
exporter = new AbstractExporter<T>(invoker) {
@Override
public void unexport() {
super.unexport();
exporterMap.remove(uri);
if (runnable != null) {
try {
runnable.run();
} catch (Throwable t) {
logger.warn(t.getMessage(), t);
}
}
}
};
exporterMap.put(uri, exporter);
return exporter;
}
@Override
protected <T> Invoker<T> protocolBindingRefer(final Class<T> type, final URL url) throws RpcException {
// 服务引用阶段 多一次invoker 与 接口实现代理类的转换
// 【DubboProtocol没有这一次转换】
final Invoker<T> target = proxyFactory.getInvoker(doRefer(type, url), type, url);
Invoker<T> invoker = new AbstractInvoker<T>(type, url) {
@Override
protected Result doInvoke(Invocation invocation) throws Throwable {
try {
Result result = target.invoke(invocation);
// FIXME result is an AsyncRpcResult instance.
Throwable e = result.getException();
if (e != null) {
for (Class<?> rpcException : rpcExceptions) {
if (rpcException.isAssignableFrom(e.getClass())) {
throw getRpcException(type, url, invocation, e);
}
}
}
return result;
} catch (RpcException e) {
if (e.getCode() == RpcException.UNKNOWN_EXCEPTION) {
e.setCode(getErrorCode(e.getCause()));
}
throw e;
} catch (Throwable e) {
throw getRpcException(type, url, invocation, e);
}
}
};
invokers.add(invoker);
return invoker;
}
。。。
}
功能分类
单链接、长链接,NIO异步传输,TCP
DubboProtocol 【非代理协议】
服务暴露会调用Exchangers.bind(url, requestHandler);方法【指定requestHandler】
服务引用会调用Exchangers.connect(url, requestHandler);方法【指定requestHandler】
requestHandler:请求处理handler,同时也处理回调请求【服务端回调客户端】
dubbo中其他rpc模块下的协议类均没看到回调处理【均没有回调处理】
服务暴露与服务引用阶段不需要创建rpc接口代理对象【与AbstractProxyProtocol子协议代码结构上的一个不同点】
Exchangers 内部指定了【netty | mina | grizzly】java nio框架
服务暴露使用使用nettyServer【默认】, 服务引用可以采用minaClient【配置可指定 netty | mina | grizzly 三者之间选择】
默认采用hessian2序列化方式,dubbo自定义数据包结构
默认服务引用方创建一个长链接
public class DubboProtocol extends AbstractProtocol {
// 请求处理handler
// 回调请求也在该handler 处理
private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() {
@Override
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
Invocation inv = (Invocation) message;
Invoker<?> invoker = getInvoker(channel, inv);
// 是否有回调CallBack 处理
// need to consider backward-compatibility if it's a callback
if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) {
String methodsStr = invoker.getUrl().getParameters().get("methods");
。。。
}
}
RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
Result result = invoker.invoke(inv);
return result.thenApply(Function.identity());
}
。。。
};
// 直接将invoker对象进行暴露
// 没有生成接口实现代理对象【与AbstractProxyProtocol不同】
// 在openServer(url);方法中指定了requestHander进行处理所有请求
@Override
public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
URL url = invoker.getUrl();
// export service.
String key = serviceKey(url);
DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
exporterMap.put(key, exporter);
//export an stub service for dispatching event
Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
if (isStubSupportEvent && !isCallbackservice) {
String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
if (logger.isWarnEnabled()) {
logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
"], has set stubproxy support event ,but no stub methods founded."));
}
}
}
// 启动server
openServer(url);
optimizeSerialization(url);
return exporter;
}
private void openServer(URL url) {
// find server.
String key = url.getAddress();
//client can export a service which's only for server to invoke
boolean isServer = url.getParameter(IS_SERVER_KEY, true);
if (isServer) {
// serverMap 为AbstractProtocol中属性,
// 可当作服务server缓存
// 并可保证接口暴露幂等性
ProtocolServer server = serverMap.get(key);
if (server == null) {
synchronized (this) {
server = serverMap.get(key);
if (server == null) {
serverMap.put(key, createServer(url));
}
}
} else {
// server supports reset, use together with override
server.reset(url);
}
}
}
private ProtocolServer createServer(URL url) {
url = URLBuilder.from(url)
// send readonly event when server closes, it's enabled by default
.addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
// enable heartbeat by default
.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
.addParameter(CODEC_KEY, DubboCodec.NAME)
.build();
String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported server type: " + str + ", url: " + url);
}
ExchangeServer server;
try {
// 指定 requestHandler 来接收请求消息处理器
server = Exchangers.bind(url, requestHandler);
} catch (RemotingException e) {
throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
}
str = url.getParameter(CLIENT_KEY);
if (str != null && str.length() > 0) {
Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
if (!supportedTypes.contains(str)) {
throw new RpcException("Unsupported client type: " + str);
}
}
return new DubboProtocolServer(server);
}
// 继承AbstractProtocol 服务引用方法
@Override
public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
optimizeSerialization(url);
// create rpc invoker.
// 创建client
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
private ExchangeClient[] getClients(URL url) {
boolean useShareConnect = false;
int connections = url.getParameter(CONNECTIONS_KEY, 0);
List<ReferenceCountExchangeClient> shareClients = null;
// if not configured, connection is shared, otherwise, one connection for one service
if (connections == 0) {
useShareConnect = true;
String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
// 通常引用一个服务只启动一个client【tcp长链接】
shareClients = getSharedClient(url, connections);
}
// 通过connections 参数控制启动多个client【tcp长链接】
ExchangeClient[] clients = new ExchangeClient[connections];
for (int i = 0; i < clients.length; i++) {
if (useShareConnect) {
clients[i] = shareClients.get(i);
} else {
clients[i] = initClient(url);
}
}
return clients;
}
private ExchangeClient initClient(URL url) {
// client type setting.
String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
// BIO is not allowed since it has severe performance issue.
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
throw new RpcException("Unsupported client type: " + str + "," +
" supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
}
ExchangeClient client;
try {
// connection should be lazy
if (url.getParameter(LAZY_CONNECT_KEY, false)) {
client = new LazyConnectExchangeClient(url, requestHandler);
} else {
// 创建client也需要指定requestHandler
// 此处的requestHandler 处理回调callback请求处理【provider回调consumer】
client = Exchangers.connect(url, requestHandler);
}
} catch (RemotingException e) {
throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
}
return client;
}
。。。。。
}
多链接、短链接,同步传输,HTTP | TCP
HttpProtocol、HessionProtocol、RestProtocol、RmiProtocol、WebServiceProtocol、XmlRpcProtocol【代理协议子类】
每个协议类:创建server并启动,创建client,指定请求处理器handler
server 与 client都采用开源组件创建
各代理子类创建server或client对象 都依赖到接口实现类【或接口代理实现类】
Protocol接口中export与refer方法需要传入或返回invoker对象
AbstractProxyProtocol 中export 与 refer 方法有实现 接口代理实现类对象 与 invoker互转
采用通用的序列化方式【json 或 hessian】
RmiProtocol 采用TCP协议,其他协议都采用HTTP协议
dubbo 默认采用jetty作为http web服务器
无回调callback处理
源码示例
public class HttpProtocol extends AbstractProxyProtocol {
public static final String ACCESS_CONTROL_ALLOW_ORIGIN_HEADER = "Access-Control-Allow-Origin";
public static final String ACCESS_CONTROL_ALLOW_METHODS_HEADER = "Access-Control-Allow-Methods";
public static final String ACCESS_CONTROL_ALLOW_HEADERS_HEADER = "Access-Control-Allow-Headers";
private final Map<String, JsonRpcServer> skeletonMap = new ConcurrentHashMap<>();
private HttpBinder httpBinder;
public HttpProtocol() {
super(HttpException.class, JsonRpcClientException.class);
}
public void setHttpBinder(HttpBinder httpBinder) {
this.httpBinder = httpBinder;
}
@Override
public int getDefaultPort() {
return 80;
}
// 消息处理器
private class InternalHandler implements HttpHandler {
private boolean cors;
public InternalHandler(boolean cors) {
this.cors = cors;
}
@Override
public void handle(HttpServletRequest request, HttpServletResponse response)
throws ServletException {
String uri = request.getRequestURI();
JsonRpcServer skeleton = skeletonMap.get(uri);
if (cors) {
response.setHeader(ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*");
response.setHeader(ACCESS_CONTROL_ALLOW_METHODS_HEADER, "POST");
response.setHeader(ACCESS_CONTROL_ALLOW_HEADERS_HEADER, "*");
}
if (request.getMethod().equalsIgnoreCase("OPTIONS")) {
response.setStatus(200);
} else if (request.getMethod().equalsIgnoreCase("POST")) {
RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
try {
// skeleton:com.googlecode.jsonrpc4j.JsonRpcServer
// 采用JSON序列化
skeleton.handle(request.getInputStream(), response.getOutputStream());
} catch (Throwable e) {
throw new ServletException(e);
}
} else {
response.setStatus(500);
}
}
}
@Override
protected <T> Runnable doExport(final T impl, Class<T> type, URL url) throws RpcException {
String addr = getAddr(url);
ProtocolServer protocolServer = serverMap.get(addr);
if (protocolServer == null) {
// 指定消息处理器handler
RemotingServer remotingServer = httpBinder.bind(url, new InternalHandler(url.getParameter("cors", false)));
serverMap.put(addr, new ProxyProtocolServer(remotingServer));
}
final String path = url.getAbsolutePath();
final String genericPath = path + "/" + GENERIC_KEY;
// 创建 server
JsonRpcServer skeleton = new JsonRpcServer(impl, type);
// 创建com.googlecode.jsonrpc4j.JsonRpcServer 需要传入rpc接口实现【或代理类】对象
JsonRpcServer genericServer = new JsonRpcServer(impl, GenericService.class);
skeletonMap.put(path, skeleton);
skeletonMap.put(genericPath, genericServer);
return () -> {
skeletonMap.remove(path);
skeletonMap.remove(genericPath);
};
}
@SuppressWarnings("unchecked")
@Override
protected <T> T doRefer(final Class<T> serviceType, URL url) throws RpcException {
final String generic = url.getParameter(GENERIC_KEY);
final boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class);
/********* com.googlecode.jsonrpc4j.spring.JsonProxyFactoryBean 创建代理对象过程【rpc-client】 **************/
JsonProxyFactoryBean jsonProxyFactoryBean = new JsonProxyFactoryBean();
JsonRpcProxyFactoryBean jsonRpcProxyFactoryBean = new JsonRpcProxyFactoryBean(jsonProxyFactoryBean);
jsonRpcProxyFactoryBean.setRemoteInvocationFactory((methodInvocation) -> {
RemoteInvocation invocation = new JsonRemoteInvocation(methodInvocation);
if (isGeneric) {
invocation.addAttribute(GENERIC_KEY, generic);
}
return invocation;
});
String key = url.setProtocol("http").toIdentityString();
if (isGeneric) {
key = key + "/" + GENERIC_KEY;
}
jsonRpcProxyFactoryBean.setServiceUrl(key);
// 指定服务接口
jsonRpcProxyFactoryBean.setServiceInterface(serviceType);
jsonProxyFactoryBean.afterPropertiesSet();
// 创建客户端代理类,该代理类实现rpc接口,并返回
// Protocol#refer 方法返回值类型为Invoker
// AbstractProxyProtocol 会将proxy 转成 Invoker 对象
// 【DubboProtocol 服务引用方法没有创建接口实现代理类】
return (T) jsonProxyFactoryBean.getObject();
/********* com.googlecode.jsonrpc4j.spring.JsonProxyFactoryBean 创建代理对象过程【rpc-client】 **************/
}
。。。
}
HessianProtocol 源码
public class HessianProtocol extends AbstractProxyProtocol {
private final Map<String, HessianSkeleton> skeletonMap = new ConcurrentHashMap<String, HessianSkeleton>();
private HttpBinder httpBinder;
public HessianProtocol() {
super(HessianException.class);
}
public void setHttpBinder(HttpBinder httpBinder) {
this.httpBinder = httpBinder;
}
@Override
public int getDefaultPort() {
return 80;
}
// 消息处理handler
private class HessianHandler implements HttpHandler {
@Override
public void handle(HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException {
String uri = request.getRequestURI();
// com.caucho.hessian.server.HessianSkeleton
HessianSkeleton skeleton = skeletonMap.get(uri);
if (!"POST".equalsIgnoreCase(request.getMethod())) {
response.setStatus(500);
} else {
RpcContext.getContext().setRemoteAddress(request.getRemoteAddr(), request.getRemotePort());
Enumeration<String> enumeration = request.getHeaderNames();
while (enumeration.hasMoreElements()) {
String key = enumeration.nextElement();
if (key.startsWith(DEFAULT_EXCHANGER)) {
RpcContext.getContext().setAttachment(key.substring(DEFAULT_EXCHANGER.length()),
request.getHeader(key));
}
}
try {
skeleton.invoke(request.getInputStream(), response.getOutputStream());
} catch (Throwable e) {
throw new ServletException(e);
}
}
}
}
@Override
protected <T> Runnable doExport(T impl, Class<T> type, URL url) throws RpcException {
String addr = getAddr(url);
ProtocolServer protocolServer = serverMap.get(addr);
if (protocolServer == null) {
// 指定消息处理handler
// 创建server
RemotingServer remotingServer = httpBinder.bind(url, new HessianHandler());
serverMap.put(addr, new ProxyProtocolServer(remotingServer));
}
final String path = url.getAbsolutePath();
// hessian框架服务提供方核型类:com.caucho.hessian.server.HessianSkeleton
// 创建 com.caucho.hessian.server.HessianSkeleton 对需要传入rpc接口实现【或代理类】对象
final HessianSkeleton skeleton = new HessianSkeleton(impl, type);
skeletonMap.put(path, skeleton);
final String genericPath = path + "/" + GENERIC_KEY;
skeletonMap.put(genericPath, new HessianSkeleton(impl, GenericService.class));
return new Runnable() {
@Override
public void run() {
skeletonMap.remove(path);
skeletonMap.remove(genericPath);
}
};
}
@Override
@SuppressWarnings("unchecked")
protected <T> T doRefer(Class<T> serviceType, URL url) throws RpcException {
String generic = url.getParameter(GENERIC_KEY);
boolean isGeneric = ProtocolUtils.isGeneric(generic) || serviceType.equals(GenericService.class);
if (isGeneric) {
RpcContext.getContext().setAttachment(GENERIC_KEY, generic);
url = url.setPath(url.getPath() + "/" + GENERIC_KEY);
}
/********* com.caucho.hessian.client.HessianProxyFactory 创建代理对象过程【rpc-client】 **************/
HessianProxyFactory hessianProxyFactory = new HessianProxyFactory();
boolean isHessian2Request = url.getParameter(HESSIAN2_REQUEST_KEY, DEFAULT_HESSIAN2_REQUEST);
hessianProxyFactory.setHessian2Request(isHessian2Request);
boolean isOverloadEnabled = url.getParameter(HESSIAN_OVERLOAD_METHOD_KEY, DEFAULT_HESSIAN_OVERLOAD_METHOD);
hessianProxyFactory.setOverloadEnabled(isOverloadEnabled);
String client = url.getParameter(CLIENT_KEY, DEFAULT_HTTP_CLIENT);
if ("httpclient".equals(client)) {
HessianConnectionFactory factory = new HttpClientConnectionFactory();
factory.setHessianProxyFactory(hessianProxyFactory);
hessianProxyFactory.setConnectionFactory(factory);
} else if (client != null && client.length() > 0 && !DEFAULT_HTTP_CLIENT.equals(client)) {
throw new IllegalStateException("Unsupported http protocol client=\"" + client + "\"!");
} else {
HessianConnectionFactory factory = new DubboHessianURLConnectionFactory();
factory.setHessianProxyFactory(hessianProxyFactory);
hessianProxyFactory.setConnectionFactory(factory);
}
int timeout = url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT);
hessianProxyFactory.setConnectTimeout(timeout);
hessianProxyFactory.setReadTimeout(timeout);
// 创建客户端代理类,该代理类实现rpc接口,并返回
// Protocol#refer 方法返回值类型为Invoker
// AbstractProxyProtocol 会将proxy 转成 Invoker 对象
// 【DubboProtocol 服务引用方法没有创建接口实现代理类】
return (T) hessianProxyFactory.create(serviceType, url.setProtocol("http").toJavaURL(), Thread.currentThread().getContextClassLoader());
/********* com.caucho.hessian.client.HessianProxyFactory 创建代理对象过程【rpc-client】 **************/
}
。。。
}
只有服务引用,没有服务暴露
RedisProtocol、MemcacheProtocol【非代理协议】
都有第三方【缓存】服务器
consumer只能调用对应get(k)、set(k,v)、delete(k) 三个方法, 与redis| memcache 交互
该协议就是对缓存基础方法的封装,感觉实用价值不大
RedisProtocol 源码【MemcacheProtocol代码类似】
public class RedisProtocol extends AbstractProtocol {
public static final int DEFAULT_PORT = 6379;
。。。
@Override
protected <T> Invoker<T> protocolBindingRefer(final Class<T> type, final URL url) throws RpcException {
try {
GenericObjectPoolConfig config = new GenericObjectPoolConfig();
config.setTestOnBorrow(url.getParameter("test.on.borrow", true));
config.setTestOnReturn(url.getParameter("test.on.return", false));
config.setTestWhileIdle(url.getParameter("test.while.idle", false));
if (url.getParameter("max.idle", 0) > 0) {
config.setMaxIdle(url.getParameter("max.idle", 0));
}
if (url.getParameter("min.idle", 0) > 0) {
config.setMinIdle(url.getParameter("min.idle", 0));
}
if (url.getParameter("max.active", 0) > 0) {
config.setMaxTotal(url.getParameter("max.active", 0));
}
if (url.getParameter("max.total", 0) > 0) {
config.setMaxTotal(url.getParameter("max.total", 0));
}
if (url.getParameter("max.wait", 0) > 0) {
config.setMaxWaitMillis(url.getParameter("max.wait", 0));
}
if (url.getParameter("num.tests.per.eviction.run", 0) > 0) {
config.setNumTestsPerEvictionRun(url.getParameter("num.tests.per.eviction.run", 0));
}
if (url.getParameter("time.between.eviction.runs.millis", 0) > 0) {
config.setTimeBetweenEvictionRunsMillis(url.getParameter("time.between.eviction.runs.millis", 0));
}
if (url.getParameter("min.evictable.idle.time.millis", 0) > 0) {
config.setMinEvictableIdleTimeMillis(url.getParameter("min.evictable.idle.time.millis", 0));
}
final JedisPool jedisPool = new JedisPool(config, url.getHost(), url.getPort(DEFAULT_PORT),
url.getParameter(TIMEOUT_KEY, DEFAULT_TIMEOUT),
StringUtils.isBlank(url.getPassword()) ? null : url.getPassword(),
url.getParameter("db.index", 0));
final int expiry = url.getParameter("expiry", 0);
// 可指定类似get的方法名
final String get = url.getParameter("get", "get");
// 可指定类似set的方法名
final String set = url.getParameter("set", Map.class.equals(type) ? "put" : "set");
// 可指定类似delete的方法名
final String delete = url.getParameter("delete", Map.class.equals(type) ? "remove" : "delete");
return new AbstractInvoker<T>(type, url) {
@Override
protected Result doInvoke(Invocation invocation) throws Throwable {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
if (get.equals(invocation.getMethodName())) {
// 不是get(k) 则报错
if (invocation.getArguments().length != 1) {
throw new IllegalArgumentException("The redis get method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
byte[] value = jedis.get(String.valueOf(invocation.getArguments()[0]).getBytes());
if (value == null) {
return AsyncRpcResult.newDefaultAsyncResult(invocation);
}
ObjectInput oin = getSerialization(url).deserialize(url, new ByteArrayInputStream(value));
return AsyncRpcResult.newDefaultAsyncResult(oin.readObject(), invocation);
} else if (set.equals(invocation.getMethodName())) {
// 不是set(k, v) 则报错
if (invocation.getArguments().length != 2) {
throw new IllegalArgumentException("The redis set method arguments mismatch, must be two arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
byte[] key = String.valueOf(invocation.getArguments()[0]).getBytes();
ByteArrayOutputStream output = new ByteArrayOutputStream();
ObjectOutput value = getSerialization(url).serialize(url, output);
value.writeObject(invocation.getArguments()[1]);
jedis.set(key, output.toByteArray());
if (expiry > 1000) {
jedis.expire(key, expiry / 1000);
}
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else if (delete.equals(invocation.getMethodName())) {
// 不是delete(k) 则报错
if (invocation.getArguments().length != 1) {
throw new IllegalArgumentException("The redis delete method arguments mismatch, must only one arguments. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url);
}
jedis.del(String.valueOf(invocation.getArguments()[0]).getBytes());
return AsyncRpcResult.newDefaultAsyncResult(invocation);
} else {
// 其余方法一律不支持
throw new UnsupportedOperationException("Unsupported method " + invocation.getMethodName() + " in redis service.");
}
} catch (Throwable t) {
RpcException re = new RpcException("Failed to invoke redis service method. interface: " + type.getName() + ", method: " + invocation.getMethodName() + ", url: " + url + ", cause: " + t.getMessage(), t);
if (t instanceof TimeoutException || t instanceof SocketTimeoutException) {
re.setCode(RpcException.TIMEOUT_EXCEPTION);
} else if (t instanceof JedisConnectionException || t instanceof IOException) {
re.setCode(RpcException.NETWORK_EXCEPTION);
} else if (t instanceof JedisDataException) {
re.setCode(RpcException.SERIALIZATION_EXCEPTION);
}
throw re;
} finally {
if (jedis != null) {
try {
jedis.close();
} catch (Throwable t) {
logger.warn("returnResource error: " + t.getMessage(), t);
}
}
}
}
@Override
public void destroy() {
super.destroy();
try {
jedisPool.destroy();
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
}
}
};
} catch (Throwable t) {
throw new RpcException("Failed to refer redis service. interface: " + type.getName() + ", url: " + url + ", cause: " + t.getMessage(), t);
}
}
}
扩展第三方RPC服务协议
GrpcProtocol,ThriftProtocol【代理协议子类】
【暂时没研究,贴一些概念】
gRPC 是一个高性能、开源和通用的 RPC 框架,面向移动和 HTTP/2 设计, ThriftProtocol
Thrift是一种接口描述语言和二进制通讯协议,它被用来定义和创建跨语言的服务。它被当作一个远程过程调用(RPC)框架来使用,是由Facebook为“大规模跨语言服务开发”而开发的。
非RPC层协议
QosProtocolWrapper:服务健康检查、质量探测 会用到该协议
RegistryProtocol:注册阶段使用,在服务注册文章中重点分析
ProtocolFilterWrapper:filter协议包装类, 可参考dubbo filter加载,extension Wrapper扩展