一、前言
上篇文章我们了解了ES的插入和批量插入文档的操作,分别通过ES的kibana客户端以及Java高级Rest客户端进行学习,那么本篇则进入到对文档的修改操作,同新增文档,也有更新单条文档和批量更新文档操作,但还多出一个根据条件更新文档,我们本篇均会涉及到。
二、更新文档
2.1、更新单条文档
在ES中更新索引的请求类型是POST,其请求形式如下:
POST /${index_name}/_update/${_id}
{
.... //需要更新的数据,在URL中指定文档_id
}
上面的_id就是将要修改的ES文档中的_Id,修改后的字段和值将会填写到大括号中,其格式是JSON形式。例如把_id为017的文档中的city修改成下面的数据:
POST /hotel/_update/017
{
"doc": {
"city":"南昌"
}
}
ES返回结果如下图:
通过结果可知,已经成功更新文档信息,并且本次修改后文档的版本变为2.下面根据_id搜索文档的命令进行验证:
GET /hotel/_doc/017
ES返回内容如下:
通过返回结果可知,文档017的对应字段已经被修改为目标数据。并且要注意的是,我只修改了city,其他的没有修改的数据并没有改变,仍然保持原值。
那么在这里需要提一下,虽然上篇文章也提到过,就是另一个命令也可以起到修改的作用,它就是写入文档的命令:
POST /${index_name}/_doc/${_id}
{
//需要修改的文档数据
}
我们如果需要通过该命令修改指定索引的文档,只需要将_id改成该索引中需要修改的文档的id即可,那么执行该命令后,结果不再是created,而是updated,并且使version+1.
例如,我将文档018的内容修改如下,我也只修改一个city,原文档18的内容如下
然后执行如下命令,只修改city:
POST /hotel/_doc/018
{
"city":"厦门"
}
但是此时查看文档,发现只剩下city属性了,意味着其他没有变动的属性,该命令会视为改为null,这是和前面update命令的区别所在,update命令不会影响没有变动的属性,仍然保持原值:
除了普通的update功能,ES还提供了upsert.upsert即是update和insert的合体字,表示更新/插入数据。如果目标文档存在,则执行更新逻辑;否则执行插入逻辑。以下DSL演示了upsert的应用:
POST /hotel/_update/030
{
"doc": {
"city":"南昌"
},
"upsert": {
"city":"厦门"
}
}
那么文档030不存在,所以执行后会新增该文档:
在Java高级REST客户端中,更新单条文档需要创建UpdateRequest对象并设置对应的索引和_id字段名称,执行时,调用客户端的update()方法并把UpdateRequest对象传入即可。update()方法返回UpdateResponse对象,通过该对象可以获取当前请求的索引名称,文档_Id和版本号等。以下代码演示了向索引中添加单条文档的方法:
那么我们首先在service层建立ESUpdateDocService类,注入client后,写入以下代码,需要说明的是,如果需要使用upsert功能需要在调用update()方法之前将可能需要插入的map对象传入upsert方法即可:
public Map<String, Object> singleUpsert(String indexName, String docIdKey, Map<String, Object> recordMap) {
String docId = recordMap.get(docIdKey).toString();
//将ID字段从map中移除,这步可有可无
recordMap.remove(docIdKey);
UpdateRequest updateRequest = new UpdateRequest(indexName, docId);
//如果有则进行修改,没有该文档则插入,可以支持链式编程
updateRequest.doc(recordMap).upsert(recordMap);
try {
UpdateResponse updateResponse = client.update(updateRequest, RequestOptions.DEFAULT);
HashMap<String, Object> resultMap = new HashMap<>();
String id = updateResponse.getId(); //文档ID
String index = updateResponse.getIndex(); //索引名称
long version = updateResponse.getVersion(); //文档版本
resultMap.put("id", id);
resultMap.put("index", index);
resultMap.put("version", version);
return resultMap;
} catch (IOException e) {
log.warn(e.getMessage());
throw new SearchException("搜索错误,原因:" + e.getMessage());
}
}
在controller层建立ESUpdateController,然后建立单条修改文档的方法,代码如下:
@PostMapping("/update/doc")
public FoundationResponse<Map<String, Object>> singleUpdate(@RequestBody HotelDocRequest hotelDocRequest) {
String indexName = hotelDocRequest.getIndexName();
if (CharSequenceUtil.isBlank(indexName)) {
return FoundationResponse.error(100, "索引名不能为空");
}
Hotel hotel = hotelDocRequest.getHotel();
HashMap<String, Object> dataMap = new HashMap<>();
//这里对比之前的插入单条文档,需要多加入一个id
dataMap.put("id", hotel.getId());
dataMap.put("title", hotel.getTitle());
dataMap.put("city", hotel.getCity());
dataMap.put("price", hotel.getPrice());
try {
Map<String, Object> resultMap = esUpdateDocService.singleUpsert(indexName, hotelDocRequest.getDocIdKey(), dataMap);
return FoundationResponse.success(resultMap);
} catch (SearchException e) {
log.warn("搜索发生异常,原因为:{}", e.getMessage());
return FoundationResponse.error(100, e.getMessage());
} catch (Exception e) {
log.error("服务发生异常,原因为:{}", e.getMessage());
return FoundationResponse.error(100, e.getMessage());
}
}
postman中执行该接口,body内容如下:
{
"hotel": {
"id": "020",
"title": "可莉酒店3",
"city": "上海",
"price": 648
},
"indexName":"hotel",
"docIdKey":"id"
}
2.2、批量更新文档
与批量写入文档相似,批量更新文档的请求形式如下:
POST /_bulk
{"update":{"_index":"${index_name}","_id":"${_id}"}}
{"doc":{"修改的json数据"},"upsert":{"需要插入的json数据"}
{"update":{"_index":"${index_name}","_id":"${_id}"}}
{"doc":{"修改的json数据"},"upsert":{"需要插入的json数据"}
注意,与批量写入文档不同的是,批量更新文档必须在元数据中填写需要更新的文档_id.且与单条文档类似的是,同样也可以加入upsert功能。下面的DSL将批量更新_id为001和002的文档:
POST /_bulk
{"update":{"_index":"hotel_order","_id":"004"}}
{"doc":{"username":"Mike JorDan"},"upsert":{"username":"Mike JorDan"}}
{"update":{"_index":"hotel_order","_id":"002"}}
{"doc":{"username":"Tom JorDan"}}
{"update":{"_index":"hotel_order","_id":"003"}}
{"doc":{"username":"Kobi JorDan"}}
在java客户端接口中,批量更新文档需要创建BulkRequest对象并设置对应的索引名称,这一点和批量写入是相同的。对于多条需要更新的文档,可构建多个UpdateRequest对象并调用BulkRequest.add()方法添加这些UpdateRequest对象,执行时,调用客户端的bulk()方法并把BulkRequest对象传入即可。
首先在service层写批量更新的方法:
public String bulkUpdate(HotelDocRequest hotelDocRequest) {
String indexName = hotelDocRequest.getIndexName();
if (CharSequenceUtil.isBlank(indexName)) {
throw new SearchException("索引名不能为空");
}
BulkRequest bulkRequest = new BulkRequest();
List<Map<String, Object>> recordMapList = hotelDocRequest.getRecordMapList();
for (Map<String, Object> dataMap : recordMapList) {
String docIdKey = hotelDocRequest.getDocIdKey();
String docId = dataMap.get(docIdKey).toString();
//将ID字段从map中移除,这步可有可无,这个操作和单条修改的基本一致
dataMap.remove(docIdKey);
bulkRequest.add(new UpdateRequest(indexName, docId).doc(dataMap).upsert(dataMap));
}
BulkResponse bulkResponse;
try {
bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
if (bulkResponse.hasFailures()) {
return "失败,原因:" + bulkResponse.buildFailureMessage();
} else {
return "成功";
}
} catch (IOException e) {
throw new SearchException("批量修改服务错误");
}
}
然后controller层调用service层,这里和之前一样,也是通过后端做一次转化,前台就无需输入复杂的map json串
@PostMapping("/bulk/update/doc")
public FoundationResponse<String> bulkUpdateDoc(@RequestBody HotelDocRequest hotelDocRequest) {
List<Hotel> hotelList = hotelDocRequest.getHotelList();
if (CollUtil.isEmpty(hotelList)) {
return FoundationResponse.error(100, "无可修改的有效文档");
}
//这里之所以转化是因为json输入List<Map<k,v>>这个结构非常复杂,所以由后端这边做一次转化,这样前台只需要输入List<Hotel>的json
ArrayList<Map<String, Object>> recordListMap = new ArrayList<>();
hotelList.forEach(hotel -> {
HashMap<String, Object> dataMap = new HashMap<>();
//这里对比之前的插入单条文档,需要多加入一个id
dataMap.put("id", hotel.getId());
dataMap.put("title", hotel.getTitle());
dataMap.put("city", hotel.getCity());
dataMap.put("price", hotel.getPrice());
recordListMap.add(dataMap);
});
hotelDocRequest.setRecordMapList(recordListMap);
try {
String s = esUpdateDocService.bulkUpdate(hotelDocRequest);
return FoundationResponse.success(s);
} catch (SearchException e) {
log.warn("批量修改发生异常,原因为:{}", e.getMessage());
return FoundationResponse.error(100, e.getMessage());
} catch (Exception e) {
log.error("服务发生异常,原因为:{}", e.getMessage());
return FoundationResponse.error(100, e.getMessage());
}
}
postman中执行该接口,body中输入以下内容:
{
"hotelList": [
{
"id": "021",
"title": "可莉酒店4",
"city": "上海",
"price": 648
},
{
"id": "018",
"title": "可莉酒店5",
"city": "上海",
"price": 648
}
],
"docIdKey":"id",
"indexName":"hotel"
}
执行成功:
2.3、根据条件更新文档
在索引数据的更新操作中,有些场景需要根据某些条件同时更新多条数据,类似于在关系型数据库中使用update table table_name set … where … 更新一批数据。为了满足这样的需求,ES为用户提供了_update_by_query功能,其请求形式如下:
POST /${index_name}/_update_by_query
{
"query": {
... //条件更新的查询条件
},
"script": {
... //条件更新的具体更新脚本代码
}
}
上面的query用于指定更新数据的匹配条件,相当于SQL中的where语句;script用于指定具体的更新操作,相当于SQL的set内容。script的知识点将在后面的章节中进行介绍,这里仅简单应用一下,请求的DSL如下:
POST /hotel/_update_by_query
{
"query": {
"term":{
"city": {
"value": "上海" //更新文档的查询条件:城市为上海的文档
}
}
},
"script": {
"source": "ctx._source['price']='6480'", //条件更新的更新脚本,将price改为6480
"lang": "painless"
}
}
执行以上DSL后,ES将先搜素城市为“上海”的酒店,然后把这些酒店的价格改为6480.
在Java高级客户端,执行根据条件更新文档,需要创建UpdateByQueryRequest对象并设置对应的索引名称,类似于DSL中的query子句,通过调用UpdateByQueryRequest.setQuery()方法设置查询逻辑,script子句通过UpdateByQueryRequest.setScript()方法设置更新逻辑,然后执行客户端的updateByQuery()方法并把UpdateByQueryRequest对象传入即可。一下代码演示了根据城市字段查找文档然后更新价格字段的方法,首先在service层操作:
public String updatePriceByCity(String indexName, String oldCity, String newPrice) {
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(indexName);
//设置按照城市查找文档的query
updateByQueryRequest.setQuery(new TermQueryBuilder("city", oldCity));
updateByQueryRequest.setScript(new Script("ctx._source['price']='" + newPrice + "';"));
try {
BulkByScrollResponse response = client.updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
return response.toString();
} catch (IOException e) {
throw new SearchException("按照条件修改服务错误");
}
}
然后controller层调用service方法:
@PostMapping("/update/byCity")
public FoundationResponse<String> updatePriceByQueryCity(String indexName, String oldCity, String newPrice) {
if (CharSequenceUtil.isBlank(indexName)) {
throw new SearchException("索引名不能为空");
}
try {
String result = esUpdateDocService.updatePriceByCity(indexName, oldCity, newPrice);
return FoundationResponse.success(result);
} catch (SearchException e) {
log.warn("搜索发生异常,原因为:{}", e.getMessage());
return FoundationResponse.error(100, e.getMessage());
} catch (Exception e) {
log.error("服务发生异常,原因为:{}", e.getMessage());
return FoundationResponse.error(100, e.getMessage());
}
}
postman调用该接口
如果更新所有文档中的某个字段应该如何操作呢?其实,_update_by_query中的query子句可以不定义,这种情况下ES会选中所有的文档执行script中的内容。以下为修改所有酒店中城市为"上海"的DSL:
POST /hotel/_update_by_query
{
"script": {
"source": "ctx._source['city']='上海'",
"lang": "painless"
}
}