본문으로 건너뛰기

프로덕션 레디 이벤트 기반 아키텍처 Part 2 - Outbox Pattern 구현

시리즈 소개

Series Introduction

이 시리즈는 프로덕션 환경에서 사용할 수 있는 이벤트 기반 아키텍처를 구축하는 방법을 다룹니다.

  1. Part 1: Event Sourcing 기초
  2. Part 2: Outbox Pattern 구현 (현재 글)
  3. Part 3: CQRS와 Read/Write 모델 분리
  4. Part 4: Saga Pattern으로 분산 트랜잭션 처리
  5. Part 5: Event Schema 진화와 버전 관리

This series covers how to build event-driven architectures that are production-ready.

  1. Part 1: Event Sourcing Fundamentals
  2. Part 2: Outbox Pattern Implementation (current post)
  3. Part 3: CQRS and Read/Write Model Separation
  4. Part 4: Distributed Transaction Handling with Saga Pattern
  5. Part 5: Event Schema Evolution and Versioning

문제: 이중 쓰기(Dual Write) 문제

마이크로서비스에서 자주 발생하는 문제를 살펴보겠습니다:

The Problem: Dual Write Issue

Let's examine a common problem that occurs in microservices:

@Service
class OrderService(
private val orderRepository: OrderRepository,
private val kafkaTemplate: KafkaTemplate<String, String>
) {
@Transactional
fun createOrder(command: CreateOrderCommand): Order {
// 1. 데이터베이스에 저장
val order = orderRepository.save(Order.create(command))

// 2. Kafka에 이벤트 발행
kafkaTemplate.send("order-events", OrderCreatedEvent(order))

return order
}
}

이 코드의 문제점:

  • DB 저장 성공 후 Kafka 전송 실패 → 데이터 불일치
  • Kafka 전송 성공 후 트랜잭션 롤백 → 이벤트는 발행되었으나 데이터 없음
  • 두 시스템에 대한 원자적 쓰기가 불가능

Problems with this code:

  • DB save succeeds but Kafka send fails → data inconsistency
  • Kafka send succeeds but transaction rolls back → event published but no data exists
  • Atomic writes to two systems are impossible

해결책: Transactional Outbox Pattern

Outbox Pattern은 이벤트를 같은 트랜잭션 내에서 데이터베이스의 Outbox 테이블에 저장하고, 별도의 프로세스가 이를 메시지 브로커로 전달합니다.

Solution: Transactional Outbox Pattern

The Outbox Pattern stores events in an Outbox table within the same database transaction, and a separate process delivers them to the message broker.

아키텍처

Architecture

┌─────────────────────────────────────────────────────┐
│ Application │
│ ┌─────────────┐ ┌─────────────────────────────┐ │
│ │ Order │ │ Outbox Table │ │
│ │ Table │ │ (Same Transaction) │ │
│ └─────────────┘ └─────────────────────────────┘ │
└─────────────────────────────────────────────────────┘


┌─────────────────┐
│ Outbox Relay │
│ (Polling or │
│ CDC) │
└─────────────────┘


┌─────────────────┐
│ Kafka │
└─────────────────┘

구현 방법 1: Polling Publisher

Implementation Method 1: Polling Publisher

Outbox 테이블 설계

Outbox Table Design

@Entity
@Table(name = "outbox_events")
class OutboxEvent(
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
val id: Long? = null,

@Column(nullable = false)
val aggregateType: String,

@Column(nullable = false)
val aggregateId: String,

@Column(nullable = false)
val eventType: String,

@Column(columnDefinition = "TEXT", nullable = false)
val payload: String,

@Column(nullable = false)
val createdAt: Instant = Instant.now(),

@Enumerated(EnumType.STRING)
var status: OutboxStatus = OutboxStatus.PENDING,

var processedAt: Instant? = null
)

enum class OutboxStatus {
PENDING, PROCESSED, FAILED
}

Service Layer 수정

Service Layer Modification

@Service
class OrderService(
private val orderRepository: OrderRepository,
private val outboxRepository: OutboxRepository,
private val objectMapper: ObjectMapper
) {
@Transactional
fun createOrder(command: CreateOrderCommand): Order {
// 1. Order 저장
val order = orderRepository.save(Order.create(command))

// 2. 같은 트랜잭션 내에서 Outbox에 이벤트 저장
val event = OrderCreatedEvent(
orderId = order.id,
customerId = order.customerId,
items = order.items,
totalAmount = order.totalAmount
)

outboxRepository.save(
OutboxEvent(
aggregateType = "Order",
aggregateId = order.id,
eventType = "OrderCreated",
payload = objectMapper.writeValueAsString(event)
)
)

return order
}
}

Polling Publisher

@Component
class OutboxPollingPublisher(
private val outboxRepository: OutboxRepository,
private val kafkaTemplate: KafkaTemplate<String, String>
) {
private val logger = LoggerFactory.getLogger(javaClass)

@Scheduled(fixedDelay = 1000) // 1초마다 실행
@Transactional
fun publishPendingEvents() {
val pendingEvents = outboxRepository.findByStatusOrderByCreatedAtAsc(
OutboxStatus.PENDING,
PageRequest.of(0, 100)
)

pendingEvents.forEach { event ->
try {
val topic = "${event.aggregateType.lowercase()}-events"

kafkaTemplate.send(topic, event.aggregateId, event.payload)
.get() // 동기 전송으로 확실히 전송

event.status = OutboxStatus.PROCESSED
event.processedAt = Instant.now()
outboxRepository.save(event)

logger.info("Published event: ${event.eventType} for ${event.aggregateId}")
} catch (e: Exception) {
logger.error("Failed to publish event: ${event.id}", e)
event.status = OutboxStatus.FAILED
outboxRepository.save(event)
}
}
}
}

Polling 방식의 한계

  • 지연 시간: 폴링 간격만큼의 지연 발생
  • 부하: 지속적인 DB 폴링으로 인한 부하
  • 중복: 장애 상황에서 중복 전송 가능

Limitations of the Polling Approach

  • Latency: Delay equivalent to polling interval
  • Load: Overhead from continuous DB polling
  • Duplicates: Possible duplicate sends during failure scenarios

구현 방법 2: Debezium CDC (Change Data Capture)

Debezium은 데이터베이스의 변경 사항을 실시간으로 캡처하여 Kafka로 스트리밍합니다.

Implementation Method 2: Debezium CDC (Change Data Capture)

Debezium captures database changes in real-time and streams them to Kafka.

Docker Compose 설정

Docker Compose Configuration

version: '3.8'
services:
postgres:
image: postgres:15
environment:
POSTGRES_DB: orderdb
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
command:
- "postgres"
- "-c"
- "wal_level=logical" # CDC를 위해 필수
ports:
- "5432:5432"

zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181

kafka:
image: confluentinc/cp-kafka:7.5.0
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

connect:
image: debezium/connect:2.4
depends_on:
- kafka
- postgres
ports:
- "8083:8083"
environment:
BOOTSTRAP_SERVERS: kafka:29092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
STATUS_STORAGE_TOPIC: connect_statuses

Debezium Connector 설정

Debezium Connector Configuration

{
"name": "outbox-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname": "orderdb",
"database.server.name": "orderdb",
"table.include.list": "public.outbox_events",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"transforms.outbox.table.field.event.id": "id",
"transforms.outbox.table.field.event.key": "aggregate_id",
"transforms.outbox.table.field.event.type": "event_type",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.route.topic.replacement": "${routedByValue}-events",
"transforms.outbox.table.fields.additional.placement": "aggregate_type:header:aggregateType"
}
}

Connector 등록

Connector Registration

curl -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @outbox-connector.json

Debezium Outbox Event Router

Debezium의 Outbox Event Router는 outbox 테이블의 레코드를 자동으로 적절한 토픽으로 라우팅합니다.

Debezium Outbox Event Router

Debezium's Outbox Event Router automatically routes records from the outbox table to the appropriate topics.

// Outbox 테이블 구조 (Debezium용)
@Entity
@Table(name = "outbox_events")
class OutboxEvent(
@Id
val id: UUID = UUID.randomUUID(),

@Column(name = "aggregate_type", nullable = false)
val aggregateType: String,

@Column(name = "aggregate_id", nullable = false)
val aggregateId: String,

@Column(name = "event_type", nullable = false)
val eventType: String,

@Column(columnDefinition = "JSONB", nullable = false)
val payload: String
)

Service Layer (CDC 방식)

Service Layer (CDC Approach)

@Service
class OrderService(
private val orderRepository: OrderRepository,
private val outboxRepository: OutboxRepository,
private val objectMapper: ObjectMapper
) {
@Transactional
fun createOrder(command: CreateOrderCommand): Order {
val order = orderRepository.save(Order.create(command))

// Outbox에 저장하면 Debezium이 자동으로 Kafka로 전송
outboxRepository.save(
OutboxEvent(
aggregateType = "Order",
aggregateId = order.id,
eventType = "OrderCreated",
payload = objectMapper.writeValueAsString(
OrderCreatedEvent(order)
)
)
)

return order
}
}

Exactly-Once Semantics 보장

Guaranteeing Exactly-Once Semantics

Consumer 측 Idempotency

Consumer-Side Idempotency

@Component
class OrderEventConsumer(
private val processedEventRepository: ProcessedEventRepository
) {
@KafkaListener(topics = ["order-events"])
@Transactional
fun handleOrderEvent(
@Payload payload: String,
@Header(KafkaHeaders.RECEIVED_KEY) key: String,
@Header("id") eventId: String
) {
// 이미 처리된 이벤트인지 확인
if (processedEventRepository.existsById(eventId)) {
logger.info("Event already processed: $eventId")
return
}

// 이벤트 처리
processEvent(payload)

// 처리 완료 기록
processedEventRepository.save(ProcessedEvent(eventId, Instant.now()))
}
}

@Entity
@Table(name = "processed_events")
class ProcessedEvent(
@Id
val eventId: String,
val processedAt: Instant
)

Outbox 테이블 정리

Outbox 테이블이 무한히 커지는 것을 방지하기 위한 정리 작업:

Outbox Table Cleanup

Cleanup job to prevent the Outbox table from growing indefinitely:

@Component
class OutboxCleaner(
private val outboxRepository: OutboxRepository
) {
@Scheduled(cron = "0 0 2 * * *") // 매일 새벽 2시
@Transactional
fun cleanOldEvents() {
val cutoffTime = Instant.now().minus(Duration.ofDays(7))
val deletedCount = outboxRepository.deleteByStatusAndCreatedAtBefore(
OutboxStatus.PROCESSED,
cutoffTime
)
logger.info("Deleted $deletedCount old outbox events")
}
}

성능 최적화

Performance Optimization

Batch Processing

@Component
class BatchOutboxPublisher(
private val outboxRepository: OutboxRepository,
private val kafkaTemplate: KafkaTemplate<String, String>
) {
@Scheduled(fixedDelay = 500)
@Transactional
fun publishBatch() {
val events = outboxRepository.findPendingEvents(limit = 1000)

if (events.isEmpty()) return

val futures = events.map { event ->
val topic = "${event.aggregateType.lowercase()}-events"
kafkaTemplate.send(topic, event.aggregateId, event.payload)
}

// 모든 전송 완료 대기
futures.forEach { it.get() }

// Batch update
outboxRepository.markAsProcessed(events.map { it.id!! })
}
}

정리

Outbox Pattern은 다음과 같은 이점을 제공합니다:

특성PollingCDC (Debezium)
지연 시간폴링 간격거의 실시간
구현 복잡도낮음중간
인프라 요구사항낮음Debezium 필요
DB 부하중간낮음
확장성제한적높음

다음 글에서는 CQRS 패턴을 통해 읽기와 쓰기 모델을 분리하는 방법을 다루겠습니다.

Summary

The Outbox Pattern provides the following benefits:

CharacteristicPollingCDC (Debezium)
LatencyPolling intervalNear real-time
Implementation complexityLowMedium
Infrastructure requirementsLowRequires Debezium
DB loadMediumLow
ScalabilityLimitedHigh

In the next post, we will cover how to separate read and write models using the CQRS pattern.