Spark-DateType-Timestamp-cast-小结
title: Spark DateType/Timestamp cast 小结
date: 2018-07-19 16:47:39
tags:
- Spark
前言
在平时的 Spark 处理中常常会有把一个如 2012-12-12
这样的 date 类型转换成一个 long 的 Unix time 然后进行计算的需求.下面是一段示例代码:
val schema = StructType( Array( StructField("id", IntegerType, nullable = true), StructField("birth", DateType, nullable = true), StructField("time", TimestampType, nullable = true) )) val data = Seq( Row(1, Date.valueOf("2012-12-12"), Timestamp.valueOf("2016-09-30 03:03:00")), Row(2, Date.valueOf("2016-12-14"), Timestamp.valueOf("2016-12-14 03:03:00"))) val df = spark.createDataFrame(spark.sparkContext.parallelize(data),schema)
问题 & 解决
首先很直观的是直接把DateType cast 成 LongType, 如下:
df.select(df.col("birth").cast(LongType))
但是这样出来都是 null, 这是为什么? 答案就在 org.apache.spark.sql.catalyst.expressions.Cast
中, 先看 canCast 方法, 可以看到 DateType 其实是可以转成 NumericType 的, 然后再看下面castToLong的方法, 可以看到case DateType => buildCast[Int](_, d => null)
居然直接是个 null, 看提交记录其实这边有过反复, 然后为了和 hive 统一, 所以返回最后还是返回 null 了.
虽然 DateType 不能直接 castToLong, 但是TimestampType可以, 所以这里的解决方案就是先把 DateType cast 成 TimestampType. 但是这里又会有一个非常坑爹的问题: 时区问题.
首先明确一个问题, 就是这个放到了 spark 中的 2012-12-12 到底 UTC 还是我们当前时区? 答案是如果没有经过特殊配置, 这个2012-12-12代表的是 当前时区的 2012-12-12 00:00:00., 对应 UTC 其实是: 2012-12-11 16:00:00, 少了8小时. 这里还顺便说明了Spark 入库 Date 数据的时候是带着时区的.
然后再看DateType cast toTimestampType 的代码, 可以看到buildCast[Int](_, d => DateTimeUtils.daysToMillis(d, timeZone) * 1000)
, 这里是带着时区的, 但是 Spark SQL 默认会用当前机器的时区. 但是大家一般底层数据比如这个2016-09-30, 都是代表的 UTC 时间, 在用 Spark 处理数据的时候, 这个时间还是 UTC 时间, 只有通过 JDBC 出去的时间才会变成带目标时区的结果. 经过摸索, 这里有两种解决方案:
- 配置 Spark 的默认时区
config("spark.sql.session.timeZone", "UTC")
, 最直观. 这样直接写df.select(df.col("birth").cast(TimestampType).cast(LongType))
就可以了. - 不配置 conf, 正面刚:
df.select(from_utc_timestamp(to_utc_timestamp(df.col("birth"), TimeZone.getTimeZone("UTC").getID), TimeZone.getDefault.getID).cast(LongType))
, 可以看到各种 cast, 这是区别:
- 没有配置 UTC:
from_utc_timestamp(to_utc_timestamp(lit("2012-12-11 16:00:00"), TimeZone.getTimeZone("UTC").getID), TimeZone.getDefault.getID)
- 配置了 UTC:
from_utc_timestamp(to_utc_timestamp(lit("2012-12-12 00:00:00"), TimeZone.getTimeZone("UTC").getID), TimeZone.getDefault.getID)
多了8小时
/** * Returns true iff we can cast `from` type to `to` type. */ def canCast(from: DataType, to: DataType): Boolean = (from, to) match { case (fromType, toType) if fromType == toType => true case (NullType, _) => true case (_, StringType) => true case (StringType, BinaryType) => true case (StringType, BooleanType) => true case (DateType, BooleanType) => true case (TimestampType, BooleanType) => true case (_: NumericType, BooleanType) => true case (StringType, TimestampType) => true case (BooleanType, TimestampType) => true case (DateType, TimestampType) => true case (_: NumericType, TimestampType) => true case (StringType, DateType) => true case (TimestampType, DateType) => true case (StringType, CalendarIntervalType) => true case (StringType, _: NumericType) => true case (BooleanType, _: NumericType) => true case (DateType, _: NumericType) => true case (TimestampType, _: NumericType) => true case (_: NumericType, _: NumericType) => true ... }
private[this] def castToLong(from: DataType): Any => Any = from match { case StringType => val result = new LongWrapper() buildCast[UTF8String](_, s => if (s.toLong(result)) result.value else null) case BooleanType => buildCast[Boolean](_, b => if (b) 1L else 0L) case DateType => buildCast[Int](_, d => null) case TimestampType => buildCast[Long](_, t => timestampToLong(t)) case x: NumericType => b => x.numeric.asInstanceOf[Numeric[Any]].toLong(b) }
// TimestampConverter private[this] def castToTimestamp(from: DataType): Any => Any = from match { ... case DateType => buildCast[Int](_, d => DateTimeUtils.daysToMillis(d, timeZone) * 1000) // TimestampWritable.decimalToTimestamp ... }
/** * Given a timestamp, which corresponds to a certain time of day in the given timezone, returns * another timestamp that corresponds to the same time of day in UTC. * @group datetime_funcs * @since 1.5.0 */ def to_utc_timestamp(ts: Column, tz: String): Column = withExpr { ToUTCTimestamp(ts.expr, Literal(tz)) } /** * Given a timestamp, which corresponds to a certain time of day in UTC, returns another timestamp * that corresponds to the same time of day in the given timezone. * @group datetime_funcs * @since 1.5.0 */ def from_utc_timestamp(ts: Column, tz: String): Column = withExpr { FromUTCTimestamp(ts.expr, Literal(tz)) }
Deep dive
配置源码解读:
val SESSION_LOCAL_TIMEZONE = buildConf("spark.sql.session.timeZone").stringConf.createWithDefaultFunction(() => TimeZone.getDefault.getID)
def sessionLocalTimeZone: String = getConf(SQLConf.SESSION_LOCAL_TIMEZONE)
/** * Replace [[TimeZoneAwareExpression]] without timezone id by its copy with session local * time zone. */ case class ResolveTimeZone(conf: SQLConf) extends Rule[LogicalPlan] { private val transformTimeZoneExprs: PartialFunction[Expression, Expression] = { case e: TimeZoneAwareExpression if e.timeZoneId.isEmpty => e.withTimeZone(conf.sessionLocalTimeZone) // Casts could be added in the subquery plan through the rule TypeCoercion while coercing // the types between the value expression and list query expression of IN expression. // We need to subject the subquery plan through ResolveTimeZone again to setup timezone // information for time zone aware expressions. case e: ListQuery => e.withNewPlan(apply(e.plan)) } override def apply(plan: LogicalPlan): LogicalPlan = plan.transformAllExpressions(transformTimeZoneExprs) def resolveTimeZones(e: Expression): Expression = e.transform(transformTimeZoneExprs) } /** * Mix-in trait for constructing valid [[Cast]] expressions. */ trait CastSupport { /** * Configuration used to create a valid cast expression. */ def conf: SQLConf /** * Create a Cast expression with the session local time zone. */ def cast(child: Expression, dataType: DataType): Cast = { Cast(child, dataType, Option(conf.sessionLocalTimeZone)) } }
org.apache.spark.sql.catalyst.analysis.Analyzer#batches 可以看到有ResolveTimeZone
lazy val batches: Seq[Batch] = Seq( Batch("Resolution", fixedPoint, ResolveTableValuedFunctions :: ResolveRelations :: ResolveReferences :: ... ResolveTimeZone(conf) :: ResolvedUuidExpressions :: TypeCoercion.typeCoercionRules(conf) ++ extendedResolutionRules : _*), Batch("Post-Hoc Resolution", Once, postHocResolutionRules: _*), Batch("View", Once, AliasViewChild(conf)), Batch("Nondeterministic", Once, PullOutNondeterministic), Batch("UDF", Once, HandleNullInputsForUDF), Batch("FixNullability", Once, FixNullability), Batch("Subquery", Once, UpdateOuterReferences), Batch("Cleanup", fixedPoint, CleanupAliases) )
Test Example
对于时区理解
在不同的时区下 sql.Timestamp 对象的表现:
这里是 GMT+8:
Timestamp "2014-06-24 07:22:15.0" - fastTime = 1403565735000 - "2014-06-24T07:22:15.000+0700"
如果是 GMT+7, 会显示如下,可以看到是同一个毫秒数
Timestamp "2014-06-24 06:22:15.0" - fastTime = 1403565735000 - "2014-06-24T06:22:15.000+0700"
test("ColumnBatch") { val schema = StructType( Array( StructField("id", IntegerType, nullable = true), StructField("birth", DateType, nullable = true), StructField("time", TimestampType, nullable = true) )) val columnarBatch = ColumnarBatch.allocate(schema, MemoryMode.ON_HEAP, 1024) val c0 = columnarBatch.column(0) val c1 = columnarBatch.column(1) val c2 = columnarBatch.column(2) c0.putInt(0, 0) // 1355241600, /3600/24 s to days c1.putInt(0, 1355241600 / 3600 / 24) // microsecond c2.putLong(0, 1355285532000000L) val internal0 = columnarBatch.getRow(0) //a way converting internal row to unsafe row. //val convert = UnsafeProjection.create(schema) //val internal = convert.apply(internal0) val enc = RowEncoder.apply(schema).resolveAndBind() val row = enc.fromRow(internal0) val df = spark.createDataFrame(Lists.newArrayList(row), schema) TimeZone.setDefault(TimeZone.getTimeZone("UTC")) val tsStr0 = df.select(col("time")).head().getTimestamp(0).toString val ts0 = df.select(col("time").cast(LongType)).head().getLong(0) TimeZone.setDefault(TimeZone.getTimeZone("GMT+8")) val tsStr1 = df.select(col("time")).head().getTimestamp(0).toString val ts1 = df.select(col("time").cast(LongType)).head().getLong(0) assert(true, "2012-12-12 04:12:12.0".equals(tsStr0)) assert(true, "2012-12-12 12:12:12.0".equals(tsStr1)) // to long 之后毫秒数都是一样的 assert(true, ts0 == ts1) }
番外 : ImplicitCastInputTypes
我们自己定义了一个Expr, 要求接受两个 input 为 DateType 的参数.
case class MockExpr(d0: Expression, d1: Expression) extends BinaryExpression with ImplicitCastInputTypes { override def left: Expression = d0 override def right: Expression = d1 override def inputTypes: Seq[AbstractDataType] = Seq(DateType, DateType) override def dataType: DataType = IntegerType override def nullSafeEval(date0: Any, date1: Any): Any = { ... } }
假设我们有如下调用, 请问这个调用符合预期吗? 结论是符合的, 因为有ImplicitCastInputTypes
.
lit("2012-11-12 12:12:12.0").cast(TimestampType) lit("2012-12-12 12:12:12.0").cast(TimestampType) Column(MockExpr(tsc1.expr, tsc2.expr))
org.apache.spark.sql.catalyst.analysis.TypeCoercion.ImplicitTypeCasts
case e: ImplicitCastInputTypes if e.inputTypes.nonEmpty => val children: Seq[Expression] = e.children.zip(e.inputTypes).map { case (in, expected) => // If we cannot do the implicit cast, just use the original input. implicitCast(in, expected).getOrElse(in) } e.withNewChildren(children) def implicitCast(e: Expression, expectedType: AbstractDataType): Option[Expression] = { implicitCast(e.dataType, expectedType).map { dt => if (dt == e.dataType) e else Cast(e, dt) } }
org.apache.spark.sql.catalyst.expressions.Cast#castToDate #DateConverter
private[this] def castToDate(from: DataType): Any => Any = from match { case StringType => buildCast[UTF8String](_, s => DateTimeUtils.stringToDate(s).orNull) case TimestampType => // throw valid precision more than seconds, according to Hive. // Timestamp.nanos is in 0 to 999,999,999, no more than a second. buildCast[Long](_, t => DateTimeUtils.millisToDays(t / 1000L, timeZone)) }
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
【回放视频+PPT下载整理】Elasticsearch Meetup系列第二期
Elasticsearch Meetup系列第二期活动中,7位来自阿里巴巴、eBay、沪江、携程等知名企业的技术大咖现场分享了集群管理、架构、定制开发、业务支撑的使用经验。以下是此次活动的直播整理。 《利用Elastic Stack快速搭建SIEM系统》 分享嘉宾:Elastic架构师 Monash大学计算机硕士 吴斌 视频链接:https://yq.aliyun.com/video/play/1526 PDF下载:https://yq.aliyun.com/download/2884 《Elasticsearch diagnose and index lifecycle management services》 分享嘉宾:eBay 软件工程师丁旻奕、王佩 视频链接:https://yq.aliyun.com/video/play/1527 PDF下载:https://yq.aliyun.com/download/2886 《EYou—阿里云Elasticsearch智能优化运维工具分享》 分享嘉宾:阿里巴巴Elasticsearch工程师张家杰 视频链接:https://yq.a...
- 下一篇
Spark Catalyst 的实现分析
title: Spark Catalyst Deep Divedate: 2018-07-19 18:15:40 tags: Spark Spark Catalyst 的实现分析 转载自:Spark Catalyst 的实现分析 Spark SQL 和 Catalyst 分别对应了 SQL 执行期以及解析期的优化工作,因此 Catalyst 的理解是 Spark SQL 的第一步。在一些 Catalyst 的介绍以及讲座中,下面一张图是必出现,它描述了从 SQL 语句到最后执行 Plan 的生成过程中,除了 Spark SQL,其他 SQL 引擎的工作原理也基本一致,比如 Hive 之类的。 本文核心也是介绍 Catalyst 内部的实现,但是不是按照这张图的步骤来介绍 Catalyst 的实现原理,而是按照 SQL 给人最直接几个概念,比如 Row,Expression,Plan 来逐步介绍它们的内部实现。 看过 Spark SQL 或者 Catalyst 人都知道,相比 Spark Core 的代码,这一块真的挺复杂了,各种算法逻辑,复杂的 Scala 语法元素,以及各种性能优化,...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- Hadoop3单机部署,实现最简伪集群
- CentOS8编译安装MySQL8.0.19
- CentOS7编译安装Cmake3.16.3,解决mysql等软件编译问题
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- Eclipse初始化配置,告别卡顿、闪退、编译时间过长
- Springboot2将连接池hikari替换为druid,体验最强大的数据库连接池
- Mario游戏-低调大师作品
- CentOS6,CentOS7官方镜像安装Oracle11G