利用 Spark DataSource API 实现Rest数据源
引子
{
"status":"200",
"data":[
"id":1,
"userid":2,
"service":{
"3":{"a":1,"b":2},
"2":{"a":3,"b":2},
.....
}
]
}
{id:1,userid:2,service:3,a:1,b:2}
{id:1,userid:2,service:2,a:3,b:2}
实现目标
val df = SQLContext.getOrCreate(sc).
read.
format("driver class").//驱动程序,类似JDBC的 driver class
options(Map(....)). //你需要额外传递给驱动的参数
load("url")//资源路径
{
"name": "streaming.core.compositor.spark.source.SQLSourceCompositor",
"params": [
{
"format": "org.apache.spark.sql.execution.datasources.rest.json",
"url": "http://[your dns]/path",
"xPath": "$.data"
}
]
}
DefaultSource的实现
org.apache.spark.sql.execution.datasources.rest.json.DefaultSource
extends RelationProvider
with DataSourceRegister
- DataSourceRegister
org.apache.spark.sql.execution.datasources.rest.json
==>
restJSON
override def shortName(): String = "restJSON"
RelationProvider
def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation
SchemaRelationProvider 允许你直接传递Schema信息给BaseRelation实现。
HadoopFsRelationProvider 除了参数帮你加了path等,返回值也帮你约定成HadoopFsRelation. HadoopFsRelation 提供了和HDFS交互的大部分实现
在我们的实现里,只要实现基础的RelationProvider就好。
override def createRelation(
sqlContext: SQLContext,
//还记的DataSource的options方法么,parameters就是
//用户通过options传递过来的
parameters: Map[String, String]
): BaseRelation = {
//因为我们并需要用户提供schema
//而是从JSON格式数据自己自己推导出来的
// 所以这里有个采样率的概念
val samplingRatio = parameters.get("samplingRatio").map(_.toDouble).getOrElse(1.0)
// 还记得DataSource的 path么? 理论上是应该通过那个传递过来的,然而
//这里是直接通过potions传递过来的。
val url = parameters.getOrElse("url", "")
// 我们需要能够对通过XPATH语法抽取我们要的数据,比如
//前面的例子,我们需要能够抽取出data那个数组
val xPath = parameters.getOrElse("xPath", "$")
//这里是核心
new RestJSONRelation(None, url, xPath, samplingRatio, None)(sqlContext)
}
RestJSONRelation
先看看RestJSONRelation 的签名:
private[sql] class RestJSONRelation(
val inputRDD: Option[RDD[String]],
val url: String,
val xPath: String,
val samplingRatio: Double,
val maybeDataSchema: Option[StructType]
)(@transient val sqlContext: SQLContext)
extends BaseRelation with TableScan {
- needConversion,是否需类型转换,因为Spark SQL内部的表示是Row,里面的数据需要特定的类型,比如String会被转化成UTF8String。默认为true,官方也是说不要管他就好。
- unhandledFilters, 返回一些数据源没有办法pushdown的filter。这样解析器就知道可以在Spark内部做filter了。否则Spark 会傻傻的以为你做了过滤,然后数据计算结果就错了。
- TableScan 全表扫描
- PrunedScan 可以指定列,其他的列数据源可以不用返回
- PrunedFilteredScan 指定列,并且还可以加一些过滤条件,只返回满足条件的数据。这个也就是我们常说的数据源下沉(pushdown)操作。
- CatalystScan 和PrunedFilteredScan类似,支持列过滤,数据过滤,但是接受的过滤条件是Spark 里的Expression。 理论上会更灵活些。话说在Spark源码)里(1.6.1版本),我没有看到这个类的具体实现案例。这里我们只要实现一个简单的TableScan就可以了,因为拿的是字典数据,并不需要做过滤。
Schema推导
override def schema: StructType = dataSchema
lazy val dataSchema = .....
private def createBaseRdd(inputPaths: Array[String]): RDD[String]
//应该要再加个重试机制就更好了
private def createBaseRdd(inputPaths: Array[String]): RDD[String] = {
val url = inputPaths.head
val res = Request.Get(new URL(url).toURI).execute()
val response = res.returnResponse()
val content = EntityUtils.toString(response.getEntity)
if (response != null && response.getStatusLine.getStatusCode == 200) {
//这里是做数据抽取的,把data的数组给抽取出来
import scala.collection.JavaConversions._
val extractContent = JSONArray.fromObject(JSONPath.read(content, xPath)).
map(f => JSONObject.fromObject(f).toString).toSeq
sqlContext.sparkContext.makeRDD(extractContent)
} else {
sqlContext.sparkContext.makeRDD(Seq())
}
}
lazy val dataSchema = {
//我们也允许用户传递给我们Schema,如果没有就自己推导
val jsonSchema = maybeDataSchema.getOrElse {
InferSchema(
//拿到数据
inputRDD.getOrElse(createBaseRdd(Array(url))),
//采样率,其实就是拿sc.sample方法
samplingRatio,
sqlContext.conf.columnNameOfCorruptRecord)
}
checkConstraints(jsonSchema)
jsonSchema
}
数据获取
def buildScan(): RDD[Row] = {
JacksonParser(
inputRDD.getOrElse(createBaseRdd(Array(url))),
dataSchema, sqlContext.conf.columnNameOfCorruptRecord).asInstanceOf[RDD[Row]]
}
//这个是createBaseRDD返回的RDD[String]
//对应的String 其实是JSON格式
//针对每个分区做处理
json.mapPartitions { iter =>
val factory = new JsonFactory()
iter.flatMap { record =>
try {
//JSON的解析器
val parser = factory.createParser(record)
parser.nextToken()
//这里开始做类型转换了
convertField(factory, parser, schema) match {
case null => failedRecord(record)
case row: InternalRow => row :: Nil
case array: ArrayData =>
if (array.numElements() == 0) {
Nil
} else {
array.toArray[InternalRow](schema)
}
case _ =>
sys.error(
s"Failed to parse record $record. Please make sure that each line of the file " +
"(or each string in the RDD) is a valid JSON object or an array of JSON objects.")
}
} catch {
case _: JsonProcessingException =>
failedRecord(record)
}
}
}
private[sql] def convertField(
factory: JsonFactory,
parser: JsonParser,
schema: DataType): Any = {
import com.fasterxml.jackson.core.JsonToken._
(parser.getCurrentToken, schema) match {
case (null | VALUE_NULL, _) =>
null
case (FIELD_NAME, _) =>
parser.nextToken()
convertField(factory, parser, schema)
.....
case (START_OBJECT, st: StructType) =>
convertObject(factory, parser, st)
while (nextUntil(parser, JsonToken.END_OBJECT)) {
schema.getFieldIndex(parser.getCurrentName) match {
case Some(index) =>
row.update(index, convertField(factory, parser, schema(index).dataType))
case None =>
parser.skipChildren()
}
}
收工
val df = SQLContext.getOrCreate(sc).
read.
format("org.apache.spark.sql.execution.datasources.rest.json").//驱动程序,类似JDBC的 driver class
options(Map(
"url"->"http://[your dns]/path"
"xPath" -> "$.data"
)). //你需要额外传递给驱动的参数
load("url")//资源路径
获取到的Dataframe 你可以做任意的操作。