您现在的位置是:首页 > 文章详情

大数据应用之HBase数据插入性能优化之多线程并行插入测试案例

日期:2013-10-09点击:582

一、引言:

上篇文章提起关于HBase插入性能优化设计到的五个参数,从参数配置的角度给大家提供了一个性能测试环境的实验代码。根据网友的反馈,基于单线程的模式实现的数据插入毕竟有限。通过个人实测,在我的虚拟机环境下,单线程插入数据的值约为4w/s。集群指标是:CPU双核1.83,虚拟机512M内存,集群部署单点模式。本文给出了基于多线程并发模式的,测试代码案例和实测结果,希望能给大家一些启示:

二、源程序:

 1 import org.apache.hadoop.conf.Configuration;  2 import org.apache.hadoop.hbase.HBaseConfiguration;  3 import java.io.BufferedReader;  4 import java.io.File;  5 import java.io.FileNotFoundException;  6 import java.io.FileReader;  7 import java.io.IOException;  8 import java.util.ArrayList;  9 import java.util.List;  10 import java.util.Random;  11  12 import org.apache.hadoop.conf.Configuration;  13 import org.apache.hadoop.hbase.HBaseConfiguration;  14 import org.apache.hadoop.hbase.client.HBaseAdmin;  15 import org.apache.hadoop.hbase.client.HTable;  16 import org.apache.hadoop.hbase.client.HTableInterface;  17 import org.apache.hadoop.hbase.client.HTablePool;  18 import org.apache.hadoop.hbase.client.Put;  19  20 public class HBaseImportEx {  21 static Configuration hbaseConfig = null;  22 public static HTablePool pool = null;  23 public static String tableName = "T_TEST_1";  24 static{  25 //conf = HBaseConfiguration.create();  26 Configuration HBASE_CONFIG = new Configuration();  27 HBASE_CONFIG.set("hbase.master", "192.168.230.133:60000");  28 HBASE_CONFIG.set("hbase.zookeeper.quorum", "192.168.230.133");  29 HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181");  30 hbaseConfig = HBaseConfiguration.create(HBASE_CONFIG);  31  32 pool = new HTablePool(hbaseConfig, 1000);  33  }  34 /*  35  * Insert Test single thread  36  * */  37 public static void SingleThreadInsert()throws IOException  38  {  39 System.out.println("---------开始SingleThreadInsert测试----------");  40 long start = System.currentTimeMillis();  41 //HTableInterface table = null;  42 HTable table = null;  43 table = (HTable)pool.getTable(tableName);  44 table.setAutoFlush(false);  45 table.setWriteBufferSize(24*1024*1024);  46 //构造测试数据  47 List<Put> list = new ArrayList<Put>();  48 int count = 10000;  49 byte[] buffer = new byte[350];  50 Random rand = new Random();  51 for(int i=0;i<count;i++)  52  {  53 Put put = new Put(String.format("row %d",i).getBytes());  54  rand.nextBytes(buffer);  55 put.add("f1".getBytes(), null, buffer);  56 //wal=false  57 put.setWriteToWAL(false);  58  list.add(put);  59 if(i%10000 == 0)  60  {  61  table.put(list);  62  list.clear();  63  table.flushCommits();  64  }  65  }  66 long stop = System.currentTimeMillis();  67 //System.out.println("WAL="+wal+",autoFlush="+autoFlush+",buffer="+writeBuffer+",count="+count);  68  69 System.out.println("插入数据:"+count+"共耗时:"+ (stop - start)*1.0/1000+"s");  70  71 System.out.println("---------结束SingleThreadInsert测试----------");  72  }  73 /*  74  * 多线程环境下线程插入函数  75  *  76  * */  77 public static void InsertProcess()throws IOException  78  {  79 long start = System.currentTimeMillis();  80 //HTableInterface table = null;  81 HTable table = null;  82 table = (HTable)pool.getTable(tableName);  83 table.setAutoFlush(false);  84 table.setWriteBufferSize(24*1024*1024);  85 //构造测试数据  86 List<Put> list = new ArrayList<Put>();  87 int count = 10000;  88 byte[] buffer = new byte[256];  89 Random rand = new Random();  90 for(int i=0;i<count;i++)  91  {  92 Put put = new Put(String.format("row %d",i).getBytes());  93  rand.nextBytes(buffer);  94 put.add("f1".getBytes(), null, buffer);  95 //wal=false  96 put.setWriteToWAL(false);  97  list.add(put);  98 if(i%10000 == 0)  99  { 100  table.put(list); 101  list.clear(); 102  table.flushCommits(); 103  } 104  } 105 long stop = System.currentTimeMillis(); 106 //System.out.println("WAL="+wal+",autoFlush="+autoFlush+",buffer="+writeBuffer+",count="+count); 107 108 System.out.println("线程:"+Thread.currentThread().getId()+"插入数据:"+count+"共耗时:"+ (stop - start)*1.0/1000+"s"); 109  } 110 111 112 /* 113  * Mutil thread insert test 114  * */ 115 public static void MultThreadInsert() throws InterruptedException 116  { 117 System.out.println("---------开始MultThreadInsert测试----------"); 118 long start = System.currentTimeMillis(); 119 int threadNumber = 10; 120 Thread[] threads=new Thread[threadNumber]; 121 for(int i=0;i<threads.length;i++) 122  { 123 threads[i]= new ImportThread(); 124  threads[i].start(); 125  } 126 for(int j=0;j< threads.length;j++) 127  { 128  (threads[j]).join(); 129  } 130 long stop = System.currentTimeMillis(); 131 132 System.out.println("MultThreadInsert:"+threadNumber*10000+"共耗时:"+ (stop - start)*1.0/1000+"s"); 133 System.out.println("---------结束MultThreadInsert测试----------"); 134  } 135 136 /** 137  * @param args 138 */ 139 public static void main(String[] args) throws Exception{ 140 // TODO Auto-generated method stub 141 //SingleThreadInsert();  142  MultThreadInsert(); 143 144 145  } 146 147 public static class ImportThread extends Thread{ 148 public void HandleThread() 149  { 150 //this.TableName = "T_TEST_1"; 151 152 153  } 154 // 155 public void run(){ 156 try{ 157  InsertProcess(); 158  } 159 catch(IOException e){ 160  e.printStackTrace(); 161 }finally{ 162  System.gc(); 163  } 164  } 165  } 166 167 }

三、说明

1.线程数设置需要根据本集群硬件参数,实际测试得出。否则线程过多的情况下,总耗时反而是下降的。

2.单笔提交数对性能的影响非常明显,需要在自己的环境下,找到最理想的数值,这个需要与单条记录的字节数相关。

四、测试结果

---------开始MultThreadInsert测试----------

线程:8插入数据:10000共耗时:1.328s
线程:16插入数据:10000共耗时:1.562s
线程:11插入数据:10000共耗时:1.562s
线程:10插入数据:10000共耗时:1.812s
线程:13插入数据:10000共耗时:2.0s
线程:17插入数据:10000共耗时:2.14s
线程:14插入数据:10000共耗时:2.265s
线程:9插入数据:10000共耗时:2.468s
线程:15插入数据:10000共耗时:2.562s
线程:12插入数据:10000共耗时:2.671s
MultThreadInsert:100000共耗时:2.703s
---------结束MultThreadInsert测试----------

 备注:该技术专题讨论正在群Hadoop高级交流群:293503507同步直播中,敬请关注。


作者:张子良
出处:http://www.cnblogs.com/hadoopdev
本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

原文链接:https://yq.aliyun.com/articles/438598
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章