恕我直言,我也是才知道ElasticSearch条件更新是这么玩的
背景
ElasticSearch 的使用度越来越普及了,很多公司都在使用。有做日志搜索的,有做商品搜索的,有做订单搜索的。
大部分使用场景都是通过程序定期去导入数据到 ElasticSearch 中,或者通过 CDC 的方式来构建索引。在这种场景下,更新数据都是单条更新,比如 ID=1 的数据发生了修改操作,那么就会把 ElasticSearch 中 ID=1 的这条数据更新下。
但有些场景下需要根据条件同时更新多条数据,就像 Mysql 中我们使用 Update Table Set Name=XXX where Age=18 去更新一批数据一样。
正好有同学微信问我怎么批量更新,接下来就看看在 ElasticSearch 中是如何去进行按条件更新的操作。
单条更新
ElasticSearch 的客户端官方推荐使用 elasticsearch-rest-high-level-client。所以本文也是基于 elasticsearch-rest-high-level-client 来构建代码。
首先来回顾下单条数据的更新是怎么做的,代码如下:
UpdateRequest updateRequest = new UpdateRequest(index, type, id); updateRequest.doc(documentJson, XContentType.JSON); restHighLevelClient.update(updateRequest, options);
构建 UpdateRequest 的时候就指定了索引,类型,ID 三个字段,也就精确到了某一条数据,所以更新的自然也是这一条数据。
条件更新
首先我们准备几条测试数据,如下:
{ id: 1, title: "Java怎么学", type: 1, userId: 1, tags: [ "java" ], textContent: "我要学Java", status: 1, heat: 100 } { id: 2, title: "Java怎么学", type: 1, userId: 1, tags: [ "java" ], textContent: "我要学Java", status: 1, heat: 100 }
假如我们的需求是将 userId=1 的所有文档数据改成无效,也就是 status=0。如果不用按条件更新,你就得查询出 userId=1 的所有数据,然后一条条更新,这就太慢了。
下面看看按条件更新是如何使用的,如下:
POST http://47.105.66.210:9200/article_v1/doc/_update_by_query { "script": { "source":"ctx._source['status']=0;" }, "query": { "term": { "userId": 1 } } }
按条件更新需要使用_update_by_query 来进行,query 用于指定更新数据的匹配条件,script 用于更新的逻辑。
详细使用文档:
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html[1]
https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting-using.html[2]
在 Java 代码中如何实现条件更新呢?
UpdateByQueryRequest request = new UpdateByQueryRequest("article_v1"); request.setQuery(new TermQueryBuilder("userId", 1)); request.setScript(new Script("ctx._source['status']=0;")); restHighLevelClient.updateByQuery(request, RequestOptions.DEFAULT);
是不是也很简单,跟单条数据更新差不多,使用 UpdateByQueryRequest 构建更新对象,然后设置 Query 和 Script 就可以了。
条件更新数组
比如我们的需求是要移除 tags 中的 java,如下:
POST http://47.105.66.210:9200/article_v1/doc/_update_by_query { "script": { "source":"ctx._source['tags'].removeIf(item -> item == 'java');" }, "query": { "term": { "userId": 1 } } }
新增的话只需要将 removeIf 改成 add 就可以了。
ctx._source['tags'].add('java');
如果有特殊的业务逻辑,Script 中还可以写判断来判断是否需要修改。
POST http://47.105.66.210:9200/article_v1/doc/_update_by_query { "script": { "source":"if(ctx._source.type == 11) {ctx._source['tags'].add('java');}" }, "query": { "term": { "userId": 1 } } }
封装通用的条件更新
大部分场景下的更新都比较简单,根据某个字段去更新某个值,或者去更新多个值。在 Java 中如果每个地方都去写脚本,就重复了,最好是抽一个比较通用的方法来更新。
下面是简单的示列,其中还有很多需要考虑的点,像数据类型我只处理了数字,字符串,和 List,其他的大家需要自己去扩展。
public BulkByScrollResponse updateByQuery(String index, QueryBuilder query, Map<String, Object> document) { UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(index); updateByQueryRequest.setQuery(query); StringBuilder script = new StringBuilder(); Set<String> keys = document.keySet(); for (String key : keys) { String appendValue = ""; Object value = document.get(key); if (value instanceof Number) { appendValue = value.toString(); } else if (value instanceof String) { appendValue = "'" + value.toString() + "'"; } else if (value instanceof List){ appendValue = JsonUtils.toJson(value); } else { appendValue = value.toString(); } script.append("ctx._source.").append(key).append("=").append(appendValue).append(";"); } updateByQueryRequest.setScript(new Script(script.toString())); return updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT); } public BulkByScrollResponse updateByQuery(UpdateByQueryRequest updateByQueryRequest, RequestOptions options) { Map<String, Object> catData = new HashMap<>(1); catData.put(ElasticSearchConstant.UPDATE_BY_QUERY_REQUEST, updateByQueryRequest.toString()); return CatTransactionManager.newTransaction(() -> { try { return restHighLevelClient.updateByQuery(updateByQueryRequest, options); }catch (IOException e) { throw new RuntimeException(e); } }, ElasticSearchConstant.ES_CAT_TYPE, ElasticSearchConstant.UPDATE, catData); }
如果有了这么一个方法,那么使用方式如下:
@Test public void testUpdate5() { Map<String, Object> document = new HashMap<>(); document.put("title", "Java"); document.put("status", 0); document.put("tags", Lists.newArrayList("JS", "CSS")); kittyRestHighLevelClient.updateByQuery(elasticSearchIndexConfig.getArticleSaveIndexName(), new TermQueryBuilder("userId", 1), document); }
关于作者:尹吉欢,简单的技术爱好者,《Spring Cloud 微服务-全栈技术与案例解析》, 《Spring Cloud 微服务 入门 实战与进阶》作者, 公众号 猿天地 发起人。
参考资料
[1]
docs-update-by-query.html: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html
[2]
modules-scripting-using.html: https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting-using.html

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
不容错过!私藏的几个学操作系统好东西
本文转载自微信公众号「编程技术宇宙」,作者轩辕之风 。转载本文请联系编程技术宇宙公众号。 大家可能读过不少的操作系统的文章,不过这其中很多都是大谈操作系统的理论技术,鲜有探讨现代操作系统实际的实现。 你可能知道线程的几个状态和状态的变化,但你知道Linux中是如何进行线程调度的吗? 你可能知道线程的栈是怎么进进出出,但你知道Linux的线程栈如何分配管理,栈的最下面藏了什么秘密吗? 你可能对TCP/IP协议了然于胸,但你知道Linux中是如何实现TCP协议吗,跟RFC规范又有哪些不同? 你可能知道Signal机制,但你知道内核是如何派发信号和执行信号处理函数的吗? 你可能知道操作系统内存管理技术,但你知道Linux和Windows同样在x86架构下的实现有哪些区别吗? ······ 学一项技术,不能只是纸上谈兵,知其然还能知其所以然,还能知如何以然,方达至高境界。 阅读操作系统源代码,是学习操作系统的不二法门。下面就最常见的Linux和Windows系统分别介绍一下。 Linux Linux属于开源系统,想获取源码轻而易举,不过这里轩辕推荐一个我私藏的网站,不需要你科学上网,不需要你下...
- 下一篇
Node.js v14.7.0 (Current) 发布
Node.js v14.7.0 已发布,增加了不少新特性: deps: 将 npm 升级至6.14.7 #34468 dgram: (SEMVER-MINOR)将 IPv6 scope id 后缀添加到接收到的 udp6 数据报#14500 src: (SEMVER-MINOR)支持阻止SetPromiseRejectCallback#34387 (SEMVER-MINOR)支持为所有诊断输出设置目录#33584 worker: (SEMVER-MINOR)使 MessagePort 继承自 EventTarget#34057 zlib: 针对 zlib 流切换至延迟初始化(lazy init)#34048 New Collaborators: 将 rexagod 添加到协作者中 #34457 将 AshCripps添加到协作者中#34494 将 HarshithaKP添加到协作者中 #34417 为 Richard Lau 添加发布密钥 #34397 其他的 commit 主要是对文档和测试功能进行优化,以及改进构建和编译相关的功能,详情查看发布公告。
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- 设置Eclipse缩进为4个空格,增强代码规范
- CentOS8编译安装MySQL8.0.19
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- CentOS关闭SELinux安全模块
- CentOS8,CentOS7,CentOS6编译安装Redis5.0.7
- SpringBoot2整合Redis,开启缓存,提高访问速度
- CentOS7安装Docker,走上虚拟化容器引擎之路
- Docker使用Oracle官方镜像安装(12C,18C,19C)