Play For Scala 开发指南 - 第10章 MongoDB 开发
为什么选择 MongoDB?
在 Reactive 越来越流行的今天,传统阻塞式的数据库驱动已经无法满足Reactive应用的需要了,为此我们将目光转向新诞生的数据库新星 MongoDB 。MongoDB 从诞生以来就争议不断,总结一下主要有以下几点:
-
Schemaless
-
默认忽略错误
-
默认关闭认证
-
曾经的数据丢失问题
其实Schemaless
和不支持事务
是技术选型时的决定,不应该受到吐槽,主要看是否满足业务需求以及团队的喜好,没什么可争议的。至于默认忽略错误
也是无稽之谈,对于那些非关键数据,MongoDB为你提供了一个Fire and Forget
模式,可以显著提高系统性能,并且几乎所有的MongoDB驱动都默认关闭了这个模式,如果需要你可以手动打开。默认关闭认证
并不是不支持认证
,只是为了方便快速原型,如果你敢在线上裸奔MongoDB,我只能默默地为你点根蜡烛。数据丢失
问题已经成为历史,曾经在网上广为流传的两篇关于MongoDB数据丢失问题(1, 2), 经过分布式系统安全性测试组织JEPSEN最新的测试分析表明,MongoDB 3.4.0已经解决了这些问题。
聊完争议,我们来看看MongoDB有哪些优点:
-
简单易用
-
异步数据库驱动
-
全栈Json,统一前后台
-
半结构化数据结构,避免多表查询,避免多文档事务
-
基于单文档的高性能原子操作
-
支持跨数据库的多文档事务
-
Schemaless,方便快速原型
-
支持集群,MapReduce
-
支持GridFS,易用的分布式文件系统
-
支持基于ChangeStream的实时应用
其中异步数据库驱动
最为吸引人,该技术是实现 Reactive 应用的基石。
如何进行 MongoDB 开发 ?
目前有如下三个基于 Scala 开发的 MongoDB 驱动可供选择:
Mongo Scala Driver 是 MongoDB 官方维护的 Scala 驱动,该驱动底层基于官方的 Java 驱动,在此基础上提供了一层很薄的 Scala 包装。Mongo Scala Driver 提供了一套基于 Java 的 Bson Api,无法与 Play Json 集成。另外 Mongo Scala Driver 并没有实现 Reactive Streams 规范,而是实现了一套与 Reactive Streams 类似的 Reactive Api,即 Observable, Subscription 和 Observer。另外 Mongo Scala Driver 的数据库操作默认返回 Observable 类型,如果你忘记了调用 toFuture 方法,或是没有消费返回数据,则数据库操作实际上并不会被执行,在开发中很容易引入一些Bug。
ReactiveMongo 是 Play Framework 团队成员私下维护的项目,似乎并没有得到官方的支持。该项目基于 Akka 和 Netty 重新实现了 MongoDB 通信协议,并且基于 Scala 实现了一套原生的 Bson Api。该项目提供了一个 Play 模块,实现了 Bson 和 Json 的自动转换。ReactiveMongo 主要有三个问题,一是版本更新不够及时,无法跟上 MongoDB 的更新节奏;二是可能存在安全隐患,容易造成生产事故,详情参考:issue#721。三是语法过于繁琐,向开发者暴露了太多细节,例如批量插入操作:
val docs = seq.map(c => implicitly[statChatCol.ImplicitlyDocumentProducer](c.toStatChat)) collection.bulkInsert(false)(docs: _*)
让开发者编写类似implicitly[statChatCol.ImplicitlyDocumentProducer]
这样的代码似乎不太合适。
由于 Reactive Mongo 的种种问题,最终诞生了 Play Mongo。Play Mongo 是由 PlayScala 社区为 Play Framework 开发的 MongoDB 模块, 该项目基于 MongoDB 官方的 Scala 驱动,并且提供了更多的实用功能,例如,
-
更简洁多样的数据库交方式
-
自动识别模型类(Model),自动编解码
-
自动完成 JsValue 和 BsonValue 互转
-
更方便的 GridFS 交互
-
Change Stream 转 Akka Stream.
-
支持关联查询(Relationship Query)
Play Mongo 基于官方驱动开发,可以为开发者提供最佳的稳定性,并能及时跟进 MongoDB 的版本升级。另外 Play Mongo 不会过多关注底层驱动的实现细节,而是将关注点放在与 Play Framework 的集成上,可以为开发者提供更舒适的开发体验。本文将采用 Play Mongo 讲述 MongoDB 的开发细节。
Play Mongo 开发入门
Play Mongo 只是为我们提供了数据访问层,我们还需要基于访问层构建模型层。关于模型层的设计,我们可以选择贫血模型、充血模型以及应对复杂业务的领域模型。关于模型层的设计,我们将会在“第四部分 Play 框架开发实战”中继续讨论。为了方便阐述,我们这里选择最简单的贫血模型,即模型层只包含数据,不包含任何的业务逻辑实现。
添加依赖
打开 Play 项目,编辑 build.sbt
,添加如下依赖,
libraryDependencies += "cn.playscala" % "play-mongo_2.12" % "0.3.0" addCompilerPlugin("org.scalamacros" % "paradise" % "2.1.1" cross CrossVersion.full)
打开 conf/application.conf
, 添加数据库连接,
mongodb.uri = "mongodb://user:password@host:port/dbName?authMode=scram-sha1"
定义模型层
我们建议在定义 Model 类时要显式声明 _id 属性,该属性为 MongoDB 的默认主键,如果没有,在插入时会自动生成。下面代码定义了一个 Person 类:
package models @Entity("common-person") case class Person(_id: String, name: String, age: Int)
@Entity 注解参数用于指定关联的 mongodb collection 名称, 如果未指定,则默认为 Model 类名称。 作为约定,Model 类使用 _id 字段作为唯一标识, 该字段同时也是 mongodb collection 的默认主键。
模型层编解码
在应用启动时指定模型层(models)的包路径,编辑app/Module
类,
class Module extends AbstractModule { override def configure() = { Mongo.setModelsPackage("models") } }
Mongo.setModelsPackage方法将会查找指定包路径下的所有Case Class,自动生成驱动所需的编解码器。需要注意的是,这些编解码器是驱动私有的,外界无法共享。我们仍然需要定义全局共享的隐式 Format 对象:
import play.api.libs.json.Format package object models { implicit val personFormat = Json.format[Person] }
如果有很多的 Case Class,则需要逐个定义,编写起来还是挺麻烦的。我们可以使用 @JsonFormat 宏注解,通过一行代码为所有 Case Class 生成相应的隐式 Format 对象:
import cn.playscala.mongo.codecs.macrocodecs.JsonFormat package object models { @JsonFormat("models") implicit val formats = ??? }
由于这些隐式的 Format 对象是在模型层的包对象(package object)中创建的,所以使用时无需显式导入,编译器会自动加载。
依赖注入
至此,我们便可以将 Mongo 实例注入到任意需要的地方:
@Singleton class Application @Inject()(cc: ControllerComponents, mongo: Mongo) extends AbstractController(cc) {}
模型类和Collection
模型类使用 @Entity 注解标注, 一个模型类实例表示 mongodb collection 中的一个文档, 一个 mongodb collection 在概念上类似于关系数据库的一张表。
@Entity("common-user") case class User(_id: String, name: String, password: String, addTime: Instant)
@Entity 注解参数用于指定关联的 mongodb collection 名称, 如果未指定,则默认为 Model 类名称。 作为约定,模型类使用 _id 字段作为唯一标识, 该字段同时也是 mongodb collection 的默认主键。
我们可以通过两种方式访问 mongodb collection, 第一种方式是使用模型类,
mongo.find[User]().list().map{ users => ... }
这里的参数类型 User 不仅用于指定关联的 mongodb collection, 而且用于指明返回的结果类型。 这意味着查询操作将会在 common-user collection 上执行, 并且返回的结果类型是 User。 需要注意的是,在该方式下无法改变返回的结果类型。
第二种方式是使用 mongo.collection 方法,
mongo.collection("common-user").find[User]().list().map{ users => }
在这里, find 方法上的参数类型 User 仅仅用于指定返回的结果类型, 我们可以通过更改该参数类型设置不同的返回结果类型,
mongo.collection("common-user").find[JsObject]().list().map{ jsObjList => } mongo.collection("common-user").find[User](Json.obj("userType" -> "common")).list().map{ commonUsers => }
当然,我们也可以使用 model 类指定关联的 mongodb collection,
mongo.collection[User].find[User]().list().map{ user => }
第1个参数类型 User 用于指定关联的 mongodb collection, 第2个参数类型 User 用于指定返回的结果类型。 我们仍然可以通过改变第2个参数类型从而改变返回的结果类型。
常见操作
以下示例代码默认执行了 import play.api.libs.json.Json._
导入, 所以 Json.obj()
可以被简写为 obj()
。
创建操作
// 插入 Model mongo.insert[User](User("0", "joymufeng", "123456", Instant.now)) // 插入 Json val jsObj = obj("_id" -> "0", "name" -> "joymufeng", "password" -> "123456", "addTime" -> Instant.now) mongo.collection[User].insert(jsObj) mongo.collection("common-user").insert(jsObj)
更新操作
mongo.updateById[User]("0", obj("$set" -> obj("password" -> "123321"))) mongo.updateOne[User](obj("_id" -> "0"), obj("$set" -> obj("password" -> "123321"))) mongo.collection[User].updateById("0", obj("$set" -> obj("password" -> "123321"))) mongo.collection[User].updateOne(obj("_id" -> "0"), obj("$set" -> obj("password" -> "123321"))) mongo.collection("common-user").updateById("0", obj("$set" -> obj("password" -> "123321"))) mongo.collection("common-user").updateOne(obj("_id" -> "0"), obj("$set" -> obj("password" -> "123321")))
查询操作
mongo.findById[User]("0") // Future[Option[User]] mongo.find[User](obj("_id" -> "0")).first // Future[Option[User]] mongo.collection[User].findById[User]("0") // Future[Option[User]] mongo.collection[User].find[User](obj("_id" -> "0")).first // Future[Option[User]] mongo.collection[User].findById[JsObject]("0") // Future[Option[JsObject]] mongo.collection[User].find[JsObject](obj("_id" -> "0")).first // Future[Option[JsObject]] mongo.collection("common-user").findById[User]("0") // Future[Option[User]] mongo.collection("common-user").find[User](obj("_id" -> "0")).first // Future[Option[User]] mongo.collection("common-user").findById[JsObject]("0") // Future[Option[JsObject]] mongo.collection("common-user").find[JsObject](obj("_id" -> "0")).first // Future[Option[JsObject]]
删除操作
mongo.deleteById[User]("0") mongo.deleteOne[User](obj("_id" -> "0")) mongo.collection[User].deleteById("0") mongo.collection[User].deleteOne(obj("_id" -> "0")) mongo.collection("common-user").deleteById("0") mongo.collection("common-user").deleteOne(obj("_id" -> "0"))
上传和下载文件
// Upload and get the fileId mongo.gridFSBucket.uploadFromFile("image.jpg", "image/jpg", new File("./image.jpg")).map{ fileId => Ok(fileId) } // Download file by fileId mongo.gridFSBucket.findById("5b1183fed3ba643a3826325f").map{ case Some(file) => Ok.chunked(file.stream.toSource) .as(file.getContentType) case None => NotFound }
Change Stream
我们可以通过 toSource
方法将 Change Stream 转换成 Akka Source,之后便会有趣很多。例如下面的代码拥有如下几个功能:
-
将从 Change Stream 接收到的元素进行缓冲,以方便批处理,当满足其中一个条件时便结束缓冲向后传递:
-
缓冲满10个元素
-
缓冲时间超过了1000毫秒
-
-
对缓冲后的元素进行流控,每秒只允许通过1个元素
mongo .collection[User] .watch() .fullDocument .toSource .groupedWithin(10, 1000.millis) .throttle(elements = 1, per = 1.second, maximumBurst = 1, ThrottleMode.shaping) .runForeach{ seq => // ... }
关联查询操作
@Entity("common-article") case class Article(_id: String, title: String, content: String, authorId: String) @Entity("common-author") case class Author(_id: String, name: String) mongo.find[Article].fetch[Author]("authorId").list().map{ _.map{ t => val (article, author) = t } }
对于满足查询条件的每一个 article , 将会根据匹配条件article.authorId == author._id
拉取关联的 author。
小结
MongoDB自2009发布以来,产品和社区都已经非常成熟,已经有商业公司在云上提供MongoDB服务。除此之外,MongoDB不仅方便开发,而且容易维护,普通的开发人员利用自带的mongodump
和mongorestore
命令便可进行备份、恢复操作。当然更重要的是,利用MongoDB的异步驱动以及ChangeStreams,我们可以开发高性能的实时应用。
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
Kafka 跨集群同步方案
该方案解决Kafka跨集群同步、创建Kafka集群镜像等相关问题,主要使用Kafka内置的MirrorMaker工具实现。 Kafka镜像即已有Kafka集群的副本。下图展示如何使用MirrorMaker工具创建从源Kafka集群(source cluster)到目标Kafka集群(target cluster)的镜像。该工具通过Kafka consumer从源Kafka集群消费数据,然后通过一个内置的Kafka producer将数据重新推送到目标Kafka集群。 一、如何创建镜像 使用MirrorMaker创建镜像是比较简单的,搭建好目标Kafka集群后,只需要启动mirror-maker程序即可。其中,一个或多个consumer配置文件、一个producer配置文件是必须的,whitelist、blacklist是可选的。在consumer的配置中指定源Kafka集群的Zookeeper,在producer的配置中指定目标集群的Zookeeper(或者broker.list)。 kafka-run-class.shkafka.tools.MirrorMaker–consume...
- 下一篇
Akka向设备组添加Actor注册《thirteen》译
我们已经完成了设备级别的注册支持,现在我们必须在组级别实现它。在注册时,小组成员还有更多工作要做,包括: 通过将注册请求转发给现有设备actor或通过创建新actor并转发消息来处理注册请求。 跟踪组中存在哪些设备Actor,并在组停止时从组中删除它们。 处理注册请求 设备组Actor必须将请求转发给现有子项,或者应创建一个。要通过设备ID查找子actor,我们将使用Map <String,ActorRef>。 我们还希望保留请求的原始发件人的ID,以便我们的设备角色可以直接回复。这可以通过使用forward而不是tell运算符来实现。两者之间的唯一区别是,forward会保留原始发件人,而tell会将发件人设置为当前的actor。就像我们的设备actor一样,我们确保不会响应错误的组ID。将以下内容添加到源文件中: Full source at GitHub 正如我们对设备所做的那样,我们测试了这个新功能。我们还测试了返回两个不同ID的Actor实际上是不同的,我们还尝试记录每个设备的温度读数,以查看Actor是否正在响应。 Full source at GitHub 如...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- Windows10,CentOS7,CentOS8安装MongoDB4.0.16
- CentOS6,CentOS7官方镜像安装Oracle11G
- Jdk安装(Linux,MacOS,Windows),包含三大操作系统的最全安装
- SpringBoot2编写第一个Controller,响应你的http请求并返回结果
- SpringBoot2更换Tomcat为Jetty,小型站点的福音
- CentOS7安装Docker,走上虚拟化容器引擎之路
- CentOS7编译安装Gcc9.2.0,解决mysql等软件编译问题
- Linux系统CentOS6、CentOS7手动修改IP地址