ElasticSearch API for JAVA 学习笔记
Client Client是一个类,通过这个类可以实现对ES集群的各种操作:Index, Get, Delete , Search,以及对ES集群的管理任务。 Client的构造需要基于TransportClient TransportClient TransportClient可以远程连接ES集群,通过一个传输模块,但是它不真正的连接到集群,只是获取集群的一个或多个初始传输地址,在每次请求动作时,才真正连接到ES集群。 Settings Settings类主要是在启动Client之前,配置一些属性参数,主要配置集群名称cluster.name,还有其他参数: client.transport.sniff 是否为传输client添加嗅探功能 client.transport.ignore_cluster_name 设为true,忽略连接节点的集群名称验证 client.transport.ping_timeout 设置ping节点时的时间限,默认5s client.transport.nodes_sampler_interval 设置sample/ping nodes listed 间隔时间,默认5s 1 2 3 4 5 6 7 8 9 10 11 12 13 //通过Settings类设置属性参数 Settingssettings=Settings.settingsBuilder().put( "cluster.name" , "index-name" ).build(); //启动Client Clientclient=TransportClient.builder().settings(settings).build(). addTransportAddress( new InetSocketTransportAddress(InetAddress.getByName( "192.168.xxx.xxx" ), 9300 )); //如果不需要设置参数,直接如下 /*Clientclient=TransportClient.builder().build(). addTransportAddress(newInetSocketTransportAddress(InetAddress.getByName("192.168.xxx.xxx"),9300));*/ //关闭Clinet client.close(); Document API 主要分为以下类:Index API , Get API , Delete API , Update API, Multi Get API, Bulk API es中的增删改查 Index API可以索引一个典型的JSON文档到指定的索引中,并且可以使它可以检索。 产生JSON JSON产生可以有以下几种方式: 手动拼接一个JSON字符串 使用Map 使用第三方库,比如Jackson 使用内置的XContentFactory.jsonBuilder() 每种类型都会转换为byte[],因此如果对象已经是这种形式,可以直接使用,jsonBuilder是一个高度优化了的JSON产生器,它直接构造byte[] 通过下边的代码讲解四种方法:index-api, get-api, delete-api, update-api 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 /** *es-api的方法学习: *1.prepareIndex方法:索引数据到ElasticSearch *2.prepareGet方法:获取信息 *3.prepareDelete方法:删除信息 *4.update方法:更新信息 *4.1upsert:在使用update方法时: *a:针对文档不存在的情况时,做出index数据的操作,update无效; *b:如果文档存在,那么index数据操作无效,update有效; */ public static void main(String[]args) throws IOException,InterruptedException,ExecutionException{ //通过Settings类设置属性参数 Settingssettings=Settings.settingsBuilder().put( "cluster.name" , "myApp" ).build(); //启动Client Clientclient= null ; try { client=TransportClient.builder().settings(settings).build(). addTransportAddress( new InetSocketTransportAddress(InetAddress.getByName( "101.200.124.27" ), 9300 )); } catch (UnknownHostExceptione){ e.printStackTrace(); } //执行操作 SimpleDateFormatdf= new SimpleDateFormat( "yyyy-MM-dd" ); XContentBuilderjsonBuilder=XContentFactory.jsonBuilder() .startObject() .field( "user" , "yuchen" ) .field( "interest" , "readingbook" ) .field( "insert_time" ,df.format( new Date())) .endObject(); //1.prepareIndex方法:索引数据到ElasticSearch IndexResponseresponse=client.prepareIndex( "index-test" , "weibo" , "4" ) .setSource(jsonBuilder) .get(); String_index=response.getIndex(); String_type=response.getType(); String_id=response.getId(); long _version=response.getVersion(); boolean created=response.isCreated(); System.out.println(_index+ "" +_type+ "" +_id+ "" +_version+ "" +created); //2.prepareGet方法:获取信息 GetResponsegetResponse=client.prepareGet( "index-test" , "weibo" , "1" ).get(); System.out.println(getResponse.getSourceAsString()); //3.prepareDelete方法:删除信息 DeleteResponsedeleteResponse=client.prepareDelete( "index-test" , "weibo" , "4" ).get(); System.out.println(deleteResponse.isFound()); //4.update方法:更新信息 UpdateRequestupdateRequest= new UpdateRequest(); updateRequest.index( "index-test" ); updateRequest.type( "weibo" ); updateRequest.id( "1" ); updateRequest.doc(XContentFactory.jsonBuilder().startObject().field( "interest" , "music" ).endObject()); UpdateResponseupdateResponse=client.update(updateRequest).get(); System.out.println(updateResponse.isCreated()); //update方法:可以为已有的文档添加新的字段 UpdateResponseupdateResponse2=client.prepareUpdate( "index-test" , "weibo" , "1" ) .setDoc(XContentFactory.jsonBuilder() .startObject() .field( "interest2" , "reading" ) .endObject()).get(); System.out.println(updateResponse2.isCreated()); //4.1upsert:在使用update方法时,a:针对文档不存在的情况时,做出index数据的操作,update无效; //b:如果文档存在,那么index数据操作无效,update有效; //先构建一个IndexRequest IndexRequestindexRequest= new IndexRequest( "index-test" , "weibo" , "14" ); indexRequest.source(XContentFactory.jsonBuilder() .startObject() .field( "user" , "yuchen2" ) .field( "interest" , "eating" ) .field( "insert_time" ,df.format( new Date())) .endObject()); //再构建一个UpdateRequest,并用IndexRequest关联 UpdateRequestupdateRequest3= new UpdateRequest( "index-test" , "weibo" , "14" ); updateRequest3.doc(XContentFactory.jsonBuilder() .startObject() .field( "interest2" , "love" ) .endObject() ).upsert(indexRequest); client.update(updateRequest3).get(); if (client!= null ){ client.close(); } } 批量操作 Multi Get Api 和 Bulk Api可进行批量的增删改查 使用Multi Get Api 批量获取: 1 2 3 4 5 6 7 8 9 10 //1.Muti-getApi //可以指定单个id,也在index,type下指定一个id-list;也可以指定别的index/type MultiGetResponsemultiGetResponse=client.prepareMultiGet() .add( "index-test" , "weibo" , "1" ) //指定单个id .add( "index-test" , "weibo" , "11" , "13" , "14" ) //指定一个id-list .add( "index-other" , "news" , "1" , "3" ).get(); //指定别的index/type for (MultiGetItemResponseitem:multiGetResponse){ GetResponseresponse=item.getResponse(); System.out.println(response.getSourceAsString()); } Bulk Api批量增加: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 //2.BulkApi:可以进行批量index和批量删除操作 //2.1批量增加 BulkRequestBuilderbulkRequest=client.prepareBulk(); bulkRequest.add(client.prepareIndex( "index-test" , "weibo" , "20" ) .setSource(XContentFactory.jsonBuilder() .startObject() .field( "user" , "yuchen20" ) .field( "postDate" , new Date()) .field( "message" , "tryingoutElasticsearch" ) .endObject() ) ); bulkRequest.add(client.prepareIndex( "index-test" , "weibo" , "21" ) .setSource(XContentFactory.jsonBuilder() .startObject() .field( "user" , "yuchen21" ) .field( "postDate" , new Date()) .field( "message" , "tryingoutElasticsearch" ) .endObject() ) ); BulkResponsebulkResponse=bulkRequest.get(); if (bulkResponse.hasFailures()){ //... } Bulk Api批量删除: 1 2 3 4 5 6 7 8 9 10 11 //2.2批量删除 BulkRequestBuilderbulkRequest=client.prepareBulk(); bulkRequest.add(client.prepareDelete( "index-test" , "weibo" , "20" ) ); bulkRequest.add(client.prepareDelete( "index-test" , "weibo" , "21" ) ); BulkResponsebulkResponse=bulkRequest.get(); if (bulkResponse.hasFailures()){ System.out.println( "bulkerror:" +bulkResponse.buildFailureMessage()); } Bulk Api 批量更新 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 //2.3批量更新 BulkRequestBuilderbulkRequest=client.prepareBulk(); bulkRequest.add(client.prepareUpdate( "index-test" , "weibo" , "11" ).setDoc(XContentFactory .jsonBuilder().startObject() .field( "country" , "China" ) //新添加字段 .endObject() ) ); bulkRequest.add(client.prepareUpdate( "index-test" , "weibo" , "13" ).setDoc(XContentFactory .jsonBuilder().startObject() .field( "user" , "yuchen13" ) //更新字段 .endObject() ) ); BulkResponsebulkResponse=bulkRequest.get(); if (bulkResponse.hasFailures()){ System.out.println( "bulkerror:" +bulkResponse.buildFailureMessage()); } BulkProcessor设置批量请求的属性 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 //BulkProcessor BulkProcessorbulkProcessor=BulkProcessor.builder(client, new BulkProcessor.Listener(){ @Override public void beforeBulk( long arg0,BulkRequestarg1){ //批量执行前做的事情 System.out.println( "bulkapiactionstarting..." ); } @Override public void afterBulk( long arg0,BulkRequestarg1,Throwablearg2){ System.out.println( "exception:buklapiactionending...:" +arg2.getMessage()); } @Override public void afterBulk( long arg0,BulkRequestarg1,BulkResponsearg2){ //正常执行完毕后... System.out.println( "normal:buklapiactionending..." ); } }) //设置多种条件,对批量操作进行限制,达到限制中的任何一种触发请求的批量提交 .setBulkActions( 1000 ) //设置批量操作一次性执行的action个数,根据请求个数批量提交 //.setBulkSize(newByteSizeValue(1,ByteSizeUnit.KB))//设置批量提交请求的大小允许的最大值 //.setFlushInterval(TimeValue.timeValueMillis(100))//根据时间周期批量提交请求 //.setConcurrentRequests(1)//设置允许并发请求的数量 //设置请求失败时的补偿措施,重复请求3次 //.setBackoffPolicy(BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100),3)) .build(); for ( int i= 0 ;i< 100000 ;i++){ bulkProcessor.add( new IndexRequest( "index-test" , "weibo2" , "" +i).source( XContentFactory .jsonBuilder() .startObject() .field( "name" , "yuchen" +i) .field( "interest" , "love" +i) .endObject())); } bulkProcessor.awaitClose( 5 ,TimeUnit.MINUTES); //释放bulkProcessor资源 System.out.println( "loadsucceed!" ); 默认的参数: sets bulkActions to1000 sets bulkSize to5mb does not set flushInterval sets concurrentRequests to 1 sets backoffPolicy to an exponential backoff with 8 retries and a start delay of 50ms. The total wait time is roughly 5.1 seconds. 参考地址: http://blog.csdn.net/wuyzhen_csdn/article/details/52381697 本文转自yunlielai51CTO博客,原文链接:http://blog.51cto.com/4925054/2084251,如需转载请自行联系原作者