dubbo 客户端调用流程
西魏陶渊明 ... 2022-8-22 大约 4 分钟
前面我们学习了服务端如何启动暴露一个外部服务,本文主要学习客户端如何通过代理方式访问客户端请求
# 一、启动一个客户端Consumer
# 1. 定义一个接口
注意这里其实是引用的前文中的接口。生产中是服务提供方打一个jar包给客户端用。
public interface UserService {
void say(String message);
}
1
2
3
2
3
# 2. 生成本地服务
@Test
public void consumerTest() {
// 当前应用配置
ApplicationConfig application = new ApplicationConfig();
application.setName("consumerTest");
// 连接注册中心配置
RegistryConfig registry = new RegistryConfig();
registry.setAddress("zookeeper://127.0.0.1:2181");
// 注意:ReferenceConfig为重对象,内部封装了与注册中心的连接,以及与服务提供方的连接
// 引用远程服务
ReferenceConfig<UserService> reference = new ReferenceConfig<UserService>(); // 此实例很重,封装了与注册中心的连接以及与提供者的连接,请自行缓存,否则可能造成内存和连接泄漏
reference.setApplication(application);
reference.setRegistry(registry); // 多个注册中心可以用setRegistries()
reference.setInterface(UserService.class);
reference.setVersion("1.0.0");
UserService userService = reference.get();
userService.say("hello");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 3. 原理分析
首先客户端只有接口的,那么可以根据这个接口生成一个代理。而代理对象中逻辑就是,从zk中找到服务端地址。 然后通过netty客户端去请求服务端的数据。然后返回
# 二、源码分析
带着我们猜测的逻辑一起来看下ReferenceConfig
的实现原理。
public synchronized T get() {
if (destroyed){
throw new IllegalStateException("Already destroyed!");
}
if (ref == null) {
//逻辑就在init里面
init();
}
return ref;
}
1
2
3
4
5
6
7
8
9
10
2
3
4
5
6
7
8
9
10
init先做写检查信息,如这个方法是否存在接口中 createProxy#loadRegistries
# 1. 集群容错策略
可以看到一共有9中策略。
当时服务端是多个的时候,才会生成集群策略。另外既然是集群就要选择到底使用哪个来执行。这就是 负载均衡或者说叫路由策略。
# LoadBalance负载均衡
- directory中获取所有的invoker
- 如果有多个invoker就去看配置的负载均衡策略
- 根据负载均衡策略找到一个Inoker
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
public Result invoke(final Invocation invocation) throws RpcException {
checkWheatherDestoried();
LoadBalance loadbalance;
//获取所有的invoker
List<Invoker<T>> invokers = list(invocation);
//如果有多个invoker就去看配置的负载均衡策略
if (invokers != null && invokers.size() > 0) {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
.getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
} else {
loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
}
RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
//根据策略选一个
return doInvoke(invocation, invokers, loadbalance);
}
protected List<Invoker<T>> list(Invocation invocation) throws RpcException {
List<Invoker<T>> invokers = directory.list(invocation);
return invokers;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 2. invoker生成代理对象
代理的知识点不用说了。
# 3. 客户端的invoker逻辑
# Protocol#refer
主要看DubboProtocol的逻辑
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
// create rpc invoker.
DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
invokers.add(invoker);
return invoker;
}
1
2
3
4
5
6
2
3
4
5
6
# DubboInvoker
底层调用netty通信api发送数据到客户端。然后读取数据。
客户端doInvoke时候会生成ExchangeClient就是NettyClient。
public class DubboInvoker<T> extends AbstractInvoker<T> {
@Override
protected Result doInvoke(final Invocation invocation) throws Throwable {
RpcInvocation inv = (RpcInvocation) invocation;
final String methodName = RpcUtils.getMethodName(invocation);
inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
inv.setAttachment(Constants.VERSION_KEY, version);
ExchangeClient currentClient;
if (clients.length == 1) {
currentClient = clients[0];
} else {
currentClient = clients[index.getAndIncrement() % clients.length];
}
try {
boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
if (isOneway) {
boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
currentClient.send(inv, isSent);
RpcContext.getContext().setFuture(null);
return new RpcResult();
} else if (isAsync) {
ResponseFuture future = currentClient.request(inv, timeout) ;
RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
return new RpcResult();
} else {
RpcContext.getContext().setFuture(null);
return (Result) currentClient.request(inv, timeout).get();
}
} catch (TimeoutException e) {
throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
} catch (RemotingException e) {
throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
}
}
@Override
public boolean isAvailable() {
if (!super.isAvailable())
return false;
for (ExchangeClient client : clients){
if (client.isConnected() && !client.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)){
//cannot write == not Available ?
return true ;
}
}
return false;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
# 三、总结
在前文的基础上,客户端的代码算是比较简单的。主要是集群容错和负载均衡、路由。
主要是利用代理来实现的。
最后求关注,求订阅,谢谢你的阅读!
下一篇会讲,dubbo如何与Spring进行整合。
本文由西魏陶渊明版权所有。如若转载,请注明出处:西魏陶渊明