elasticsearc使用指南之ES管道聚合(Pipeline Aggregation)
管道聚合处理来自其他聚合而不是文档集的输出,将信息添加到输出树中。
注:关于脚本聚合目前在本文中暂时不会涉及。
主要有如下两种管道聚合方式:
- parent
- sibling
下面一一介绍ES定义的管道聚合。
1、Avg Bucket Aggregation
同级管道聚合,它计算同级聚合中指定度量的平均值。同级聚合必须是多桶聚合,针对的是度量聚合(metric Aggregation)。
示例如下:
{ "avg_bucket": { "buckets_path": "the_sum" // @1 } }
- buckets_path:指定聚合的名称,支持多级嵌套聚合。
其他参数: - gap_policy
当管道聚合遇到不存在的值,有点类似于term等聚合的(missing)时所采取的策略,可选择值为:skip、insert_zeros。 - skip:此选项将丢失的数据视为bucket不存在。它将跳过桶并使用下一个可用值继续计算。
- insert_zeros:默认使用0代替。
- format
用于格式化聚合桶的输出(key)。
示例如下:
POST /_search { "size": 0, "aggs": { "sales_per_month": { // @1 "date_histogram": { "field": "date", "interval": "month" }, "aggs": { // @2 "sales": { "sum": { "field": "price" } } } }, "avg_monthly_sales": { // @3 "avg_bucket": { "buckets_path": "sales_per_month>sales" } } } }
代码@1:首先定义第一级聚合(按月)直方图聚合。
代码@2:定义第二级聚合,在按月聚合的基础上,对每个月的文档求sum。
代码@3:对上面的聚合求平均值。
其返回结果如下:
{ ... // 省略 "aggregations": { "sales_per_month": { "buckets": [ { "key_as_string": "2015/01/01 00:00:00", "key": 1420070400000, "doc_count": 3, "sales": { "value": 550.0 } }, { "key_as_string": "2015/02/01 00:00:00", "key": 1422748800000, "doc_count": 2, "sales": { "value": 60.0 } } ] }, "avg_monthly_sales": { // 这是对二级聚合的结果再进行一次求平均值聚合。 "value": 328.33333333333333 } } }
对应的JAVA示例如下:
public static void test_pipeline_avg_buncket_aggregation() { RestHighLevelClient client = EsClient.getClient(); try { SearchRequest searchRequest = new SearchRequest(); searchRequest.indices("aggregations_index02"); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); AggregationBuilder aggregationBuild = AggregationBuilders.terms("seller_agg") .field("sellerId") .subAggregation(AggregationBuilders.sum("seller_num_agg") .field("num") ) ; sourceBuilder.aggregation(aggregationBuild); // 添加 avg bucket pipeline sourceBuilder.aggregation(new AvgBucketPipelineAggregationBuilder("seller_num_agg_av", "seller_agg>seller_num_agg")); sourceBuilder.size(0); searchRequest.source(sourceBuilder); SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT); System.out.println(result); } catch (Throwable e) { e.printStackTrace(); } finally { EsClient.close(client); } }
2、Percentiles Bucket Aggregation
同级管道聚合,百分位管道聚合。其JAVA示例如下:
public static void test_Percentiles_buncket_aggregation() { RestHighLevelClient client = EsClient.getClient(); try { SearchRequest searchRequest = new SearchRequest(); searchRequest.indices("aggregations_index02"); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); AggregationBuilder aggregationBuild = AggregationBuilders.terms("seller_agg") .field("sellerId") .subAggregation(AggregationBuilders.sum("seller_num_agg") .field("num") ) ; sourceBuilder.aggregation(aggregationBuild); // 添加 avg bucket pipeline sourceBuilder.aggregation(new PercentilesBucketPipelineAggregationBuilder("seller_num_agg_av", "seller_agg>seller_num_agg")); sourceBuilder.size(0); searchRequest.source(sourceBuilder); SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT); System.out.println(result); } catch (Throwable e) { e.printStackTrace(); } finally { EsClient.close(client); } }
其返回值如下:
{ ... // 省略其他属性 "aggregations":{ "lterms#seller_agg":{ "doc_count_error_upper_bound":0, "sum_other_doc_count":12, "buckets":[ { "key":45, "doc_count":567, "sum#seller_num_agg":{ "value":911 } }, { "key":31, "doc_count":324, "sum#seller_num_agg":{ "value":353 } } // 省略其他桶的显示 ] }, "percentiles_bucket#seller_num_agg_av":{ "values":{ "1.0":5, "5.0":5, "25.0":10, "50.0":20, "75.0":290, "95.0":911, "99.0":911 } } } }
3、Cumulative Sum Aggregation
累积管道聚合,就是就是依次将每个管道的sum聚合进行累加。
其语法(restfull)如下:
{ "cumulative_sum": { "buckets_path": "the_sum" } }
支持的参数说明:
- buckets_path
桶聚合名称,作为管道聚合的输入信息。 - format
格式化key。
使用示例如下:
POST /sales/_search { "size": 0, "aggs" : { "sales_per_month" : { "date_histogram" : { "field" : "date", "interval" : "month" }, "aggs": { "sales": { "sum": { "field": "price" } }, "cumulative_sales": { "cumulative_sum": { "buckets_path": "sales" } } } } } }
其返回结果如下:
{ "took": 11, "timed_out": false, "_shards": ..., "hits": ..., "aggregations": { "sales_per_month": { "buckets": [ { "key_as_string": "2015/01/01 00:00:00", "key": 1420070400000, "doc_count": 3, "sales": { "value": 550.0 }, "cumulative_sales": { "value": 550.0 } }, { "key_as_string": "2015/02/01 00:00:00", "key": 1422748800000, "doc_count": 2, "sales": { "value": 60.0 }, "cumulative_sales": { "value": 610.0 } }, { "key_as_string": "2015/03/01 00:00:00", "key": 1425168000000, "doc_count": 2, "sales": { "value": 375.0 }, "cumulative_sales": { "value": 985.0 } } ] } } }
从结果可知,cumulative_sales的值等于上一个cumulative_sales + 当前桶的sum聚合。
对应的JAVA示例如下:
{ "aggregations":{ "date_histogram#createTime_histogram":{ "buckets":{ "2015-12-01 00:00:00":{ "key_as_string":"2015-12-01 00:00:00", "key":1448928000000, "doc_count":6, "sum#seller_num_agg":{ "value":16 }, "simple_value#Cumulative_Seller_num_agg":{ "value":16 } }, "2016-01-01 00:00:00":{ "key_as_string":"2016-03-01 00:00:00", "key":1456790400000, "doc_count":10, "sum#seller_num_agg":{ "value":11 }, "simple_value#Cumulative_Seller_num_agg":{ "value":31 } } // ... 忽略 } } } }
4、Bucket Sort Aggregation
一种父管道聚合,它对其父多桶聚合的桶进行排序。并可以指定多个排序字段。每个bucket可以根据它的_key、_count或子聚合进行排序。此外,可以设置from和size的参数,以便截断结果桶。
使用语法如下:
{ "bucket_sort": { "sort": [ {"sort_field_1": {"order": "asc"}}, {"sort_field_2": {"order": "desc"}}, "sort_field_3" ], "from": 1, "size": 3 } }
支持的参数说明如下:
- sort
定义排序结构。 - from
用与对父聚合的桶进行截取,该值之前的所有桶将忽略,也就是不参与排序,默认为0。 - size
返回的桶数。默认为父聚合的所有桶。 - gap_policy
当管道聚合遇到不存在的值,有点类似于term等聚合的(missing)时所采取的策略,可选择值为:skip、insert_zeros。 - skip:此选项将丢失的数据视为bucket不存在。它将跳过桶并使用下一个可用值继续计算。
- insert_zeros:默认使用0代替。
官方示例如下:
POST /sales/_search { "size": 0, "aggs" : { "sales_per_month" : { "date_histogram" : { "field" : "date", "interval" : "month" }, "aggs": { "total_sales": { "sum": { "field": "price" } }, "sales_bucket_sort": { "bucket_sort": { "sort": [ {"total_sales": {"order": "desc"}} ], "size": 3 } } } } } }
对应的JAVA示例如下:
public static void test_bucket_sort_Aggregation() { RestHighLevelClient client = EsClient.getClient(); try { //构建日期直方图聚合 时间间隔,示例中按月统计 DateHistogramInterval interval = new DateHistogramInterval("1M"); SearchRequest searchRequest = new SearchRequest(); searchRequest.indices("aggregations_index02"); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); AggregationBuilder aggregationBuild = AggregationBuilders.dateHistogram("createTime_histogram") .field("createTime") .dateHistogramInterval(interval) .keyed(true) .subAggregation(AggregationBuilders.sum("seller_num_agg") .field("num") ) .subAggregation(new BucketSortPipelineAggregationBuilder("seller_num_agg_sort", Arrays.asList( new FieldSortBuilder("seller_num_agg").order(SortOrder.ASC))) .from(0) .size(3)) // BucketSortPipelineAggregationBuilder(String name, List<FieldSortBuilder> sorts) .subAggregation(new CumulativeSumPipelineAggregationBuilder("Cumulative_Seller_num_agg", "seller_num_agg")) // .format("yyyy-MM-dd") // 对key的格式化 ; sourceBuilder.aggregation(aggregationBuild); sourceBuilder.size(0); sourceBuilder.query( QueryBuilders.termQuery("sellerId", 24) ); searchRequest.source(sourceBuilder); SearchResponse result = client.search(searchRequest, RequestOptions.DEFAULT); System.out.println(result); } catch (Throwable e) { e.printStackTrace(); } finally { EsClient.close(client); } }
返回值:
{ "aggregations":{ "date_histogram#createTime_histogram":{ "buckets":{ "2016-04-01 00:00:00":{ "key_as_string":"2016-04-01 00:00:00", "key":1459468800000, "doc_count":2, "sum#seller_num_agg":{ "value":2 }, "simple_value#Cumulative_Seller_num_agg":{ "value":2 } }, "2017-05-01 00:00:00":{ "key_as_string":"2017-05-01 00:00:00", "key":1493596800000, "doc_count":3, "sum#seller_num_agg":{ "value":3 }, "simple_value#Cumulative_Seller_num_agg":{ "value":5 } }, "2017-02-01 00:00:00":{ "key_as_string":"2017-02-01 00:00:00", "key":1485907200000, "doc_count":4, "sum#seller_num_agg":{ "value":4 }, "simple_value#Cumulative_Seller_num_agg":{ "value":9 } } } } }
5、Max Bucket Aggregation
与 avg类似。
6、Min Bucket Aggregation
与 avg类似。
7、Sum Bucket Aggregation
与 avg类似。
8、Stats Bucket Aggregation
与 avg类似。
本节详细介绍了ES Pipeline Aggregation 管道聚合的使用方法,重点介绍了Avg Bucket Aggregation、Percentiles Bucket Aggregation、Cumulative Sum Aggregation、Bucket Sort Aggregation、Max Bucket Aggregation、Min Bucket Aggregation、Sum Bucket Aggregation、Stats Bucket Aggregation。
原文发布时间为:2019-03-18
本文作者:丁威,《RocketMQ技术内幕》作者。
本文来自中间件兴趣圈,了解相关信息可以关注中间件兴趣圈。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Apache Zepplin使用Hive Interpreter查询
1.Hadoop a).配置core-site.xml hadoop文件core-site.xml中配置信息如下,重启HDFS <property> <name>hadoop.proxyuser.hadoop.hosts</name> <value>*</value> </property> <property> <name>hadoop.proxyuser.hadoop.groups</name> <value>*</value> </property> b).启停HDFS ## 启动HDFS ./hadoop/sbin/start-dfs.sh ## 停止HDFS ./hadoop/sbin/stop-dfs.sh c).HDFS退出安全模式 ./hadoop/bin/hdfs dfsadmin -safemode leave 2.Hive a).配置hive-site.xml 修改hive-site.xml中thrift相关配置项 ## ...
- 下一篇
Elasticsearch Index Setting一览表
索引的配置项按是否可以更改分为static属性与动态配置,所谓的静态配置即索引创建后不能修改。 1、索引静态配置 index.number_of_shards索引分片的数量。在ES层面可以通过es.index.max_number_of_shards属性设置索引最大的分片数,默认为1024,index.number_of_shards的默认值为Math.min(es.index.max_number_of_shards,5),故通常默认值为5。 index.shard.check_on_startup分片在打开之前是否应该检查该分片是否损坏。当检测到损坏时,它将阻止分片被打开。可选值:false:不检测;checksum:只检查物理结构;true:检查物理和逻辑损坏,相对比较耗CPU;fix:类同与false,7.0版本后将废弃。默认值:false。 index.codec数据存储的压缩算法,默认值为LZ4,可选择值best_compression ,比LZ4可以获得更好的压缩比(即占据较小的磁盘空间,但存储性能比LZ4低)。 index.routing_partition_size路...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Hadoop3单机部署,实现最简伪集群
- SpringBoot2整合Redis,开启缓存,提高访问速度
- SpringBoot2全家桶,快速入门学习开发网站教程
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS7,CentOS8安装Elasticsearch6.8.6
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Docker安装Oracle12C,快速搭建Oracle学习环境