본문으로 건너뛰기

프로덕션 레디 이벤트 기반 아키텍처 Part 3 - CQRS와 Read/Write 모델 분리

시리즈 소개

Series Introduction

이 시리즈는 프로덕션 환경에서 사용할 수 있는 이벤트 기반 아키텍처를 구축하는 방법을 다룹니다.

  1. Part 1: Event Sourcing 기초
  2. Part 2: Outbox Pattern 구현
  3. Part 3: CQRS와 Read/Write 모델 분리 (현재 글)
  4. Part 4: Saga Pattern으로 분산 트랜잭션 처리
  5. Part 5: Event Schema 진화와 버전 관리

This series covers how to build production-ready event-driven architecture.

  1. Part 1: Event Sourcing Basics
  2. Part 2: Implementing the Outbox Pattern
  3. Part 3: CQRS and Read/Write Model Separation (current post)
  4. Part 4: Distributed Transaction Handling with Saga Pattern
  5. Part 5: Event Schema Evolution and Versioning

CQRS란?

CQRS(Command Query Responsibility Segregation)는 읽기(Query)와 쓰기(Command)를 분리하는 패턴입니다.

왜 CQRS인가?

전통적인 CRUD 모델의 한계:

  • 읽기와 쓰기의 요구사항이 다름
  • 복잡한 쿼리를 위해 여러 테이블 조인 필요
  • 읽기 최적화와 쓰기 최적화가 충돌
  • 스케일링이 어려움

CQRS의 장점:

  • 읽기/쓰기 각각에 최적화된 모델 사용 가능
  • 독립적인 스케일링
  • 복잡한 도메인 로직과 쿼리 로직 분리
  • Event Sourcing과의 자연스러운 결합

What is CQRS?

CQRS (Command Query Responsibility Segregation) is a pattern that separates read (Query) and write (Command) operations.

Why CQRS?

Limitations of the traditional CRUD model:

  • Read and write requirements are different
  • Complex queries require joining multiple tables
  • Read optimization and write optimization conflict
  • Scaling is difficult

Benefits of CQRS:

  • Can use optimized models for read/write separately
  • Independent scaling
  • Separation of complex domain logic and query logic
  • Natural integration with Event Sourcing

아키텍처 개요

Architecture Overview

                    ┌─────────────────┐
│ Client │
└────────┬────────┘

┌─────────────────┴─────────────────┐
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Command │ │ Query │
│ Service │ │ Service │
└──────┬──────┘ └──────┬──────┘
│ │
▼ ▼
┌─────────────┐ ┌─────────────┐
│ Write │ │ Read │
│ Model │─────Events────────▶│ Model │
│ (PostgreSQL)│ │(Elasticsearch)│
└─────────────┘ └─────────────┘

구현

Command Side

Command 정의

Implementation

Command Side

Command Definition

// Commands
sealed interface OrderCommand {
val orderId: String
}

data class CreateOrderCommand(
override val orderId: String = UUID.randomUUID().toString(),
val customerId: String,
val items: List<OrderItemDto>
) : OrderCommand

data class ShipOrderCommand(
override val orderId: String,
val shippingAddress: String,
val trackingNumber: String
) : OrderCommand

data class CancelOrderCommand(
override val orderId: String,
val reason: String
) : OrderCommand

Command Handler

Command Handler

@Service
class OrderCommandHandler(
private val orderRepository: OrderRepository,
private val eventPublisher: ApplicationEventPublisher
) {
@Transactional
fun handle(command: CreateOrderCommand): String {
val order = Order.create(
orderId = command.orderId,
customerId = command.customerId,
items = command.items.map { it.toOrderItem() }
)

orderRepository.save(order)

// Publish domain events
order.getPendingEvents().forEach { event ->
eventPublisher.publishEvent(event)
}
order.clearPendingEvents()

return order.id
}

@Transactional
fun handle(command: ShipOrderCommand) {
val order = orderRepository.findById(command.orderId)
?: throw OrderNotFoundException(command.orderId)

order.ship(command.shippingAddress, command.trackingNumber)
orderRepository.save(order)

order.getPendingEvents().forEach { event ->
eventPublisher.publishEvent(event)
}
order.clearPendingEvents()
}

@Transactional
fun handle(command: CancelOrderCommand) {
val order = orderRepository.findById(command.orderId)
?: throw OrderNotFoundException(command.orderId)

order.cancel(command.reason)
orderRepository.save(order)

order.getPendingEvents().forEach { event ->
eventPublisher.publishEvent(event)
}
order.clearPendingEvents()
}
}

Command Controller

Command Controller

@RestController
@RequestMapping("/api/orders")
class OrderCommandController(
private val commandHandler: OrderCommandHandler
) {
@PostMapping
fun createOrder(@RequestBody request: CreateOrderRequest): ResponseEntity<OrderIdResponse> {
val orderId = commandHandler.handle(
CreateOrderCommand(
customerId = request.customerId,
items = request.items
)
)
return ResponseEntity
.created(URI.create("/api/orders/$orderId"))
.body(OrderIdResponse(orderId))
}

@PostMapping("/{orderId}/ship")
fun shipOrder(
@PathVariable orderId: String,
@RequestBody request: ShipOrderRequest
): ResponseEntity<Unit> {
commandHandler.handle(
ShipOrderCommand(
orderId = orderId,
shippingAddress = request.shippingAddress,
trackingNumber = request.trackingNumber
)
)
return ResponseEntity.ok().build()
}

@PostMapping("/{orderId}/cancel")
fun cancelOrder(
@PathVariable orderId: String,
@RequestBody request: CancelOrderRequest
): ResponseEntity<Unit> {
commandHandler.handle(
CancelOrderCommand(
orderId = orderId,
reason = request.reason
)
)
return ResponseEntity.ok().build()
}
}

Query Side

Read Model

Query Side

Read Model

@Document(indexName = "orders")
data class OrderReadModel(
@Id
val orderId: String,
val customerId: String,
val customerName: String,
val status: String,
val items: List<OrderItemReadModel>,
val totalAmount: BigDecimal,
val itemCount: Int,
val shippingAddress: String?,
val trackingNumber: String?,
val createdAt: Instant,
val updatedAt: Instant
)

data class OrderItemReadModel(
val productId: String,
val productName: String,
val quantity: Int,
val price: BigDecimal
)

Event Handler (Projector)

Event Handler (Projector)

@Component
class OrderProjector(
private val orderReadModelRepository: OrderReadModelRepository,
private val customerService: CustomerService,
private val productService: ProductService
) {
private val logger = LoggerFactory.getLogger(javaClass)

@EventListener
@Async
fun on(event: OrderCreated) {
logger.info("Projecting OrderCreated: ${event.aggregateId}")

val customer = customerService.getCustomer(event.customerId)
val products = productService.getProducts(event.items.map { it.productId })

val readModel = OrderReadModel(
orderId = event.aggregateId,
customerId = event.customerId,
customerName = customer.name,
status = "CREATED",
items = event.items.map { item ->
val product = products.find { it.id == item.productId }!!
OrderItemReadModel(
productId = item.productId,
productName = product.name,
quantity = item.quantity,
price = item.price
)
},
totalAmount = event.totalAmount,
itemCount = event.items.size,
shippingAddress = null,
trackingNumber = null,
createdAt = event.occurredAt,
updatedAt = event.occurredAt
)

orderReadModelRepository.save(readModel)
}

@EventListener
@Async
fun on(event: OrderShipped) {
logger.info("Projecting OrderShipped: ${event.aggregateId}")

orderReadModelRepository.findById(event.aggregateId).ifPresent { order ->
val updated = order.copy(
status = "SHIPPED",
shippingAddress = event.shippingAddress,
trackingNumber = event.trackingNumber,
updatedAt = event.occurredAt
)
orderReadModelRepository.save(updated)
}
}

@EventListener
@Async
fun on(event: OrderCancelled) {
logger.info("Projecting OrderCancelled: ${event.aggregateId}")

orderReadModelRepository.findById(event.aggregateId).ifPresent { order ->
val updated = order.copy(
status = "CANCELLED",
updatedAt = event.occurredAt
)
orderReadModelRepository.save(updated)
}
}
}

Query Service

Query Service

@Service
class OrderQueryService(
private val orderReadModelRepository: OrderReadModelRepository
) {
fun findById(orderId: String): OrderReadModel? {
return orderReadModelRepository.findById(orderId).orElse(null)
}

fun findByCustomerId(customerId: String, pageable: Pageable): Page<OrderReadModel> {
return orderReadModelRepository.findByCustomerId(customerId, pageable)
}

fun searchOrders(criteria: OrderSearchCriteria, pageable: Pageable): Page<OrderReadModel> {
return orderReadModelRepository.search(criteria, pageable)
}

fun getOrderStatistics(customerId: String): OrderStatistics {
val orders = orderReadModelRepository.findByCustomerId(customerId)
return OrderStatistics(
totalOrders = orders.size,
totalAmount = orders.sumOf { it.totalAmount },
averageOrderAmount = orders.map { it.totalAmount }
.takeIf { it.isNotEmpty() }
?.let { amounts -> amounts.reduce { a, b -> a + b } / amounts.size.toBigDecimal() }
?: BigDecimal.ZERO,
ordersByStatus = orders.groupBy { it.status }.mapValues { it.value.size }
)
}
}

Query Controller

Query Controller

@RestController
@RequestMapping("/api/orders")
class OrderQueryController(
private val queryService: OrderQueryService
) {
@GetMapping("/{orderId}")
fun getOrder(@PathVariable orderId: String): ResponseEntity<OrderReadModel> {
val order = queryService.findById(orderId)
?: throw OrderNotFoundException(orderId)
return ResponseEntity.ok(order)
}

@GetMapping
fun searchOrders(
@RequestParam(required = false) customerId: String?,
@RequestParam(required = false) status: String?,
@RequestParam(required = false) fromDate: Instant?,
@RequestParam(required = false) toDate: Instant?,
pageable: Pageable
): ResponseEntity<Page<OrderReadModel>> {
val criteria = OrderSearchCriteria(
customerId = customerId,
status = status,
fromDate = fromDate,
toDate = toDate
)
return ResponseEntity.ok(queryService.searchOrders(criteria, pageable))
}

@GetMapping("/customers/{customerId}/statistics")
fun getCustomerStatistics(
@PathVariable customerId: String
): ResponseEntity<OrderStatistics> {
return ResponseEntity.ok(queryService.getOrderStatistics(customerId))
}
}

Elasticsearch Repository

Elasticsearch Repository

interface OrderReadModelRepository : ElasticsearchRepository<OrderReadModel, String> {

fun findByCustomerId(customerId: String, pageable: Pageable): Page<OrderReadModel>

fun findByCustomerId(customerId: String): List<OrderReadModel>

fun findByStatus(status: String, pageable: Pageable): Page<OrderReadModel>

@Query("""
{
"bool": {
"must": [
{"match": {"customerId": "?0"}}
],
"filter": [
{"range": {"createdAt": {"gte": "?1", "lte": "?2"}}}
]
}
}
""")
fun findByCustomerIdAndDateRange(
customerId: String,
fromDate: Instant,
toDate: Instant,
pageable: Pageable
): Page<OrderReadModel>
}

@Repository
class OrderReadModelRepositoryCustomImpl(
private val elasticsearchOperations: ElasticsearchOperations
) : OrderReadModelRepositoryCustom {

override fun search(criteria: OrderSearchCriteria, pageable: Pageable): Page<OrderReadModel> {
val queryBuilder = BoolQuery.Builder()

criteria.customerId?.let {
queryBuilder.must(MatchQuery.of { q -> q.field("customerId").query(it) }._toQuery())
}

criteria.status?.let {
queryBuilder.must(MatchQuery.of { q -> q.field("status").query(it) }._toQuery())
}

if (criteria.fromDate != null || criteria.toDate != null) {
val rangeQuery = RangeQuery.of { r ->
r.field("createdAt")
criteria.fromDate?.let { r.gte(JsonData.of(it.toString())) }
criteria.toDate?.let { r.lte(JsonData.of(it.toString())) }
r
}
queryBuilder.filter(rangeQuery._toQuery())
}

val searchQuery = NativeQuery.builder()
.withQuery(queryBuilder.build()._toQuery())
.withPageable(pageable)
.build()

val hits = elasticsearchOperations.search(searchQuery, OrderReadModel::class.java)

return PageImpl(
hits.searchHits.map { it.content },
pageable,
hits.totalHits
)
}
}

Eventual Consistency 처리

CQRS에서 읽기 모델은 쓰기 모델과 최종적으로 일관성을 가집니다. 이를 처리하는 전략:

Handling Eventual Consistency

In CQRS, the read model is eventually consistent with the write model. Strategies to handle this:

1. Optimistic UI Update

1. Optimistic UI Update

// Frontend: React example
const createOrder = async (orderData: CreateOrderRequest) => {
// Optimistically add to local state
const tempId = generateTempId();
setOrders(prev => [...prev, { ...orderData, id: tempId, status: 'CREATING' }]);

try {
const response = await api.post('/orders', orderData);
// Replace temp order with real one
setOrders(prev => prev.map(o =>
o.id === tempId ? { ...o, id: response.data.orderId, status: 'CREATED' } : o
));
} catch (error) {
// Remove temp order on failure
setOrders(prev => prev.filter(o => o.id !== tempId));
throw error;
}
};

2. Polling for Consistency

2. Polling for Consistency

@RestController
class OrderQueryController(
private val queryService: OrderQueryService
) {
@GetMapping("/{orderId}")
fun getOrder(
@PathVariable orderId: String,
@RequestParam(required = false, defaultValue = "false") waitForConsistency: Boolean,
@RequestParam(required = false, defaultValue = "5000") timeoutMs: Long
): ResponseEntity<OrderReadModel> {
if (waitForConsistency) {
return waitForOrder(orderId, timeoutMs)
}

val order = queryService.findById(orderId)
?: throw OrderNotFoundException(orderId)
return ResponseEntity.ok(order)
}

private fun waitForOrder(orderId: String, timeoutMs: Long): ResponseEntity<OrderReadModel> {
val startTime = System.currentTimeMillis()

while (System.currentTimeMillis() - startTime < timeoutMs) {
queryService.findById(orderId)?.let {
return ResponseEntity.ok(it)
}
Thread.sleep(100)
}

throw OrderNotFoundException(orderId)
}
}

3. Version-based Consistency Check

3. Version-based Consistency Check

@Document(indexName = "orders")
data class OrderReadModel(
@Id
val orderId: String,
val version: Long, // Add version field
// ... other fields
)

@Component
class OrderProjector {
@EventListener
@Async
fun on(event: OrderCreated) {
val readModel = OrderReadModel(
orderId = event.aggregateId,
version = 1,
// ...
)
orderReadModelRepository.save(readModel)
}

@EventListener
@Async
fun on(event: OrderShipped) {
orderReadModelRepository.findById(event.aggregateId).ifPresent { order ->
val updated = order.copy(
status = "SHIPPED",
version = order.version + 1,
// ...
)
orderReadModelRepository.save(updated)
}
}
}

Read Model Rebuild

이벤트 소싱과 결합된 CQRS의 장점 중 하나는 Read Model을 언제든지 재구축할 수 있다는 것입니다.

Read Model Rebuild

One of the advantages of CQRS combined with event sourcing is that you can rebuild the Read Model at any time.

@Service
class ReadModelRebuilder(
private val eventStore: EventStore,
private val orderReadModelRepository: OrderReadModelRepository,
private val orderProjector: OrderProjector
) {
@Async
fun rebuildOrderReadModels() {
logger.info("Starting read model rebuild...")

// Clear existing read models
orderReadModelRepository.deleteAll()

// Replay all events
val aggregateIds = eventStore.getAllAggregateIds("Order")

aggregateIds.forEach { aggregateId ->
val events = eventStore.getEvents(aggregateId)
events.forEach { event ->
when (event) {
is OrderCreated -> orderProjector.on(event)
is OrderShipped -> orderProjector.on(event)
is OrderCancelled -> orderProjector.on(event)
}
}
}

logger.info("Read model rebuild completed for ${aggregateIds.size} aggregates")
}
}

정리

CQRS 적용 시 고려사항:

장점단점
읽기/쓰기 독립적 최적화복잡성 증가
독립적 스케일링최종적 일관성 처리 필요
유연한 쿼리 모델인프라 비용 증가
Event Sourcing과 자연스러운 결합러닝 커브

다음 글에서는 분산 트랜잭션을 처리하는 Saga Pattern을 다루겠습니다.

Summary

Considerations when applying CQRS:

AdvantagesDisadvantages
Independent optimization for read/writeIncreased complexity
Independent scalingNeed to handle eventual consistency
Flexible query modelIncreased infrastructure costs
Natural integration with Event SourcingLearning curve

In the next post, we will cover the Saga Pattern for handling distributed transactions.