基于 DDD、EventSourcing 的现代响应式 CQRS 架构微服务开发框架
![Awesome Kotlin Badge]()
领域驱动 | 事件驱动 | 测试驱动 | 声明式设计 | 响应式编程 | 命令查询职责分离 | 事件溯源
🎉 更新内容 🎉
新增 10 秒钟快速构建基于 Wow 框架的 DDD 项目模板
- 特性: 新增
AggregateRouteSpecFactoryProvider API
- 特性: 增强自定义
RouteHandlerFunctionFactory API
- 依赖: 更新
gradle 版本 v8.5
- 依赖: 更新插件
detekt 版本 v1.23.4
- 依赖: 更新插件
ksp 版本 v1.9.21-1.0.15
- 重构: 使用 CosId 的
HostAddressSupplier 替换 InetUtils,移除 Spring-Cloud-Commons 依赖
- 案例: 新增解冻账户(
UnfreezeAccount)命令 (经典 DDD 银行转账案例(JAVA))
- 修复:
AggregateTracingHandlerFunction 路由不匹配的问题
- 特性: 新增 10 秒钟快速构建基于 Wow 框架的 DDD 项目模板
| 模块 |
说明 |
| api |
API 层,定义聚合命令(Command)、领域事件(Domain Event)以及查询视图模型(Query View Model)。充当各个模块之间通信的“发布语言”,同时提供详细的 API 文档,助力开发者理解和使用接口。 |
| domain |
领域层,包含聚合根和业务约束的实现。聚合根充当领域模型的入口点,负责协调领域对象的操作,确保业务规则的正确执行。业务约束包括领域对象的验证规则、领域事件的处理等。模块内附有详细的领域模型文档,助力团队深入了解业务逻辑。 |
| server |
宿主服务,应用程序的启动点。负责整合其他模块,并提供应用程序的入口。涉及配置依赖项、连接数据库、启动 API 服务等任务。此外,server 模块提供了容器化部署的支持,包括 Docker 构建镜像和 Kubernetes 部署文件,简化了部署过程。 |
| code-coverage-report |
测试覆盖率,用于生成详细的测试覆盖率报告,以及验证覆盖率是否符合要求。帮助开发团队了解项目测试的全面性和质量。 |
| dependencies |
依赖项管理,这个模块负责管理项目的依赖关系,确保各个模块能够正确地引用和使用所需的外部库和工具。 |
| bom |
项目的 BOM(Bill of Materials) |
| libs.versions.toml |
依赖版本配置文件,明确了项目中各个库的版本,方便团队协作和保持版本的一致性。 |
| deploy |
Kubernetes 部署文件,提供了在 Kubernetes 上部署应用程序所需的配置文件,简化了部署过程。 |
| Dockerfile |
server Docker 构建镜像,通过 Dockerfile 文件定义了应用程序的容器化构建步骤,方便部署和扩展。 |
| document |
项目文档,包括 UML 图和上下文映射图,为团队成员提供了对整个项目结构和业务逻辑的清晰理解。 |
架构图
![Wow-Architecture]()
性能测试 (Example)
- 测试代码:Example
- 测试场景:加入购物车、下单
- 命令发送等待模式(
WaitStrategy):SENT、PROCESSED
部署
测试报告
加入购物车
WaitStrategy: SENT
![SENT]()
WaitStrategy: PROCESSED
![PROCESSED]()
下单
WaitStrategy: SENT
![SENT]()
WaitStrategy: PROCESSED
![PROCESSED]()
事件源
![Wow-EventSourcing]()
可观测性
![Wow-Observability]()
OpenAPI (Spring WebFlux 集成)
自动注册 命令 路由处理函数 (HandlerFunction) ,开发人员仅需编写领域模型,即可完成服务开发。
![Wow-Spring-WebFlux-Integration]()
测试套件:80%+ 的测试覆盖率轻而易举
Given -> When -> Expect .
![Wow-CI-Flow]()
前置条件
- 理解 领域驱动设计:《实现领域驱动设计》、《领域驱动设计:软件核心复杂性应对之道》
- 理解 命令查询职责分离(CQRS)
- 理解 事件源架构
- 理解 响应式编程
特性
- Aggregate Modeling
- Single Class
- Inheritance Pattern
- Aggregation Pattern
- Saga Modeling
- Test Suite
- 兼容性测试规范(TCK)
-
AggregateVerifier
-
SagaVerifier
- EventSourcing
- EventStore
- MongoDB (Recommend)
- R2dbc
- Database Sharding
- Table Sharding
- Redis
- Snapshot
- MongoDB
- R2dbc
- Database Sharding
- Table Sharding
- ElasticSearch
- Redis (Recommend)
- 命令等待策略(
WaitStrategy)
-
SENT : 命令发送成功后发送完成信号
-
PROCESSED : 命令处理完成后发送完成信号
-
SNAPSHOT : 快照生成完成后发送完成信号
-
PROJECTED : 命令产生的事件被投影后发送完成信号
- CommandBus
-
InMemoryCommandBus
-
KafkaCommandBus (Recommend)
-
RedisCommandBus
-
LocalFirstCommandBus
- DomainEventBus
-
InMemoryDomainEventBus
-
KafkaDomainEventBus (Recommend)
-
RedisDomainEventBus
-
LocalFirstDomainEventBus
- StateEventBus
-
InMemoryStateEventBus
-
KafkaStateEventBus (Recommend)
-
RedisStateEventBus
-
LocalFirstStateEventBus
- Spring 集成
- Spring Boot Auto Configuration
- Automatically register
CommandAggregate to RouterFunction
- 可观测性
- OpenAPI
-
WowMetadata Generator
Example
Example
单元测试套件
80%+ 的测试覆盖率轻而易举。
![Test Coverage]()
Given -> When -> Expect .
Aggregate Unit Test (AggregateVerifier)
Aggregate Test
internal class OrderTest {
private fun mockCreateOrder(): VerifiedStage<OrderState> {
val tenantId = GlobalIdGenerator.generateAsString()
val customerId = GlobalIdGenerator.generateAsString()
val orderItem = OrderItem(
GlobalIdGenerator.generateAsString(),
GlobalIdGenerator.generateAsString(),
BigDecimal.valueOf(10),
10,
)
val orderItems = listOf(orderItem)
val inventoryService = object : InventoryService {
override fun getInventory(productId: String): Mono<Int> {
return orderItems.filter { it.productId == productId }.map { it.quantity }.first().toMono()
}
}
val pricingService = object : PricingService {
override fun getProductPrice(productId: String): Mono<BigDecimal> {
return orderItems.filter { it.productId == productId }.map { it.price }.first().toMono()
}
}
return aggregateVerifier<Order, OrderState>(tenantId = tenantId)
.inject(DefaultCreateOrderSpec(inventoryService, pricingService))
.given()
.`when`(CreateOrder(customerId, orderItems, SHIPPING_ADDRESS, false))
.expectEventCount(1)
.expectEventType(OrderCreated::class.java)
.expectStateAggregate {
assertThat(it.aggregateId.tenantId, equalTo(tenantId))
}
.expectState {
assertThat(it.id, notNullValue())
assertThat(it.customerId, equalTo(customerId))
assertThat(it.address, equalTo(SHIPPING_ADDRESS))
assertThat(it.items, equalTo(orderItems))
assertThat(it.status, equalTo(OrderStatus.CREATED))
}
.verify()
}
/** * 创建订单 */
@Test
fun createOrder() {
mockCreateOrder()
}
@Test
fun createOrderGivenEmptyItems() {
val customerId = GlobalIdGenerator.generateAsString()
aggregateVerifier<Order, OrderState>()
.inject(mockk<CreateOrderSpec>(), "createOrderSpec")
.given()
.`when`(CreateOrder(customerId, listOf(), SHIPPING_ADDRESS, false))
.expectErrorType(IllegalArgumentException::class.java)
.expectStateAggregate {
/* * 该聚合对象处于未初始化状态,即该聚合未创建成功. */
assertThat(it.initialized, equalTo(false))
}.verify()
}
/** * 创建订单-库存不足 */
@Test
fun createOrderWhenInventoryShortage() {
val customerId = GlobalIdGenerator.generateAsString()
val orderItem = OrderItem(
GlobalIdGenerator.generateAsString(),
GlobalIdGenerator.generateAsString(),
BigDecimal.valueOf(10),
10,
)
val orderItems = listOf(orderItem)
val inventoryService = object : InventoryService {
override fun getInventory(productId: String): Mono<Int> {
return orderItems.filter { it.productId == productId }
/* * 模拟库存不足 */
.map { it.quantity - 1 }.first().toMono()
}
}
val pricingService = object : PricingService {
override fun getProductPrice(productId: String): Mono<BigDecimal> {
return orderItems.filter { it.productId == productId }.map { it.price }.first().toMono()
}
}
aggregateVerifier<Order, OrderState>()
.inject(DefaultCreateOrderSpec(inventoryService, pricingService))
.given()
.`when`(CreateOrder(customerId, orderItems, SHIPPING_ADDRESS, false))
/* * 期望:库存不足异常. */
.expectErrorType(InventoryShortageException::class.java)
.expectStateAggregate {
/* * 该聚合对象处于未初始化状态,即该聚合未创建成功. */
assertThat(it.initialized, equalTo(false))
}.verify()
}
/** * 创建订单-下单价格与当前价格不一致 */
@Test
fun createOrderWhenPriceInconsistency() {
val customerId = GlobalIdGenerator.generateAsString()
val orderItem = OrderItem(
GlobalIdGenerator.generateAsString(),
GlobalIdGenerator.generateAsString(),
BigDecimal.valueOf(10),
10,
)
val orderItems = listOf(orderItem)
val inventoryService = object : InventoryService {
override fun getInventory(productId: String): Mono<Int> {
return orderItems.filter { it.productId == productId }.map { it.quantity }.first().toMono()
}
}
val pricingService = object : PricingService {
override fun getProductPrice(productId: String): Mono<BigDecimal> {
return orderItems.filter { it.productId == productId }
/* * 模拟下单价格、商品定价不一致 */
.map { it.price.plus(BigDecimal.valueOf(1)) }.first().toMono()
}
}
aggregateVerifier<Order, OrderState>()
.inject(DefaultCreateOrderSpec(inventoryService, pricingService))
.given()
.`when`(CreateOrder(customerId, orderItems, SHIPPING_ADDRESS, false))
/* * 期望:价格不一致异常. */
.expectErrorType(PriceInconsistencyException::class.java).verify()
}
}
Saga Unit Test (SagaVerifier)
Saga Test
class CartSagaTest {
@Test
fun onOrderCreated() {
val orderItem = OrderItem(
GlobalIdGenerator.generateAsString(),
GlobalIdGenerator.generateAsString(),
BigDecimal.valueOf(10),
10,
)
sagaVerifier<CartSaga>()
.`when`(
mockk<OrderCreated> {
every {
customerId
} returns "customerId"
every {
items
} returns listOf(orderItem)
every {
fromCart
} returns true
},
)
.expectCommandBody<RemoveCartItem> {
assertThat(it.id, equalTo("customerId"))
assertThat(it.productIds, hasSize(1))
assertThat(it.productIds.first(), equalTo(orderItem.productId))
}
.verify()
}
}
设计
聚合建模
| Single Class |
Inheritance Pattern |
Aggregation Pattern |
![Single Class - Modeling]() |
![Inheritance Pattern- Modeling]() |
![Aggregation Pattern- Modeling]() |
加载聚合
![Load Aggregate]()
聚合状态流
![Aggregate State Flow]()
发送命令
![Send Command]()
命令与事件流
![Command And Event Flow]()
Saga - OrderProcessManager (Demo)
![OrderProcessManager]()