由SOFARPC示例介绍基本流程和基础源码
1. Server
先看 Server 端测试方法:
public class QuickStartServer {
public static void main(String[] args) {
ServerConfig serverConfig = new ServerConfig()
.setProtocol("bolt") // 设置一个协议,默认bolt
.setPort(12200) // 设置一个端口,默认12200
.setDaemon(false); // 非守护线程
ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>()
.setInterfaceId(HelloService.class.getName()) // 指定接口
.setRef(new HelloServiceImpl()) // 指定实现
.setServer(serverConfig); // 指定服务端
providerConfig.export(); // 发布服务
}
}
在ProviderConfig
类中,获得一个服务提供者启动类,负责发布服务。
public synchronized void export() {
if (providerBootstrap == null) {
providerBootstrap = Bootstraps.from(this);
}
providerBootstrap.export();
}
com.alipay.sofa.rpc.bootstrap.Bootstraps#from(com.alipay.sofa.rpc.config.ProviderConfig)
这里看一下Bootstraps#from
方法,构造一个发布服务的包装类。
public static <T> ProviderBootstrap<T> from(ProviderConfig<T> providerConfig) {
String bootstrap = providerConfig.getBootstrap();
if (StringUtils.isEmpty(bootstrap)) {
// Use default provider bootstrap
bootstrap = RpcConfigs.getStringValue(RpcOptions.DEFAULT_PROVIDER_BOOTSTRAP);
providerConfig.setBootstrap(bootstrap);
}
ProviderBootstrap providerBootstrap = ExtensionLoaderFactory
.getExtensionLoader(ProviderBootstrap.class) // 从工厂拿到一个Loader
.getExtension(bootstrap, new Class[] { ProviderConfig.class }, new Object[] { providerConfig }); // 得到一个扩展实例
return (ProviderBootstrap<T>) providerBootstrap;
}
先学习一下ExtensionLoadeFactory
的工厂方法的写法:
/**
* All extension loader {Class : ExtensionLoader}
* ConcurrentHashMap
*/
private static final ConcurrentMap<Class, ExtensionLoader> LOADER_MAP = new ConcurrentHashMap<Class, ExtensionLoader>();
/**
* Get extension loader by extensible class with listener
*/
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> clazz, ExtensionLoaderListener<T> listener) {
ExtensionLoader<T> loader = LOADER_MAP.get(clazz);
if (loader == null) { // a
synchronized (ExtensionLoaderFactory.class) {
loader = LOADER_MAP.get(clazz);
if (loader == null) {
loader = new ExtensionLoader<T>(clazz, listener); // b
LOADER_MAP.put(clazz, loader);
}
}
}
return loader;
}
/**
* Get extension loader by extensible class without listener
*/
public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> clazz) {
return getExtensionLoader(clazz, null);
}
因为 b 处产生的 loader 一定在被初始化后才会被放进 map 中,所以不存在双重检查锁定的因指令重排导致的问题。
再看看获得实例的getExtension
方法:
public T getExtension(String alias, Class[] argTypes, Object[] args) {
ExtensionClass<T> extensionClass = getExtensionClass(alias);
if (extensionClass == null) {
throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_EXTENSION_NOT_FOUND, interfaceName, alias));
} else {
if (extensible.singleton() && factory != null) {
T t = factory.get(alias);
if (t == null) {
synchronized (this) {
t = factory.get(alias);
if (t == null) {
t = extensionClass.getExtInstance(argTypes, args);
factory.put(alias, t);
}
}
}
return t;
} else {
return extensionClass.getExtInstance(argTypes, args);
}
}
}
com.alipay.sofa.rpc.bootstrap.DefaultProviderBootstrap#export
这里看一下 default 的服务发布实现,实际还有另外几种
@Override
public void export() {
if (providerConfig.getDelay() > 0) { // 延迟加载,单位毫秒
Thread thread = factory.newThread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(providerConfig.getDelay());
} catch (Throwable ignore) { // NOPMD
}
doExport();
}
});
thread.start();
} else {
doExport();
}
}
catch 的异常可以用 ignore 代替,下面看关键的doExport
private void doExport() {
if (exported) {
return;
}
// 检查参数
checkParameters();
String appName = providerConfig.getAppName();
// key is the protocol of server,for concurrent safe
Map<String, Boolean> hasExportedInCurrent = new ConcurrentHashMap<String, Boolean>();
// 将处理器注册到server,遍历Server列表,在示例中就一个bolt\12200\not daemon
List<ServerConfig> serverConfigs = providerConfig.getServer();
for (ServerConfig serverConfig : serverConfigs) {
String protocol = serverConfig.getProtocol();
String key = providerConfig.buildKey() + ":" + protocol;
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "Export provider config : {} with bean id {}", key, providerConfig.getId());
}
// 注意同一interface,同一uniqueId,不同server情况
AtomicInteger cnt = EXPORTED_KEYS.get(key); // 计数器
if (cnt == null) { // 没有发布过
cnt = CommonUtils.putToConcurrentMap(EXPORTED_KEYS, key, new AtomicInteger(0));
}
int c = cnt.incrementAndGet();
// 注册为已发布
hasExportedInCurrent.put(serverConfig.getProtocol(), true);
// 最大发布次数
int maxProxyCount = providerConfig.getRepeatedExportLimit();
if (maxProxyCount > 0) {
if (c > maxProxyCount) {
decrementCounter(hasExportedInCurrent);
// 超过最大数量,直接抛出异常
throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_DUPLICATE_PROVIDER_CONFIG, key,
maxProxyCount));
} else if (c > 1) {
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, LogCodes.getLog(LogCodes.WARN_DUPLICATE_PROVIDER_CONFIG, key, c));
}
}
}
}
try {
// 构造请求调用器
providerProxyInvoker = new ProviderProxyInvoker(providerConfig);
preProcessProviderTarget(providerConfig, (ProviderProxyInvoker) providerProxyInvoker);
// 初始化注册中心
if (providerConfig.isRegister()) {
List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
if (CommonUtils.isNotEmpty(registryConfigs)) {
for (RegistryConfig registryConfig : registryConfigs) {
RegistryFactory.getRegistry(registryConfig); // 提前初始化Registry
}
}
}
// 将将请求调用处理器Invoker注册到server
for (ServerConfig serverConfig : serverConfigs) {
try {
Server server = serverConfig.buildIfAbsent();
// 注册请求调用器
server.registerProcessor(providerConfig, providerProxyInvoker);
if (serverConfig.isAutoStart()) {
server.start();
}
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Exception e) {
LOGGER.errorWithApp(appName,
LogCodes.getLog(LogCodes.ERROR_REGISTER_PROCESSOR_TO_SERVER, serverConfig.getId()), e);
}
}
//如果是泛型接口则需要在JSON序列化器中注册跟真实实现类的对应关系,因为反序列化时无法从泛型接口中拿到真实的数据类型
if (providerConfig.getProxyClass().getTypeParameters().length > 0) { // getTypeParameters().length > 0
String serviceName = ConfigUniqueNameGenerator.getUniqueName(providerConfig);
AbstractSerializer.registerGenericService(serviceName, providerConfig.getRef().getClass().getName());
}
// 注册到注册中心
providerConfig.setConfigListener(new ProviderAttributeListener());
register();
} catch (Exception e) {
decrementCounter(hasExportedInCurrent);
if (e instanceof SofaRpcRuntimeException) {
throw e;
}
throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_BUILD_PROVIDER_PROXY), e);
}
// 记录一些缓存数据,供后续销毁等等使用
RpcRuntimeContext.cacheProviderConfig(this);
exported = true;
}
关键的注册操作:
protected void register() {
if (providerConfig.isRegister()) {
List<RegistryConfig> registryConfigs = providerConfig.getRegistry();
if (registryConfigs != null) {
for (RegistryConfig registryConfig : registryConfigs) {
Registry registry = RegistryFactory.getRegistry(registryConfig);
// 根据具体提供的registry,调用不同的init和start
registry.init();
registry.start();
try {
// 注册
registry.register(providerConfig);
} catch (SofaRpcRuntimeException e) {
throw e;
} catch (Throwable e) {
String appName = providerConfig.getAppName();
if (LOGGER.isWarnEnabled(appName)) {
LOGGER.errorWithApp(appName,
LogCodes.getLog(LogCodes.ERROR_REGISTER_TO_REGISTRY, registryConfig.getId()), e);
}
}
}
}
}
}
2. Client
先看 Client 端测试方法:
public class QuickStartClient {
private final static Logger LOGGER = LoggerFactory.getLogger(QuickStartClient.class);
public static void main(String[] args) {
ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>()
.setInterfaceId(HelloService.class.getName()) // 指定接口
.setProtocol("bolt") // 指定协议
.setDirectUrl("bolt://127.0.0.1:12200") // 指定直连地址
.setConnectTimeout(10 * 1000);
HelloService helloService = consumerConfig.refer();
while (true) {
try {
LOGGER.info(helloService.sayHello("world"));
} catch (Exception e) {
e.printStackTrace();
}
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
直接看com.alipay.sofa.rpc.config.ConsumerConfig#refer
方法
public T refer() {
if (consumerBootstrap == null) {
// 构造服务消费者启动类
consumerBootstrap = Bootstraps.from(this);
}
// 获得代理对象
return consumerBootstrap.refer();
}
com.alipay.sofa.rpc.bootstrap.Bootstraps#from(com.alipay.sofa.rpc.config.ConsumerConfig)
这里看一下Bootstraps#from
方法,构造一个引用服务的包装类。
public static <T> ConsumerBootstrap<T> from(ConsumerConfig<T> consumerConfig) {
String bootstrap = consumerConfig.getBootstrap();
ConsumerBootstrap consumerBootstrap;
if (StringUtils.isNotEmpty(bootstrap)) {
consumerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class)
.getExtension(bootstrap,
new Class[] { ConsumerConfig.class },
new Object[] { consumerConfig });
} else {
// default is same with protocol
bootstrap = consumerConfig.getProtocol();
ExtensionLoader extensionLoader = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class);
ExtensionClass<ConsumerBootstrap> extensionClass = extensionLoader.getExtensionClass(bootstrap);
if (extensionClass == null) {
// if not exist, use default consumer bootstrap
bootstrap = RpcConfigs.getStringValue(RpcOptions.DEFAULT_CONSUMER_BOOTSTRAP);
consumerConfig.setBootstrap(bootstrap);
consumerBootstrap = ExtensionLoaderFactory.getExtensionLoader(ConsumerBootstrap.class)
.getExtension(bootstrap, new Class[] { ConsumerConfig.class }, new Object[] { consumerConfig });
} else {
consumerConfig.setBootstrap(bootstrap);
consumerBootstrap = extensionClass.getExtInstance(
new Class[] { ConsumerConfig.class }, new Object[] { consumerConfig });
}
}
return (ConsumerBootstrap<T>) consumerBootstrap;
}
com.alipay.sofa.rpc.bootstrap.DefaultConsumerBootstrap#refer
@Override
public T refer() {
if (proxyIns != null) {
return proxyIns;
}
synchronized (this) {
if (proxyIns != null) {
return proxyIns;
}
String key = consumerConfig.buildKey();
String appName = consumerConfig.getAppName();
// 检查参数
checkParameters();
// 提前检查接口类
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "Refer consumer config : {} with bean id {}", key, consumerConfig.getId());
}
// 注意同一interface,同一tags,同一protocol情况
AtomicInteger cnt = REFERRED_KEYS.get(key); // 计数器
if (cnt == null) { // 没有发布过
cnt = CommonUtils.putToConcurrentMap(REFERRED_KEYS, key, new AtomicInteger(0));
}
int c = cnt.incrementAndGet();
int maxProxyCount = consumerConfig.getRepeatedReferLimit();
if (maxProxyCount > 0) {
if (c > maxProxyCount) {
cnt.decrementAndGet();
// 超过最大数量,直接抛出异常
throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_DUPLICATE_CONSUMER_CONFIG, key,
maxProxyCount));
} else if (c > 1) {
if (LOGGER.isInfoEnabled(appName)) {
LOGGER.infoWithApp(appName, "Duplicate consumer config with key {} has been referred!"
+ " Maybe it's wrong config, please check it."
+ " Ignore this if you did that on purpose!", key);
}
}
}
try {
// build cluster
cluster = ClusterFactory.getCluster(this);
// build listeners
consumerConfig.setConfigListener(buildConfigListener(this));
consumerConfig.setProviderInfoListener(buildProviderInfoListener(this));
// init cluster
cluster.init();
// 构造Invoker对象(执行链)
proxyInvoker = buildClientProxyInvoker(this);
// 创建代理类
proxyIns = (T) ProxyFactory.buildProxy(consumerConfig.getProxy(), consumerConfig.getProxyClass(),
proxyInvoker);
//动态配置
final String dynamicAlias = consumerConfig.getParameter(DynamicConfigKeys.DYNAMIC_ALIAS);
if (StringUtils.isNotBlank(dynamicAlias)) {
final DynamicConfigManager dynamicManager = DynamicConfigManagerFactory.getDynamicManager(
consumerConfig.getAppName(), dynamicAlias);
dynamicManager.initServiceConfiguration(consumerConfig.getInterfaceId());
}
} catch (Exception e) {
if (cluster != null) {
cluster.destroy();
cluster = null;
}
consumerConfig.setConfigListener(null);
consumerConfig.setProviderInfoListener(null);
cnt.decrementAndGet(); // 发布失败不计数
if (e instanceof SofaRpcRuntimeException) {
throw (SofaRpcRuntimeException) e;
} else {
throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_BUILD_CONSUMER_PROXY), e);
}
}
if (consumerConfig.getOnAvailable() != null && cluster != null) {
cluster.checkStateChange(false); // 状态变化通知监听器
}
RpcRuntimeContext.cacheConsumerConfig(this);
return proxyIns;
}
}