본문으로 건너뛰기

프로덕션 레디 이벤트 기반 아키텍처 Part 5 - Event Schema 진화와 버전 관리

시리즈 소개

Series Introduction

이 시리즈는 프로덕션 환경에서 사용할 수 있는 이벤트 기반 아키텍처를 구축하는 방법을 다룹니다.

  1. Part 1: Event Sourcing 기초
  2. Part 2: Outbox Pattern 구현
  3. Part 3: CQRS와 Read/Write 모델 분리
  4. Part 4: Saga Pattern으로 분산 트랜잭션 처리
  5. Part 5: Event Schema 진화와 버전 관리 (현재 글)

This series covers how to build event-driven architecture that can be used in production environments.

  1. Part 1: Event Sourcing Basics
  2. Part 2: Implementing Outbox Pattern
  3. Part 3: CQRS and Read/Write Model Separation
  4. Part 4: Distributed Transaction Handling with Saga Pattern
  5. Part 5: Event Schema Evolution and Versioning (current post)

스키마 진화의 필요성

이벤트 기반 시스템은 시간이 지나면서 변화합니다:

  • 새로운 필드 추가
  • 기존 필드 제거
  • 필드 타입 변경
  • 필드 이름 변경

이러한 변경이 기존 Consumer를 깨뜨리지 않도록 해야 합니다.

The Need for Schema Evolution

Event-driven systems change over time:

  • Adding new fields
  • Removing existing fields
  • Changing field types
  • Renaming fields

These changes must not break existing Consumers.

호환성 유형

1. Backward Compatibility (하위 호환성)

새로운 스키마로 이전 데이터를 읽을 수 있음

Compatibility Types

1. Backward Compatibility

New schema can read old data

Producer (v1) ──▶ [Event v1] ──▶ Consumer (v2) ✓

허용되는 변경:

  • 기본값이 있는 새 필드 추가
  • 선택적 필드 제거

2. Forward Compatibility (상위 호환성)

이전 스키마로 새로운 데이터를 읽을 수 있음

Allowed changes:

  • Adding new fields with default values
  • Removing optional fields

2. Forward Compatibility

Old schema can read new data

Producer (v2) ──▶ [Event v2] ──▶ Consumer (v1) ✓

허용되는 변경:

  • 기본값이 있는 필드 제거
  • 선택적 필드 추가

3. Full Compatibility (완전 호환성)

양방향 모두 호환

Allowed changes:

  • Removing fields with default values
  • Adding optional fields

3. Full Compatibility

Compatible in both directions

Producer (v1) ◀──▶ Consumer (v2) ✓
Producer (v2) ◀──▶ Consumer (v1) ✓

가장 안전하지만 제약이 많음.

Avro를 활용한 스키마 관리

Avro Schema 정의

Most safe but has the most restrictions.

Schema Management with Avro

Avro Schema Definition

{
"type": "record",
"name": "OrderCreated",
"namespace": "com.example.events.order",
"fields": [
{
"name": "orderId",
"type": "string"
},
{
"name": "customerId",
"type": "string"
},
{
"name": "items",
"type": {
"type": "array",
"items": {
"type": "record",
"name": "OrderItem",
"fields": [
{"name": "productId", "type": "string"},
{"name": "quantity", "type": "int"},
{"name": "price", "type": "double"}
]
}
}
},
{
"name": "totalAmount",
"type": "double"
},
{
"name": "occurredAt",
"type": {
"type": "long",
"logicalType": "timestamp-millis"
}
}
]
}

스키마 진화 예제

V1 → V2: 새 필드 추가 (하위 호환)

Schema Evolution Example

V1 → V2: Adding New Fields (Backward Compatible)

{
"type": "record",
"name": "OrderCreated",
"namespace": "com.example.events.order",
"fields": [
{"name": "orderId", "type": "string"},
{"name": "customerId", "type": "string"},
{"name": "items", "type": {"type": "array", "items": "OrderItem"}},
{"name": "totalAmount", "type": "double"},
{"name": "occurredAt", "type": {"type": "long", "logicalType": "timestamp-millis"}},
{
"name": "currency",
"type": "string",
"default": "KRW"
},
{
"name": "metadata",
"type": ["null", {"type": "map", "values": "string"}],
"default": null
}
]
}

Spring Boot + Avro 설정

Spring Boot + Avro Configuration

// build.gradle.kts
plugins {
id("com.github.davidmc24.gradle.plugin.avro") version "1.9.1"
}

dependencies {
implementation("org.apache.avro:avro:1.11.3")
implementation("io.confluent:kafka-avro-serializer:7.5.0")
}

avro {
setCreateSetters(false)
setFieldVisibility("PRIVATE")
}
# application.yml
spring:
kafka:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
schema.registry.url: http://localhost:8081
specific.avro.reader: true

Avro Producer

Avro Producer

@Service
class OrderEventProducer(
private val kafkaTemplate: KafkaTemplate<String, OrderCreated>
) {
fun publishOrderCreated(order: Order) {
val event = OrderCreated.newBuilder()
.setOrderId(order.id)
.setCustomerId(order.customerId)
.setItems(order.items.map { item ->
OrderItem.newBuilder()
.setProductId(item.productId)
.setQuantity(item.quantity)
.setPrice(item.price.toDouble())
.build()
})
.setTotalAmount(order.totalAmount.toDouble())
.setOccurredAt(Instant.now().toEpochMilli())
.setCurrency("KRW") // v2 필드
.setMetadata(null) // v2 필드
.build()

kafkaTemplate.send("order-events", order.id, event)
}
}

Avro Consumer (V1 Consumer가 V2 메시지 처리)

Avro Consumer (V1 Consumer Processing V2 Messages)

@Component
class OrderEventConsumer {

@KafkaListener(topics = ["order-events"])
fun handleOrderCreated(event: OrderCreated) {
// ko: V1 Consumer는 currency, metadata 필드를 무시
// ko: Avro가 자동으로 처리
// en: V1 Consumer ignores currency, metadata fields
// en: Avro handles this automatically
processOrder(
orderId = event.getOrderId(),
customerId = event.getCustomerId(),
items = event.getItems(),
totalAmount = event.getTotalAmount()
)
}
}

Protocol Buffers를 활용한 스키마 관리

Proto 파일 정의

Schema Management with Protocol Buffers

Proto File Definition

syntax = "proto3";

package com.example.events.order;

option java_multiple_files = true;
option java_package = "com.example.events.order";

import "google/protobuf/timestamp.proto";

message OrderCreated {
string order_id = 1;
string customer_id = 2;
repeated OrderItem items = 3;
double total_amount = 4;
google.protobuf.Timestamp occurred_at = 5;

// V2 additions
string currency = 6; // ko: 새 필드 (선택적) / en: new field (optional)
map<string, string> metadata = 7; // ko: 새 필드 (선택적) / en: new field (optional)
}

message OrderItem {
string product_id = 1;
int32 quantity = 2;
double price = 3;
}

Protobuf 진화 규칙

Protobuf Evolution Rules

// ko: 안전한 변경들:
// ko: 1. 새 필드 추가 (고유한 필드 번호 사용)
// en: Safe changes:
// en: 1. Adding new fields (using unique field numbers)
message OrderCreatedV2 {
string order_id = 1;
string customer_id = 2;
repeated OrderItem items = 3;
double total_amount = 4;
google.protobuf.Timestamp occurred_at = 5;
string currency = 6; // ko: 새 필드 / en: new field
string shipping_method = 7; // ko: 새 필드 / en: new field
}

// ko: 2. 필드를 optional에서 repeated로 변경 (scalar types)
// ko: 3. 호환되는 타입 간 변경 (int32 <-> int64)
// en: 2. Changing field from optional to repeated (scalar types)
// en: 3. Changing between compatible types (int32 <-> int64)

// ko: 위험한 변경들 (하지 말 것):
// ko: 1. 필드 번호 변경
// ko: 2. 필드 타입을 비호환 타입으로 변경
// ko: 3. required 필드 추가 (proto2)
// en: Dangerous changes (don't do):
// en: 1. Changing field numbers
// en: 2. Changing field type to incompatible type
// en: 3. Adding required fields (proto2)

Spring Boot + Protobuf

Spring Boot + Protobuf

// build.gradle.kts
plugins {
id("com.google.protobuf") version "0.9.4"
}

dependencies {
implementation("com.google.protobuf:protobuf-java:3.25.1")
implementation("io.confluent:kafka-protobuf-serializer:7.5.0")
}

protobuf {
protoc {
artifact = "com.google.protobuf:protoc:3.25.1"
}
}

Schema Registry 활용

Schema Registry 아키텍처

Using Schema Registry

Schema Registry Architecture

┌──────────────┐         ┌──────────────┐         ┌──────────────┐
│ Producer │────────▶│ Kafka │────────▶│ Consumer │
└──────┬───────┘ └──────────────┘ └──────┬───────┘
│ │
│ Register Schema Get Schema │
│ │
▼ ▼
┌─────────────────────────────────────────────────────────────────┐
│ Schema Registry │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ Subject: order-events-value │ │
│ │ ├── Version 1: OrderCreated (v1) │ │
│ │ ├── Version 2: OrderCreated (v2) + currency │ │
│ │ └── Version 3: OrderCreated (v3) + metadata │ │
│ └─────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘

Docker Compose 설정

Docker Compose Configuration

version: '3.8'
services:
schema-registry:
image: confluentinc/cp-schema-registry:7.5.0
hostname: schema-registry
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka:29092
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081

Schema Registry API 활용

Using Schema Registry API

@Configuration
class SchemaRegistryConfig {

@Bean
fun schemaRegistryClient(): SchemaRegistryClient {
return CachedSchemaRegistryClient(
"http://localhost:8081",
100 // max schemas to cache
)
}
}

@Service
class SchemaService(
private val schemaRegistryClient: SchemaRegistryClient
) {
fun registerSchema(subject: String, schema: Schema): Int {
return schemaRegistryClient.register(subject, schema)
}

fun getLatestSchema(subject: String): Schema {
val metadata = schemaRegistryClient.getLatestSchemaMetadata(subject)
return Schema.Parser().parse(metadata.schema)
}

fun checkCompatibility(subject: String, schema: Schema): Boolean {
return schemaRegistryClient.testCompatibility(subject, schema)
}

fun getSchemaVersions(subject: String): List<Int> {
return schemaRegistryClient.getAllVersions(subject)
}
}

호환성 설정

Compatibility Configuration

# ko: Subject 레벨 호환성 설정
# en: Subject-level compatibility configuration
curl -X PUT http://localhost:8081/config/order-events-value \
-H "Content-Type: application/vnd.schemaregistry.v1+json" \
-d '{"compatibility": "BACKWARD"}'

# ko: 가능한 호환성 수준:
# ko: - BACKWARD (기본값)
# en: Available compatibility levels:
# en: - BACKWARD (default)
# - BACKWARD_TRANSITIVE
# - FORWARD
# - FORWARD_TRANSITIVE
# - FULL
# - FULL_TRANSITIVE
# - NONE

실전 스키마 진화 전략

1. 단계별 롤아웃

Day 1: Consumer V2 배포 (V1, V2 모두 처리 가능)
Day 2: Producer V2 배포 (V2 이벤트 발행 시작)
Day 7: 모니터링 후 V1 Consumer 제거

2. Feature Flag를 활용한 점진적 전환

Practical Schema Evolution Strategies

1. Phased Rollout

Day 1: Deploy Consumer V2 (can handle both V1 and V2)
Day 2: Deploy Producer V2 (starts publishing V2 events)
Day 7: Remove V1 Consumer after monitoring

2. Gradual Transition Using Feature Flags

@Service
class OrderEventProducer(
private val kafkaTemplate: KafkaTemplate<String, GenericRecord>,
private val featureFlagService: FeatureFlagService,
private val schemaV1: Schema,
private val schemaV2: Schema
) {
fun publishOrderCreated(order: Order) {
val event = if (featureFlagService.isEnabled("use-order-event-v2")) {
createV2Event(order)
} else {
createV1Event(order)
}

kafkaTemplate.send("order-events", order.id, event)
}

private fun createV1Event(order: Order): GenericRecord {
return GenericRecordBuilder(schemaV1)
.set("orderId", order.id)
.set("customerId", order.customerId)
.set("totalAmount", order.totalAmount.toDouble())
.build()
}

private fun createV2Event(order: Order): GenericRecord {
return GenericRecordBuilder(schemaV2)
.set("orderId", order.id)
.set("customerId", order.customerId)
.set("totalAmount", order.totalAmount.toDouble())
.set("currency", order.currency)
.set("metadata", order.metadata)
.build()
}
}

3. 다중 스키마 Consumer

3. Multi-Schema Consumer

@Component
class MultiVersionOrderConsumer {

@KafkaListener(topics = ["order-events"])
fun handleOrderEvent(
@Payload record: GenericRecord,
@Header(KafkaHeaders.RECEIVED_KEY) key: String
) {
val schemaVersion = getSchemaVersion(record)

when {
schemaVersion < 2 -> handleV1(record)
schemaVersion < 3 -> handleV2(record)
else -> handleV3(record)
}
}

private fun getSchemaVersion(record: GenericRecord): Int {
return record.schema.getField("currency")?.let { 2 }
?: record.schema.getField("metadata")?.let { 3 }
?: 1
}

private fun handleV1(record: GenericRecord) {
val orderId = record.get("orderId").toString()
val totalAmount = record.get("totalAmount") as Double
// ko: V1 처리 로직 (currency 기본값 사용)
// en: V1 processing logic (using default currency)
processOrder(orderId, totalAmount, "KRW")
}

private fun handleV2(record: GenericRecord) {
val orderId = record.get("orderId").toString()
val totalAmount = record.get("totalAmount") as Double
val currency = record.get("currency")?.toString() ?: "KRW"
processOrder(orderId, totalAmount, currency)
}
}

4. Dead Letter Queue와 스키마 불일치 처리

4. Dead Letter Queue and Schema Mismatch Handling

@Component
class SchemaAwareErrorHandler(
private val deadLetterProducer: KafkaTemplate<String, ByteArray>
) : CommonErrorHandler {

override fun handleRecord(
thrownException: Exception,
record: ConsumerRecord<*, *>,
consumer: Consumer<*, *>,
container: MessageListenerContainer
) {
when (thrownException.cause) {
is SerializationException -> {
// ko: 스키마 불일치 - DLQ로 전송
// en: Schema mismatch - send to DLQ
sendToDeadLetter(record, thrownException)
}
else -> {
// ko: 다른 에러 처리
// en: Handle other errors
throw thrownException
}
}
}

private fun sendToDeadLetter(
record: ConsumerRecord<*, *>,
exception: Exception
) {
val headers = record.headers().toMutableList()
headers.add(RecordHeader("error-reason", exception.message?.toByteArray()))
headers.add(RecordHeader("original-topic", record.topic().toByteArray()))

deadLetterProducer.send(
ProducerRecord(
"${record.topic()}-dlq",
null,
record.key() as? String,
record.value() as? ByteArray,
headers
)
)
}
}

스키마 진화 체크리스트

변경 전

  • 현재 스키마와의 호환성 확인
  • Schema Registry에서 호환성 테스트
  • 모든 Consumer가 새 스키마 처리 가능한지 확인
  • 롤백 계획 수립

변경 중

  • Consumer 먼저 배포
  • 모니터링 강화
  • 점진적 트래픽 전환

변경 후

  • DLQ 모니터링
  • Consumer lag 확인
  • 에러율 모니터링

Schema Evolution Checklist

Before Changes

  • Verify compatibility with current schema
  • Test compatibility in Schema Registry
  • Confirm all Consumers can handle new schema
  • Establish rollback plan

During Changes

  • Deploy Consumer first
  • Strengthen monitoring
  • Gradual traffic transition

After Changes

  • Monitor DLQ
  • Check Consumer lag
  • Monitor error rates

정리

스키마 진화의 핵심 원칙:

원칙설명
항상 하위 호환성 유지새 Consumer가 이전 이벤트를 처리 가능해야 함
필드 번호/이름 재사용 금지삭제된 필드 번호는 영구적으로 예약
기본값 필수새 필드는 항상 기본값 포함
점진적 배포Consumer 먼저, Producer 나중에
Schema Registry 활용중앙 집중식 스키마 관리

이것으로 Event-Driven Architecture 시리즈를 마칩니다. 이 시리즈에서 다룬 패턴들을 조합하면 확장 가능하고 유지보수 가능한 마이크로서비스를 구축할 수 있습니다.

Summary

Core principles of schema evolution:

PrincipleDescription
Always maintain backward compatibilityNew Consumer must be able to process old events
Never reuse field numbers/namesDeleted field numbers are permanently reserved
Default values requiredNew fields must always include default values
Gradual deploymentConsumer first, Producer later
Use Schema RegistryCentralized schema management

This concludes the Event-Driven Architecture series. By combining the patterns covered in this series, you can build scalable and maintainable microservices.