elasticSearch空间坐标的设置和查询
原文网址:http://blog.csdn.net/yangwenbo214/article/details/54411004 一、数据准备 为了方便起见,我模拟臆造了json格式的数据 {"timestamp":"2017-01-13T13:13:32.2516955+08:00","deviceId":"myFirstDevice","windSpeed":17,"haze":284,"city":"Beijing","lat":33.9402,"lon":116.40739} 1 模拟数据我用的是c#,大概如下: static void SendingRandomMessages() { //var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName); int len = 4; string[] citys = { "Beijing", "Shangjhai", "Guangzhou", "Shenzhen" }; int[] avgWindSpeed = { 10, 16, 5, 7 }; int[] avgWindSpeed1 = { 10, 16, 5, 7 }; int[] avgHaze1 = { 200, 100, 50, 49 }; int[] avgHaze = { 200, 100, 50, 49 }; double[] latitude = { 39.3402, 31.23042, 23.13369, 22.54310 }; double[] longitude = { 116.40739, 121.47370, 113.28880, 114.057860 }; Random rand = new Random(); while (true) { try { for (int i = 0; i < len; i++) { avgWindSpeed[i] = avgWindSpeed1[i] + rand.Next(1, 11); avgHaze[i] = avgHaze1[i] + rand.Next(10, 100); var telemetryDataPoint = new { timestamp = DateTime.Now, deviceId = "myFirstDevice", windSpeed = avgWindSpeed[i], haze = avgHaze[i], city = citys[i], lat = latitude[i], lon = longitude[i] }; var message = JsonConvert.SerializeObject(telemetryDataPoint); //eventHubClient.Send(new EventData(Encoding.UTF8.GetBytes(message))); Console.WriteLine("{0} > Get message: {1}", "eventHubName", message); } } catch (Exception exception) { Console.ForegroundColor = ConsoleColor.Red; Console.WriteLine("{0} > Exception: {1}", DateTime.Now, exception.Message); Console.ResetColor(); } Thread.Sleep(200); } } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 此处我是作为消息发到一个eventhub中,你正确的做法可以将json写到文本文件中,再通过logstash读取即可 我的目的有两个: 获取数据中的lat、lon经纬度数据在kibana Map中进行绘图 获取数据中的timestamp作为我在kibana中的搜索时间,默认情况下是@timestamp 二、解决问题的整体思路 lat、lon本质上是float类型,此处需要设计一个mapping 日志内的时间,本质上应该是个字符串。我们得先卡出这个字段,然后用date match进行转换 三、解决实例 1. mapping的设计,我给出一个template { "template": "geo-*", "settings": { "index.refresh_interval": "5s" }, "mappings": { "_default_": { "_all": {"enabled": true, "omit_norms": true}, "dynamic_templates": [ { "message_field": { "match": "message", "match_mapping_type": "string", "mapping": { "type": "string", "index": "analyzed", "omit_norms": true } } }, { "string_fields": { "match": "*", "match_mapping_type": "string", "mapping": { "type": "string", "index": "analyzed", "omit_norms": true, "fields": { "raw": {"type": "string", "index": "not_analyzed", "ignore_above": 256} } } } } ], "properties": { "@version": { "type": "string", "index": "not_analyzed" }, "lonlat": { "type": "geo_point" } } } } } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 大概解释如下: “template”: “geo-*”,所有geo开头的索引,都将会套用这个template配置 “lonlat”: { “type”: “geo_point” } 这个定义了lonlat为geo_point类型,为以后Map绘制奠定基础,这个是关键。 NOTE:这个lonlat名字不能取成特定的关键名字?,我取成location一直报错。 更加详细的介绍你可以查看官网文档 2. 给出logstash的配置文件 input { file { path => "/opt/logstash/1.log" start_position => "beginning" sincedb_path => "/dev/null" } } filter { json { source => "message" } mutate { add_field => [ "[lonlat]", "%{lon}" ] add_field => [ "[lonlat]", "%{lat}" ] } date{ match=>["timestamp","ISO8601"] timezone => "Asia/Shanghai" "target" => "logdate" } } output { stdout { codec => rubydebug } elasticsearch { hosts =>"wb-elk" index => "geo-%{+YYYY.MM.dd}" # template => "/opt/logstash/monster.json" # template_overwrite => true } } 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 大概解释一下: json {source => “message”}这个能将json数据格式分解出一个个字段 mutate 这个是向geo_point中加入经纬度数据 date {match=>}这个是将匹配json数据分解出来的timestamp,并以时间格式赋值给logdate output中注释掉的是template文件。我采用的是直接put template的方式,因此注释掉了。两个方法都可行 补充:经纬度信息格式确定和重命名: mutate { convert => { "lon" => "float" } convert => { "lat" => "float" } } mutate { rename => { "lon" => "[lonlat][lon]" "lat" => "[lonlat][lat]" } } 3.运行过程