基于 DDD、EventSourcing 的现代响应式 CQRS 架构微服务开发框架
![Awesome Kotlin Badge]()
领域驱动 | 事件驱动 | 测试驱动 | 声明式设计 | 响应式编程 | 命令查询职责分离 | 事件源
更新内容 🎉 🎉 🎉
全面支持 OpenAPI 3
- 特性: 全面支持 OpenAPI 3
- 特性: 新增
@CommandRoute.HeaderVariable 支持从请求头提取参数注入命令体
- 特性: 新增
HiddenVariableConverter 支持隐藏命令体中的路由变量
- 特性: 新增
CommandRoute.Method 支持自定义命令路由 Method
- 特性: 新增
ScanAggregateHandlerFunction 支持扫描状态聚合API
- 依赖: 升级
gradle 版本v8.2.1
- 依赖: 升级
swagger-core 版本v2.2.15
- 特性: 新增
IdsQueryAggregateHandlerFunction 支持根据 Ids 查询状态聚合
- 特性: 新增
RouteHandlerFunctionRegistrar 适配 wow-openapi
架构图
![Wow-Architecture]()
事件源
![Wow-EventSourcing]()
可观测性
![Wow-Observability]()
Spring WebFlux 集成
自动注册 命令 路由处理函数 (HandlerFunction) ,开发人员仅需编写领域模型,即可完成服务开发。
![OpenAPI-Swagger]()
测试套件:80%+ 的测试覆盖率轻而易举
Given -> When -> Expect .
![Wow-CI-Flow]()
前置条件
- 理解 领域驱动设计:《实现领域驱动设计》、《领域驱动设计:软件核心复杂性应对之道》
- 理解 命令查询职责分离(CQRS)
- 理解 事件源架构
- 理解 响应式编程
特性
- Aggregate Modeling
- Single Class
- Inheritance Pattern
- Aggregation Pattern
- Saga Modeling
- Test Suite
- 兼容性测试规范(TCK)
-
AggregateVerifier
-
SagaVerifier
- EventSourcing
- EventStore
- MongoDB (Recommend)
- R2dbc
- Database Sharding
- Table Sharding
- Redis
- Snapshot
- MongoDB
- R2dbc
- Database Sharding
- Table Sharding
- ElasticSearch
- Redis (Recommend)
- 命令等待策略(
WaitStrategy)
-
SENT : 命令发送成功后发送完成信号
-
PROCESSED : 命令处理完成后发送完成信号
-
SNAPSHOT : 快照生成完成后发送完成信号
-
PROJECTED : 命令产生的事件被投影后发送完成信号
- CommandBus
-
InMemoryCommandBus
-
KafkaCommandBus (Recommend)
-
RedisCommandBus
-
LocalFirstCommandBus
- DomainEventBus
-
InMemoryDomainEventBus
-
KafkaDomainEventBus (Recommend)
-
RedisDomainEventBus
-
LocalFirstDomainEventBus
- StateEventBus
-
InMemoryStateEventBus
-
KafkaStateEventBus (Recommend)
-
RedisStateEventBus
-
LocalFirstStateEventBus
- Spring 集成
- Spring Boot Auto Configuration
- Automatically register
CommandAggregate to RouterFunction
- 可观测性
- OpenApi
-
WowMetadata Generator
Example
Example
测试套件
80%+ 的测试覆盖率轻而易举。
![Test Coverage]()
Given -> When -> Expect .
internal class OrderTest {
companion object {
val SHIPPING_ADDRESS = ShippingAddress("China", "ShangHai", "ShangHai", "HuangPu", "001")
}
private fun mockCreateOrder(): VerifiedStage<OrderState> {
val tenantId = GlobalIdGenerator.generateAsString()
val customerId = GlobalIdGenerator.generateAsString()
val orderItem = OrderItem(
GlobalIdGenerator.generateAsString(),
GlobalIdGenerator.generateAsString(),
BigDecimal.valueOf(10),
10
)
val orderItems = listOf(orderItem)
val inventoryService = object : InventoryService {
override fun getInventory(productId: String): Mono<Int> {
return orderItems.toFlux().filter { it.productId == productId }.map { it.quantity }.last()
}
}
val pricingService = object : PricingService {
override fun getProductPrice(productId: String): Mono<BigDecimal> {
return orderItems.toFlux().filter { it.productId == productId }.map { it.price }.last()
}
}
return aggregateVerifier<Order, OrderState>(tenantId = tenantId)
.inject(DefaultCreateOrderSpec(inventoryService, pricingService))
.given()
.`when`(CreateOrder(customerId, orderItems, SHIPPING_ADDRESS))
.expectEventCount(1)
.expectEventType(OrderCreated::class.java)
.expectStateAggregate {
assertThat(it.aggregateId.tenantId, equalTo(tenantId))
}
.expectState {
assertThat(it.id, notNullValue())
assertThat(it.customerId, equalTo(customerId))
assertThat(it.address, equalTo(SHIPPING_ADDRESS))
assertThat(it.items, equalTo(orderItems))
assertThat(it.status, equalTo(OrderStatus.CREATED))
}
.verify()
}
/** * 创建订单 */
@Test
fun createOrder() {
mockCreateOrder()
}
/** * 创建订单-库存不足 */
@Test
fun createOrderWhenInventoryShortage() {
val customerId = GlobalIdGenerator.generateAsString()
val orderItem = OrderItem(
GlobalIdGenerator.generateAsString(),
GlobalIdGenerator.generateAsString(),
BigDecimal.valueOf(10),
10
)
val orderItems = listOf(orderItem)
val inventoryService = object : InventoryService {
override fun getInventory(productId: String): Mono<Int> {
return orderItems.toFlux().filter { it.productId == productId }
/* * 模拟库存不足 */
.map { it.quantity - 1 }.last()
}
}
val pricingService = object : PricingService {
override fun getProductPrice(productId: String): Mono<BigDecimal> {
return orderItems.toFlux().filter { it.productId == productId }.map { it.price }.last()
}
}
aggregateVerifier<Order, OrderState>()
.inject(DefaultCreateOrderSpec(inventoryService, pricingService))
.given()
.`when`(CreateOrder(customerId, orderItems, SHIPPING_ADDRESS))
/* * 期望:库存不足异常. */
.expectErrorType(InventoryShortageException::class.java)
.expectStateAggregate {
/* * 该聚合对象处于未初始化状态,即该聚合未创建成功. */
assertThat(it.initialized, equalTo(false))
}.verify()
}
/** * 创建订单-下单价格与当前价格不一致 */
@Test
fun createOrderWhenPriceInconsistency() {
val customerId = GlobalIdGenerator.generateAsString()
val orderItem = OrderItem(
GlobalIdGenerator.generateAsString(),
GlobalIdGenerator.generateAsString(),
BigDecimal.valueOf(10),
10
)
val orderItems = listOf(orderItem)
val inventoryService = object : InventoryService {
override fun getInventory(productId: String): Mono<Int> {
return orderItems.toFlux().filter { it.productId == productId }.map { it.quantity }.last()
}
}
val pricingService = object : PricingService {
override fun getProductPrice(productId: String): Mono<BigDecimal> {
return orderItems.toFlux().filter { it.productId == productId }
/* * 模拟下单价格、商品定价不一致 */
.map { it.price.plus(BigDecimal.valueOf(1)) }.last()
}
}
aggregateVerifier<Order, OrderState>()
.inject(DefaultCreateOrderSpec(inventoryService, pricingService))
.given()
.`when`(CreateOrder(customerId, orderItems, SHIPPING_ADDRESS))
/* * 期望:价格不一致异常. */
.expectErrorType(PriceInconsistencyException::class.java).verify()
}
private fun mockPayOrder(): VerifiedStage<OrderState> {
val verifiedStageAfterCreateOrder = mockCreateOrder()
val previousState = verifiedStageAfterCreateOrder.stateRoot
val payOrder = PayOrder(
previousState.id,
GlobalIdGenerator.generateAsString(),
previousState.totalAmount
)
return verifiedStageAfterCreateOrder
.then()
.given()
/* * 2. 当接收到命令 */
.`when`(payOrder)
/* * 3.1 期望将会产生1个事件 */
.expectEventCount(1)
/* * 3.2 期望将会产生一个 OrderPaid 事件 (3.1 可以不需要) */
.expectEventType(OrderPaid::class.java)
/* * 3.3 期望产生的事件状态 */
.expectEventBody<OrderPaid> {
assertThat(it.amount, equalTo(payOrder.amount))
}
/* * 4. 期望当前聚合状态 */
.expectState {
assertThat(it.address, equalTo(SHIPPING_ADDRESS))
assertThat(it.paidAmount, equalTo(payOrder.amount))
assertThat(it.status, equalTo(OrderStatus.PAID))
}
/* * 完成测试编排后,验证期望. */
.verify()
}
/** * 支付订单 */
@Test
fun payOrder() {
mockPayOrder()
}
/** * 支付订单-超付 */
@Test
fun payOrderWhenOverPay() {
val verifiedStageAfterCreateOrder = mockCreateOrder()
val previousState = verifiedStageAfterCreateOrder.stateRoot
val payOrder = PayOrder(
previousState.id,
GlobalIdGenerator.generateAsString(),
previousState.totalAmount.plus(
BigDecimal.valueOf(1)
)
)
verifiedStageAfterCreateOrder
.then()
.given()
/* * 2. 处理 PayOrder 命令 */
.`when`(payOrder)
/* * 3.1 期望将会产生俩个事件分别是: OrderPaid、OrderOverPaid */
.expectEventType(OrderPaid::class.java, OrderOverPaid::class.java)
/* * 3.2 期望产生的事件状态 */
.expectEventStream {
val itr = it.iterator()
/* * OrderPaid */
val orderPaid = itr.next().body as OrderPaid
assertThat(orderPaid.paid, equalTo(true))
/* * OrderOverPaid */
val orderOverPaid = itr.next().body as OrderOverPaid
assertThat(
orderOverPaid.overPay,
equalTo(payOrder.amount.minus(previousState.totalAmount))
)
}
/* * 4. 期望当前聚合状态 */
.expectState {
assertThat(it.paidAmount, equalTo(previousState.totalAmount))
assertThat(it.status, equalTo(OrderStatus.PAID))
}
.verify()
}
/** * 发货 */
@Test
fun ship() {
val verifiedStageAfterPayOrder = mockPayOrder()
val shipOrder = ShipOrder(verifiedStageAfterPayOrder.stateRoot.id)
verifiedStageAfterPayOrder
.then().given()
.`when`(shipOrder)
.expectEventType(OrderShipped::class.java)
/* * 4. 期望当前聚合状态 */
.expectState {
assertThat(it.status, equalTo(OrderStatus.SHIPPED))
}
.verify()
}
@Test
fun shipGivenUnpaid() {
val verifiedStageAfterCreateOrder = mockCreateOrder()
val shipOrder = ShipOrder(verifiedStageAfterCreateOrder.stateRoot.id)
verifiedStageAfterCreateOrder.then().given()
.`when`(shipOrder)
.expectErrorType(IllegalStateException::class.java)
.expectState {
/* * 验证聚合状态[未]发生变更. */
assertThat(it.paidAmount, equalTo(BigDecimal.ZERO))
assertThat(it.status, equalTo(OrderStatus.CREATED))
}
.verify()
}
private fun mockDeleteOrder(): VerifiedStage<OrderState> {
val verifiedStageAfterCreateOrder = mockCreateOrder()
return verifiedStageAfterCreateOrder.then().given()
.`when`(DeleteAggregate)
.expectEventType(AggregateDeleted::class.java)
.expectStateAggregate {
assertThat(it.deleted, equalTo(true))
}
.verify()
}
@Test
fun deleteOrder() {
mockDeleteOrder()
}
@Test
fun deleteGivenDeleted() {
val verifiedStageAfterDelete = mockDeleteOrder()
verifiedStageAfterDelete.then().given()
.`when`(DeleteAggregate)
.expectErrorType(IllegalAccessDeletedAggregateException::class.java)
.expectError<IllegalAccessDeletedAggregateException> {
assertThat(it.aggregateId, equalTo(verifiedStageAfterDelete.stateAggregate.aggregateId))
}.expectStateAggregate {
assertThat(it.deleted, equalTo(true))
}
.verify()
}
}
设计
聚合建模
| Single Class |
Inheritance Pattern |
Aggregation Pattern |
![Single Class - Modeling]() |
![Inheritance Pattern- Modeling]() |
![Aggregation Pattern- Modeling]() |
加载聚合
![Load Aggregate]()
聚合状态流
![Aggregate State Flow]()
发送命令
![Send Command]()
命令与事件流
![Command And Event Flow]()
Saga - OrderProcessManager (Demo)
![OrderProcessManager]()