由SOFARPC示例介绍基本流程和基础源码

news2024/11/16 19:37:04

由SOFARPC示例介绍基本流程和基础源码

1. Server

先看 Server 端测试方法:

public class QuickStartServer {

    public static void main(String[] args) {
        ServerConfig serverConfig = new ServerConfig()
            .setProtocol("bolt") // 设置一个协议,默认bolt
            .setPort(12200) // 设置一个端口,默认12200
            .setDaemon(false); // 非守护线程

        ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
            .setInterfaceId(HelloService.class.getName()) // 指定接口
            .setRef(new HelloServiceImpl()) // 指定实现
            .setServer(serverConfig); // 指定服务端

        providerConfig.export(); // 发布服务
    }
}

ProviderConfig类中,获得一个服务提供者启动类,负责发布服务。

public synchronized void export() {
    if (providerBootstrap == null) {
        providerBootstrap = Bootstraps.from(this);
    }
    providerBootstrap.export();
}

com.alipay.sofa.rpc.bootstrap.Bootstraps#from(com.alipay.sofa.rpc.config.ProviderConfig)

这里看一下Bootstraps#from方法,构造一个发布服务的包装类。

public static <T> ProviderBootstrap<T> from(ProviderConfig<T> providerConfig) {
    String bootstrap = providerConfig.getBootstrap();
    if (StringUtils.isEmpty(bootstrap)) {
        // Use default provider bootstrap
        bootstrap = RpcConfigs.getStringValue(RpcOptions.DEFAULT_PROVIDER_BOOTSTRAP);
        providerConfig.setBootstrap(bootstrap);
    }
    ProviderBootstrap providerBootstrap = ExtensionLoaderFactory
        .getExtensionLoader(ProviderBootstrap.class) // 从工厂拿到一个Loader
        .getExtension(bootstrap, new Class[] { ProviderConfig.class }, new Object[] { providerConfig }); // 得到一个扩展实例
    return (ProviderBootstrap<T>) providerBootstrap;
}

先学习一下ExtensionLoadeFactory的工厂方法的写法:

/**
 * All extension loader {Class : ExtensionLoader}
 * ConcurrentHashMap
 */
private static final ConcurrentMap<Class, ExtensionLoader> LOADER_MAP = new ConcurrentHashMap<Class, ExtensionLoader>();

/**
 * Get extension loader by extensible class with listener
 */
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> clazz, ExtensionLoaderListener<T> listener) {
    ExtensionLoader<T> loader = LOADER_MAP.get(clazz);
    if (loader == null) { // a
        synchronized (ExtensionLoaderFactory.class) {
            loader = LOADER_MAP.get(clazz);
            if (loader == null) {
                loader = new ExtensionLoader<T>(clazz, listener); // b
                LOADER_MAP.put(clazz, loader);
            }
        }
    }
    return loader;
}

/**
 * Get extension loader by extensible class without listener
 */
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> clazz) {
    return getExtensionLoader(clazz, null);
}

因为 b 处产生的 loader 一定在被初始化后才会被放进 map 中,所以不存在双重检查锁定的因指令重排导致的问题。

再看看获得实例的getExtension方法:

public T getExtension(String alias, Class[] argTypes, Object[] args) {
    ExtensionClass<T> extensionClass = getExtensionClass(alias);
    if (extensionClass == null) {
        throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_EXTENSION_NOT_FOUND, interfaceName, alias));
    } else {
        if (extensible.singleton() && factory != null) {
            T t = factory.get(alias);
            if (t == null) {
                synchronized (this) {
                    t = factory.get(alias);
                    if (t == null) {
                        t = extensionClass.getExtInstance(argTypes, args);
                        factory.put(alias, t);
                    }
                }
            }
            return t;
        } else {
            return extensionClass.getExtInstance(argTypes, args);
        }
    }
}

com.alipay.sofa.rpc.bootstrap.DefaultProviderBootstrap#export

image-20221215221115052

这里看一下 default 的服务发布实现,实际还有另外几种

@Override
public void export() {
    if (providerConfig.getDelay() > 0) { // 延迟加载,单位毫秒
        Thread thread = factory.newThread(new Runnable() {
            @Override
            public void run() {
                try {
                    Thread.sleep(providerConfig.getDelay());
                } catch (Throwable ignore) { // NOPMD
                }
                doExport();
            }
        });
        thread.start();
    } else {
        doExport();
    }
}

catch 的异常可以用 ignore 代替,下面看关键的doExport

private void doExport() {
    if (exported) {
        return;
    }
    // 检查参数
    checkParameters();
    String appName = providerConfig.getAppName();

    // key is the protocol of server,for concurrent safe
    Map<String, Boolean> hasExportedInCurrent = new ConcurrentHashMap<String, Boolean>();
    // 将处理器注册到server,遍历Server列表,在示例中就一个bolt\12200\not daemon
    List<ServerConfig> serverConfigs = providerConfig.getServer();
    for (ServerConfig serverConfig : serverConfigs) {
        String protocol = serverConfig.getProtocol();

        String key = providerConfig.buildKey() + ":" + protocol;

        if (LOGGER.isInfoEnabled(appName)) {
            LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId());
        }

        // 注意同一interface,同一uniqueId,不同server情况
        AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器
        if (cnt == null) { // 没有发布过
            cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));
        }
        int c = cnt.incrementAndGet();
        // 注册为已发布
        hasExportedInCurrent.put(serverConfig.getProtocol(), true);
        // 最大发布次数
        int maxProxyCount = providerConfig.getRepeatedExportLimit();
        if (maxProxyCount > 0) {
            if (c > maxProxyCount) {
                decrementCounter(hasExportedInCurrent);
                // 超过最大数量,直接抛出异常
                throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_DUPLICATE_PROVIDER_CONFIG, key,
                    maxProxyCount));
            } else if (c > 1) {
                if (LOGGER.isInfoEnabled(appName)) {
                    LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.WARN_DUPLICATE_PROVIDER_CONFIG, key, c));
                }
            }
        }

    }

    try {
        // 构造请求调用器
        providerProxyInvoker = new ProviderProxyInvoker(providerConfig);

        preProcessProviderTarget(providerConfig, (ProviderProxyInvoker) providerProxyInvoker);
        // 初始化注册中心
        if (providerConfig.isRegister()) {
            List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
            if (CommonUtils.isNotEmpty(registryConfigs)) {
                for (RegistryConfig registryConfig : registryConfigs) {
                    RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry
                }
            }
        }
        // 将将请求调用处理器Invoker注册到server
        for (ServerConfig serverConfig : serverConfigs) {
            try {
                Server server = serverConfig.buildIfAbsent();
                // 注册请求调用器
                server.registerProcessor(providerConfig, providerProxyInvoker);
                if (serverConfig.isAutoStart()) {
                    server.start();
                }

            } catch (SofaRpcRuntimeException e) {
                throw e;
            } catch (Exception e) {
                LOGGER.errorWithApp(appName,
                    LogCodes.getLog(LogCodes.ERROR_REGISTER_PROCESSOR_TO_SERVER, serverConfig.getId()), e);
            }
        }

        //如果是泛型接口则需要在JSON序列化器中注册跟真实实现类的对应关系,因为反序列化时无法从泛型接口中拿到真实的数据类型
        if (providerConfig.getProxyClass().getTypeParameters().length > 0) { // getTypeParameters().length > 0
            String serviceName = ConfigUniqueNameGenerator.getUniqueName(providerConfig);
            AbstractSerializer.registerGenericService(serviceName, providerConfig.getRef().getClass().getName());
        }

        // 注册到注册中心
        providerConfig.setConfigListener(new ProviderAttributeListener());
        register();
    } catch (Exception e) {
        decrementCounter(hasExportedInCurrent);
        if (e instanceof SofaRpcRuntimeException) {
            throw e;
        }
        throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_BUILD_PROVIDER_PROXY), e);
    }

    // 记录一些缓存数据,供后续销毁等等使用
    RpcRuntimeContext.cacheProviderConfig(this);
    exported = true;
}

关键的注册操作:

protected void register() {
    if (providerConfig.isRegister()) {
        List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
        if (registryConfigs != null) {
            for (RegistryConfig registryConfig : registryConfigs) {
                Registry registry = RegistryFactory.getRegistry(registryConfig);
                // 根据具体提供的registry,调用不同的init和start
                registry.init();
                registry.start();
                try {
                    // 注册
                    registry.register(providerConfig);
                } catch (SofaRpcRuntimeException e) {
                    throw e;
                } catch (Throwable e) {
                    String appName = providerConfig.getAppName();
                    if (LOGGER.isWarnEnabled(appName)) {
                        LOGGER.errorWithApp(appName,
                            LogCodes.getLog(LogCodes.ERROR_REGISTER_TO_REGISTRY, registryConfig.getId()), e);
                    }
                }
            }
        }
    }
}
image-20221215225449270

2. Client

先看 Client 端测试方法:

public class QuickStartClient {

    private final static Logger LOGGER = LoggerFactory.getLogger(QuickStartClient.class);

    public static void main(String[] args) {

        ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
            .setInterfaceId(HelloService.class.getName()) // 指定接口
            .setProtocol("bolt") // 指定协议
            .setDirectUrl("bolt://127.0.0.1:12200") // 指定直连地址
            .setConnectTimeout(10 * 1000);

        HelloService helloService = consumerConfig.refer();

        while (true) {
            try {
                LOGGER.info(helloService.sayHello("world"));
            } catch (Exception e) {
                e.printStackTrace();
            }

            try {
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}

直接看com.alipay.sofa.rpc.config.ConsumerConfig#refer方法

public T refer() {
    if (consumerBootstrap == null) {
    	// 构造服务消费者启动类
        consumerBootstrap = Bootstraps.from(this);
    }
    // 获得代理对象
    return consumerBootstrap.refer();
}

com.alipay.sofa.rpc.bootstrap.Bootstraps#from(com.alipay.sofa.rpc.config.ConsumerConfig)

这里看一下Bootstraps#from方法,构造一个引用服务的包装类。

public static <T> ConsumerBootstrap<T> from(ConsumerConfig<T> consumerConfig) {
    String bootstrap = consumerConfig.getBootstrap();
    ConsumerBootstrap consumerBootstrap;
    if (StringUtils.isNotEmpty(bootstrap)) {
        consumerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class)
            .getExtension(bootstrap,
                new Class[] { ConsumerConfig.class },
                new Object[] { consumerConfig });
    } else {
        // default is same with protocol
        bootstrap = consumerConfig.getProtocol();
        ExtensionLoader extensionLoader = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class);
        ExtensionClass<ConsumerBootstrap> extensionClass = extensionLoader.getExtensionClass(bootstrap);
        if (extensionClass == null) {
            // if not exist, use default consumer bootstrap
            bootstrap = RpcConfigs.getStringValue(RpcOptions.DEFAULT_CONSUMER_BOOTSTRAP);
            consumerConfig.setBootstrap(bootstrap);
            consumerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class)
                .getExtension(bootstrap, new Class[] { ConsumerConfig.class }, new Object[] { consumerConfig });
        } else {
            consumerConfig.setBootstrap(bootstrap);
            consumerBootstrap = extensionClass.getExtInstance(
                new Class[] { ConsumerConfig.class }, new Object[] { consumerConfig });
        }
    }
    return (ConsumerBootstrap<T>) consumerBootstrap;
}

com.alipay.sofa.rpc.bootstrap.DefaultConsumerBootstrap#refer

@Override
public T refer() {
    if (proxyIns != null) {
        return proxyIns;
    }
    synchronized (this) {
        if (proxyIns != null) {
            return proxyIns;
        }
        String key = consumerConfig.buildKey();
        String appName = consumerConfig.getAppName();
        // 检查参数
        checkParameters();
        // 提前检查接口类
        if (LOGGER.isInfoEnabled(appName)) {
            LOGGER.infoWithApp(appName, "Refer consumer config : {} with bean id {}", key, consumerConfig.getId());
        }

        // 注意同一interface,同一tags,同一protocol情况
        AtomicInteger cnt = REFERRED_KEYS.get(key); // 计数器
        if (cnt == null) { // 没有发布过
            cnt = CommonUtils.putToConcurrentMap(REFERRED_KEYS, key, new AtomicInteger(0));
        }
        int c = cnt.incrementAndGet();
        int maxProxyCount = consumerConfig.getRepeatedReferLimit();
        if (maxProxyCount > 0) {
            if (c > maxProxyCount) {
                cnt.decrementAndGet();
                // 超过最大数量,直接抛出异常
                throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_DUPLICATE_CONSUMER_CONFIG, key,
                    maxProxyCount));
            } else if (c > 1) {
                if (LOGGER.isInfoEnabled(appName)) {
                    LOGGER.infoWithApp(appName, "Duplicate consumer config with key {} has been referred!"
                        + " Maybe it's wrong config, please check it."
                        + " Ignore this if you did that on purpose!", key);
                }
            }
        }

        try {
            // build cluster
            cluster = ClusterFactory.getCluster(this);
            // build listeners
            consumerConfig.setConfigListener(buildConfigListener(this));
            consumerConfig.setProviderInfoListener(buildProviderInfoListener(this));
            // init cluster
            cluster.init();
            // 构造Invoker对象(执行链)
            proxyInvoker = buildClientProxyInvoker(this);
            // 创建代理类
            proxyIns = (T) ProxyFactory.buildProxy(consumerConfig.getProxy(), consumerConfig.getProxyClass(),
                proxyInvoker);

            //动态配置
            final String dynamicAlias = consumerConfig.getParameter(DynamicConfigKeys.DYNAMIC_ALIAS);
            if (StringUtils.isNotBlank(dynamicAlias)) {
                final DynamicConfigManager dynamicManager = DynamicConfigManagerFactory.getDynamicManager(
                    consumerConfig.getAppName(), dynamicAlias);
                dynamicManager.initServiceConfiguration(consumerConfig.getInterfaceId());
            }
        } catch (Exception e) {
            if (cluster != null) {
                cluster.destroy();
                cluster = null;
            }
            consumerConfig.setConfigListener(null);
            consumerConfig.setProviderInfoListener(null);
            cnt.decrementAndGet(); // 发布失败不计数
            if (e instanceof SofaRpcRuntimeException) {
                throw (SofaRpcRuntimeException) e;
            } else {
                throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_BUILD_CONSUMER_PROXY), e);
            }
        }
        if (consumerConfig.getOnAvailable() != null && cluster != null) {
            cluster.checkStateChange(false); // 状态变化通知监听器
        }
        RpcRuntimeContext.cacheConsumerConfig(this);
        return proxyIns;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/111862.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

内容资产管理11问

&#x1f447;点击一键关注主笔&#xff1a;邹小困、邝晴岚主持人&#xff1a;增长黑盒分析师Emma出品&#xff1a;增长黑盒研究组前言在这个信息爆炸的数据时代&#xff0c;各个行业正积极推进数字化转型&#xff0c;产业升级与技术赋能成为主题之一。在推进企业线上线下融合的…

SEO 已死,LLMO 万岁

“北风那个吹&#xff0c;雪花那个飘”&#xff0c;我手捧一杯咖啡&#xff0c;听着白毛女。朋友坐在对面高谈阔论&#xff1a;“有了 ChatGPT&#xff0c;我再也不缺内容了&#xff0c;SEO 的春天就要来了&#xff01;”然而他没有看到真相是 —— ChatGPT 直接颠覆了 SEO 行业…

JVM【垃圾回收相关概念和垃圾回收器】

垃圾回收相关概念 System.gc()的理解 在默认情况下&#xff0c;通过**system.gc&#xff08;&#xff09;**者Runtime.getRuntime().gc() 的调用&#xff0c;会显式触发FullGC&#xff0c;同时对老年代和新生代进行回收&#xff0c;尝试释放被丢弃对象占用的内存。 然而syste…

AutoDL+Xftp+Xshell+VSCode配合使用教程

一、AutoDL AutoDL是一款经过众多算法业内大牛精心调教的云GPU深度学习环境出租平台。随着人工智能发展逐渐变成混合学科经验科学学科的深海区神器&#xff0c;一款高效的云GPU深度学习出租平台成为每一个深度学习的从业者的最大痛点。如何解决数据痛点&#xff0c;计算资源痛…

python--可重用的登录注册系统(上)

文章目录预期目标一、基本逻辑设计数据库模型二、前端界面设计与优化完善登录的视图函数三、session会话与登录的视图函数四、将项目上传到远程仓库预期目标 实现注册&#xff08;邮箱、手机、qq、微信&#xff09; 登录 注销等功能 路由配置 视图配置 数据库模型 模板&#xf…

生物系转行学编程,如今身家26亿

在编程界有许多明星级别的大牛&#xff0c;他们有些人学习成绩很差&#xff0c;有些人甚至不是科班出身&#xff0c;但对编程的狂热和努力&#xff0c;成就了他们在IT界“名利双收”的地位。 在我们中国编程界就有这样一位大牛&#xff0c;非科班出身&#xff0c;却做到了神一般…

el-pagination 动态切换每页条数、页数切换

目录 业务场景 官方链接 实现效果 使用框架 代码展示 template代码 script代码 变量定义 事件定义 handleSizeChange事件--实现每页条数改变表格动态变化 handleCurrentChange事件--切换页码 css代码 完整代码 总结 业务场景 当表格中的数据量如果非常庞大的时候我们…

【Javascript基础】--零基础--超详细且简洁的Javascript笔记--代码质量(03)

在浏览器中调试 在编写代码前看看调试。 调试是指在一个脚本中找出并修复错误的过程。 在这里我们将会使用 Chrome&#xff08;谷歌浏览器&#xff09;&#xff0c;因为它拥有足够多的功能&#xff0c;其他大部分浏览器的功能也与之类似。 “资源&#xff08;Sources&#…

为什么编程入门从Python学起?

目前&#xff0c;青岛市的小学、初中、高中对于编程教育和信息学的推进几乎都选中了Python。 浙江省新高中信息技术教材改革项目中&#xff0c;高中新生开始使用新教材&#xff0c;里面的编程语言将换用 Python&#xff0c;Python 将正式纳入高考内容。 Python是一种代表简单主…

看BP(后向投影算法)英文文献生词记录

看BP&#xff08;后向投影算法&#xff09;英文文献生词记录 总的来说&#xff0c;该论文是在讲CAT和SAR的后向投影算法之间的联系与区别 acoustic imaging 原声成像 polychromatic 美 [pɒlɪkroʊ’mtɪk] 英 [pɒlɪkrəʊ’mtɪk] adj.多色的 illumination 美 [ɪˌlum…

建筑“光储直柔”配用电系统关键技术分析

低碳发展背景下的建筑“光储直柔”配用电系统关键技术分析&#xff08;2021&#xff09; 摘 要 在低碳发展的背景下&#xff0c;为适应高比例的可再生能源结构&#xff0c;建筑电气化已经成为未来的发展趋势。建筑电气化不仅要提高建筑电气化率&#xff0c;还要发展新型建筑配…

RV1126笔记九:RTMP服务器搭建

若该文为原创文章,转载请注明原文出处 一、介绍 搭建RTMP服务器主要是为了在RV1126上实现RTMP推拉流功能测试使用,如果条件允许可以把RTMP服务器部署到公网服务器上,搭建的RTMP服务器只支持h264,h265需要自行修改。 这里介绍两种方式搭建RTMP服务器: 一、使用开源的SRS…

从架构层面了解Kubernetes

一. 背景 1、 为什么K8s战胜了Swarm、Mesos 从使用上来说以声明式API来降低运维的操作成本。在生态系统建设方面以极高的可扩展性来提升社区活跃度。从这两个方面既可以填充K8s的不足&#xff0c;也极大的简化了运维操作过程。 2、 架构侧面 在K8s的各种文档、书籍中都没有…

【源码共读】Vue2 中为什么可以使用 this 访问各种选项中的属性?

如何阅读源码 网上有很多关于源码阅读的文章&#xff0c;每个人都有自己的方式&#xff0c;但是网上的文章都是精炼之后的&#xff0c;告诉你哪个文件、那个函数、那个变量是干什么的&#xff1b; 但是没有告诉你这些是怎么找到的&#xff0c;这些是怎么理解的&#xff0c;这…

港科夜闻|叶玉如校长回应「香港创科发展蓝图」

关注并星标每周阅读港科夜闻建立新视野 开启新思维1、香港科大校长叶玉如教授回应「香港创科发展蓝图」。近日&#xff0c;粤港澳大湾区院士联盟表示希望特区政府切实落实「蓝图」内容&#xff0c;设立具体行动措施和可量化的指标&#xff0c;以更大的魄力和决心实现当中的目标…

C++11标准模板(STL)- 算法(std::iota)

定义于头文件 <algorithm> 算法库提供大量用途的函数&#xff08;例如查找、排序、计数、操作&#xff09;&#xff0c;它们在元素范围上操作。注意范围定义为 [first, last) &#xff0c;其中 last 指代要查询或修改的最后元素的后一个元素。 用从起始值开始连续递增的…

10.1、Django框架入门--后台管理

文章目录预备知识MVC模式和MTV模式MVC模式MTV 模式Django框架Django框架简介Django框架中的后台管理启动后台admin站点管理数据库迁移创建管理员用户管理界面本地化创建并使用一个应用bookapp项目的数据库模型创建数据库模型生成数据库表数据库上的基本操作启用后台admin站点管…

【源码共读】如何优雅的处理 Promise 的错误

Promise解决了优雅的解决了回调地域的问题&#xff0c;现在已经大范围的使用Promise&#xff0c;但是在使用Promise的过程中&#xff0c;最令人头疼的就是错误处理的方式。 Promise 的错误处理方式 据我对Promise的了解&#xff0c;Promise的错误处理分为下面的几种方式&…

324页13万字高校数字化校园大数据中心及大数据平台建设方案

一、 数据中心总体规划 云资源中心加大数据分析与高性能主要分为计算资源、内存资源、存储资源、网络资源&#xff0c;大数据分析系统&#xff0c;高性能作业调度系统&#xff0c;本项目在充分整合XXX高校数据中心资源的基础上&#xff0c;配置必要软硬件设备&#xff0c;为XXX…