이 글은 https://docs.microsoft.com/en-us/azure/architecture/patterns/cqrs 을 기반으로 작성되었으나 필자의 생각과 별도의 코드가 추가되었다.

 

CQRS란 Command and Query Responsibility Segregation 의 약자로, 데이터 저장소로부터의 읽기와 업데이트 작업을 분리하는 패턴을 말한다. CQRS를 사용하면, 어플리케이션의 퍼포먼스, 확장성, 보안성을 극대화할 수 있다. 또한 CQRS 패턴을 통해 만들어진 시스템의 유연성을 바탕으로, 시간이 지나면서 지속적으로 시스템을 발전시켜나갈 수 있으며, 여러 요청으로부터 들어온 복수의 업데이트 명령들에 대한 충돌도 방지할 수 있다.

 

발생하던 문제점들

전통적인 아키텍처에서는, 데이터베이스에서 데이터를 조회하고 업데이트하는데 같은 데이터 모델이 사용되었다. 간단한 CRUD 작업에 대해서라면, 이것은 문제 없이 동작한다. 하지만 좀 더 복잡한 어플리케이션에서 이러한 접근 방식은 유지 보수를 어렵게 만들 수 있다. 예를 들면, 어플리케이션은 데이터 조회 시, 각기 다른 형태의 DTO를 반환하는 매우 다양한 쿼리들을 수행할 수 있다. 각각 다른 형태의 DTO들에 대해 객체 매핑을 하는 것은 복잡해질 수 있다. 또한 데이터를 쓰거나 업데이트를 할 때는, 복잡한 유효성 검사와 비즈니스 로직이 수행되어야 한다. 결과적으로 이 모든 걸 하나의 데이터 모델이 한다면, 너무 많은 것을 수행하는 복잡한 모델이 되는 것이다.

또한 읽기와 쓰기의 부하는 보통 같지 않다. 따라서 각각에 대해 다른 성능이 요구된다.

 

그밖에도 전통적인 아키텍처에서는 다음과 같은 문제점들이 있다.

  • 읽기와 쓰기 작업에서 사용되는 데이터 표현들이 서로 일치하지 않는 경우가 많다. 그로 인해 일부 작업에서는 필요하지 않은 추가적인 컬럼이나 속성의 업데이트가 이뤄져야 한다.
  • 동일한 데이터 세트에 대해 병렬로 작업이 수행될 때 데이터 경합이 발생할 수 있다.
  • 정보 조회를 위해 요구되는 복잡한 쿼리로 인해 성능에 부정적인 영향을 줄 수 있다.
  • 하나의 데이터 모델이 읽기와 쓰기를 모두 수행하기 때문에, 보안 관리가 복잡해질 수 있다. (예를 들면 사용자 데이터의 경우 비밀번호가 노출되선 안된다)

 

해결책

CQRS는 읽기와 쓰기를 각각 다른 모델로 분리한다. 명령(Command)을 통해 데이터를 쓰고, 쿼리(Query)를 통해 데이터를 읽는다.

  • 명령(Command)은 데이터 중심적이 아니라 수행할 작업 중심이 되어야 한다. 예를 들면 '호텔룸의 상태를 예약됨으로 변경한다'가 아니라 '호텔 룸 예약'과 같이 생성되어야 한다.
  • 명령(Command)은 보통 동기적으로 처리되기보단, 비동기적으로 큐에 쌓인 후 수행된다.
  • 쿼리(Query)는 데이터 베이스를 결코 수정하지 않는다. 쿼리(Query)는 어떠한 도메인 로직도 캡슐화하지 않은 DTO만을 반환한다.

읽기/쓰기 모델들은 서로 격리될 수 있다. 이렇게 읽기/쓰기 모델을 분리하는 것은 어플리케이션 디자인과 구현을 더욱 간단하게 만들어준다. 하지만 CQRS 코드는, ORM 툴을 통해 DB 스키마로부터 자동으로 생성되도록 할 수는 없다는 단점이 있다.

 

좀 더 확실한 격리를 위해, 물리적으로 읽기와 쓰기를 분리할 수 있다. 예를 들면 읽기 DB의 경우, 복잡한 조인문이나 ORM 매핑을 방지하기 위해 materialized view 를 가지는, 조회에 최적화된 별도의 DB 스키마를 가질 수 있다. 단지 다른 DB 스키마가 아니라 아예 다른 타입의 데이터 저장소를 사용할 수도 있다. 예를 들면 쓰기는 RDBMS를 사용하고, 읽기의 경우 몽고디비와 같은 NoSql을 사용하는 것이다.

 

만약 이와 같이 별도의 읽기/쓰기 데이터 저장소가 사용된다면, 반드시 동기화가 이뤄져야 한다. 보통 이는 쓰기 모델이 DB에 수정사항이 발생할 때마다 이벤트를 발행함으로써 이뤄진다. 이때 DB 업데이트와 이벤트 발행은 반드시 하나의 트랜잭션 안에서 이뤄져야 한다.

 

읽기 저장소는 단순히 쓰기 저장소의 레플리카일 수도 있고, 완전히 다른 구조를 가질 수도 있다. 어찌 되었건 읽기와 쓰기를 분리하는 것은 각각의 부하에 알맞게 스케일링을 하는 것을 가능하게 해 준다. 예를 들면, 보통 읽기 저장소가 쓰기보다 훨씬 더 많은 부하를 받게 된다.

 

CQRS의 장점은 아래와 같다.

  • 독립적인 스케일링 : CQRS는 읽기와 쓰기 각각에 대해 독립적으로 스케일링을 하는 것을 가능하게 해준다. 이는 훨씬 더 적은 Lock 경합이 발생하는 것을 가능하게 한다.
  • 최적화된 데이터 스키마 : 읽기 저장소는 쿼리에 최적화된 스키마를 사용할 수 있고, 쓰기 저장소는 쓰기에 최적화된 스키마를 사용할 수 있다.
  • 보안 : 읽기와 쓰기를 분리함으로써 보안 관리가 용이해진다.
  • 관심사 분리 : 읽기와 쓰기에 대한 관심사 분리는, 시스템의 유지 보수를 더 쉽게 해 주고 유연하게 해 준다. 대부분의 복잡한 비즈니스 로직은 쓰기 모델에 들어가고, 상대적으로 읽기 모델은 간단해진다.
  • 간단한 쿼리 : 읽기 저장소의 materialized view를 통해, 복잡한 조인문을 사용하지 않을 수 있다.

 

구현 이슈

CQRS를 구현하기 위해선 다음과 같은 어려움이 있다.

  • 복잡성 : CQRS의 기본 아이디어는 간단하다. 하지만 이 패턴은 만약 이벤트 소싱 패턴을 포함할 경우, 더 복잡해질 수 있다.
  • 메세징 : 메세징이 CQRS의 필수요소는 아니지만, 명령(Command)을 수행하고 업데이트 이벤트를 발행하는 것이 보편적인 사용법이다. 이 경우 어플리케이션은 반드시 메세지 실패나 중복 메세지와 같은 것들에 대한 처리를 해야 한다. 
  • 데이터 일관성 : 만약 읽기/쓰기 저장소가 분리된다면, 읽기 데이터가 최신의 데이터가 아닐 수도 있다. 읽기 저장소에는 쓰기 저장소의 변경 사항들이 반영되어야 하는데, 이에는 딜레이가 생기기 때문이다.

 

CQRS를 사용해야 할 때

CQRS는 다음과 같은 경우에 사용을 고려해볼 수 있다.

  • 많은 사용자가 동일한 데이터에 병렬로 액세스하는 도메인. CQRS는 도메인 레벨에서의 병합 충돌을 최소화할 수 있도록 충분히 세분화된 명령(Command)를 정의하는 것을 가능하게 해 준다.
  • 복잡한 프로세스나 도메인 모델을 통해 가이드되는 작업 기반 사용자 인터페이스. 쓰기 모델은 비즈니스 로직, 유효성 검사 등을 모두 가진 완전한 명령(Command) 처리 기능을 가진다. 쓰기 모델은 관련된 객체들의 집합을 하나의 단위(DDD에서의 aggregate)로 다룰 수 있고, 이 객체들이 항상 일관된 상태를 가지도록 보장할 수 있다. 읽기 모델의 경우 비즈니스 로직이나 유효성 검사 같은 것 없이, 오직 DTO만 반환한다. 읽기 모델은 최종적으로 쓰기 모델과 일치하게 된다. (딜레이는 있을 수 있음)
  • 데이터 읽기의 성능이 데이터 쓰기의 성능과 별도로 조정이 가능해야 할 때. 특히 읽기의 수가 쓰기의 수보다 훨씬 많을 때. 이 경우, 읽기 모델은 스케일 아웃을 하고, 쓰기 모델은 적은 수로 유지할 수 있다. 적은 수의 쓰기 모델은 병향 충돌의 가능성을 최소화해준다.
  • 한 팀은 쓰기 모델에 대한 복잡한 도메인 모델에만 집중해야 하고, 다른 한 팀은 사용자 인터페이스에 대한 읽기 모델에만 집중해야 할 때
  • 시스템이 시간이 지남에 따라 계속해서 진화하고, 여러 버전을 가질 수 있으며, 정기적으로 바뀔 수 있는 경우
  • 다른 시스템과의 통합, 특히 이벤트 소싱과 결합할 때. 서브시스템의 일시적 장애가 다른 시스템에 영향을 줘선 안된다. 

다음과 같은 경우에는 CQRS 사용이 권장되지 않는다.

  • 도메인과 비즈니스 로직이 간단할 때
  • 단순한 CRUD일 때

 

이벤트 소싱과 CQRS 패턴

CQRS 패턴은 이벤트 소싱 패턴과 자주 같이 쓰인다. CQRS 기반 시스템은 분리된 읽기/쓰기 모델을 가지고, 이들은 각자의 작업에 최적화되어있으며 대부분 물리적으로도 다른 저장소에 위치한다. 이벤트 소싱 패턴과 CQRS를 같이 사용할 때는, 이벤트 저장소가 쓰기 모델이 되며, 이것이 메인 저장소가 된다. 읽기 모델은 일반적으로 매우 역정규화된 materialized view를 제공한다. 이 뷰들은 어플리케이션의 요구사항에 최적화되어있으며, 이로 인해 조회 성능을 극대화한다.

 

특정 시점의 실제 데이터를 사용하는 것 대신 쓰기 저장소로서 이벤트 스트림을 사용하는 것은, 하나의 aggregate에 대한 병합 충돌을 방지해주고 성능과 확장성을 극대화시켜준다. 이벤트들은 비동기적으로 읽기 저장소의 materialized view들을 생성하고 변경하는 데 사용된다.

 

이벤트 저장소가 공식 메인 저장소이기 때문에, 시스템이 변경되거나 단순히 읽기 모델이 변경되었을 때, materialized view를 삭제하고 과거의 모든 이벤트를 다시 실행하여 새로운 형태로 생성하는 것도 가능하다. materialized view는 사실상 읽기 전용 캐시라고 보면 된다.

 

CQRS와 이벤트 소싱을 같이 사용할 때는 다음 사항들을 고려해야 한다.

  • 읽기/쓰기 저장소가 분리되어있는 시스템에서는, 이벤트가 실행되어 저장소가 수정되기 전 약간의 딜레이가 있을 수 있다.
  • 이 패턴은 시스템의 복잡성을 증가시킨다. CQRS의 복잡성은 성공적인 구현을 더 어렵게 만들 수도 있다. 하지만 이벤트 소싱 방식을 사용하면 도메인 모델링을 더 쉽게 할 수도 있고, 데이터 변경 이벤트들이 보존되기 때문에 뷰들을 리빌딩하는 것도 쉽게 할 수 있다.
  • 읽기 모델에서 materialized view를 만들어내는 데는 오랜 실행 시간이나 리소스가 필요할 수 있다. 특히 오랜 기간 동안의 데이터 합계나 분석 자료와 같은 것이 필요할 땐 더더욱 그렇다. 이럴 경우, 일정 간격으로 데이터의 스냅샷을 만들어냄으로써 (예: 특정 액션의 총합, 한 엔티티의 현재 상태) 해결할 수 있다.

 

CQRS 예제 구현

여태까지 공부한 내용들을 바탕으로 CQRS 예제를 만들어 보았다. https://www.baeldung.com/cqrs-event-sourcing-java를 참고하였으나 필자의 입맛대로 조금씩 바꾸어 재구성하였다.

사실 코드 중 계속해서 좀 거슬리는(?) 부분들이 보일 수 있다. 중복코드도 있고 enum을 써야할 곳에 String을 쓴 곳도 있고 말이다. CQRS를 체험해보는 것이 주된 목적인 만큼, 좀 거슬리는 부분들은 대충 넘어가도록 하자...^^;;;;

스프링 & 코틀린으로 구현되었으며, CQRS와 이벤트 소싱 패턴을 같이 사용하였다. 전체 소스 코드는 https://github.com/minseokLim/practice/tree/main/cqrs-practice에서 확인할 수 있다.

 

예제에서는 사용자를 생성하고 수정하며, 사용자의 연락처, 주소를 조회하는 기능을 구현할 것이다. 기반이 되는 도메인 클래스는 아래와 같다.

class User(
    val userId: String,
    val name: String,
    var contacts: MutableSet<Contact> = mutableSetOf(),
    var addresses: MutableSet<Address> = mutableSetOf()
)

data class Contact(
    val type: String,
    val detail: String
)

data class Address(
    val city: String,
    val state: String,
    val postalCode: String
)

 

CQRS는 명령(Command)과 조회(Query)를 분리함으로써, 읽기와 쓰기 모델을 분리하는 것이다. 먼저 사용자 생성과 수정에 대한 처리를 해보도록 하자.

 

사용자 생성/수정에 대한 명령(Command)

명령(Command)에 대한 코드는 아래와 같다. 사용자를 생성할 때 연락처와 주소를 함께 집어넣을 수도 있을 것이고, 수정 시 이름을 바꾸는 경우가 발생할 수도 있을 것이다. 하지만 이 예제에서는 아래와 같이 생성하고 수정한다고 가정해보자. 실제 비즈니스 케이스에서도 이처럼 다양한 요구사항에 대한 세분화된 명령(Command)을 생성할 수 있다.

class CreateUserCommand(
    val userId: String,
    val name: String
)

class UpdateUserCommand(
    val userId: String,
    val contacts: Set<Contact>,
    val addresses: Set<Address>
)

 

사용자 이벤트

이 예제에서는 이벤트 소싱 패턴을 사용할 것이다. 즉, 이벤트 저장소가 쓰기 저장소인 동시에 메인 저장소가 될 것이다. 이벤트에 대한 코드는 아래와 같이 구성하였다.

abstract class Event {
    val id = UUID.randomUUID()
    val createdDate = LocalDateTime.now()
}

abstract class UserEvent(
    val userId: String
) : Event()

class UserCreatedEvent(
    userId: String,
    val name: String
) : UserEvent(userId)

class UserContactAddedEvent(
    userId: String,
    val type: String,
    val detail: String
) : UserEvent(userId)

class UserContactRemovedEvent(
    userId: String,
    val type: String,
    val detail: String
) : UserEvent(userId)

class UserAddressAddedEvent(
    userId: String,
    val city: String,
    val state: String,
    val postalCode: String
) : UserEvent(userId)

class UserAddressRemovedEvent(
    userId: String,
    val city: String,
    val state: String,
    val postalCode: String
) : UserEvent(userId)

 

이 이벤트를 저장하는 EventStore라는 클래스를 아래와 같이 생성하였다. 실제 구현 시에는 카프카와 같은 MQ가 사용되겠지만, 이 예제에서는 간단하게 메모리를 사용하였다. 맵이 멀티쓰레드로부터 안전하지 않겠지만 이 예제에서는 그런 세세한 것까지 신경쓰진 않았다.

@Component
class EventStore {
    // userId를 키로 가진 Map
    private val userEventStore = mutableMapOf<String, MutableList<Event>>()

    fun addUserEvent(event: UserEvent) {
        userEventStore[event.userId]?.add(event) ?: userEventStore.put(event.userId, mutableListOf(event))
    }

    fun addUserEvents(events: Collection<UserEvent>) {
        if (events.isNotEmpty()) {
            val userId = events.first().userId
            userEventStore[userId]?.addAll(events) ?: userEventStore.put(userId, ArrayList(events))
        }
    }

    fun getUserEvents(userId: String): List<Event> {
        return userEventStore[userId] ?: emptyList()
    }
}

 

사용자 생성/수정

이벤트 소싱 패턴에서 사용자를 생성/수정한다는 것은 결국 해당 이벤트를 이벤트 저장소에 저장하는 것이 될 것이다. DDD에서 나오는 Aggregate라는 용어를 사용하여 아래와 같이 구현하였다.

UserUtility를 보면, 사용자의 현재 상태를 알기 위해 매번 모든 이벤트를 처음부터 다시 실행하는데, 실제 프로덕션 코드에서 이렇게 하면 정말 많은 부하가 생길 것이다. 보통은 정기적으로 스냅샷을 찍는다거나 RDB를 추가적으로 쓴다거나 하지 않을까... 이건 사실 실무에서 내가 안해봐서 잘 모르겠다.

@Component
class UserAggregate(
    private val eventStore: EventStore
) {
    fun handleCreateUserCommand(command: CreateUserCommand): List<Event> {
        val event = UserCreatedEvent(command.userId, command.name)
        eventStore.addUserEvent(event)
        return listOf(event)
    }

    fun handleUpdateUserCommand(command: UpdateUserCommand): List<Event> {
        val user = UserUtility.recreateUserState(eventStore, command.userId)
        val events = mutableListOf<UserEvent>()

        val userContactAddedEvents = command.contacts
            .filter { it !in user.contacts }
            .map { UserContactAddedEvent(command.userId, it.type, it.detail) }
        events.addAll(userContactAddedEvents)

        val userContactRemovedEvents = user.contacts
            .filter { it !in command.contacts }
            .map { UserContactRemovedEvent(command.userId, it.type, it.detail) }
        events.addAll(userContactRemovedEvents)

        val userAddressAddedEvents = command.addresses
            .filter { it !in user.addresses }
            .map { UserAddressAddedEvent(command.userId, it.city, it.state, it.postalCode) }
        events.addAll(userAddressAddedEvents)

        val userAddressRemovedEvents = user.addresses
            .filter { it !in command.addresses }
            .map { UserAddressRemovedEvent(command.userId, it.city, it.state, it.postalCode) }
        events.addAll(userAddressRemovedEvents)

        eventStore.addUserEvents(events)

        return events
    }
}

object UserUtility {
    fun recreateUserState(store: EventStore, userId: String): User {
        val events = store.getUserEvents(userId)

        if (events.isEmpty() || events[0] !is UserCreatedEvent) {
            throw IllegalArgumentException()
        }

        val userCreatedEvent = events[0] as UserCreatedEvent
        val user = User(userCreatedEvent.userId, userCreatedEvent.name)

        for (i in 1 until events.size) {
            when (val event = events[i]) {
                is UserAddressAddedEvent -> {
                    val address = Address(event.city, event.state, event.postalCode)
                    user.addresses.add(address)
                }
                is UserAddressRemovedEvent -> {
                    val address = Address(event.city, event.state, event.postalCode)
                    user.addresses.remove(address)
                }
                is UserContactAddedEvent -> {
                    val contact = Contact(event.type, event.detail)
                    user.contacts.add(contact)
                }
                is UserContactRemovedEvent -> {
                    val contact = Contact(event.type, event.detail)
                    user.contacts.remove(contact)
                }
            }
        }

        return user
    }
}

 

사용자의 연락처, 주소 쿼리(Query)

사용자는 복수의 연락처들과 주소를 가지고 있다. 특정 사용자의 연락처 타입별 모든 연락처를 조회해야하는 비즈니스 요구사항이 있다고 가정해보자. (예를 들면, 휴대폰 타입의 연락처만 다 조회해와야한다.) 마찬가지로 특정 사용자의 주소 중 경기도에 속한 주소만 가져와야한다고 가정해보자. 쿼리(Query) 클래스는 아래와 같다.

class ContactByTypeQuery(
    val userId: String,
    val type: String
)

class AddressByStateQuery(
    val userId: String,
    val state: String
)

 

이러한 비즈니스 요구사항에서 일반적인 RDB를 쓴다면, 일단 사용자와 연락처/주소가 일대다 관계이기 때문에 별도의 테이블로 구성이 될 것이고 join문이 사용될 것이 때문에, 이에 따른 부하가 생길 것이다. CQRS는 특정 비즈니스 요구사항에 최적화된 읽기 저장소를 갖는 것을 가능하게 해준다. 실제 구현 시에는 몽고디비나 RDB의 materialized view를 사용하겠으나, 이 예제에서는 간단하게 메모리를 사용하겠다.

@Repository
class UserReadRepository {
    val userContactStore = mutableMapOf<String, UserContact>()
    val userAddressStore = mutableMapOf<String, UserAddress>()

    fun getUserContact(userId: String): Optional<UserContact> {
        return Optional.ofNullable(userContactStore[userId])
    }

    fun getUserAddress(userId: String): Optional<UserAddress> {
        return Optional.ofNullable(userAddressStore[userId])
    }
}

class UserContact(
    val contactByType: MutableMap<String, MutableSet<Contact>> = mutableMapOf()
)

class UserAddress(
    val addressByState: MutableMap<String, MutableSet<Address>> = mutableMapOf()
)

 

사용자의 연락처, 주소 조회

아래와 같이 조회할 수 있다.

@Component
class UserProjection(
    private val readRepository: UserReadRepository
) {
    fun handleContactByTypeQuery(query: ContactByTypeQuery): Optional<Set<Contact>> {
        val userContact = readRepository.getUserContact(query.userId).orElseThrow()
        return Optional.ofNullable(userContact.contactByType[query.type])
    }

    fun handleAddressByStateQuery(query: AddressByStateQuery): Optional<Set<Address>> {
        val userAddress = readRepository.getUserAddress(query.userId).orElseThrow()
        return Optional.ofNullable(userAddress.addressByState[query.state])
    }
}

 

쓰기 저장소와 읽기 저장소 동기화

이제 쓰기 저장소의 이벤트들이 지속적으로 읽기 저장소에 동기화가 이뤄진다면, CQRS 구현 완성이다! 먼저 Event객체들을 읽기 저장소에 반영해주는 Projector 클래스를 아래와 같이 구현하였다.

@Component
class UserProjector(
    private val readRepository: UserReadRepository
) {
    fun project(event: Event) {
        when (event) {
            is UserCreatedEvent -> apply(event)
            is UserContactAddedEvent -> apply(event)
            is UserContactRemovedEvent -> apply(event)
            is UserAddressAddedEvent -> apply(event)
            is UserAddressRemovedEvent -> apply(event)
        }
    }

    private fun apply(event: UserCreatedEvent) {
        readRepository.userContactStore[event.userId] = UserContact()
        readRepository.userAddressStore[event.userId] = UserAddress()
    }

    private fun apply(event: UserContactAddedEvent) {
        val userContact = readRepository.getUserContact(event.userId).orElse(UserContact())
        val contacts = userContact.contactByType[event.type] ?: mutableSetOf()
        contacts.add(Contact(event.type, event.detail))
        userContact.contactByType[event.type] = contacts
        readRepository.userContactStore[event.userId] = userContact
    }

    private fun apply(event: UserContactRemovedEvent) {
        val userContact = readRepository.getUserContact(event.userId).orElse(UserContact())
        val contacts = userContact.contactByType[event.type] ?: mutableSetOf()
        contacts.remove(Contact(event.type, event.detail))
    }

    private fun apply(event: UserAddressAddedEvent) {
        val userAddress = readRepository.getUserAddress(event.userId).orElse(UserAddress())
        val addresses = userAddress.addressByState[event.state] ?: mutableSetOf()
        addresses.add(Address(event.city, event.state, event.postalCode))
        userAddress.addressByState[event.state] = addresses
        readRepository.userAddressStore[event.userId] = userAddress
    }

    private fun apply(event: UserAddressRemovedEvent) {
        val userAddress = readRepository.getUserAddress(event.userId).orElse(UserAddress())
        val addresses = userAddress.addressByState[event.state] ?: mutableSetOf()
        addresses.remove(Address(event.city, event.state, event.postalCode))
    }
}

 

동기화가 이뤄지는 시점은, 쓰기 저장소에 변경이 이뤄졌을 때일 것이다. 따라서 EventStore 클래스를 아래와 같이 변경하였다.

@Component
class EventStore(
    private val userProjector: UserProjector
) {
    private val userEventStore = mutableMapOf<String, MutableList<Event>>()
    private val queue: Queue<Event> = LinkedList() // 나름 카프카처럼(?) 큐를 써봤다.

    fun addUserEvent(event: UserEvent) {
        userEventStore[event.userId]?.add(event) ?: userEventStore.put(event.userId, mutableListOf(event))
        queue.offer(event)
        project()
    }

    fun addUserEvents(events: Collection<UserEvent>) {
        if (events.isNotEmpty()) {
            val userId = events.first().userId
            userEventStore[userId]?.addAll(events) ?: userEventStore.put(userId, ArrayList(events))
            queue.addAll(events)
            project()
        }
    }

    fun getUserEvents(userId: String): List<Event> {
        return userEventStore[userId] ?: emptyList()
    }

    // 백그라운드에서 비동기로 실행된다.
    private fun project() {
        CoroutineScope(Dispatchers.Default).launch {
            while (queue.isNotEmpty()) {
                val polledEvent = queue.poll()
                userProjector.project(polledEvent)
            }
        }
    }
}

 

통합 테스트

구현이 정상적으로 이뤄졌는지 확인하기 위해 아래와 같이 테스트 코드를 작성하였다. 'Thread.sleep(5000)' 과 같은, 테스트 코드에서는 있어서는 안되는 정말 바람직하지 않은 코드가 포함되어 있으나;; 이 예제의 목적은 CQRS & 이벤트 소싱 패턴 체험이라는 걸 기억하고 양해바란다^^;;;;

@SpringBootTest
internal class UserServiceTest {
    @Autowired
    private lateinit var userService: UserService

    @Test
    fun 통합테스트() {
        val userId = "minseoklim"

        사용자_생성_요청(userId, "임민석")

        val contacts = setOf(Contact("휴대폰", "01012345678"), Contact("휴대폰", "01111111111"))
        val addresses = setOf(Address("부천", "경기도", "1234"), Address("파주", "경기도", "0123"))
        사용자_수정_요청(userId, contacts, addresses)

        val newContacts = setOf(Contact("휴대폰", "01077777777"), Contact("휴대폰", "01111111111"))
        val newAddresses = setOf(Address("부천", "경기도", "1234"), Address("화성", "경기도", "3232"))
        사용자_수정_요청(userId, newContacts, newAddresses)

        사용자_수정됨(userId, "휴대폰", newContacts, "경기도", newAddresses)
    }

    private fun 사용자_생성_요청(userId: String, name: String) {
        userService.createUser(userId, name)
    }

    private fun 사용자_수정_요청(userId: String, contacts: Set<Contact>, addresses: Set<Address>) {
        userService.updateUser(userId, contacts, addresses)
    }

    private fun 사용자_수정됨(
        userId: String,
        contactType: String,
        newContacts: Set<Contact>,
        state: String,
        newAddresses: Set<Address>
    ) {
        // 읽기 저장소 반영이 비동기로 되다보니 좀 기다려야 확인을 할 수 있음
        Thread.sleep(5000)

        val foundContacts = userService.getContactsByType(userId, contactType).get()
        assert(foundContacts == newContacts)

        val foundAddresses = userService.getAddressesByState(userId, state).get()
        assert(foundAddresses == newAddresses)
    }
}

 

 

※ 참고

https://docs.microsoft.com/en-us/azure/architecture/patterns/cqrs

 

CQRS pattern - Azure Architecture Center

Learn how to segregate operations that read data from those that update data, using the CQRS (Command and Query Responsibility Segregation) pattern.

docs.microsoft.com

https://www.baeldung.com/cqrs-event-sourcing-java

 

'공부' 카테고리의 다른 글

Redisson을 이용한 재고 관리  (0) 2022.05.02

+ Recent posts