Springboot整合ElasticSearch客户端的源码解析
ElasticSearch版本
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
<version>2.2.5.RELEASE</version>
</dependency>
创建RestHighLevelClient
RestClientConfigurations
会注入RestHighLevelClient
,该实例的创建是用RestClientBuilder
来构造的。
@Bean
@ConditionalOnMissingBean
RestHighLevelClient elasticsearchRestHighLevelClient(RestClientBuilder restClientBuilder) {
return new RestHighLevelClient(restClientBuilder);
}
- RestClientBuilder的创建是RestClientProperties的实例创建出来的,
RestClientProperties
是带有注解@ConfigurationProperties(prefix = "spring.elasticsearch.rest")
,会将配置文件中spring.elasticsearch.rest
的前缀属性加载到RestClientProperties。该实体属性映射配置,可以获取到es服务器的ip地址和端口。
@Bean
@ConditionalOnMissingBean
RestClientBuilder elasticsearchRestClientBuilder(RestClientProperties properties, ObjectProvider<RestClientBuilderCustomizer> builderCustomizers) {
HttpHost[] hosts = (HttpHost[])properties.getUris().stream().map(HttpHost::create).toArray((x$0) -> {
return new HttpHost[x$0];
});
RestClientBuilder builder = RestClient.builder(hosts);
PropertyMapper map = PropertyMapper.get();
map.from(properties::getUsername).whenHasText().to((username) -> {
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
Credentials credentials = new UsernamePasswordCredentials(properties.getUsername(), properties.getPassword());
credentialsProvider.setCredentials(AuthScope.ANY, credentials);
builder.setHttpClientConfigCallback((httpClientBuilder) -> {
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
});
});
builder.setRequestConfigCallback((requestConfigBuilder) -> {
properties.getClass();
map.from(properties::getConnectionTimeout).whenNonNull().asInt(Duration::toMillis).to(requestConfigBuilder::setConnectTimeout);
properties.getClass();
map.from(properties::getReadTimeout).whenNonNull().asInt(Duration::toMillis).to(requestConfigBuilder::setSocketTimeout);
return requestConfigBuilder;
});
builderCustomizers.orderedStream().forEach((customizer) -> {
customizer.customize(builder);
});
return builder;
}
扫描注册ElasticsearchRepositoryFactoryBean
-
启动类上增加注解,
@EnableElasticsearchRepositories(basePackages = {"*"})
。会往Spring容器中加入ElasticsearchRepositoriesRegistrar
,该类会重写RepositoryBeanDefinitionRegistrarSupport#registerBeanDefinitions()
,根据注解上的basePackages
扫描,往容器中添加ElasticsearchRepositoryFactoryBean
类型的Bean。 -
ElasticsearchRepositoryFactoryBean
用来返回代理对象,核心方法是RepositoryFactorySupport#getRepository()
。
public <T> T getRepository(Class<T> repositoryInterface, RepositoryFragments fragments) {
if (LOG.isDebugEnabled()) {
LOG.debug("Initializing repository instance for {}…", repositoryInterface.getName());
}
Assert.notNull(repositoryInterface, "Repository interface must not be null!");
Assert.notNull(fragments, "RepositoryFragments must not be null!");
RepositoryMetadata metadata = getRepositoryMetadata(repositoryInterface);
RepositoryComposition composition = getRepositoryComposition(metadata, fragments);
RepositoryInformation information = getRepositoryInformation(metadata, composition);
validate(information, composition);
Object target = getTargetRepository(information);
// Create proxy
ProxyFactory result = new ProxyFactory();
result.setTarget(target);
result.setInterfaces(repositoryInterface, Repository.class, TransactionalProxy.class);
if (MethodInvocationValidator.supports(repositoryInterface)) {
result.addAdvice(new MethodInvocationValidator());
}
result.addAdvisor(ExposeInvocationInterceptor.ADVISOR);
postProcessors.forEach(processor -> processor.postProcess(result, information));
if (DefaultMethodInvokingMethodInterceptor.hasDefaultMethods(repositoryInterface)) {
result.addAdvice(new DefaultMethodInvokingMethodInterceptor());
}
ProjectionFactory projectionFactory = getProjectionFactory(classLoader, beanFactory);
result.addAdvice(new QueryExecutorMethodInterceptor(information, projectionFactory));
composition = composition.append(RepositoryFragment.implemented(target));
result.addAdvice(new ImplementationMethodExecutionInterceptor(composition));
T repository = (T) result.getProxy(classLoader);
if (LOG.isDebugEnabled()) {
LOG.debug("Finished creation of repository instance for {}.", repositoryInterface.getName());
}
return repository;
}
创建索引和设置Mapping
SimpleElasticsearchRepository
,该类的构造函数会调用父类的构造方法进行初始化。该方法会进行创建索引和设置Mapping。
public AbstractElasticsearchRepository(ElasticsearchEntityInformation<T, ID> metadata,
ElasticsearchOperations elasticsearchOperations) {
this(elasticsearchOperations);
Assert.notNull(metadata, "ElasticsearchEntityInformation must not be null!");
this.entityInformation = metadata;
setEntityClass(this.entityInformation.getJavaType());
try {
if (createIndexAndMapping()) {
createIndex();
putMapping();
}
} catch (ElasticsearchException exception) {
LOGGER.error("failed to load elasticsearch nodes : {}", exception.getDetailedMessage());
}
}
- 创建索引,会判断索引是否存在。
public boolean exists(org.elasticsearch.action.admin.indices.get.GetIndexRequest request, RequestOptions options) throws IOException {
return restHighLevelClient.performRequest(
request,
IndicesRequestConverters::indicesExist,
options,
RestHighLevelClient::convertExistsResponse,
Collections.emptySet()
);
}
- 发送请求,
RestHighLevelClient#performRequest()
。核心会执行到RestClient#performRequestAsync()
,该方法使用的是httpClient发送请求。