elasticsearch使用指南之Elasticsearch Document Multi Get API、Bulk API详解、原理与示...
作者简介:《RocketMQ技术内幕》作者、中间件兴趣圈微信公众号维护者。
本文将详细介绍elasticsearch批量获取API(Multi Get API)和Bulk API的使用。
1、Multi Get API
详细API如下:
- public final MultiGetResponse mget(MultiGetRequest multiGetRequest, RequestOptions options) throws IOException
- public final void mgetAsync(MultiGetRequest multiGetRequest, RequestOptions options, ActionListener listener)
其核心需要关注MultiGetRequest 。
从上面所知,mget及批量获取文档,通过add方法添加多个Item,每一个item代表一个文件获取请求,其相关字段已在get API中详细介绍,这里就不做过多详解。
Mget API使用示例
public static void testMget() { RestHighLevelClient client = EsClient.getClient(); try { MultiGetRequest request = new MultiGetRequest(); request.add("twitter", "_doc", "10"); request.add("twitter", "_doc", "11"); request.add("twitter", "_doc", "12"); request.add("gisdemo", "_doc", "10"); MultiGetResponse result = client.mget(request, RequestOptions.DEFAULT); System.out.println(result); } catch (Throwable e) { e.printStackTrace(); } finally { EsClient.close(client); } }
返回的结果其本质是一个 GetResponse的数组,不会因为其中一个失败,整个请求失败,但其结果中会标明每一个是否成功。
其返回结果类图如下:
其字段过滤(Source filtering)、路由等机制与Get API相同,详情请参考:Elasticsearch Document Get API详解、原理与示例
2、Elasticsearch Bulk API
Bulk API可以在一次API调用中包含多个索引操作,例如更新索引,删除索引等,类比批量操作。
详细API如下:
- public final BulkResponse bulk(BulkRequest bulkRequest, RequestOptions options) throws IOException
- public final void bulkAsync(BulkRequest bulkRequest, RequestOptions options, ActionListener listener)
2.1 BulkRequest详解
我们先一一来看一下其核心属性与与典型方法:
- final List requests = new ArrayList<>():单个命令容器,DocWriteRequest的子类包括:IndexRequest、UpdateRequest、DeleteRequest。
- private final Set indices = new HashSet<>():List requests涉及到的索引。
List - protected TimeValue timeout = BulkShardRequest.DEFAULT_TIMEOUT:timeout机制,针对一个Bulk请求生效。
- private ActiveShardCount waitForActiveShards = ActiveShardCount.DEFAULT: waitForActiveShards,针对一个Bulk请求生效,各个请求中waitForActiveShards优先。
- private RefreshPolicy refreshPolicy = RefreshPolicy.NONE:刷新策略。
- private long sizeInBytes = 0:整个Bulk请求的大小。
通过add api为BulkRequest添加一个请求。
2.2 Bulk API请求格式详解
Bulk Rest请求协议基于如下格式:
POST _bulk { "index" : { "_index" : "test", "_type" : "_doc", "_id" : "1" } } { "field1" : "value1" } { "delete" : { "_index" : "test", "_type" : "_doc", "_id" : "2" } } { "create" : { "_index" : "test", "_type" : "_doc", "_id" : "3" } } { "field1" : "value3" } { "update" : {"_id" : "1", "_type" : "_doc", "_index" : "test"} } { "doc" : {"field2" : "value2"} }
请求格式如下(restfull):
- POST请求,其Content-Type为application/x-ndjson。
- 每一个命令占用两行,每行的结束字符为rn。
- 第一行为元数据,"opType" : {元数据}。
- 第二行为有效载体(非必选),例如Index操作,其有效载荷为IndexRequest#source字段。
- opType可选值 index、create、update、delete。
公用元数据(index、create、update、delete)如下
1)_index :索引名
2)_type:类型名
3)_id:文档ID
4)routing:路由值
5)parent
6)version:数据版本号
7)version_type:版本类型
各操作特有元数据
1、index | create
1)pipeline
2、update
1)retry_on_conflict :更新冲突时重试次数。
2)_source:字段过滤。
有效载荷说明
1、index | create
其有效载荷为_source字段。
2、update
其有效载荷为:partial doc, upsert and script。
3、delete
没有有效载荷。
请求格式为什么要设计成metdata+有效载体的方式,主要是为了在接收端节点(所谓的接收端节点是指收到命令的第一节点),只需解析 metadata,然后将请求直接转发给对应的数据节点。
2.3 bulk API通用特性分析
2.3.1 版本管理
每一个Bulk条目拥有独自的version,存在于请求条目的item的元数据中。
2.3.2 路由
每一个Bulk条目各自生效。
2.3.3 Wait For Active Shards
通常可以设置BulkRequest#waitForActiveShards来要求Bulk批量执行之前要求处于激活的最小副本数。
2.3.4 Bulk Demo
public static final void testBulk() { RestHighLevelClient client = EsClient.getClient(); try { IndexRequest indexRequest = new IndexRequest("twitter", "_doc", "12") .source(buildTwitter("dingw", "2009-11-18T14:12:12", "test bulk")); UpdateRequest updateRequest = new UpdateRequest("twitter", "_doc", "11") .doc(new IndexRequest("twitter", "_doc", "11") .source(buildTwitter("dingw", "2009-11-18T14:12:12", "test bulk update"))); BulkRequest request = new BulkRequest(); request.add(indexRequest); request.add(updateRequest); BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT); for (BulkItemResponse bulkItemResponse : bulkResponse) { if (bulkItemResponse.isFailed()) { BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); System.out.println(failure); continue; } DocWriteResponse itemResponse = bulkItemResponse.getResponse(); if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { IndexResponse indexResponse = (IndexResponse) itemResponse; System.out.println(indexRequest); } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { UpdateResponse updateResponse = (UpdateResponse) itemResponse; System.out.println(updateRequest); } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { DeleteResponse deleteResponse = (DeleteResponse) itemResponse; System.out.println(deleteResponse); } } } catch (Exception e) { e.printStackTrace(); } finally { EsClient.close(client); } }
批量更新bulk api就介绍到这里了。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
大数据入门学习?
大数据入门学习? 微笑数据工程师 2019-03-17 10:55 第一部分:了解大数据平台架构 大数据有非常大的价值,不管是从帮助企业创造营收还是从提高效率、节省企业成本角度。大数据要是做好了,将会是一个企业增长的发动机,推动业务突飞猛进的发展。要实现大数据的价值,真正让大数据为企业创造贡献,首先必须要积累有大数据,把日常的业务和用户行为数据收集起来。有些数据是可再生资源,但更多的数据是不可再生资源,大数据学习交流扣qun:74零零加4一3八yi这就需要我们搭建一个平台负责数据的采集、规整、运算、存储、应用、展现等,有了这样一个大数据平台,我们才能做好数据的积累,从小数据到大数据,数据是企业的资产,好的数据是企业的优质资产。大数据平台该怎样搭建呢?请看下面这幅图,不管我之前在阿里还是在腾讯工作,还是到哪个企业工作,基本上我都是通过这幅图进行一些简单的适应企业的调整,就可以完全搬过来使用了。 针对上面这幅图,有几点跟大家讲解说明下: 1)大数据平台由三个平台+一个服务组成:工具平台,大数据仓库基础平台、大数据门户,其中,工具平台又包含运维平台和数据采集平台,大数据门户又包含大数据分析平...
- 下一篇
【最全PPT下载+直播回顾】中国HBase技术社区第十届meetup(杭州站)
活动介绍 2019年3月16日14:00,在杭州阿里巴巴创新中心(下沙路演大厅)举办了中国HBase技术社区第十届meetup。本期沙龙以“HBase生态实践”为主题,是HBase的技术盛宴。我们邀请到了国内HBase圈子的众多大咖,对于关注这个领域的同学是个非常好的认识和建交机会,我们也非常欢迎大家一起加入探讨HBase技术,畅谈HBase那些事儿。有近200名的HBase开发者和爱好者现场参与了本次活动,聆听互联网大咖解构生态最佳实践,现场人气爆棚。 下面小编就为大家整理了满满的干货,供各位回顾和学习使用。 直播回顾 https://yq.aliyun.com/live/883 演讲主题及嘉宾: 本主题分享主要围绕以下四个方面:(1)当前面临的挑战(2)平台架构及案例(3)原理及最佳实践(4)阿里云HBase X-Pack服务 分享实录文字版》》 会上讲师向大家分享了HBase在有赞的产品定位,重点介绍了有赞HBase和相关管控平台的研发建设、以及在HBase 1.2.6版本之上所做的改造、改造原因以及给业务实践带来的价值。 分享实录文字版》》 对OpenTSDB简单介绍,以及使用...
相关文章
文章评论
共有0条评论来说两句吧...