按照Elasticsearch API,在Java端使用是ES服务需要创建Java Client,但是每一次连接都实例化一个client,对系统的消耗很大,即使在使用完毕之后将client close掉,由于服务器不能及时回收socket资源,极端情况下会导致服务器达到最大连接数。
1 import java.io.IOException;
2 import java.net.InetSocketAddress;
3 import java.util.ArrayList;
4 import java.util.HashMap;
5 import java.util.List;
6 import java.util.Map;
7 import java.util.concurrent.ConcurrentHashMap;
8
9 import org.elasticsearch.client.Client;
10 import org.elasticsearch.common.settings.Settings;
11 import org.elasticsearch.common.transport.InetSocketTransportAddress;
12 import org.elasticsearch.common.xcontent.XContentBuilder;
13 import org.elasticsearch.common.xcontent.XContentFactory;
14 import org.elasticsearch.index.mapper.Mapping;
15 import org.elasticsearch.transport.client.PreBuiltTransportClient;
16
17 import com.chinadigitalvideo.esagent.servlet.WebServiceInit;
18
19 /**
20 * Created by tgg on 16-3-17.
21 */
22 public class ClientHelper {
23 private static String ip;
24 private static int port;
25
26 private Settings setting;
27 private Mapping mapping;
28
29 private Map<String, Client> clientMap = new ConcurrentHashMap<String, Client>();
30
31 private Map<String, Integer> ips = new HashMap<String, Integer>(); // hostname port
32
33 private String clusterName = WebServiceInit.clusterName;
34
35 private ClientHelper(String ip,Integer port) {
36 init(ip,port);
37 //TO-DO 添加你需要的client到helper
38 }
39
40 public static final ClientHelper getInstance(String ipConf ,Integer portConf) {
41 ip=ipConf;
42 port=portConf;
43 return ClientHolder.INSTANCE;
44 }
45
46 private static class ClientHolder {
47 private static final ClientHelper INSTANCE = new ClientHelper(ip,port);
48 }
49
50 /**
51 * 初始化默认的client
52 */
53 public void init(String ip,int port) {
54
55 ips.put(ip, port);
56 setting =Settings.builder()
57 .put("client.transport.sniff",true)
58 .put("cluster.name",clusterName).build();
59 addClient(setting, getAllAddress(ips));
60 }
61
62 /**
63 * 获得所有的地址端口
64 *
65 * @return
66 */
67 public List<InetSocketTransportAddress> getAllAddress(Map<String, Integer> ips) {
68 List<InetSocketTransportAddress> addressList = new ArrayList<InetSocketTransportAddress>();
69 for (String ip : ips.keySet()) {
70 addressList.add(new InetSocketTransportAddress(new InetSocketAddress(ip, ips.get(ip))));
71 }
72 return addressList;
73 }
74
75 public Client getClient() {
76 return getClient(clusterName);
77 }
78
79 public Client getClient(String clusterName) {
80 return clientMap.get(clusterName);//通过集群名称得到一个Client
81 }
82
83 public void addClient(Settings setting, List<InetSocketTransportAddress> transportAddress) {
84 Client client = new PreBuiltTransportClient(setting)
85 .addTransportAddresses(transportAddress.toArray(new InetSocketTransportAddress[transportAddress.size()]));
86
87 clientMap.put(setting.get("cluster.name"), client);
88 }
89 }