Linkis JDBC模块设计介绍
目录
01
—
背景介绍
02
—
使用介绍
(1)引入依赖模块
<dependency>
<groupId>com.webank.wedatasphere.linkis</groupId>
<artifactId>linkis-ujes-jdbc</artifactId>
<version>0.9.1</version>
</dependency>
(2)建立测试类
public static void main(String[] args) throws SQLException, ClassNotFoundException {
//1. 加载驱动类:
Class.forName("com.webank.wedatasphere.linkis.ujes.jdbc.UJESSQLDriver");
//2. 获得连接:jdbc:linkis://gatewayIP:gatewayPort 帐号和密码对应前端的帐号密码
Connection connection = DriverManager.getConnection("jdbc:linkis://127.0.0.1:9001","username","password");
//3. 创建statement 和执行查询
Statement st= connection.createStatement();
ResultSet rs=st.executeQuery("show tables");
//4.处理数据库的返回结果(使用ResultSet类)
while (rs.next()) {
ResultSetMetaData metaData = rs.getMetaData();
for (int i = 1; i <= metaData.getColumnCount(); i++) {
System.out.print(metaData.getColumnName(i) + ":" +metaData.getColumnTypeName(i)+": "+ rs.getObject(i) + " ");
}
System.out.println();
}
//关闭资源
rs.close();
st.close();
connection.close();
}
图 2-1 Linkis JDBC 任务执行结果
03
—
模块设计方案
UJESSQLDriver
UJESSQLConnection
UJESSQLStatement
UJESSQLPreStatement
UJESSQLResultSet
static {
try {
DriverManager.registerDriver(new UJESSQLDriver());
} catch (SQLException e) {
Logger logger = LoggerFactory.getLogger(UJESSQLDriver.class);
logger.info("Load driver failed",e);
}
}
(2)JDBC连接器UJESSQLConnection
conn = (UJESSQLConnection) DriverManager
.getConnection("jdbc:linkis://hostname:port","username","password")
override def connect(url: String, info: Properties): Connection = if(acceptsURL(url)) {
val props = if(info != null) info else newProperties
props.putAll(parseURL(url))
val ujesClient =UJESClientFactory.getUJESClient(props)
new UJESSQLConnection(ujesClient, props)
} else throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url: " + url)
private def parseURL(url: String): Properties= {
val props = new Properties
//add an entry to get url
props.setProperty("URL", url)
url match {
case URL_REGEX(host, port, db, params)=>
if(StringUtils.isNotBlank(host))props.setProperty(HOST, host)
if(StringUtils.isNotBlank(port))props.setProperty(PORT, port.substring(1))
if(StringUtils.isNotBlank(db)&& db.length > 1) props.setProperty(DB_NAME, db.substring(1))
if(StringUtils.isNotBlank(params)&& params.length > 1) {
val _params = params.substring(1)
val kvs =_params.split(PARAM_SPLIT).map(_.split(KV_SPLIT)).filter {
case Array(USER, value) =>
props.setProperty(USER, value)
false
case Array(PASSWORD, value) =>
props.setProperty(PASSWORD,value)
false
case Array(key, _) =>
if(StringUtils.isBlank(key)) {
throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url for params: " +url)
} else true
case _ => throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url for params: " +url)
}
props.setProperty(PARAMS,kvs.map(_.mkString(KV_SPLIT)).mkString(PARAM_SPLIT))
}
case _ => throw newUJESSQLException(UJESSQLErrorCode.BAD_URL, "bad url: " + url)
}
props
}
def getUJESClient(props: Properties):UJESClient = {
val host = props.getProperty(HOST)
val port = props.getProperty(PORT)
val serverUrl = if(StringUtils.isNotBlank(port))s"http://$host:$port" else "http://" + host
if(ujesClients.containsKey(serverUrl))ujesClients.get(serverUrl)
else serverUrl.intern synchronized {
if(ujesClients.containsKey(serverUrl))return ujesClients.get(serverUrl)
val ujesClient =createUJESClient(serverUrl, props)
ujesClients.put(serverUrl, ujesClient)
ujesClient
}
}
(3)执行对象UJESSQLStatement/UJESSQLPreStatement
//获取执行对象
UJESSQLStatementstatement = (UJESSQLStatementCon) conn.createStatement;
//获取预执行对象
UJESSQLPrepareStatementpreStatement = (UJESSQLPrepareStatement) conn.prePareStatement;
override defexecute(sql: String): Boolean = throwWhenClosed {
var parsedSQL = sql
//预执行hook,转换不支持的sql语句
JDBCDriverPreExecutionHook.getPreExecutionHooks.foreach{
preExecution =>
parsedSQL = preExecution.callPreExecutionHook(parsedSQL)
}
//获取linkis的job执行器,创建用于执行的action任务
val action =JobExecuteAction.builder().setEngineType(EngineType.SPARK).addExecuteCode(parsedSQL)
.setCreator(ujesSQLConnection.creator).setUser(ujesSQLConnection.user)
if(ujesSQLConnection.variableMap.nonEmpty)action.setVariableMap(JavaConversions.mapAsJavaMap(ujesSQLConnection.variableMap))
//提交SQL任务到ujes客户端执行
jobExecuteResult =ujesSQLConnection.ujesClient.execute(action.build())
queryEnd = false
//job状态检测
var status =ujesSQLConnection.ujesClient.status(jobExecuteResult)
val atMost = if(queryTimeout > 0) Duration(queryTimeout,TimeUnit.MILLISECONDS) else Duration.Inf
if(!status.isCompleted)Utils.tryThrow{
Utils.waitUntil(() =>{
status =ujesSQLConnection.ujesClient.status(jobExecuteResult)
status.isCompleted ||closed
}, atMost, 100, 10000)
} {
case t: TimeoutException=>
if(queryTimeout >0) clearQuery()
newUJESSQLException(UJESSQLErrorCode.QUERY_TIMEOUT, "query has beentimeout!").initCause(t)
case t => t
}
if(!closed) {
var jobInfo =ujesSQLConnection.ujesClient.getJobInfo(jobExecuteResult)
if(status.isFailed)throw new ErrorException(jobInfo.getRequestPersistTask.getErrCode,jobInfo.getRequestPersistTask.getErrDesc)
val jobInfoStatus =jobInfo.getJobStatus
if(!jobInfoStatus.equals("Succeed")) Utils.tryThrow{
Utils.waitUntil(()=> {
jobInfo = ujesSQLConnection.ujesClient.getJobInfo(jobExecuteResult)
val state =jobInfo.getJobStatus match{
case"Failed" | "Cancelled" | "Timeout" |"Succeed" => true
case _ => false
}
state || closed
}, atMost, 100, 10000)
} {
case t:TimeoutException =>
if(queryTimeout >0) clearQuery()
newUJESSQLException(UJESSQLErrorCode.QUERY_TIMEOUT, "query has beentimeout!").initCause(t)
case t => t
}
//获取结果集
val resultSetList =jobInfo.getResultSetList(ujesSQLConnection.ujesClient)
queryEnd = true
if(resultSetList !=null) {
resultSet = newUJESSQLResultSet(resultSetList, this, maxRows, fetchSize)
true
} else false
} else throw newUJESSQLException(UJESSQLErrorCode.STATEMENT_CLOSED, "Statement isclosed.")
}
private def init(): Unit = {
resultSetResultInit()
metaDataInit()
resultSetInit()
}
private def resultSetResultInit(): Unit = {
if (path == null) path =getResultSetPath(resultSetList)
val user =connection.getProps.getProperty("user")
if(StringUtils.isNotBlank(path)){
val resultAction =ResultSetAction.builder.setUser(user).setPath(path).build()
resultSetResult =connection.ujesClient.resultSet(resultAction)
}
}
private def metaDataInit(): Unit = {
if ( null ==resultSetResult ){
return
}
metaData =resultSetResult.getMetadata.asInstanceOf[util.List[util.Map[String, String]]]
for(cursor <- 1 tometaData.size()){
val col =metaData.get(cursor - 1)
resultSetMetaData.setColumnNameProperties(cursor,col.get("columnName"))
resultSetMetaData.setDataTypeProperties(cursor,col.get("dataType"))
resultSetMetaData.setCommentPropreties(cursor,col.get("comment"))
}
}
private def resultSetInit(): Unit = {
if ( null ==resultSetResult ){
return
}
resultSetRow =resultSetResult.getFileContent.asInstanceOf[util.ArrayList[util.ArrayList[String]]]
}
override def next(): Boolean = {
if(metaData == null)init()
currentRowCursor += 1
if(null == resultSetRow ||currentRowCursor > resultSetRow.size()-1) false
else{
updateCurrentRow(currentRowCursor)
true
}
}
override def getString(columnIndex: Int): String = {
val any = getColumnValue(columnIndex)
if(wasNull()) {
throw newUJESSQLException(UJESSQLErrorCode.RESULTSET_ROWERROR, "Type is null")
}else{
any.asInstanceOf[String]
}
}
(5)错误码方案
public enum UJESSQLErrorCode {
BAD_URL(80000,"badurl"),
NOSUPPORT_DRIVER(80001,"this method not supported in driver"),
NOSUPPORT_CONNECTION(80002,"this method not supported in connection"),
NOSUPPORT_STATEMENT(80003,"this method not supported instatement"),
CONNECTION_CLOSED(80004,"Connection is closed!"),
STATEMENT_CLOSED(80005,"statement is closed!"),
SCHEMA_EMPTY(80006,"schemais empty!"),
SCHEMA_FAILED(80007,"Get schema failed!"),
QUERY_TIMEOUT(80008,"query has been timeout!"),
FILETYPE_ERROR(80009,"file type error"),
METADATATYPE_ERROR(80010,"metadata type error"),
NOSUPPORT_METADATA(80011, "thismethod not supported in DatabaseMetaData"),
NOPERMITION(80012,"This user has no permission to read thisfile!"),
PARAMS_NOT_FOUND(80013,"Parameter not found"),
ERRORINFO_FROM_JOBINFO(80014,"get errorinfo from jobInfo"),
RESULTSET_ROWERROR(80015,"rowmessage error"),
NOSUPPORT_RESULTSET(80016,"this method not supported inresultSet"),
RESULTSET_NULL(80017,"resultset is null,try to run next() firstlyto init ResultSet and MetaData"),
PREPARESTATEMENT_TYPEERROR(80018,"parameter type error"),
METADATA_EMPTY(80019,"data is empty")
;
private String msg;
private int code;
UJESSQLErrorCode(intcode,String msg) {
this.code = code;
this.msg = msg;
}
public String getMsg() {
return msg;
}
public int getCode() {
return code;
}
}
04
—
实现方案总结
05
—
参考文献
WeDataSphere,BIG DATA MADE EASY.
用心做一个有温度的开源社区
~欢迎关注~
扫码关注我们
微信号公众号 : WeDataSphere
GitHub:WeDataSphere
如果喜欢我们的产品或文章,请给我们的GitHub点上你宝贵的star和fork哦~~
本文分享自微信公众号 - WeDataSphere(gh_273e85fce73b)。
如有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一起分享。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
在 Android 11 上使用第三方 SDK 向其他应用分享文件的注意事项
隐私和安全 是我们设计 Android 的核心,随着每一个新版本的发布,我们都会加大这一方面的投入。Android 11 也持续在这些领域取得重要进展。 Android 11 (API 级别 30) 进一步增强了平台功能,为外部存储中的应用和用户数据提供了更好的保护。从 Android 11 开始,使用 分区存储模式 的应用即使拥有 READ_EXTERNAL_STORAGE 权限,也无法再访问外部存储中的任何其他应用的 专属目录 中的文件。 受到这一限制的影响,如果您的应用通过第三方 SDK 分享文件 (例如分享照片给另一应用上的好友) 到其他应用,而这只是将自己专属目录中的文件路径传递给目标应用,目标应用将无法读取该文件。部分分享 SDK 可能没有正确处理这种情形,没有显式报错而只是静默失败 (对方用户都没有意识到这次分享),影响开发者和用户的体验。 如果您的应用需要在 Android 11 上向其他应用分享文件,请检查您使用的第三方分享 SDK 是否支持通过 FileProvider 进行分享 (现在大部分常用的第三方分享 SDK 均已支持)。通过 FileProvider,您的...
- 下一篇
熬夜整理最全面的HTML字符实体,很下饭
HTML字符实体 HTML字符实体(Character Entities),转义字符串(Escape Sequence) 为什么要用转义字符串? HTML中<,>,&等有特殊含义(<,>,用于链接签,&用于转义),不能直接使用。这些符号是不显示在我们最终看到的网页里的,那如果我们希望在网页中显示这些符号,该怎么办呢? 这就要说到HTML转义字符串(Escape Sequence)了。 转义字符串(Escape Sequence)也称字符实体(Character Entity)。在HTML中,定义转义字符串的原因有两个:第一个原因是像“<”和“>”这类符号已经用来表示HTML标签,因此就不能直接当作文本中的符号来使用。为了在HTML文档中使用这些符号,就需要定义它的转义字符串。当解释程序遇到这类字符串时就把它解释为真实的字符。在输入转义字符串时,要严格遵守字母大小写的规则。第二个原因是,有些字符在ASCII字符集中没有定义,因此需要使用转义字符串来表示。 转义字符串的组成 转义字符串(Escape Sequence),即字符实体(Cha...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- CentOS6,CentOS7官方镜像安装Oracle11G
- CentOS关闭SELinux安全模块
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- CentOS8安装Docker,最新的服务器搭配容器使用
- SpringBoot2全家桶,快速入门学习开发网站教程
- Windows10,CentOS7,CentOS8安装Nodejs环境
- Docker安装Oracle12C,快速搭建Oracle学习环境
- 2048小游戏-低调大师作品
- CentOS7,CentOS8安装Elasticsearch6.8.6