본문으로 건너뛰기

프로덕션 레디 이벤트 기반 아키텍처 Part 4 - Saga Pattern으로 분산 트랜잭션 처리

시리즈 소개

Series Introduction

이 시리즈는 프로덕션 환경에서 사용할 수 있는 이벤트 기반 아키텍처를 구축하는 방법을 다룹니다.

  1. Part 1: Event Sourcing 기초
  2. Part 2: Outbox Pattern 구현
  3. Part 3: CQRS와 Read/Write 모델 분리
  4. Part 4: Saga Pattern으로 분산 트랜잭션 처리 (현재 글)
  5. Part 5: Event Schema 진화와 버전 관리

This series covers how to build event-driven architecture that can be used in production environments.

  1. Part 1: Event Sourcing Basics
  2. Part 2: Implementing Outbox Pattern
  3. Part 3: CQRS and Read/Write Model Separation
  4. Part 4: Handling Distributed Transactions with Saga Pattern (current post)
  5. Part 5: Event Schema Evolution and Version Management

분산 트랜잭션의 문제

마이크로서비스 환경에서는 하나의 비즈니스 트랜잭션이 여러 서비스에 걸쳐 있습니다.

예: 주문 처리 프로세스

The Problem of Distributed Transactions

In a microservices environment, a single business transaction spans multiple services.

Example: Order Processing Flow

주문 생성 → 재고 확인 → 결제 처리 → 배송 예약

각 단계가 다른 서비스에서 처리되며, 전통적인 ACID 트랜잭션을 사용할 수 없습니다.

Each step is handled by a different service, and traditional ACID transactions cannot be used.

Saga Pattern 소개

Saga는 일련의 로컬 트랜잭션으로 구성됩니다. 각 로컬 트랜잭션은 다음 트랜잭션을 트리거하고, 실패 시 이전 트랜잭션을 취소하는 보상 트랜잭션을 실행합니다.

두 가지 구현 방식

  1. Choreography: 각 서비스가 이벤트를 발행하고 구독
  2. Orchestration: 중앙 조정자가 Saga를 관리

Introduction to Saga Pattern

A Saga consists of a series of local transactions. Each local transaction triggers the next transaction, and in case of failure, executes a compensating transaction to undo previous transactions.

Two Implementation Approaches

  1. Choreography: Each service publishes and subscribes to events
  2. Orchestration: A central coordinator manages the Saga

Choreography 방식

아키텍처

Choreography Approach

Architecture

┌─────────┐     ┌─────────┐     ┌─────────┐     ┌─────────┐
│ Order │────▶│Inventory│────▶│ Payment │────▶│Shipping │
│ Service │ │ Service │ │ Service │ │ Service │
└────┬────┘ └────┬────┘ └────┬────┘ └────┬────┘
│ │ │ │
│ OrderCreated │ StockReserved │PaymentCompleted│
└───────────────┴───────────────┴───────────────┘
Event Bus (Kafka)

Order Service

@Service
class OrderService(
private val orderRepository: OrderRepository,
private val eventPublisher: OrderEventPublisher
) {
@Transactional
fun createOrder(command: CreateOrderCommand): Order {
val order = Order.create(
customerId = command.customerId,
items = command.items
)

orderRepository.save(order)

eventPublisher.publish(
OrderCreated(
orderId = order.id,
customerId = order.customerId,
items = order.items.map { OrderItemDto(it) },
totalAmount = order.totalAmount
)
)

return order
}

@TransactionalEventListener
fun onPaymentCompleted(event: PaymentCompleted) {
val order = orderRepository.findById(event.orderId)
?: throw OrderNotFoundException(event.orderId)

order.markAsPaid()
orderRepository.save(order)
}

@TransactionalEventListener
fun onStockReservationFailed(event: StockReservationFailed) {
val order = orderRepository.findById(event.orderId)
?: throw OrderNotFoundException(event.orderId)

order.cancel("Stock reservation failed: ${event.reason}")
orderRepository.save(order)

eventPublisher.publish(OrderCancelled(order.id, event.reason))
}
}

Inventory Service

@Service
class InventoryService(
private val inventoryRepository: InventoryRepository,
private val eventPublisher: InventoryEventPublisher
) {
@KafkaListener(topics = ["order-events"])
@Transactional
fun onOrderCreated(event: OrderCreated) {
try {
val reservations = event.items.map { item ->
val inventory = inventoryRepository.findByProductId(item.productId)
?: throw ProductNotFoundException(item.productId)

if (inventory.availableQuantity < item.quantity) {
throw InsufficientStockException(item.productId)
}

inventory.reserve(item.quantity)
inventoryRepository.save(inventory)

StockReservation(
productId = item.productId,
quantity = item.quantity,
reservationId = UUID.randomUUID().toString()
)
}

eventPublisher.publish(
StockReserved(
orderId = event.orderId,
reservations = reservations
)
)
} catch (e: Exception) {
eventPublisher.publish(
StockReservationFailed(
orderId = event.orderId,
reason = e.message ?: "Unknown error"
)
)
}
}

@KafkaListener(topics = ["payment-events"])
@Transactional
fun onPaymentFailed(event: PaymentFailed) {
// 보상 트랜잭션: 예약된 재고 해제
val reservations = stockReservationRepository.findByOrderId(event.orderId)

reservations.forEach { reservation ->
val inventory = inventoryRepository.findByProductId(reservation.productId)!!
inventory.releaseReservation(reservation.quantity)
inventoryRepository.save(inventory)
}

stockReservationRepository.deleteAll(reservations)

eventPublisher.publish(
StockReleased(orderId = event.orderId)
)
}
}

Payment Service

@Service
class PaymentService(
private val paymentRepository: PaymentRepository,
private val paymentGateway: PaymentGateway,
private val eventPublisher: PaymentEventPublisher
) {
@KafkaListener(topics = ["inventory-events"])
@Transactional
fun onStockReserved(event: StockReserved) {
try {
val order = orderClient.getOrder(event.orderId)

val paymentResult = paymentGateway.charge(
customerId = order.customerId,
amount = order.totalAmount
)

val payment = Payment(
orderId = event.orderId,
amount = order.totalAmount,
transactionId = paymentResult.transactionId,
status = PaymentStatus.COMPLETED
)
paymentRepository.save(payment)

eventPublisher.publish(
PaymentCompleted(
orderId = event.orderId,
transactionId = paymentResult.transactionId
)
)
} catch (e: PaymentException) {
eventPublisher.publish(
PaymentFailed(
orderId = event.orderId,
reason = e.message ?: "Payment failed"
)
)
}
}
}

Choreography의 장단점

장점단점
느슨한 결합전체 흐름 파악 어려움
단순한 구현순환 의존성 위험
서비스 자율성디버깅 어려움
확장성테스트 복잡

Pros and Cons of Choreography

ProsCons
Loose couplingDifficult to understand overall flow
Simple implementationRisk of circular dependencies
Service autonomyDifficult to debug
ScalabilityComplex testing

Orchestration 방식

아키텍처

Orchestration Approach

Architecture

                    ┌─────────────────┐
│ Saga │
│ Orchestrator │
└────────┬────────┘

┌───────────────────┼───────────────────┐
│ │ │
▼ ▼ ▼
┌─────────┐ ┌─────────┐ ┌─────────┐
│ Order │ │Inventory│ │ Payment │
│ Service │ │ Service │ │ Service │
└─────────┘ └─────────┘ └─────────┘

Saga State Machine

enum class OrderSagaState {
STARTED,
STOCK_RESERVING,
STOCK_RESERVED,
STOCK_RESERVATION_FAILED,
PAYMENT_PROCESSING,
PAYMENT_COMPLETED,
PAYMENT_FAILED,
SHIPPING_SCHEDULED,
COMPLETED,
COMPENSATING,
COMPENSATED,
FAILED
}

enum class OrderSagaEvent {
START,
STOCK_RESERVE_SUCCESS,
STOCK_RESERVE_FAIL,
PAYMENT_SUCCESS,
PAYMENT_FAIL,
SHIPPING_SUCCESS,
SHIPPING_FAIL,
COMPENSATE_COMPLETE
}

@Entity
@Table(name = "order_saga")
class OrderSaga(
@Id
val sagaId: String = UUID.randomUUID().toString(),

@Column(nullable = false)
val orderId: String,

@Enumerated(EnumType.STRING)
var state: OrderSagaState = OrderSagaState.STARTED,

@Column(columnDefinition = "TEXT")
var context: String = "{}", // JSON 형태로 컨텍스트 저장

val createdAt: Instant = Instant.now(),
var updatedAt: Instant = Instant.now()
)

Saga Orchestrator

@Service
class OrderSagaOrchestrator(
private val sagaRepository: OrderSagaRepository,
private val inventoryClient: InventoryClient,
private val paymentClient: PaymentClient,
private val shippingClient: ShippingClient,
private val orderClient: OrderClient,
private val objectMapper: ObjectMapper
) {
private val logger = LoggerFactory.getLogger(javaClass)

@Transactional
fun startSaga(orderId: String): OrderSaga {
val saga = OrderSaga(orderId = orderId)
sagaRepository.save(saga)

processNextStep(saga)
return saga
}

@Transactional
fun handleEvent(sagaId: String, event: OrderSagaEvent, payload: Any? = null) {
val saga = sagaRepository.findById(sagaId)
?: throw SagaNotFoundException(sagaId)

val newState = transition(saga.state, event)
saga.state = newState
saga.updatedAt = Instant.now()

if (payload != null) {
val context = objectMapper.readTree(saga.context).deepCopy() as ObjectNode
context.set<ObjectNode>(event.name, objectMapper.valueToTree(payload))
saga.context = objectMapper.writeValueAsString(context)
}

sagaRepository.save(saga)

processNextStep(saga)
}

private fun transition(currentState: OrderSagaState, event: OrderSagaEvent): OrderSagaState {
return when (currentState to event) {
OrderSagaState.STARTED to OrderSagaEvent.START ->
OrderSagaState.STOCK_RESERVING

OrderSagaState.STOCK_RESERVING to OrderSagaEvent.STOCK_RESERVE_SUCCESS ->
OrderSagaState.STOCK_RESERVED

OrderSagaState.STOCK_RESERVING to OrderSagaEvent.STOCK_RESERVE_FAIL ->
OrderSagaState.STOCK_RESERVATION_FAILED

OrderSagaState.STOCK_RESERVED to OrderSagaEvent.START ->
OrderSagaState.PAYMENT_PROCESSING

OrderSagaState.PAYMENT_PROCESSING to OrderSagaEvent.PAYMENT_SUCCESS ->
OrderSagaState.PAYMENT_COMPLETED

OrderSagaState.PAYMENT_PROCESSING to OrderSagaEvent.PAYMENT_FAIL ->
OrderSagaState.COMPENSATING

OrderSagaState.PAYMENT_COMPLETED to OrderSagaEvent.SHIPPING_SUCCESS ->
OrderSagaState.COMPLETED

OrderSagaState.PAYMENT_COMPLETED to OrderSagaEvent.SHIPPING_FAIL ->
OrderSagaState.COMPENSATING

OrderSagaState.COMPENSATING to OrderSagaEvent.COMPENSATE_COMPLETE ->
OrderSagaState.COMPENSATED

else -> throw InvalidStateTransitionException(currentState, event)
}
}

private fun processNextStep(saga: OrderSaga) {
when (saga.state) {
OrderSagaState.STARTED -> {
handleEvent(saga.sagaId, OrderSagaEvent.START)
}

OrderSagaState.STOCK_RESERVING -> {
reserveStock(saga)
}

OrderSagaState.STOCK_RESERVED -> {
handleEvent(saga.sagaId, OrderSagaEvent.START)
}

OrderSagaState.PAYMENT_PROCESSING -> {
processPayment(saga)
}

OrderSagaState.PAYMENT_COMPLETED -> {
scheduleShipping(saga)
}

OrderSagaState.COMPENSATING -> {
compensate(saga)
}

OrderSagaState.COMPLETED -> {
logger.info("Saga completed successfully: ${saga.sagaId}")
}

OrderSagaState.COMPENSATED -> {
logger.info("Saga compensated: ${saga.sagaId}")
markOrderAsFailed(saga)
}

else -> {
logger.warn("No action for state: ${saga.state}")
}
}
}

@Async
fun reserveStock(saga: OrderSaga) {
try {
val order = orderClient.getOrder(saga.orderId)
val result = inventoryClient.reserveStock(
ReserveStockRequest(
orderId = saga.orderId,
items = order.items
)
)
handleEvent(saga.sagaId, OrderSagaEvent.STOCK_RESERVE_SUCCESS, result)
} catch (e: Exception) {
logger.error("Stock reservation failed for saga: ${saga.sagaId}", e)
handleEvent(saga.sagaId, OrderSagaEvent.STOCK_RESERVE_FAIL, e.message)
}
}

@Async
fun processPayment(saga: OrderSaga) {
try {
val order = orderClient.getOrder(saga.orderId)
val result = paymentClient.processPayment(
ProcessPaymentRequest(
orderId = saga.orderId,
customerId = order.customerId,
amount = order.totalAmount
)
)
handleEvent(saga.sagaId, OrderSagaEvent.PAYMENT_SUCCESS, result)
} catch (e: Exception) {
logger.error("Payment failed for saga: ${saga.sagaId}", e)
handleEvent(saga.sagaId, OrderSagaEvent.PAYMENT_FAIL, e.message)
}
}

@Async
fun scheduleShipping(saga: OrderSaga) {
try {
val result = shippingClient.scheduleShipping(
ScheduleShippingRequest(orderId = saga.orderId)
)
handleEvent(saga.sagaId, OrderSagaEvent.SHIPPING_SUCCESS, result)
} catch (e: Exception) {
logger.error("Shipping scheduling failed for saga: ${saga.sagaId}", e)
handleEvent(saga.sagaId, OrderSagaEvent.SHIPPING_FAIL, e.message)
}
}

@Async
fun compensate(saga: OrderSaga) {
logger.info("Starting compensation for saga: ${saga.sagaId}")

val context = objectMapper.readTree(saga.context)

// 역순으로 보상 트랜잭션 실행
try {
// Payment 보상 (있는 경우)
if (context.has("PAYMENT_SUCCESS")) {
val paymentInfo = context.get("PAYMENT_SUCCESS")
paymentClient.refund(
RefundRequest(
orderId = saga.orderId,
transactionId = paymentInfo.get("transactionId").asText()
)
)
}

// Stock 보상 (있는 경우)
if (context.has("STOCK_RESERVE_SUCCESS")) {
inventoryClient.releaseStock(
ReleaseStockRequest(orderId = saga.orderId)
)
}

handleEvent(saga.sagaId, OrderSagaEvent.COMPENSATE_COMPLETE)
} catch (e: Exception) {
logger.error("Compensation failed for saga: ${saga.sagaId}", e)
// 보상 실패는 수동 개입 필요
markForManualIntervention(saga, e)
}
}

private fun markOrderAsFailed(saga: OrderSaga) {
orderClient.updateOrderStatus(saga.orderId, "FAILED")
}

private fun markForManualIntervention(saga: OrderSaga, error: Exception) {
saga.state = OrderSagaState.FAILED
sagaRepository.save(saga)
// Alert, 알림 등 수동 개입 요청
}
}

Saga Step Definition (더 우아한 방법)

Saga Step Definition (A More Elegant Approach)

@Configuration
class OrderSagaDefinition {

@Bean
fun orderSaga(): SagaDefinition<OrderSagaData> {
return SagaDefinition.builder<OrderSagaData>()
.step("reserve-stock")
.invokeParticipant { data -> inventoryClient.reserveStock(data.orderId, data.items) }
.withCompensation { data -> inventoryClient.releaseStock(data.orderId) }
.step("process-payment")
.invokeParticipant { data -> paymentClient.charge(data.orderId, data.amount) }
.withCompensation { data -> paymentClient.refund(data.orderId) }
.step("schedule-shipping")
.invokeParticipant { data -> shippingClient.schedule(data.orderId) }
.withCompensation { data -> shippingClient.cancel(data.orderId) }
.build()
}
}

data class OrderSagaData(
val orderId: String,
val customerId: String,
val items: List<OrderItem>,
val amount: BigDecimal
)

Orchestration의 장단점

장점단점
전체 흐름 파악 용이중앙 집중화
쉬운 디버깅단일 장애점
명확한 책임Orchestrator 복잡도
테스트 용이결합도 증가

Pros and Cons of Orchestration

ProsCons
Easy to understand overall flowCentralization
Easy debuggingSingle point of failure
Clear responsibilitiesOrchestrator complexity
Easy testingIncreased coupling

보상 트랜잭션 전략

Semantic Lock

Compensating Transaction Strategies

Semantic Lock

@Entity
class StockReservation(
@Id
val id: String = UUID.randomUUID().toString(),
val productId: String,
val orderId: String,
val quantity: Int,
val status: ReservationStatus = ReservationStatus.RESERVED,
val expiresAt: Instant = Instant.now().plus(Duration.ofMinutes(15))
)

enum class ReservationStatus {
RESERVED, CONFIRMED, RELEASED, EXPIRED
}

@Scheduled(fixedDelay = 60000)
fun releaseExpiredReservations() {
val expired = reservationRepository.findExpiredReservations(Instant.now())
expired.forEach { reservation ->
releaseReservation(reservation)
}
}

Compensating Transaction Log

Compensating Transaction Log

@Entity
@Table(name = "compensation_log")
class CompensationLog(
@Id
val id: String = UUID.randomUUID().toString(),
val sagaId: String,
val stepName: String,
val compensationData: String,
var status: CompensationStatus = CompensationStatus.PENDING,
val createdAt: Instant = Instant.now(),
var executedAt: Instant? = null
)

@Service
class CompensationExecutor(
private val compensationLogRepository: CompensationLogRepository
) {
@Scheduled(fixedDelay = 10000)
@Transactional
fun executePendingCompensations() {
val pending = compensationLogRepository.findByStatus(CompensationStatus.PENDING)

pending.forEach { log ->
try {
executeCompensation(log)
log.status = CompensationStatus.COMPLETED
log.executedAt = Instant.now()
} catch (e: Exception) {
log.status = CompensationStatus.FAILED
// 재시도 로직 또는 알림
}
compensationLogRepository.save(log)
}
}
}

모니터링과 가시성

Saga 상태 조회 API

Monitoring and Observability

Saga Status Query API

@RestController
@RequestMapping("/api/sagas")
class SagaController(
private val sagaRepository: OrderSagaRepository
) {
@GetMapping("/{sagaId}")
fun getSaga(@PathVariable sagaId: String): ResponseEntity<SagaStatusResponse> {
val saga = sagaRepository.findById(sagaId)
?: throw SagaNotFoundException(sagaId)

return ResponseEntity.ok(
SagaStatusResponse(
sagaId = saga.sagaId,
orderId = saga.orderId,
state = saga.state.name,
context = saga.context,
createdAt = saga.createdAt,
updatedAt = saga.updatedAt
)
)
}

@GetMapping
fun listSagas(
@RequestParam(required = false) state: OrderSagaState?,
pageable: Pageable
): ResponseEntity<Page<SagaStatusResponse>> {
val sagas = if (state != null) {
sagaRepository.findByState(state, pageable)
} else {
sagaRepository.findAll(pageable)
}

return ResponseEntity.ok(sagas.map { SagaStatusResponse(it) })
}
}

정리

Saga Pattern 선택 가이드:

상황권장 방식
단순한 플로우 (2-3 서비스)Choreography
복잡한 플로우Orchestration
서비스 자율성 중요Choreography
가시성/디버깅 중요Orchestration
빈번한 플로우 변경Orchestration

다음 글에서는 Event Schema의 진화와 버전 관리를 다루겠습니다.

Summary

Saga Pattern Selection Guide:

SituationRecommended Approach
Simple flow (2-3 services)Choreography
Complex flowOrchestration
Service autonomy is importantChoreography
Observability/debugging is importantOrchestration
Frequent flow changesOrchestration

In the next post, we will cover Event Schema evolution and version management.