ElasticSearch实战-编码实践
1.概述
前面在《ElasticSearch实战-入门》中给大家分享如何搭建这样一个集群,在完成集群的搭建后,今天给大家分享如何实现对应的业务功能模块,下面是今天的分享内容,目录如下所示:
- 编码实践
- 效果预览
- 总结
2.编码实践
由于 ES 集群支持 Restful 接口,我们可以直接通过 Java 来调用 Restful 接口来查询我们需要的数据结果,并将查询到的结果在在我们的业务界面可视化出来。我们知道在 ES 集群的 Web 管理界面有这样一个入口,如下图所示:
我们可以在此界面的入口中拼接 JSON 字符串来查询我们想要的结果,下面,我们通过 Java 的 API 去调用 Restful 接口来查询我们想要的结果。
2.1 字符串拼接实现
接着,我们去实现要查询的核心代码,具体内容实现如下所示:
public String buildQueryString(Map<String, Object> param) throws ParseException { SimpleDateFormat dfs = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); StringBuilder builder = new StringBuilder("{\"query\":{\"bool\":{\"must\":["); if (param.get("msgType") != null) { Integer msgType = (int) param.get("msgType") == 0 ? 2 : 1; builder.append("{\"term\":{\"msg_type\":").append(msgType).append("}}"); }if (param.get("start") != null && param.get("end") != null) { String start = String.valueOf(dfs.parse(param.get("start").toString()).getTime()).substring(0, 10); String end = String.valueOf(dfs.parse(param.get("end").toString()).getTime()).substring(0, 10); builder.append(",{\"range\":{\"itime\":{\"from\":" + start + ",\"to\":" + end + "}}}"); } if (param.get("receiverValue") != null) { builder.append(",{\"wildcard\":{\"receiver_value\":\"*").append(param.get("receiverValue")).append("*\"}}"); } builder.append("],\"must_not\":[],\"should\":[]}}"); builder.append(",\"sort\":[{\"itime\":\"desc\"}],\"facets\":{}"); builder.append(",\"from\": ").append(param.get("startIndex")).append(",\"size\": ").append(param.get("pageSize")).append("}"); LOG.info("API Query -> " + builder.toString()); return builder.toString(); }
2.2 查询实现核心代码
接着是实现查询的核心代码,具体内容实现如下所示:
public SerachResponse<ApiSent> querySent(Map<String, Object> param) { SerachResponse<ApiSent> search_result = null; try { long time = System.currentTimeMillis(); ResponseWrapper wrapper = httpUtils.sendJson(configService.loadConfig(Configure.API_SENT), buildQueryString(param)); if (wrapper.responseCode == HttpStatus.SC_OK) { search_result = _gson.fromJson(wrapper.responseContent, new TypeToken<SerachResponse<ApiSent>>() { }.getType()); LOG.info(String.format("API query ES spent time=%sms", (System.currentTimeMillis() - time))); return search_result; } else { LOG.info(String.format("API query ES spent time=%sms", (System.currentTimeMillis() - time))); LOG.error(String.format("api sent request es server response not 200,response=%s,exception=%s", wrapper.error, wrapper.exceptionString)); } } catch (Exception ex) { LOG.error(String.format("parsed es sent data exception.", ex)); } return search_result; }
Configure类 public class Configure { public static final String API_SENT = "API_SENT"; }
2.3 DAO层获取 ES 集群的连接信息
public class ConfigService { private static Log logger = LogFactory.getLog(ConfigService.class); @Autowired private ConfigDao configDao; @Cacheable("sysConfigCache") public String loadConfig(String type) { String value = configDao.getConfig(type); logger.info(String.format("Load Config,type=%s,value=%s", type, value)); return value; } }
ConfigDao接口
public interface ConfigDao { String getConfig(String type); }
其对应的实现内容如下所示:
<select id="getConfig" parameterType="String" resultType="String"> select value from t_system_config where type=#{type} </select>
DB库存储的 ES 连接信息,如下图所示:
2.4 HTTP 接口的实现代码
关于 HttpUtils 的代码实现较为简单,这里直接附上代码的实现内容,如下所示:
- IHttpUtils 接口
public interface IHttpUtils { public ResponseWrapper sendJson(String url, String content); }
- HttpUtils 类实现接口
public class HttpUtils implements IHttpUtils { private static Logger LOG = Logger.getLogger(HttpUtils.class.getName()); protected static Gson _gson = new Gson(); protected void initSSL() { try { TrustManager[] tmCerts = new javax.net.ssl.TrustManager[1]; tmCerts[0] = new SimpleTrustManager(); javax.net.ssl.SSLContext sc = javax.net.ssl.SSLContext.getInstance("SSL"); sc.init(null, tmCerts, null); javax.net.ssl.HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory()); HostnameVerifier hv = new SimpleHostnameVerifier(); HttpsURLConnection.setDefaultHostnameVerifier(hv); } catch (Exception e) { LOG.error("init SSL exception.", e); } } @Override public ResponseWrapper sendJson(String url, String content) { return sendJson(url, content, null); } @Override public ResponseWrapper sendJson(String url, String content, String authCode) { return sendRequest(url, content, METHOD_POST, authCode, CONTENT_TYPE_JSON); } public ResponseWrapper sendRequest(String url, String content, String method, String authCode, String contentType) { LOG.info("Send request to - " + url + ", with content - " + content); HttpURLConnection conn = null; OutputStream out = null; StringBuffer sb = new StringBuffer(); cn.jpush.utils.ResponseWrapper wrapper = new ResponseWrapper(); try { if (StringUtils.isSSL(url)) { initSSL(); } if (METHOD_GET.equals(method)) { if (!Strings.isNullOrEmpty(content)) url += "?" + content; } URL aUrl = new URL(url); wrapper.address = aUrl.getHost(); conn = (HttpURLConnection) aUrl.openConnection(); conn.setConnectTimeout(DEFAULT_CONNECTION_TIMEOUT); conn.setReadTimeout(DEFAULT_SOCKET_TIMEOUT); conn.setUseCaches(false); conn.setRequestMethod(method); conn.setRequestProperty("Connection", "Keep-Alive"); conn.setRequestProperty("Accept-Charset", CHARSET); conn.setRequestProperty("Charset", CHARSET); conn.setRequestProperty("Authorization", authCode); conn.setRequestProperty("Send-Source", "portal"); conn.setRequestProperty("Content-Type", contentType); if (METHOD_POST.equals(method)) { conn.setDoOutput(true); byte[] data = content.getBytes(CHARSET); conn.setRequestProperty("Content-Length", String.valueOf(data.length)); out = conn.getOutputStream(); out.write(data); out.flush(); } else { conn.setDoOutput(false); } int status = conn.getResponseCode(); InputStream in = null; if (status == 200) { in = conn.getInputStream(); } else { in = conn.getErrorStream(); } InputStreamReader reader = new InputStreamReader(in, CHARSET); char[] buff = new char[1024]; int len; while ((len = reader.read(buff)) > 0) { sb.append(buff, 0, len); } String responseContent = sb.toString(); wrapper.responseCode = status; wrapper.responseContent = responseContent; String quota = conn.getHeaderField(RATE_LIMIT_QUOTA); String remaining = conn.getHeaderField(RATE_LIMIT_Remaining); String reset = conn.getHeaderField(RATE_LIMIT_Reset); wrapper.setRateLimit(quota, remaining, reset); if (status == 200) { LOG.debug("Succeed to get response - 200 OK"); LOG.debug("Response Content - " + responseContent); } else if (status > 200 && status < 400) { LOG.warn("Normal response but unexpected - responseCode:" + status + ", responseContent:" + responseContent); } else { LOG.warn("Got error response - responseCode:" + status + ", responseContent:" + responseContent); switch (status) { case 400: LOG.error("Your request params is invalid. Please check them according to error message."); wrapper.setErrorObject(); break; case 401: LOG.error("Authentication failed! Please check authentication params according to docs."); wrapper.setErrorObject(); break; case 403: LOG.error("Request is forbidden! Maybe your is listed in blacklist?"); wrapper.setErrorObject(); break; case 410: LOG.error("Request resource is no longer in service. Please according to notice on official website."); wrapper.setErrorObject(); case 429: LOG.error("Too many requests! Please review your request quota."); wrapper.setErrorObject(); break; case 500: case 502: case 503: case 504: LOG.error("Seems encountered server error. Maybe is in maintenance? Please retry later."); break; default: LOG.error("Unexpected response."); } } } catch (SocketTimeoutException e) { if (e.getMessage().contains(KEYWORDS_READ_TIMED_OUT)) { LOG.error(KEYWORDS_READ_TIMED_OUT, e); } wrapper.exceptionString = e.getMessage(); } catch (IOException e) { LOG.error(KEYWORDS_CONNECT_TIMED_OUT, e); wrapper.exceptionString = e.getMessage(); } finally { if (null != out) { try { out.close(); } catch (IOException e) { LOG.error("Failed to close stream.", e); } } if (null != conn) { conn.disconnect(); } } LOG.info(String.format("Send Response to - %s, Response Wrapper - %s", url, wrapper)); return wrapper; } }
3.截图预览
下面给大家附上一张业务界面可视化的数据结果预览图,如下图所示:
上图为我发送的测试数据,通过收集模块将我发送的数据收集并存储到 ES 集群,通过接口代码将这部分数据可视化到业务界面进行展示。
4.总结
总体来说,ES 集群从搭建部署到编码实现都较为简单,在使用 JSON 字符串拼接查询时需要细心点,后续有时间可以为大家分享下 ES 的查询的效率,及其他方面的性能指标。
5.结束语
这篇博客就和大家分享到这里,如果大家在研究学习的过程当中有什么问题,可以加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
ElasticSearch实战-入门
1.概述 今天接着《ElasticSearch实战-日志监控平台》一文来给大家分享后续的学习,在《ElasticSearch实战-日志监控平台》中给大家介绍一个日志监控平台的架构方案,接下来给大家分享如何去搭建部署这样一个平台,给大家做一个入门介绍。下面是今天的分享目录: 搭建部署 Elastic 套件 运行集群 截图预览 下面开始今天的内容分享。 2.搭建部署 Elastic 套件 搭建 Elastic 套件较为简单,下面我们开始去搭建部署相关套件,首先我们准备必要的环境。 2.1 基础软件 大家可以 Elastic 的官方网站下载对应的安装包,地址如下所示: [下载地址] 另外,一个基础环境就是需要用到 JDK,ES 集群依赖 JDK,地址如下所示: [下载地址] 2.2Logstash 部署 这里我们将 Logstash 的服务部署在中心节点中,其核心配置文件如下所示: central.conf input { redis { host => "10.211.55.18" port => 6379 type => "redis-input" data_type ...
- 下一篇
MapReduce业务 - 图片关联计算
1.概述 最近在和人交流时谈到数据相似度和数据共性问题,而刚好在业务层面有类似的需求,今天和大家分享这类问题的解决思路,分享目录如下所示: 业务背景 编码实践 预览截图 下面开始今天的内容分享。 2.业务背景 目前有这样一个背景,在一大堆数据中,里面存放着图片的相关信息,如下图所示: 上图只是给大家列举的一个示例数据格式,第一列表示自身图片,第二、第三......等列表示与第一列相关联的图片信息。那么我们从这堆数据中如何找出他们拥有相同图片信息的图片。 2.1 实现思路 那么,我们在明确了上述需求后,下面我们来分析它的实现思路。首先,我们通过上图所要实现的目标结果,其最终计算结果如下所示: pic_001pic_002 pic_003,pic_004,pic_005 pic_001pic_003 pic_002,pic_005 pic_001pic_004 pic_002,pic_005 pic_001pic_005 pic_002,pic_003,pic_004 ...... 结果如上所示,找出两两图片之间的共性图片,结果未列完整,只是列举了部分,具体结果大家可以参考截图预览的相关信...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- 设置Eclipse缩进为4个空格,增强代码规范
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- Windows10,CentOS7,CentOS8安装Nodejs环境
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- CentOS关闭SELinux安全模块
- CentOS6,7,8上安装Nginx,支持https2.0的开启
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果