시리즈 소개
Series Introduction
이 시리즈는 프로덕션 환경에서 사용할 수 있는 이벤트 기반 아키텍처를 구축하는 방법을 다룹니다.
- Part 1: Event Sourcing 기초 (현재 글)
- Part 2: Outbox Pattern 구현
- Part 3: CQRS와 Read/Write 모델 분리
- Part 4: Saga Pattern으로 분산 트랜잭션 처리
- Part 5: Event Schema 진화와 버전 관리
This series covers how to build event-driven architecture that can be used in production environments.
- Part 1: Event Sourcing Basics (this post)
- Part 2: Implementing the Outbox Pattern
- Part 3: CQRS and Read/Write Model Separation
- Part 4: Distributed Transaction Handling with Saga Pattern
- Part 5: Event Schema Evolution and Version Management
Event Sourcing이란?
Event Sourcing은 애플리케이션의 상태를 일련의 이벤트로 저장하는 패턴입니다. 현재 상태를 직접 저장하는 대신, 상태 변경을 일으킨 모든 이벤트를 순서대로 저장합니다.
왜 Event Sourcing인가?
전통적인 CRUD 방식의 문제점:
- 현재 상태만 알 수 있고, 어떻게 그 상태에 도달했는지 알 수 없음
- 감사(Audit) 로그를 별도로 관리해야 함
- 비즈니스 인사이트를 얻기 어려움
Event Sourcing의 장점:
- 완전한 감사 추적(Audit Trail)
- 시간 여행 디버깅 가능
- 이벤트 재생을 통한 상태 복구
- 비즈니스 이벤트 기반의 분석 가능
What is Event Sourcing?
Event Sourcing is a pattern that stores the state of an application as a series of events. Instead of directly storing the current state, it stores all events that caused state changes in order.
Why Event Sourcing?
Problems with traditional CRUD approach:
- You can only know the current state, not how it reached that state
- Audit logs must be managed separately
- Difficult to gain business insights
Benefits of Event Sourcing:
- Complete audit trail
- Time-travel debugging capability
- State recovery through event replay
- Business event-based analytics
핵심 개념
1. Event Store
이벤트를 저장하는 저장소입니다. 이벤트는 불변(immutable)이며, append-only로 저장됩니다.
Core Concepts
1. Event Store
A storage for events. Events are immutable and stored in an append-only manner.
@Entity
@Table(name = "event_store")
class StoredEvent(
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
val id: Long? = null,
@Column(nullable = false)
val aggregateId: String,
@Column(nullable = false)
val aggregateType: String,
@Column(nullable = false)
val eventType: String,
@Column(columnDefinition = "TEXT", nullable = false)
val payload: String,
@Column(nullable = false)
val version: Long,
@Column(nullable = false)
val occurredAt: Instant = Instant.now()
)
2. Aggregate Root
도메인 모델의 진입점으로, 이벤트를 발생시키고 적용하는 역할을 합니다.
2. Aggregate Root
The entry point of the domain model, responsible for generating and applying events.
abstract class AggregateRoot {
@Transient
private val pendingEvents = mutableListOf<DomainEvent>()
var version: Long = 0
protected set
protected fun applyChange(event: DomainEvent) {
applyEvent(event)
pendingEvents.add(event)
}
protected abstract fun applyEvent(event: DomainEvent)
fun getPendingEvents(): List<DomainEvent> = pendingEvents.toList()
fun clearPendingEvents() {
pendingEvents.clear()
}
fun loadFromHistory(events: List<DomainEvent>) {
events.forEach { event ->
applyEvent(event)
version++
}
}
}
3. Domain Event
비즈니스에서 발생한 사실을 나타내는 불변 객체입니다.
3. Domain Event
An immutable object that represents a fact that occurred in the business domain.
interface DomainEvent {
val aggregateId: String
val occurredAt: Instant
}
data class OrderCreated(
override val aggregateId: String,
val customerId: String,
val items: List<OrderItem>,
val totalAmount: BigDecimal,
override val occurredAt: Instant = Instant.now()
) : DomainEvent
data class OrderShipped(
override val aggregateId: String,
val shippingAddress: String,
val trackingNumber: String,
override val occurredAt: Instant = Instant.now()
) : DomainEvent
실제 구현 예제: 주문 시스템
Order Aggregate
Practical Implementation Example: Order System
Order Aggregate
class Order private constructor() : AggregateRoot() {
lateinit var id: String
private set
lateinit var customerId: String
private set
var status: OrderStatus = OrderStatus.CREATED
private set
var items: List<OrderItem> = emptyList()
private set
var totalAmount: BigDecimal = BigDecimal.ZERO
private set
companion object {
fun create(
orderId: String,
customerId: String,
items: List<OrderItem>
): Order {
val order = Order()
val totalAmount = items.sumOf { it.price * it.quantity.toBigDecimal() }
order.applyChange(
OrderCreated(
aggregateId = orderId,
customerId = customerId,
items = items,
totalAmount = totalAmount
)
)
return order
}
fun fromHistory(events: List<DomainEvent>): Order {
val order = Order()
order.loadFromHistory(events)
return order
}
}
fun ship(shippingAddress: String, trackingNumber: String) {
require(status == OrderStatus.CREATED) {
"Order must be in CREATED status to ship"
}
applyChange(
OrderShipped(
aggregateId = id,
shippingAddress = shippingAddress,
trackingNumber = trackingNumber
)
)
}
override fun applyEvent(event: DomainEvent) {
when (event) {
is OrderCreated -> {
id = event.aggregateId
customerId = event.customerId
items = event.items
totalAmount = event.totalAmount
status = OrderStatus.CREATED
}
is OrderShipped -> {
status = OrderStatus.SHIPPED
}
}
}
}
enum class OrderStatus {
CREATED, SHIPPED, DELIVERED, CANCELLED
}
Event Store Repository
Event Store Repository
interface EventStoreRepository : JpaRepository<StoredEvent, Long> {
fun findByAggregateIdOrderByVersionAsc(aggregateId: String): List<StoredEvent>
@Query("SELECT MAX(e.version) FROM StoredEvent e WHERE e.aggregateId = :aggregateId")
fun findLatestVersion(aggregateId: String): Long?
}
@Service
class EventStore(
private val repository: EventStoreRepository,
private val objectMapper: ObjectMapper
) {
fun saveEvents(
aggregateId: String,
aggregateType: String,
events: List<DomainEvent>,
expectedVersion: Long
) {
val currentVersion = repository.findLatestVersion(aggregateId) ?: 0
if (currentVersion != expectedVersion) {
throw OptimisticLockingException(
"Expected version $expectedVersion but found $currentVersion"
)
}
events.forEachIndexed { index, event ->
val storedEvent = StoredEvent(
aggregateId = aggregateId,
aggregateType = aggregateType,
eventType = event::class.simpleName!!,
payload = objectMapper.writeValueAsString(event),
version = expectedVersion + index + 1
)
repository.save(storedEvent)
}
}
fun getEvents(aggregateId: String): List<DomainEvent> {
return repository.findByAggregateIdOrderByVersionAsc(aggregateId)
.map { deserializeEvent(it) }
}
private fun deserializeEvent(storedEvent: StoredEvent): DomainEvent {
val eventClass = when (storedEvent.eventType) {
"OrderCreated" -> OrderCreated::class.java
"OrderShipped" -> OrderShipped::class.java
else -> throw IllegalArgumentException("Unknown event type: ${storedEvent.eventType}")
}
return objectMapper.readValue(storedEvent.payload, eventClass)
}
}
Order Repository (Event Sourced)
Order Repository (Event Sourced)
@Repository
class OrderRepository(private val eventStore: EventStore) {
fun save(order: Order) {
val events = order.getPendingEvents()
if (events.isEmpty()) return
eventStore.saveEvents(
aggregateId = order.id,
aggregateType = "Order",
events = events,
expectedVersion = order.version
)
order.clearPendingEvents()
}
fun findById(orderId: String): Order? {
val events = eventStore.getEvents(orderId)
if (events.isEmpty()) return null
return Order.fromHistory(events)
}
}
Snapshotting
이벤트가 많아지면 Aggregate를 복원하는 데 시간이 오래 걸립니다. Snapshotting은 특정 시점의 상태를 저장하여 복원 시간을 단축합니다.
Snapshotting
As events accumulate, restoring an Aggregate takes longer. Snapshotting reduces restoration time by saving the state at specific points.
@Entity
@Table(name = "snapshots")
class Snapshot(
@Id
val aggregateId: String,
@Column(nullable = false)
val aggregateType: String,
@Column(columnDefinition = "TEXT", nullable = false)
val payload: String,
@Column(nullable = false)
val version: Long,
@Column(nullable = false)
val createdAt: Instant = Instant.now()
)
@Service
class SnapshotStore(
private val snapshotRepository: SnapshotRepository,
private val objectMapper: ObjectMapper
) {
private val snapshotFrequency = 10 // 10개의 이벤트마다 스냅샷 생성
fun shouldCreateSnapshot(version: Long): Boolean {
return version % snapshotFrequency == 0L
}
fun saveSnapshot(aggregateId: String, aggregate: AggregateRoot) {
val snapshot = Snapshot(
aggregateId = aggregateId,
aggregateType = aggregate::class.simpleName!!,
payload = objectMapper.writeValueAsString(aggregate),
version = aggregate.version
)
snapshotRepository.save(snapshot)
}
fun getLatestSnapshot(aggregateId: String): Pair<Order, Long>? {
return snapshotRepository.findById(aggregateId)
.map {
val order = objectMapper.readValue(it.payload, Order::class.java)
order to it.version
}
.orElse(null)
}
}
Projection
이벤트를 기반으로 읽기 최적화된 뷰를 만드는 것을 Projection이라고 합니다.
Projection
Creating read-optimized views based on events is called Projection.
@Entity
@Table(name = "order_summary")
class OrderSummary(
@Id
val orderId: String,
val customerId: String,
val status: String,
val totalAmount: BigDecimal,
val itemCount: Int,
val createdAt: Instant,
var lastModifiedAt: Instant
)
@Component
class OrderProjection(
private val orderSummaryRepository: OrderSummaryRepository
) {
@EventHandler
fun on(event: OrderCreated) {
val summary = OrderSummary(
orderId = event.aggregateId,
customerId = event.customerId,
status = "CREATED",
totalAmount = event.totalAmount,
itemCount = event.items.size,
createdAt = event.occurredAt,
lastModifiedAt = event.occurredAt
)
orderSummaryRepository.save(summary)
}
@EventHandler
fun on(event: OrderShipped) {
orderSummaryRepository.findById(event.aggregateId)
.ifPresent { summary ->
summary.status = "SHIPPED"
summary.lastModifiedAt = event.occurredAt
orderSummaryRepository.save(summary)
}
}
}
정리
Event Sourcing은 다음과 같은 상황에서 특히 유용합니다:
- 감사 추적이 중요한 금융, 의료 시스템
- 이벤트 기반 분석이 필요한 경우
- 복잡한 비즈니스 로직을 가진 도메인
- 시간에 따른 상태 변화 추적이 필요한 경우
다음 글에서는 Event Sourcing과 함께 자주 사용되는 Outbox Pattern을 다루겠습니다.
Summary
Event Sourcing is particularly useful in the following situations:
- Financial and healthcare systems where audit trails are critical
- When event-based analytics are needed
- Domains with complex business logic
- When tracking state changes over time is required
In the next post, we will cover the Outbox Pattern, which is frequently used together with Event Sourcing.