MapReduce InputFormat——DBInputFormat
一、背景
为了方便MapReduce直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。通过
DBInputFormat类把数据库表数据读入到HDFS,根据DBOutputFormat类把MapReduce产生的结果集导入到数据库表中。
二、技术细节
1、DBInputFormat(Mysql为例),先创建表:
CREATE TABLE studentinfo ( id INTEGER NOT NULL PRIMARY KEY, name VARCHAR(32) NOT NULL);2、由于0.20版本对DBInputFormat和DBOutputFormat支持不是很好,该例用了0.19版本来说明这两个类的用法。3、DBInputFormat用法如下:
- public class DBInput {
- // DROP TABLE IF EXISTS `hadoop`.`studentinfo`;
- // CREATE TABLE studentinfo (
- // id INTEGER NOT NULL PRIMARY KEY,
- // name VARCHAR(32) NOT NULL);
- public static class StudentinfoRecord implements Writable, DBWritable {
- int id;
- String name;
- public StudentinfoRecord() {
- }
- public void readFields(DataInput in) throws IOException {
- this.id = in.readInt();
- this.name = Text.readString(in);
- }
- public void write(DataOutput out) throws IOException {
- out.writeInt(this.id);
- Text.writeString(out, this.name);
- }
- public void readFields(ResultSet result) throws SQLException {
- this.id = result.getInt(1);
- this.name = result.getString(2);
- }
- public void write(PreparedStatement stmt) throws SQLException {
- stmt.setInt(1, this.id);
- stmt.setString(2, this.name);
- }
- public String toString() {
- return new String(this.id + " " + this.name);
- }
- }
- public class DBInputMapper extends MapReduceBase implements
- Mapper<LongWritable, StudentinfoRecord, LongWritable, Text> {
- public void map(LongWritable key, StudentinfoRecord value,
- OutputCollector<LongWritable, Text> collector, Reporter reporter)
- throws IOException {
- collector.collect(new LongWritable(value.id), new Text(value
- .toString()));
- }
- }
- public static void main(String[] args) throws IOException {
- JobConf conf = new JobConf(DBInput.class);
- DistributedCache.addFileToClassPath(new Path(
- "/lib/mysql-connector-java-5.1.0-bin.jar"), conf);
- conf.setMapperClass(DBInputMapper.class);
- conf.setReducerClass(IdentityReducer.class);
- conf.setMapOutputKeyClass(LongWritable.class);
- conf.setMapOutputValueClass(Text.class);
- conf.setOutputKeyClass(LongWritable.class);
- conf.setOutputValueClass(Text.class);
- conf.setInputFormat(DBInputFormat.class);
- FileOutputFormat.setOutputPath(conf, new Path("/hua01"));
- DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
- "jdbc:mysql://192.168.3.244:3306/hadoop", "hua", "hadoop");
- String[] fields = { "id", "name" };
- DBInputFormat.setInput(conf, StudentinfoRecord.class, "studentinfo",
- null, "id", fields);
- JobClient.runJob(conf);
- }
- }
a)StudnetinfoRecord类的变量为表字段,实现Writable和DBWritable两个接口。
实现Writable的方法:
- public void readFields(DataInput in) throws IOException {
- this.id = in.readInt();
- this.name = Text.readString(in);
- }
- public void write(DataOutput out) throws IOException {
- out.writeInt(this.id);
- Text.writeString(out, this.name);
- }
实现DBWritable的方法:
- public void readFields(ResultSet result) throws SQLException {
- this.id = result.getInt(1);
- this.name = result.getString(2);
- }
- public void write(PreparedStatement stmt) throws SQLException {
- stmt.setInt(1, this.id);
- stmt.setString(2, this.name);
- }
b)读入Mapper的value类型是StudnetinfoRecord。
c)配置如何连入数据库,读出表studentinfo数据。
- DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
- "jdbc:mysql://192.168.3.244:3306/hadoop", "hua", "hadoop");
- String[] fields = { "id", "name" };
- DBInputFormat.setInput(conf, StudentinfoRecord.class, "studentinfo", null, "id", fields);
4、DBOutputFormat用法如下:
- public class DBOutput {
- public static class StudentinfoRecord implements Writable, DBWritable {
- int id;
- String name;
- public StudentinfoRecord() {
- }
- public void readFields(DataInput in) throws IOException {
- this.id = in.readInt();
- this.name = Text.readString(in);
- }
- public void write(DataOutput out) throws IOException {
- out.writeInt(this.id);
- Text.writeString(out, this.name);
- }
- public void readFields(ResultSet result) throws SQLException {
- this.id = result.getInt(1);
- this.name = result.getString(2);
- }
- public void write(PreparedStatement stmt) throws SQLException {
- stmt.setInt(1, this.id);
- stmt.setString(2, this.name);
- }
- public String toString() {
- return new String(this.id + " " + this.name);
- }
- }
- public static class MyReducer extends MapReduceBase implements
- Reducer<LongWritable, Text, StudentinfoRecord, Text> {
- public void reduce(LongWritable key, Iterator<Text> values,
- OutputCollector<StudentinfoRecord, Text> output, Reporter reporter)
- throws IOException {
- String[] splits = values.next().toString().split("/t");
- StudentinfoRecord r = new StudentinfoRecord();
- r.id = Integer.parseInt(splits[0]);
- r.name = splits[1];
- output.collect(r, new Text(r.name));
- }
- }
- public static void main(String[] args) throws IOException {
- JobConf conf = new JobConf(DBOutput.class);
- conf.setInputFormat(TextInputFormat.class);
- conf.setOutputFormat(DBOutputFormat.class);
- FileInputFormat.setInputPaths(conf, new Path("/hua/hua.bcp"));
- DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
- "jdbc:mysql://192.168.3.244:3306/hadoop", "hua", "hadoop");
- DBOutputFormat.setOutput(conf, "studentinfo", "id", "name");
- conf.setMapperClass(org.apache.hadoop.mapred.lib.IdentityMapper.class);
- conf.setReducerClass(MyReducer.class);
- JobClient.runJob(conf);
- }
- }
a)StudnetinfoRecord类的变量为表字段,实现Writable和DBWritable两个接口,同.DBInputFormat的StudnetinfoRecord类。
b)输出Reducer的key/value类型是StudnetinfoRecord。
c)配置如何连入数据库,输出结果到表studentinfo。
- DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver",
- "jdbc:mysql://192.168.3.244:3306/hadoop", "hua", "hadoop");
- DBOutputFormat.setOutput(conf, "studentinfo", "id", "name");
三、总结
运行MapReduce时候报错:java.io.IOException: com.mysql.jdbc.Driver,一般是由于程序找不到mysql驱动包。解决方法是让每个
tasktracker运行MapReduce程序时都可以找到该驱动包。
添加包有两种方式:
1.在每个节点下的${HADOOP_HOME}/lib下添加该包。重启集群,一般是比较原始的方法。
2.a)把包传到集群上: hadoop fs -put mysql-connector-java-5.1.0- bin.jar /lib
b)在mr程序提交job前,添加语句:istributedCache.addFileToClassPath(new Path("/lib/mysql- connector-java- 5.1.0-bin.jar"), conf);
3、虽然API用的是0.19的,但是使用0.20的API一样可用,只是会提示方法已过时而已。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
MapReduce InputFormat之FileInputFormat
一:简单认识InputFormat类 InputFormat主要用于描述输入数据的格式,提供了以下两个功能: 1)、数据切分,按照某个策略将输入数据且分成若干个split,以便确定Map Task的个数即Mapper的个数,在MapReduce框架中,一个split就意味着需要一个Map Task; 2)为Mapper提供输入数据,即给定一个split,(使用其中的RecordReader对象)将之解析为一个个的key/value键值对。 下面我们先来看以下1.0版本中的老的InputFormat接口: Java代码 publicinterfaceInputFormat<K,V>{ //获取所有的split分片 publicInputSplit[]getSplits(JobConfjob,intnumSplits)throwsIOException; //获取读取split的RecordReader对象,实际上是由RecordReader对象将 //split解析成一个个的key/value对儿 publicRecordReader<K,V>getRec...
- 下一篇
手机如何连接VMware虚拟机中的服务器
手机如何连接VMware虚拟机中的服务器 由于没有服务器,于是在自己的虚拟机中CentOS 7中安装hadoop中,用手机看是否能否登陆hadoop管理界面 环境 android手机 Windows 10 x64笔记本 VMware Workstation 12 安装了hadoop的CentOS 7 x64 说明: hadoop的管理端口为50070和8088 虚拟机ip为192.168.248.148 实施步骤 开放主机的防火墙端口 控制面板->系统和安全->Windows防火墙->高级设置 虚拟机端口转发 由于虚拟机采用的是Nat联网方式,菜单->编辑->虚拟网络编辑器->更改设置 选择VMnet8网络,点击NAT设置添加端口转发规则 笔记本开启wifi,手机连接 手机通过wifi连接笔记本 笔记本打开运行,输入cmd,执行命令ipconfig,查看开启wifi的无线网卡的ip:192.168.191.1 等一下用该ip登陆 手机登录 手机浏览器输入192.168.191.1:8088即可 注意: 此时虚拟机的服务端口也需要在虚拟机中的防火墙规则...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
-
Docker使用Oracle官方镜像安装(12C,18C,19C)
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- CentOS8编译安装MySQL8.0.19
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- MySQL8.0.19开启GTID主从同步CentOS8
- CentOS7,8上快速安装Gitea,搭建Git服务器
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
推荐阅读
最新文章
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- Hadoop3单机部署,实现最简伪集群
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- MySQL8.0.19开启GTID主从同步CentOS8
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS7,CentOS8安装Elasticsearch6.8.6