大数据应用之HBase数据插入性能优化之多线程并行插入测试案例
一、引言:
上篇文章提起关于HBase插入性能优化设计到的五个参数,从参数配置的角度给大家提供了一个性能测试环境的实验代码。根据网友的反馈,基于单线程的模式实现的数据插入毕竟有限。通过个人实测,在我的虚拟机环境下,单线程插入数据的值约为4w/s。集群指标是:CPU双核1.83,虚拟机512M内存,集群部署单点模式。本文给出了基于多线程并发模式的,测试代码案例和实测结果,希望能给大家一些启示:
二、源程序:
1 import org.apache.hadoop.conf.Configuration; 2 import org.apache.hadoop.hbase.HBaseConfiguration; 3 import java.io.BufferedReader; 4 import java.io.File; 5 import java.io.FileNotFoundException; 6 import java.io.FileReader; 7 import java.io.IOException; 8 import java.util.ArrayList; 9 import java.util.List; 10 import java.util.Random; 11 12 import org.apache.hadoop.conf.Configuration; 13 import org.apache.hadoop.hbase.HBaseConfiguration; 14 import org.apache.hadoop.hbase.client.HBaseAdmin; 15 import org.apache.hadoop.hbase.client.HTable; 16 import org.apache.hadoop.hbase.client.HTableInterface; 17 import org.apache.hadoop.hbase.client.HTablePool; 18 import org.apache.hadoop.hbase.client.Put; 19 20 public class HBaseImportEx { 21 static Configuration hbaseConfig = null; 22 public static HTablePool pool = null; 23 public static String tableName = "T_TEST_1"; 24 static{ 25 //conf = HBaseConfiguration.create(); 26 Configuration HBASE_CONFIG = new Configuration(); 27 HBASE_CONFIG.set("hbase.master", "192.168.230.133:60000"); 28 HBASE_CONFIG.set("hbase.zookeeper.quorum", "192.168.230.133"); 29 HBASE_CONFIG.set("hbase.zookeeper.property.clientPort", "2181"); 30 hbaseConfig = HBaseConfiguration.create(HBASE_CONFIG); 31 32 pool = new HTablePool(hbaseConfig, 1000); 33 } 34 /* 35 * Insert Test single thread 36 * */ 37 public static void SingleThreadInsert()throws IOException 38 { 39 System.out.println("---------开始SingleThreadInsert测试----------"); 40 long start = System.currentTimeMillis(); 41 //HTableInterface table = null; 42 HTable table = null; 43 table = (HTable)pool.getTable(tableName); 44 table.setAutoFlush(false); 45 table.setWriteBufferSize(24*1024*1024); 46 //构造测试数据 47 List<Put> list = new ArrayList<Put>(); 48 int count = 10000; 49 byte[] buffer = new byte[350]; 50 Random rand = new Random(); 51 for(int i=0;i<count;i++) 52 { 53 Put put = new Put(String.format("row %d",i).getBytes()); 54 rand.nextBytes(buffer); 55 put.add("f1".getBytes(), null, buffer); 56 //wal=false 57 put.setWriteToWAL(false); 58 list.add(put); 59 if(i%10000 == 0) 60 { 61 table.put(list); 62 list.clear(); 63 table.flushCommits(); 64 } 65 } 66 long stop = System.currentTimeMillis(); 67 //System.out.println("WAL="+wal+",autoFlush="+autoFlush+",buffer="+writeBuffer+",count="+count); 68 69 System.out.println("插入数据:"+count+"共耗时:"+ (stop - start)*1.0/1000+"s"); 70 71 System.out.println("---------结束SingleThreadInsert测试----------"); 72 } 73 /* 74 * 多线程环境下线程插入函数 75 * 76 * */ 77 public static void InsertProcess()throws IOException 78 { 79 long start = System.currentTimeMillis(); 80 //HTableInterface table = null; 81 HTable table = null; 82 table = (HTable)pool.getTable(tableName); 83 table.setAutoFlush(false); 84 table.setWriteBufferSize(24*1024*1024); 85 //构造测试数据 86 List<Put> list = new ArrayList<Put>(); 87 int count = 10000; 88 byte[] buffer = new byte[256]; 89 Random rand = new Random(); 90 for(int i=0;i<count;i++) 91 { 92 Put put = new Put(String.format("row %d",i).getBytes()); 93 rand.nextBytes(buffer); 94 put.add("f1".getBytes(), null, buffer); 95 //wal=false 96 put.setWriteToWAL(false); 97 list.add(put); 98 if(i%10000 == 0) 99 { 100 table.put(list); 101 list.clear(); 102 table.flushCommits(); 103 } 104 } 105 long stop = System.currentTimeMillis(); 106 //System.out.println("WAL="+wal+",autoFlush="+autoFlush+",buffer="+writeBuffer+",count="+count); 107 108 System.out.println("线程:"+Thread.currentThread().getId()+"插入数据:"+count+"共耗时:"+ (stop - start)*1.0/1000+"s"); 109 } 110 111 112 /* 113 * Mutil thread insert test 114 * */ 115 public static void MultThreadInsert() throws InterruptedException 116 { 117 System.out.println("---------开始MultThreadInsert测试----------"); 118 long start = System.currentTimeMillis(); 119 int threadNumber = 10; 120 Thread[] threads=new Thread[threadNumber]; 121 for(int i=0;i<threads.length;i++) 122 { 123 threads[i]= new ImportThread(); 124 threads[i].start(); 125 } 126 for(int j=0;j< threads.length;j++) 127 { 128 (threads[j]).join(); 129 } 130 long stop = System.currentTimeMillis(); 131 132 System.out.println("MultThreadInsert:"+threadNumber*10000+"共耗时:"+ (stop - start)*1.0/1000+"s"); 133 System.out.println("---------结束MultThreadInsert测试----------"); 134 } 135 136 /** 137 * @param args 138 */ 139 public static void main(String[] args) throws Exception{ 140 // TODO Auto-generated method stub 141 //SingleThreadInsert(); 142 MultThreadInsert(); 143 144 145 } 146 147 public static class ImportThread extends Thread{ 148 public void HandleThread() 149 { 150 //this.TableName = "T_TEST_1"; 151 152 153 } 154 // 155 public void run(){ 156 try{ 157 InsertProcess(); 158 } 159 catch(IOException e){ 160 e.printStackTrace(); 161 }finally{ 162 System.gc(); 163 } 164 } 165 } 166 167 }
三、说明
1.线程数设置需要根据本集群硬件参数,实际测试得出。否则线程过多的情况下,总耗时反而是下降的。
2.单笔提交数对性能的影响非常明显,需要在自己的环境下,找到最理想的数值,这个需要与单条记录的字节数相关。
四、测试结果
---------开始MultThreadInsert测试----------
线程:8插入数据:10000共耗时:1.328s
线程:16插入数据:10000共耗时:1.562s
线程:11插入数据:10000共耗时:1.562s
线程:10插入数据:10000共耗时:1.812s
线程:13插入数据:10000共耗时:2.0s
线程:17插入数据:10000共耗时:2.14s
线程:14插入数据:10000共耗时:2.265s
线程:9插入数据:10000共耗时:2.468s
线程:15插入数据:10000共耗时:2.562s
线程:12插入数据:10000共耗时:2.671s
MultThreadInsert:100000共耗时:2.703s
---------结束MultThreadInsert测试----------
备注:该技术专题讨论正在群Hadoop高级交流群:293503507同步直播中,敬请关注。
作者:张子良
出处:http://www.cnblogs.com/hadoopdev
本文版权归作者所有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
远程提交Map/Reduce任务
1. 将开发好MR代码打包成jar。添加到distributed cache中。 Xml代码 bin/hadoopfs-copyFromLocal/root/stat-analysis-mapred-1.0-SNAPSHOT.jar/user/root/lib bin/hadoop fs -copyFromLocal /root/stat-analysis-mapred-1.0-SNAPSHOT.jar /user/root/lib 2. 在服务器端创建和你客户端一模一样的用户。创建目录 /tmp/hadoop-root/stagging/用户 3. 客户端提交job的代码 Java代码 Configurationconf=HBaseConfiguration.create(); conf.set("hbase.zookeeper.quorum","node.tracker1"); conf.set("fs.default.name","hdfs://node.tracker1:9000/hbase"); conf.set("mapred.job.tracker","...
- 下一篇
Riak学习(3):Riak对比HBase(转)
文章转自:http://blog.nosqlfan.com/html/4081.html 文章来自Riak官方wiki,是一篇Riak与HBase的对比文章。Riak官方的对比通常都做得很中肯,并不刻意偏向自家产品。本文也是一样。 对比的Riak版本是1.1.x,HBase是0.94.x。 大方面对比 Riak 与 HBase 都是基于Apache 2.0 licensed 发布 Riak 的实现是基于Amazon 的 Dynamo 论文,HBase 是基于 Google 的 BigTable Riak 主要用 Erlang 写成,包括一部分的C,而 HBase 是用 Java 写的。 功能性对比 功能点 Riak HBase 数据模型 Riak 通过bucket作为命名空间,存储 Key-Value 信息 Buckets, Keys, and Values HBase 按预先定义好的 column family 结构来存储数据(每一条数据有一个key以及若干个列属性值组成,每列的数据都有自己的版本信息)。HBase 中的数据是按列进行有序存储的(不像关系型数据库中按行存储)。 HBa...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- CentOS8安装Docker,最新的服务器搭配容器使用
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Docker安装Oracle12C,快速搭建Oracle学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Red5直播服务器,属于Java语言的直播服务器
- Windows10,CentOS7,CentOS8安装Nodejs环境
- CentOS7,8上快速安装Gitea,搭建Git服务器
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- SpringBoot2全家桶,快速入门学习开发网站教程
- Docker使用Oracle官方镜像安装(12C,18C,19C)