dubbo 客户端调用流程

西魏陶渊明 ... 2022-8-22 Java 大约 4 分钟

前面我们学习了服务端如何启动暴露一个外部服务,本文主要学习客户端如何通过代理方式访问客户端请求

# 一、启动一个客户端Consumer

# 1. 定义一个接口

注意这里其实是引用的前文中的接口。生产中是服务提供方打一个jar包给客户端用。

public interface UserService {
    void say(String message);
}
1
2
3

# 2. 生成本地服务

    @Test
    public void consumerTest() {
        // 当前应用配置
        ApplicationConfig application = new ApplicationConfig();
        application.setName("consumerTest");

        // 连接注册中心配置
        RegistryConfig registry = new RegistryConfig();
        registry.setAddress("zookeeper://127.0.0.1:2181");

        // 注意:ReferenceConfig为重对象,内部封装了与注册中心的连接,以及与服务提供方的连接
        // 引用远程服务
        ReferenceConfig<UserService> reference = new ReferenceConfig<UserService>(); // 此实例很重,封装了与注册中心的连接以及与提供者的连接,请自行缓存,否则可能造成内存和连接泄漏
        reference.setApplication(application);
        reference.setRegistry(registry); // 多个注册中心可以用setRegistries()
        reference.setInterface(UserService.class);
        reference.setVersion("1.0.0");
        UserService userService = reference.get();
        userService.say("hello");
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20

# 3. 原理分析

首先客户端只有接口的,那么可以根据这个接口生成一个代理。而代理对象中逻辑就是,从zk中找到服务端地址。 然后通过netty客户端去请求服务端的数据。然后返回

# 二、源码分析

带着我们猜测的逻辑一起来看下ReferenceConfig的实现原理。

  public synchronized T get() {
        if (destroyed){
            throw new IllegalStateException("Already destroyed!");
        }
    	if (ref == null) {
    	    //逻辑就在init里面
    		init();
    	}
    	return ref;
    }
1
2
3
4
5
6
7
8
9
10

init先做写检查信息,如这个方法是否存在接口中 createProxy#loadRegistries

# 1. 集群容错策略

可以看到一共有9中策略。

当时服务端是多个的时候,才会生成集群策略。另外既然是集群就要选择到底使用哪个来执行。这就是 负载均衡或者说叫路由策略。

# LoadBalance负载均衡

  • directory中获取所有的invoker
  • 如果有多个invoker就去看配置的负载均衡策略
  • 根据负载均衡策略找到一个Inoker
public abstract class AbstractClusterInvoker<T> implements Invoker<T> {
    public Result invoke(final Invocation invocation) throws RpcException {

        checkWheatherDestoried();

        LoadBalance loadbalance;
        //获取所有的invoker
        List<Invoker<T>> invokers = list(invocation);
        //如果有多个invoker就去看配置的负载均衡策略
        if (invokers != null && invokers.size() > 0) {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                    .getMethodParameter(invocation.getMethodName(),Constants.LOADBALANCE_KEY, Constants.DEFAULT_LOADBALANCE));
        } else {
            loadbalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(Constants.DEFAULT_LOADBALANCE);
        }
        RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
        //根据策略选一个
        return doInvoke(invocation, invokers, loadbalance);
    }
     
     protected  List<Invoker<T>> list(Invocation invocation) throws RpcException {
    	List<Invoker<T>> invokers = directory.list(invocation);
    	return invokers;
     }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 2. invoker生成代理对象

代理的知识点不用说了。

# 3. 客户端的invoker逻辑

# Protocol#refer

主要看DubboProtocol的逻辑

  public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException {
        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);
        return invoker;
  }
1
2
3
4
5
6

# DubboInvoker

底层调用netty通信api发送数据到客户端。然后读取数据。


客户端doInvoke时候会生成ExchangeClient就是NettyClientpublic class DubboInvoker<T> extends AbstractInvoker<T> {

    @Override
    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());
        inv.setAttachment(Constants.VERSION_KEY, version);
        
        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY,Constants.DEFAULT_TIMEOUT);
            if (isOneway) {
            	boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                RpcContext.getContext().setFuture(null);
                return new RpcResult();
            } else if (isAsync) {
            	ResponseFuture future = currentClient.request(inv, timeout) ;
                RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));
                return new RpcResult();
            } else {
            	RpcContext.getContext().setFuture(null);
                return (Result) currentClient.request(inv, timeout).get();
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }
    
    @Override
    public boolean isAvailable() {
        if (!super.isAvailable())
            return false;
        for (ExchangeClient client : clients){
            if (client.isConnected() && !client.hasAttribute(Constants.CHANNEL_ATTRIBUTE_READONLY_KEY)){
                //cannot write == not Available ?
                return true ;
            }
        }
        return false;
    }

  
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

# 三、总结

在前文的基础上,客户端的代码算是比较简单的。主要是集群容错和负载均衡、路由。

主要是利用代理来实现的。

最后求关注,求订阅,谢谢你的阅读!

下一篇会讲,dubbo如何与Spring进行整合。


本文由西魏陶渊明版权所有。如若转载,请注明出处:西魏陶渊明
上次编辑于: 2022年8月22日 09:35
贡献者: lxchinesszz