ElasticSearch
1、ElasticSearch学习随笔之基础介绍
2、ElasticSearch学习随笔之简单操作
3、ElasticSearch学习随笔之java api 操作
4、ElasticSearch学习随笔之SpringBoot Starter 操作
5、ElasticSearch学习随笔之嵌套操作
6、ElasticSearch学习随笔之分词算法
7、ElasticSearch学习随笔之高级检索
8、ELK技术栈介绍
9、Logstash部署与使用
10、ElasticSearch 7.x 版本使用 BulkProcessor 实现批量添加数据
11、ElasticSearch 8.x 弃用了 High Level REST Client,移除了 Java Transport Client,推荐使用 Elasticsearch Java API
12、ElasticSearch 8.x 使用 snapshot(快照)进行数据迁移
13、ElasticSearch 8.x 版本如何使用 SearchRequestBuilder 检索
ElasticSearch,创始人 Shay Banon(谢巴农)
本文主要讲解ElasticSearch 数据快速迁移。
文章目录
- ElasticSearch
- 前言
- 一、什么是ES的快照
- 二、快照生成
- 2.1 新建仓库
- 2.1.1 Kibana操作
- 2.1.2 JAVA API操作
- 2.1.2.1 引入 pom 依赖
- 2.1.2.2 初始化客户端
- 2.1.2.3 创建仓库(参数方式)
- 2.1.2.3 创建仓库(配置文件方式)
- 2.2 生成快照
- 2.2.1 Kibana操作
- 2.2.2 JAVA API操作
- 2.3 删除快照
- 2.3.1 Kibana操作
- 2.3.2 JAVA API操作
- 三、恢复快照
- 3.1 Kibana操作
- 3.2 JAVA API操作
前言
平时在工作中,很多时候都是需要搭建好几个测试平台来保证应用的运行是否流畅,有没有 Bug,数据是否正确,一般都是 dev、pre 等测试环境,通过一系列的测试之后没有什么问题,才会把 应用 和 数据 部署到 prod 环境,应用一般都是大包 prod 环境的部署包,直接部署,而数据则就需要很长的实际来同步了,如果是一些统计咨询类的企业应用,那数据上线就有可能花费掉大量的时间了。
此篇就来浅谈一下,项目中应用到了 ElasticSearch 来做数据检索,那在项目上线的时候,我们如何快速的把大量数据快速从一个 ElasticSearch索引迁移到另一个索引库。
A snapshot is a backup of a running Elasticsearch cluster.
官网上说,一个快照就是一个备份在 Elasticsearch 集群上运行的时候,快照可以用于以下几点:
- 集群服务不停的情况下定期备份数据;
- 删除或者硬件出错后恢复数据;
- 在两台集群中间传输数据;
- 通过在冷和冻结数据层中使用可搜索的快照来降低存储成本;
那具体什么是快照呢?
一、什么是ES的快照
Elasticsearch 的快照就是指对数据和元数据的定期备份。因为快照中不仅包含了所有数据,也包含了所有的相关信息,比如:映射、配置等;这些快照可以保存在本地文件系统,也可以保存在共享文件系统或者专门存储快照的地方。
在快照的概念中,数据并不都是每次全部备份一遍,而是采用增量的方式进行进行备份。首次备份会进行全力备份,而后续的备份则是在上一次备份后的更新的数据,增量备份可以有效地减少备份所需的存储空间和节省时间。
对于 Elasticsearch
的迁移,快照和恢复则是很常用的一种强大的方式,在源集群上创建索引的快照,拷贝到其他集群后在恢复,也可以用 API 的方式来进行快照和恢复,这里就用 JAVA + Elasticsearch Client
实现,这样在我们后台应用就可以完成数据迁移了。
二、快照生成
2.1 新建仓库
2.1.1 Kibana操作
可以通过 Kibana 指令来注册快照,命令如下:
POST _snapshot/productInfo
{
"type": "fs",
"settings": {
"location": "/data/esbackup/product_info"
}
}
执行以上指令,有可能报错,一般会报如下错误:
"caused_by": {
"type": "repository_exception",
"reason": "[productInfo] location [/data/esbackup] doesn't match any of the locations specified by path.repo because this setting is empty"
}
报上述错误一般是 Elasticsearch 服务没有配置 path.repo
这个参数,只需要在 elasticsearch.yml 中配置 path.repo: /data/repository
即可。
如果报错如下:
"caused_by": {
"type": "access_denied_exception",
"reason": "/data/repository/tests-gU8mf7EvREG_1qMc3ZMApQ"
}
说明你配置的 path.repo 的路径没有访问权限,赋予权限即可:
chown -R elasticsearch:elasticsearch /data/esbackup/productInfo/
2.1.2 JAVA API操作
2.1.2.1 引入 pom 依赖
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.3.2</version>
</dependency>
注意:
再次感受到 elasticsearch-rest-high-level-client
对版本的严格区分,可能是我的 ElasticSearch 配置或者是其他原因,低版本的客户端使用 BulkProcessor 是没有问题的,但是在 获取快照 操作测报错,高版本的客户端在 获取快照时没问题,但是在 BulkProcessor
批量操作数据时报错,你的是否也有这个问题呢?
2.1.2.2 初始化客户端
/**
* 通过认证连接ES,获取客户端
*/
public static RestHighLevelClient createClient(){
String hostname = "192.168.*.*";
int port = 9200;
String username = "your username";
String password = "your password";
final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
RestClientBuilder restClientBuilder = RestClient.builder(new HttpHost(hostname, port))
.setHttpClientConfigCallback(httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider));
return new RestHighLevelClient(restClientBuilder);
}
2.1.2.3 创建仓库(参数方式)
/**
* 创建仓库
*/
public static void createRepository(RestHighLevelClient client, String repositoryName) {
GetRepositoriesRequest getRepositoriesRequest = new GetRepositoriesRequest(new String[]{repositoryName});
boolean hasRepository = false;
try {
GetRepositoriesResponse repository = client.snapshot().getRepository(getRepositoriesRequest, RequestOptions.DEFAULT);
List<RepositoryMetadata> repositories = repository.repositories();
for (RepositoryMetadata repositoryMetadata : repositories) {
if(repositoryMetadata.name().equals(repositoryName)){
hasRepository = true;
break;
}
}
} catch (ElasticsearchStatusException ee) {
System.out.println("仓库不存在:" + ee.getMessage());
} catch (IOException ioException) {
// 客户端版本略低,所以用异常捕获方式判断,索引存在会抛到这里异常
System.err.println(ioException.getMessage());
hasRepository = true;
}
if(!hasRepository){
try {
PutRepositoryRequest repositoryRequest = new PutRepositoryRequest();
repositoryRequest.name(repositoryName);
repositoryRequest.verify(false);
repositoryRequest.name(repositoryName);
repositoryRequest.type("fs");
repositoryRequest.settings(Settings.builder().put("location", "/data/esbackup/productInfo"));
AcknowledgedResponse acknowledgedResponse = client.snapshot().createRepository(repositoryRequest, RequestOptions.DEFAULT);
if (acknowledgedResponse.isAcknowledged()) {
System.out.println("创建仓库成功: " + repositoryName);
}
} catch (IOException e) {
e.printStackTrace();
}
}
try {
client.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
2.1.2.3 创建仓库(配置文件方式)
- 配置文件如下:
src/main/resources/productInfo.json
{
"type": "fs",
"settings": {
"location": "/data/esbackup/productInfo",
"compress": true
}
}
- 读取配置,创建仓库:
注意
:仓库名称 repositoryName 必须小写
/**
* 创建仓库
*/
public static void createRepository(RestHighLevelClient client, String settingPath, String repositoryName) {
GetRepositoriesRequest getRepositoriesRequest = new GetRepositoriesRequest(new String[]{repositoryName});
boolean hasRepository = false;
try {
GetRepositoriesResponse repository = client.snapshot().getRepository(getRepositoriesRequest, RequestOptions.DEFAULT);
List<RepositoryMetadata> repositories = repository.repositories();
for (RepositoryMetadata repositoryMetadata : repositories) {
if(repositoryMetadata.name().equals(repositoryName)){
hasRepository = true;
break;
}
}
} catch (ElasticsearchStatusException ee) {
System.out.println("仓库不存在:" + ee.getMessage());
} catch (IOException ioException) {
// 客户端版本略低,所以用异常捕获方式判断,索引存在会抛到这里异常
System.err.println(ioException.getMessage());
hasRepository = true;
}
if(!hasRepository){
try {
String jsonString = new String(Files.readAllBytes(Paths.get(settingPath)), StandardCharsets.UTF_8);
// 解析 json
PutRepositoryRequest repositoryRequest = new PutRepositoryRequest();
XContentParser contentParser = XContentFactory.xContent(XContentType.JSON).createParser(NamedXContentRegistry.EMPTY, DeprecationHandler.THROW_UNSUPPORTED_OPERATION, jsonString);
repositoryRequest.name(repositoryName);
repositoryRequest.verify(false);
repositoryRequest.source(contentParser.map());
AcknowledgedResponse acknowledgedResponse = client.snapshot().createRepository(repositoryRequest, RequestOptions.DEFAULT);
if (acknowledgedResponse.isAcknowledged()) {
System.out.println("创建仓库成功: " + repositoryName);
}
} catch (IOException e) {
e.printStackTrace();
}
}
try {
client.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
2.2 生成快照
2.2.1 Kibana操作
POST _snapshot/product_info/product_info
2.2.2 JAVA API操作
注:
快照名称 snapshotName 必须小写
/**
* 生成快照
*/
public static boolean createSnapshot(RestHighLevelClient client, String snapshotName, String repositoryName, String... indexes) {
CreateSnapshotRequest createSnapshotRequest = new CreateSnapshotRequest();
createSnapshotRequest.indices(indexes);
createSnapshotRequest.snapshot(snapshotName);
createSnapshotRequest.repository(repositoryName);
createSnapshotRequest.waitForCompletion(true);
createSnapshotRequest.includeGlobalState(false);
try {
CreateSnapshotResponse snapshotResponse = client.snapshot().create(createSnapshotRequest, RequestOptions.DEFAULT);
SnapshotInfo snapshotInfo = snapshotResponse.getSnapshotInfo();
if (snapshotInfo.status().getStatus() == 200) {
System.out.println("快照创建成功:" + snapshotInfo);
return true;
}
} catch (IOException e){
e.printStackTrace();
} finally {
try {
client.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return false;
}
快照生成成功,如图:
2.3 删除快照
2.3.1 Kibana操作
DELETE _snapshot/product_info
2.3.2 JAVA API操作
/**
* 删除快照
*/
public static boolean deleteSnapshot(RestHighLevelClient client, String repositoryName) {
DeleteSnapshotRequest deleteSnapshotRequest = new DeleteSnapshotRequest();
deleteSnapshotRequest.snapshots(repositoryName);
deleteSnapshotRequest.repository(repositoryName);
try {
AcknowledgedResponse deleteSnapshotResponse = client.snapshot().delete(deleteSnapshotRequest, RequestOptions.DEFAULT);
if (deleteSnapshotResponse.isAcknowledged()) {
System.out.println("快照删除成功!");
return true;
}
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
client.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return false;
}
三、恢复快照
3.1 Kibana操作
POST _snapshot/product_info/product_info/_restore
{
"indices": "product_info"
}
3.2 JAVA API操作
/**
* 恢复快照
*/
public static void restoreSnapshot(RestHighLevelClient client, String snapshotName, String repositoryName, String indexes){
RestoreSnapshotRequest restoreSnapshotRequest = new RestoreSnapshotRequest();
restoreSnapshotRequest.snapshot(snapshotName);
restoreSnapshotRequest.repository(repositoryName);
restoreSnapshotRequest.indices(indexes);
try {
RestoreSnapshotResponse restoreSnapshotResponse = client.snapshot().restore(restoreSnapshotRequest, RequestOptions.DEFAULT);
System.out.println("快照恢复成功:" + restoreSnapshotResponse.getRestoreInfo().toString());
} catch (IOException e) {
e.printStackTrace();
} finally {
try {
client.close();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}