mysql到MPPDB自动刷表
代码实例 package oa.epoint.com.mysql_mpp; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.ResultSetMetaData; import java.sql.SQLException; import java.sql.Statement; public class AutoMysqltoMPP { private static String MYSQLUSERNAME = "root"; private static String MYSQLPASSWORD = "Gepoint"; private static String MYSQLDRIVER = "com.mysql.jdbc.Driver"; private static String MYSQLURL = "jdbc:mysql://100.2.5.221:3307/dep_fr_db"; private static String MYSQLDATABASE = "dep_fr_db"; private static String MPPDRIVER = "com.MPP.jdbc.Driver"; private static String MPPURL = "jdbc:MPP://100.2.5.1:5258/"; private static String MPPUSERNAME = "mpp"; private static String MPPPASSWORD = "h3c"; Connection mysqlconn = null; Statement mysqlpstm = null; ResultSet mysqlrs = null; Connection mppconn = null; Statement mppstm = null; ResultSet mpprs = null; String sql1 = " "; String sql2 = " "; String sql3 = " "; String sql4 = " "; String sql5 = " "; String sql6 = " "; public static void main(String[] args) throws Exception { AutoMysqltoMPP aidth = new AutoMysqltoMPP(); aidth.getMYSQLConnection(); aidth.MYSQLReleaseResource(); aidth.getMPPConnection(); aidth.MPPReleaseResource(); aidth.CreateMPPTable(); // aidth.ImportDataToMPP(); System.out.println("程序已经执行完毕!请去waterdrop验证结果吧!!"); } public void CreateMPPTable() { mysqlconn = getMYSQLConnection(); mppconn = getMPPConnection(); try { mppstm = mppconn.createStatement(); mysqlpstm = mysqlconn.createStatement(); int i = 0; String sql = "SELECT table_schema\r\n" + " ,table_name\r\n" + " ,(\r\n" + " CASE \r\n" + " WHEN ORDINAL_POSITION = mincol\r\n" + " AND ORDINAL_POSITION < maxcol\r\n" + " THEN CONCAT (\"create table \"\r\n" + " ,table_schema\r\n" + " ,\".\"\r\n" + " ,table_name\r\n" + " ,\"(`\"\r\n" + " ,column_name\r\n" + " ,\"` \"\r\n" + " ,COLUMN_TYPE\r\n" + " ,\",\"\r\n" + " )\r\n" + " WHEN ORDINAL_POSITION = mincol\r\n" + " AND ORDINAL_POSITION = maxcol\r\n" + " THEN CONCAT (\"create table \"\r\n" + " ,table_schema\r\n" + " ,\".\"\r\n" + " ,table_name\r\n" + " ,\"(`\"\r\n" + " ,column_name\r\n" + " ,\"` \"\r\n" + " ,COLUMN_TYPE\r\n" + " ,\");\"\r\n" + " )\r\n" + " WHEN ORDINAL_POSITION > mincol\r\n" + " AND ORDINAL_POSITION < maxcol\r\n" + " THEN CONCAT (\r\n" + " \"`\"\r\n" + " ,column_name\r\n" + " ,\"` \"\r\n" + " ,COLUMN_TYPE\r\n" + " ,\",\"\r\n" + " )\r\n" + " WHEN ORDINAL_POSITION = maxcol\r\n" + " THEN CONCAT (\r\n" + " \"`\"\r\n" + " ,column_name\r\n" + " ,\"` \"\r\n" + " ,COLUMN_TYPE\r\n" + " ,\");\"\r\n" + " )\r\n" + " END\r\n" + " ) AS statement\r\n" + " ,ORDINAL_POSITION\r\n" + " ,maxcol\r\n" + " ,mincol\r\n" + "FROM (\r\n" + " SELECT b.table_schema,b.table_name,b.ORDINAL_POSITION,b.column_name,\r\n" + " (case\r\n" + " when column_type = 'timestamp' then 'datetime'\r\n" + " when column_type = 'bit(1)' then 'int(1)'\r\n" + " else\r\n" + " column_type\r\n" + " end ) AS column_type\r\n" + " ,a.maxcol\r\n" + " ,a.mincol\r\n" + " FROM (\r\n" + " SELECT table_schema\r\n" + " ,table_name\r\n" + " ,max(ORDINAL_POSITION) maxcol\r\n" + " ,min(ORDINAL_POSITION) mincol\r\n" + " FROM information_schema.COLUMNS\r\n" + " GROUP BY table_schema\r\n" + " ,table_name\r\n" + " ) a\r\n" + " JOIN (\r\n" + " SELECT table_schema\r\n" + " ,table_name\r\n" + " ,ORDINAL_POSITION\r\n" + " ,column_name\r\n" + " ,COLUMN_TYPE\r\n" + " FROM information_schema.COLUMNS\r\n" + " ORDER BY table_schema\r\n" + " ,table_name\r\n" + " ,ORDINAL_POSITION ASC\r\n" + " ) b ON a.table_schema = b.table_schema\r\n" + " AND a.table_name = b.table_name\r\n" + " ) c\r\n" + "WHERE table_schema = '"+MYSQLDATABASE+"'"; mysqlrs = mysqlpstm.executeQuery(sql); while (mysqlrs.next()) { sql1 = mysqlrs.getString(3); sql2 = sql2 + sql1; } sql3 = "create database IF NOT EXISTS " + MYSQLDATABASE; mppstm.execute(sql3); System.out.println("-------------------建mpp表,表结构的语句为:" + sql2); String[] sqls=sql2.split(";"); for (String m : sqls) { mppstm.execute(m); } System.out.println("----------------------------------------建mpp表已结束!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); mppstm.close(); mysqlpstm.close(); } catch (SQLException e) { e.printStackTrace(); } finally { MYSQLReleaseResource(); MPPReleaseResource(); } } public void ImportDataToMPP() { mysqlconn = getMYSQLConnection(); mppconn = getMPPConnection(); String sql = "select table_name from user_tables where num_rows > 0 order by table_name asc"; int i = 0; try { mysqlpstm = mysqlconn.createStatement(); mysqlrs = mysqlpstm.executeQuery(sql); mppstm = mppconn.createStatement(); while (mysqlrs.next()) { i = i + 1; String table_name = mysqlrs.getString("table_name").replaceAll("\\$", ""); String sql7 = "insert into " + MYSQLDATABASE + "." + table_name + " select * from " + MYSQLDATABASE + "_ex." + table_name; System.out.println("现在插入第"+i+"个表:"+sql7); mppstm.execute(sql7); } } catch (SQLException e) { e.printStackTrace(); } finally { MYSQLReleaseResource(); MPPReleaseResource(); } } public Connection getMYSQLConnection() { try { Class.forName(MYSQLDRIVER); mysqlconn = DriverManager.getConnection(MYSQLURL, MYSQLUSERNAME, MYSQLPASSWORD); } catch (ClassNotFoundException e) { throw new RuntimeException("class not find !", e); } catch (SQLException e) { throw new RuntimeException("get connection error!", e); } return mysqlconn; } public void MYSQLReleaseResource() { if (mysqlrs != null) { try { mysqlrs.close(); } catch (SQLException e) { e.printStackTrace(); } } if (mysqlpstm != null) { try { mysqlpstm.close(); } catch (SQLException e) { e.printStackTrace(); } } if (mysqlconn != null) { try { mysqlconn.close(); } catch (SQLException e) { e.printStackTrace(); } } } public Connection getMPPConnection() { try { Class.forName(MPPDRIVER); mppconn = DriverManager.getConnection(MPPURL, MPPUSERNAME, MPPPASSWORD); } catch (ClassNotFoundException e) { throw new RuntimeException("class not find !", e); } catch (SQLException e) { throw new RuntimeException("get connection error!", e); } return mppconn; } public void MPPReleaseResource() { if (mpprs != null) { try { mpprs.close(); } catch (SQLException e) { e.printStackTrace(); } } if (mppstm != null) { try { mppstm.close(); } catch (SQLException e) { e.printStackTrace(); } } if (mppconn != null) { try { mppconn.close(); } catch (SQLException e) { e.printStackTrace(); } } } }