恕我直言,我也是才知道ElasticSearch条件更新是这么玩的

共 5262字,需浏览 11分钟

 ·

2020-07-31 22:24

背景

ElasticSearch 的使用度越来越普及了,很多公司都在使用。有做日志搜索的,有做商品搜索的,有做订单搜索的。

大部分使用场景都是通过程序定期去导入数据到 ElasticSearch 中,或者通过 CDC 的方式来构建索引。在这种场景下,更新数据都是单条更新,比如 ID=1 的数据发生了修改操作,那么就会把 ElasticSearch 中 ID=1 的这条数据更新下。

但有些场景下需要根据条件同时更新多条数据,就像 Mysql 中我们使用 Update Table Set Name=XXX where Age=18 去更新一批数据一样。

正好有同学微信问我怎么批量更新,接下来就看看在 ElasticSearch 中是如何去进行按条件更新的操作。

单条更新

ElasticSearch 的客户端官方推荐使用 elasticsearch-rest-high-level-client。所以本文也是基于 elasticsearch-rest-high-level-client 来构建代码。

首先来回顾下单条数据的更新是怎么做的,代码如下:

UpdateRequest updateRequest = new UpdateRequest(index, type, id);
updateRequest.doc(documentJson, XContentType.JSON);
restHighLevelClient.update(updateRequest, options);

构建 UpdateRequest 的时候就指定了索引,类型,ID 三个字段,也就精确到了某一条数据,所以更新的自然也是这一条数据。

条件更新

首先我们准备几条测试数据,如下:

{
    id: 1,
    title: "Java怎么学",
    type: 1,
    userId: 1,
    tags: [
        "java"
    ],
    textContent: "我要学Java",
    status: 1,
    heat: 100
}
{
    id: 2,
    title: "Java怎么学",
    type: 1,
    userId: 1,
    tags: [
        "java"
    ],
    textContent: "我要学Java",
    status: 1,
    heat: 100
}

假如我们的需求是将 userId=1 的所有文档数据改成无效,也就是 status=0。如果不用按条件更新,你就得查询出 userId=1 的所有数据,然后一条条更新,这就太慢了。

下面看看按条件更新是如何使用的,如下:

POST http://47.105.66.210:9200/article_v1/doc/_update_by_query
{
    "script": {
        "source":"ctx._source['status']=0;"
    },
    "query": {
        "term": {
            "userId": 1
        }
    } 
}

按条件更新需要使用_update_by_query 来进行,query 用于指定更新数据的匹配条件,script 用于更新的逻辑。

详细使用文档:

https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html[1]

https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting-using.html[2]

在 Java 代码中如何实现条件更新呢?

UpdateByQueryRequest request = new UpdateByQueryRequest("article_v1");
request.setQuery(new TermQueryBuilder("userId", 1));
request.setScript(new Script("ctx._source['status']=0;"));
restHighLevelClient.updateByQuery(request, RequestOptions.DEFAULT);

是不是也很简单,跟单条数据更新差不多,使用 UpdateByQueryRequest 构建更新对象,然后设置 Query 和 Script 就可以了。

条件更新数组

比如我们的需求是要移除 tags 中的 java,如下:

POST http://47.105.66.210:9200/article_v1/doc/_update_by_query
{
    "script": {
        "source":"ctx._source['tags'].removeIf(item -> item == 'java');"
    },
    "query": {
        "term": {
            "userId": 1
        }
    } 
}

新增的话只需要将 removeIf 改成 add 就可以了。

ctx._source['tags'].add('java');

如果有特殊的业务逻辑,Script 中还可以写判断来判断是否需要修改。

POST http://47.105.66.210:9200/article_v1/doc/_update_by_query
{
    "script": {
        "source":"if(ctx._source.type == 11) {ctx._source['tags'].add('java');}"
    },
    "query": {
        "term": {
            "userId": 1
        }
    } 
}

封装通用的条件更新

大部分场景下的更新都比较简单,根据某个字段去更新某个值,或者去更新多个值。在 Java 中如果每个地方都去写脚本,就重复了,最好是抽一个比较通用的方法来更新。

下面是简单的示列,其中还有很多需要考虑的点,像数据类型我只处理了数字,字符串,和 List,其他的大家需要自己去扩展。

public BulkByScrollResponse updateByQuery(String index, QueryBuilder query, Map document) {
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(index);
updateByQueryRequest.setQuery(query);
StringBuilder script = new StringBuilder();
Set keys = document.keySet();
for (String key : keys) {
String appendValue = "";
Object value = document.get(key);
if (value instanceof Number) {
appendValue = value.toString();
} else if (value instanceof String) {
appendValue = "'" + value.toString() + "'";
} else if (value instanceof List){
appendValue = JsonUtils.toJson(value);
} else {
appendValue = value.toString();
}
script.append("ctx._source.").append(key).append("=").append(appendValue).append(";");
}
updateByQueryRequest.setScript(new Script(script.toString()));
return updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
}
public BulkByScrollResponse updateByQuery(UpdateByQueryRequest updateByQueryRequest, RequestOptions options) {
Map catData = new HashMap<>(1);
catData.put(ElasticSearchConstant.UPDATE_BY_QUERY_REQUEST, updateByQueryRequest.toString());
return CatTransactionManager.newTransaction(() -> {
try {
return restHighLevelClient.updateByQuery(updateByQueryRequest, options);
}catch (IOException e) {
throw new RuntimeException(e);
}
}, ElasticSearchConstant.ES_CAT_TYPE, ElasticSearchConstant.UPDATE, catData);
}

如果有了这么一个方法,那么使用方式如下:

@Test
public void testUpdate5() {
Map document = new HashMap<>();
document.put("title", "Java");
document.put("status", 0);
document.put("tags", Lists.newArrayList("JS", "CSS"));
kittyRestHighLevelClient.updateByQuery(elasticSearchIndexConfig.getArticleSaveIndexName(), new TermQueryBuilder("userId", 1), document);
}

关于作者:尹吉欢,简单的技术爱好者,《Spring Cloud 微服务-全栈技术与案例解析》, 《Spring Cloud 微服务 入门 实战与进阶》作者, 公众号 猿天地 发起人。

参考资料

[1]

docs-update-by-query.html: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html

[2]

modules-scripting-using.html: https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting-using.html

1. 人人都能看懂的 6 种限流实现方案!

2. 一个空格引发的“惨案“

3大型网站架构演化发展历程

4Java语言“坑爹”排行榜TOP 10

5. 我是一个Java类(附带精彩吐槽)

6. 看完这篇Redis缓存三大问题,保你能和面试官互扯

7. 程序员必知的 89 个操作系统核心概念

8. 深入理解 MySQL:快速学会分析SQL执行效率

9. API 接口设计规范

10. Spring Boot 面试,一个问题就干趴下了!



扫码二维码关注我


·end·

—如果本文有帮助,请分享到朋友圈吧—

我们一起愉快的玩耍!



你点的每个赞,我都认真当成了喜欢

浏览 105
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报
评论
图片
表情
推荐
点赞
评论
收藏
分享

手机扫一扫分享

分享
举报