怎样在 Akka Persistence 中实现分页查询
在 Akka Persistence 中,数据都缓存在服务内存(状态),后端存储的都是一些持久化的事件日志,没法使用类似 SQL 一样的 DSL 来进行分页查询。利用 Akka Streams 和 Actor 我们可以通过编码的方式来实现分页查询的效果,而且这个分页查询还是分步式并行的……
EventSourcedBehavior
Akka Persistence的EventSourcedBehavior
里实现了CQRS模型,通过commandHandler
与eventHandler
解耦了命令处理与事件处理。commandHandler
处理传入的命令并返回一个事件,并可选择将这个事件持久化;若事件需要持久化,则事件将被传给eventHandler
处理,eventHandler
处理完事件后将返回一个“新的”状态(也可以不更新,直接返回原状态)。
def apply[Command, Event, State](
persistenceId: PersistenceId,
emptyState: State,
commandHandler: (State, Command) => Effect[Event, State],
eventHandler: (State, Event) => State): EventSourcedBehavior[Command, Event, State]
建模
以我们习惯的数据库表建模来说,我们会有以下一张表:
create table t_config
(
data_id varchar(64),
namespace varchar(64) not null,
config_type varchar(32) not null,
content text not null,
constraint t_config_pk primary key (namespace, data_id)
);
create index t_config_idx_data_id on t_config (data_id);
ConfigManager
actor 可以看作 t_config
表,它的 entityId
就是 namespace
, State 里保存了所有记录的主键值(ConfigManagerState
),这就相当于 t_config
表的 t_config_idx_data_id
索引。
而 ConfigEntity
actor 可看作 t_config
表里存储的记录,每个 actor 实例就是一行记录。它的 entityId
由 namespace
+ data_id
组成,这就相当于 t_config
表的 t_config_pk
复合主键。 这里我们定义两个 EventSourcedBehavior
:
ConfigManager
:拥有所有配置ID列表,并作为 State 保存在 EventSourcedBehaviorConfigEntity
: 拥有每个配置数据,并作为 State 保存在 EventSourcedBehavior
实现
这里先贴出 ConfigManager
和 ConfigEntity
的部分代码,接下来再详解怎样实现分页查询。
ConfigManager
object ConfigManager {
sealed trait Command extends CborSerializable
sealed trait Event extends CborSerializable
sealed trait Response extends CborSerializable
final case class Query(dataId: Option[String], configType: Option[String], page: Int, size: Int) extends Command
final case class ReplyCommand(in: AnyRef, replyTo: ActorRef[Response]) extends Command
private final case class InternalResponse(replyTo: ActorRef[Response], response: Response) extends Command
case class ConfigResponse(status: Int, message: String = "", data: Option[AnyRef] = None) extends Response
final case class ConfigManagerState(dataIds: Vector[String] = Vector()) extends CborSerializable
val TypeKey: EntityTypeKey[Command] = EntityTypeKey("ConfigManager")
}
import ConfigManager._
class ConfigManager private (namespace: String, context: ActorContext[Command]) {
private implicit val system = context.system
private implicit val timeout: Timeout = 5.seconds
import context.executionContext
private val configEntity = ConfigEntity.init(context.system)
def eventSourcedBehavior(): EventSourcedBehavior[Command, Event, ConfigManagerState] =
EventSourcedBehavior(
PersistenceId.of(TypeKey.name, namespace),
ConfigManagerState(), {
case (state, ReplyCommand(in, replyTo)) =>
replyCommandHandler(state, replyTo, in)
case (_, InternalResponse(replyTo, response)) =>
Effect.reply(replyTo)(response)
},
eventHandler)
private def processPageQuery(
state: ConfigManagerState,
replyTo: ActorRef[Response],
in: Query): Effect[Event, ConfigManagerState] = {
val offset = if (in.page > 0) (in.page - 1) * in.size else 0
val responseF = if (offset < state.dataIds.size) {
Source(state.dataIds)
.filter(dataId => in.dataId.forall(v => v.contains(dataId)))
.mapAsync(20) { dataId =>
configEntity.ask[Option[ConfigState]](replyTo =>
ShardingEnvelope(dataId, ConfigEntity.Query(in.configType, replyTo)))
}
.collect { case Some(value) => value }
.drop(offset)
.take(in.size)
.runWith(Sink.seq)
.map(items => ConfigResponse(IntStatus.OK, data = Some(items)))
} else {
Future.successful(ConfigResponse(IntStatus.NOT_FOUND))
}
context.pipeToSelf(responseF) {
case Success(value) => InternalResponse(replyTo, value)
case Failure(e) => InternalResponse(replyTo, ConfigResponse(IntStatus.INTERNAL_ERROR, e.getLocalizedMessage))
}
Effect.none
}
}
ConfigEntity
object ConfigEntity {
case class ConfigState(namespace: String, dataId: String, configType: String, content: String)
sealed trait Command extends CborSerializable
sealed trait Event extends CborSerializable
final case class Query(configType: Option[String], replyTo: ActorRef[Option[ConfigState]]) extends Command
final case class ConfigEntityState(config: Option[ConfigState] = None) extends CborSerializable
val TypeKey: EntityTypeKey[Command] = EntityTypeKey("ConfigEntity")
}
import ConfigEntity._
class ConfigEntity private (namespace: String, dataId: String, context: ActorContext[Command]) {
def eventSourcedBehavior(): EventSourcedBehavior[Command, Event, ConfigEntityState] =
EventSourcedBehavior(PersistenceId.of(TypeKey.name, dataId), ConfigEntityState(), commandHandler, eventHandler)
def commandHandler(state: ConfigEntityState, command: Command): Effect[Event, ConfigEntityState] = command match {
case Query(configType, replyTo) =>
state.config match {
case None =>
Effect.reply(replyTo)(None)
case Some(config) =>
val resp = if (configType.forall(v => config.configType.contains(v))) Some(config) else None
Effect.reply(replyTo)(resp)
}
}
}
ConfigManager#processPageQuery
函数实现了大部分的分页查询逻辑(有部分逻辑需要由 ConfigEntity
处理)。
val offset = if (in.page > 0) (in.page - 1) * in.size else 0
val responseF = if (offset < state.dataIds.size) {
// process paging
} else {
Future.successful(ConfigResponse(IntStatus.OK, data = Some(Nil)))
}
这里首先获取实际的分页数据偏移量 offset
,再于 ConfigManager
状态里保存的 dataIds
的大小进行判断,若 offset
< state.dataIds.size
则我们进行分页逻辑,否则直接返回一个空列表给前端。
Source(state.dataIds)
.filter(dataId => in.dataId.forall(v => v.contains(dataId)))
.mapAsync(20) { dataId =>
configEntity.ask[Option[ConfigState]](replyTo =>
ShardingEnvelope(s"$namespace@$dataId", ConfigEntity.Query(in.configType, replyTo)))
}
.collect { case Some(value) => value }
.drop(offset)
.take(in.size)
.runWith(Sink.seq)
.map(items => ConfigResponse(IntStatus.OK, data = Some(items)))
这个 Akka Streams 流即是分页处理的主要实现,若是SQL的话,它类似:
select * from t_config where data_id like '%"in.dataId"%' offset "offset" limit "in.size"
.mapAsync
在流执行流程中起了20个并发的异步操作,将委托每个匹配的 ConfigEntity
(由s"$namespace@$dataId"
生成entityId
)执行 config_type
字段的查询。这样,完整的SQL语句类似:
select * from t_config where data_id like '%"in.dataId"%' and change_type = "in.changeType" offset "offset" limit "in.size"
ConfigEntity
对 change_type
部分的查询逻辑实现如下:
case Query(configType, replyTo) =>
state.config match {
case None =>
Effect.reply(replyTo)(None)
case Some(config) =>
val resp = if (configType.forall(v => config.configType.contains(v))) Some(config) else None
Effect.reply(replyTo)(resp)
}
若in.configType
为空,既不需要判断 change_type
这个字段,直接返回 Some(config)
即可,而这时的SQL语句类似:
select * from t_config where data_id like '%"in.dataId"%' and true offset "offset" limit "in.size"
Tip这里有个小技巧,对于 Option[T]
字段的判断,直接使用了 .forall
方法,它等价于:
option match {
case Some(x) => p(x)
case None => true
}
小结
完整代码可在此 https://github.com/yangbajing/yangbajing-blog/tree/master/src/main/scala/blog/persistence/config 找到。

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
-
上一篇
转载:小时候我们都想当科学家,只有他做到了——对话阿里云 MVP朱祺
简介:对朱祺我是好奇的。初次交流时,仅限于一个积极活跃、涉猎广泛的印象,拥抱新技术的传统业者。后来读过他许多文章,发现在很多领域,几乎是“凡我所及,必有朱祺”。我非常惊讶,一个人是怎样用有限精力投入到无限研究中的,顿觉此人Young & Powerful,迫不及待想要了解更多。 以下为朱祺的专访内容,欢迎收看(约3分钟)。 当爱好成为职业 大数据本身也是我的业余爱好之一,所以并不会枯燥,也不会计较精力投入问题,我差不多除了睡觉都在研究这些。进行各个行业领域研究的时候,首先想一下自己需要得出的结论目标方向,然后去分析下结论会是什么,没有方向确立的话经常想着想着会偏题,所以目标先明确再去思考其中的过程,最终得出结论。迅速了解跨界行业的最好办法是和不同行业的朋友聊天,因为书本上的知识有时会和实际情况有差异,因此和朋友讨论是最好的办法。另外我想说的就是阿里云 MVP是我最为珍视的头衔,当然也有责任去推动数据变革。有一个项目比较有意思。之前网上有观点说特征分析如果做得好,不用了解生产过程也能设计出很好的算法。于是我把自己实验数据集的标签全部删掉,用PCA(主成分分析算法)又试了一下,准确...
-
下一篇
Rook快速上手——Ceph三位一体存储
快速上手 官网地址:https://rook.io/ 项目地址:https://github.com/rook/rook 安装集群 准备osd存储介质 硬盘符号 大小 作用 sdb 50GB OSD Data sdc 50GB OSD Data sdd 50GB OSD Data sde 50GB OSD Metadata > 安装前使用命令lvm lvs,lvm vgs和lvm pvs检查上述硬盘是否已经被使用,若已经使用需要删除,且确保硬盘上不存在分区和文件系统 确保开启内核rbd模块并安装lvm2 modprobe rbd yum install -y lvm2 安装operator git clone --single-branch --branch release-1.2 https://github.com/rook/rook.git cd rook/cluster/examples/kubernetes/ceph kubectl create -f common.yaml kubectl create -f operator.yaml 安装ceph集群 --- ap...
相关文章
文章评论
共有0条评论来说两句吧...
文章二维码
点击排行
推荐阅读
最新文章
- SpringBoot2整合Redis,开启缓存,提高访问速度
- SpringBoot2整合MyBatis,连接MySql数据库做增删改查操作
- SpringBoot2整合Thymeleaf,官方推荐html解决方案
- SpringBoot2初体验,简单认识spring boot2并且搭建基础工程
- Docker快速安装Oracle11G,搭建oracle11g学习环境
- Dcoker安装(在线仓库),最新的服务器搭配容器使用
- SpringBoot2配置默认Tomcat设置,开启更多高级功能
- Docker使用Oracle官方镜像安装(12C,18C,19C)
- SpringBoot2全家桶,快速入门学习开发网站教程
- CentOS7,8上快速安装Gitea,搭建Git服务器