您现在的位置是:首页 > 文章详情

MapReduce InputFormat——DBInputFormat

日期:2015-11-29点击:445

一、背景

     为了方便MapReduce直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过

DBInputFormat类把数据库表数据读入到HDFS,根据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。

二、技术细节

1、DBInputFormat(Mysql为例),先创建表:

CREATE TABLE studentinfo ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(32) NOT NULL);
2、由于0.20版本对DBInputFormat和DBOutputFormat支持不是很好,该例用了0.19版本来说明这两个类的用法。
3、DBInputFormat用法如下:
[java] view plain copy
  1. public class DBInput {  
  2.    // DROP TABLE IF EXISTS `hadoop`.`studentinfo`;  
  3.    // CREATE TABLE studentinfo (  
  4.    // id INTEGER NOT NULL PRIMARY KEY,  
  5.    // name VARCHAR(32) NOT NULL);  
  6.   
  7.    public static class StudentinfoRecord implements Writable, DBWritable {  
  8.      int id;  
  9.      String name;  
  10.      public StudentinfoRecord() {  
  11.   
  12.      }  
  13.      public void readFields(DataInput in) throws IOException {  
  14.         this.id = in.readInt();  
  15.         this.name = Text.readString(in);  
  16.      }  
  17.      public void write(DataOutput out) throws IOException {  
  18.         out.writeInt(this.id);  
  19.         Text.writeString(out, this.name);  
  20.      }  
  21.      public void readFields(ResultSet result) throws SQLException {  
  22.         this.id = result.getInt(1);  
  23.         this.name = result.getString(2);  
  24.      }  
  25.      public void write(PreparedStatement stmt) throws SQLException {  
  26.         stmt.setInt(1this.id);  
  27.         stmt.setString(2this.name);  
  28.      }  
  29.      public String toString() {  
  30.         return new String(this.id + " " + this.name);  
  31.      }  
  32.    }  
  33.    public class DBInputMapper extends MapReduceBase implements  
  34.         Mapper<LongWritable, StudentinfoRecord, LongWritable, Text> {  
  35.      public void map(LongWritable key, StudentinfoRecord value,  
  36.           OutputCollector<LongWritable, Text> collector, Reporter reporter)  
  37.           throws IOException {  
  38.         collector.collect(new LongWritable(value.id), new Text(value  
  39.              .toString()));  
  40.      }  
  41.    }  
  42.    public static void main(String[] args) throws IOException {  
  43.      JobConf conf = new JobConf(DBInput.class);  
  44.      DistributedCache.addFileToClassPath(new Path(  
  45.           "/lib/mysql-connector-java-5.1.0-bin.jar"), conf);  
  46.        
  47.      conf.setMapperClass(DBInputMapper.class);  
  48.      conf.setReducerClass(IdentityReducer.class);  
  49.   
  50.      conf.setMapOutputKeyClass(LongWritable.class);  
  51.      conf.setMapOutputValueClass(Text.class);  
  52.      conf.setOutputKeyClass(LongWritable.class);  
  53.      conf.setOutputValueClass(Text.class);  
  54.        
  55.      conf.setInputFormat(DBInputFormat.class);  
  56.      FileOutputFormat.setOutputPath(conf, new Path("/hua01"));  
  57.      DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",  
  58.           "jdbc:mysql://192.168.3.244:3306/hadoop""hua""hadoop");  
  59.      String[] fields = { "id""name" };  
  60.      DBInputFormat.setInput(conf, StudentinfoRecord.class"studentinfo",  
  61.  null"id", fields);  
  62.   
  63.      JobClient.runJob(conf);  
  64.    }  
  65. }  

a)StudnetinfoRecord类的变量为表字段,实现Writable和DBWritable两个接口。

实现Writable的方法:

[java]  view plain copy
  1. public void readFields(DataInput in) throws IOException {  
  2.        this.id = in.readInt();  
  3.        this.name = Text.readString(in);  
  4.     }  
  5.     public void write(DataOutput out) throws IOException {  
  6.        out.writeInt(this.id);  
  7.        Text.writeString(out, this.name);  
  8.     }  

实现DBWritable的方法:

[java]  view plain copy
  1. public void readFields(ResultSet result) throws SQLException {  
  2.         this.id = result.getInt(1);  
  3.         this.name = result.getString(2);  
  4.      }  
  5.      public void write(PreparedStatement stmt) throws SQLException {  
  6.         stmt.setInt(1this.id);  
  7.         stmt.setString(2this.name);  
  8.      }  

b)读入Mapper的value类型是StudnetinfoRecord。

c)配置如何连入数据库,读出表studentinfo数据。

[java]  view plain copy
  1. DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",  
  2.           "jdbc:mysql://192.168.3.244:3306/hadoop""hua""hadoop");  
  3.      String[] fields = { "id""name" };  
  4.      DBInputFormat.setInput(conf, StudentinfoRecord.class"studentinfo",  null"id", fields);  


4、DBOutputFormat用法如下:
[java]  view plain copy
  1. public class DBOutput {  
  2.   
  3.    public static class StudentinfoRecord implements Writable,  DBWritable {  
  4.      int id;  
  5.      String name;  
  6.      public StudentinfoRecord() {  
  7.   
  8.      }  
  9.      public void readFields(DataInput in) throws IOException {  
  10.         this.id = in.readInt();  
  11.         this.name = Text.readString(in);  
  12.      }  
  13.      public void write(DataOutput out) throws IOException {  
  14.         out.writeInt(this.id);  
  15.         Text.writeString(out, this.name);  
  16.      }  
  17.      public void readFields(ResultSet result) throws SQLException {  
  18.         this.id = result.getInt(1);  
  19.         this.name = result.getString(2);  
  20.      }  
  21.      public void write(PreparedStatement stmt) throws SQLException {  
  22.         stmt.setInt(1this.id);  
  23.         stmt.setString(2this.name);  
  24.      }  
  25.      public String toString() {  
  26.         return new String(this.id + " " + this.name);  
  27.      }  
  28.    }  
  29.      
  30.    public static class MyReducer extends MapReduceBase implements  
  31.         Reducer<LongWritable, Text, StudentinfoRecord, Text> {  
  32.      public void reduce(LongWritable key, Iterator<Text> values,  
  33.           OutputCollector<StudentinfoRecord, Text> output, Reporter  reporter)  
  34.           throws IOException {  
  35.         String[] splits = values.next().toString().split("/t");  
  36.         StudentinfoRecord r = new StudentinfoRecord();  
  37.         r.id = Integer.parseInt(splits[0]);  
  38.         r.name = splits[1];  
  39.         output.collect(r, new Text(r.name));  
  40.      }  
  41.    }  
  42.   
  43.    public static void main(String[] args) throws IOException {  
  44.      JobConf conf = new JobConf(DBOutput.class);  
  45.      conf.setInputFormat(TextInputFormat.class);  
  46.      conf.setOutputFormat(DBOutputFormat.class);  
  47.   
  48.      FileInputFormat.setInputPaths(conf, new Path("/hua/hua.bcp"));  
  49.      DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",  
  50.           "jdbc:mysql://192.168.3.244:3306/hadoop""hua""hadoop");  
  51.      DBOutputFormat.setOutput(conf, "studentinfo""id""name");  
  52.   
  53.   conf.setMapperClass(org.apache.hadoop.mapred.lib.IdentityMapper.class);  
  54.      conf.setReducerClass(MyReducer.class);  
  55.   
  56.      JobClient.runJob(conf);  
  57.    }  
  58.   
  59. }  

a)StudnetinfoRecord类的变量为表字段,实现Writable和DBWritable两个接口,同.DBInputFormat的StudnetinfoRecord类。

b)输出Reducer的key/value类型是StudnetinfoRecord。

c)配置如何连入数据库,输出结果到表studentinfo。

[java]  view plain copy
  1. DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",  
  2.           "jdbc:mysql://192.168.3.244:3306/hadoop""hua""hadoop");  
  3.      DBOutputFormat.setOutput(conf, "studentinfo""id""name");  

三、总结

      运行MapReduce时候报错:java.io.IOException: com.mysql.jdbc.Driver,一般是由于程序找不到mysql驱动包。解决方法是让每个

tasktracker运行MapReduce程序时都可以找到该驱动包。

添加包有两种方式:

1.在每个节点下的${HADOOP_HOME}/lib下添加该包。重启集群,一般是比较原始的方法。

2.a)把包传到集群上: hadoop fs -put mysql-connector-java-5.1.0- bin.jar /lib

b)在mr程序提交job前,添加语句:istributedCache.addFileToClassPath(new Path("/lib/mysql- connector-java- 5.1.0-bin.jar"), conf);

3、虽然API用的是0.19的,但是使用0.20的API一样可用,只是会提示方法已过时而已。

原文链接:https://yq.aliyun.com/articles/413130
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章