Redisson을 이용한 재고 관리
예제에 대한 전체 코드는 https://github.com/minseokLim/practice/tree/main/redisson-practice 에서 확인할 수 있다.
1. 서론
인터넷에서 제품을 구매할 땐 '재고'라는 게 있다. 근데 만약 쇼핑몰에서 10개의 상품을 준비했는데 10개 이상으로 주문이 들어오면 어떻게 될까? 그야말로 대참사다...-_- 주문 시간을 본 후 가장 마지막에 주문 한 사람들부터 차례대로 전화를 해서 사정을 얘기하고 주문 취소를 해야할 것이다... 죄 없는 고객 관리팀은 욕을 먹을 것이고, 서비스에 대한 고객들의 신뢰도는 떨어질 것이다. 물론 어떻게 해서든 상품을 더 준비하는 방법도 있겠지만, 개발자의 관점에선 저 상황이 안 오는 게 가장 좋다.
앞으로 소개할 방법은 그냥 필자가 혼자 고민하고 구글링하면서 정리한 것이다. 따라서 틀린 부분이 있을 수도 있고 실무에서 하는 방법과 다를 수도 있다. 그냥 이런 생각을 할 수도 있구나, 이런 생각을 하는 애도 있구나 정도로 참고만 하면 좋을 것 같다.
2. 단순 유효성 검사만 추가하였을 때
근데 만약 그 상품이 엄청 인기가 있는 핫한 상품이라고 가정해보자. 오픈과 동시에 1초만에 털리는 포켓몬빵과 같은... 그렇다면 거의 동시에 '재고 정보'에 접근할 가능성이 높기 때문에, 단순히 현재 재고 수량이 몇 개인지 확인한 후 주문을 넣는다면 재고량보다 많은 수의 주문이 들어올 수 밖에 없다. 아래의 코드를 보자. 예제는 최대한 단순하게 구성하였다.
@Entity
class Product(
@Column(nullable = false)
val name: String,
@Column(nullable = false)
val totalCount: Int,
stockCount: Int,
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
val id: Long? = null
) {
@Column(nullable = false)
var stockCount: Int = stockCount
private set
fun applyStockCount(count: Int) {
stockCount = count
}
}
@Entity
@Table(name = "orders")
class Order(
@Column(nullable = false)
val userId: Long,
@Column(nullable = false)
val orderCount: Int,
@ManyToOne
val product: Product,
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
val id: Long? = null
)
@Service
@Transactional
class OrderService(
private val orderRepository: OrderRepository,
private val productRepository: ProductRepository
) {
fun order(userId: Long, orderCount: Int, productId: Long) {
val product = productRepository.findById(productId).orElseThrow { IllegalArgumentException() }
// 주문 수량이 재고 수량보다 많은지 확인
if (product.stockCount < orderCount) {
throw IllegalArgumentException("주문 수량이 재고 수량보다 많습니다.")
}
product.decreaseStockCount(orderCount)
orderRepository.save(Order(userId, orderCount, product))
}
}
테스트 코드
이제 이 핫한 상품에 대해 동시에 많은 수의 주문이 들어오는 상황을 테스트 코드로 구현해보자. 아래 코드에는 총 1000개의 포켓몬빵이 재고로 준비되어있다. 그리고 1000명의 사용자가 2개씩 주문을 일시적으로 하는 상황이다. 정상적인 상황이라면 주문은 500개까지만 들어와야하고, 이후에는 예외가 발생해야 한다.
@SpringBootTest
internal class OrderServiceTest {
@Autowired
private lateinit var productRepository: ProductRepository
@Autowired
private lateinit var orderService: OrderService
@Test
fun 재고보다_많은_빵을_주문() {
// 1000개의 빵이 준비되어 있다.
val productId = productRepository.save(Product("포켓몬빵", 1000, 1000)).id!!
assertThatIllegalArgumentException().isThrownBy {
runBlocking {
// 1000명이 2개씩 주문
(1..1000).map {
async(Dispatchers.IO) {
orderService.order(it.toLong(), 2, productId)
}
}.awaitAll()
}
}
}
}
1000개의 주문이 생성되고, 재고 수량도 엉망으로 업데이트가 되는 결과를 예상했다. 하지만 어째서인지 데드락이 발생했다?;;; 대체 무슨 상황인지 'show engine innodb status' 명령어를 통해 확인해봤다. (필자는 DB를 mysql을 사용하였다. 일반적으로 테스트 상황에서는 h2를 사용하겠지만, 실제 프로덕션 상황과 유사한 상황을 만들기 위해서 그렇게 하였다.)
------------------------
LATEST DETECTED DEADLOCK
------------------------
2022-05-02 05:27:01 277173974784
*** (1) TRANSACTION:
TRANSACTION 5247, ACTIVE 0 sec starting index read
mysql tables in use 1, locked 1
LOCK WAIT 5 lock struct(s), heap size 1128, 2 row lock(s), undo log entries 1
MySQL thread id 193, OS thread handle 277724763904, query id 13000 172.17.0.1 root updating
update product set name='포켓몬빵', stock_count=975 where id=1
*** (1) HOLDS THE LOCK(S):
RECORD LOCKS space id 48 page no 4 n bits 72 index PRIMARY of table `redisson`.`product` trx id 5247 lock mode S locks rec but not gap
Record lock, heap no 2 PHYSICAL RECORD: n_fields 5; compact format; info bits 0
0: len 8; hex 8000000000000001; asc ;;
1: len 6; hex 000000001478; asc x;;
2: len 7; hex 02000000f3086d; asc m;;
3: len 12; hex ed8facecbc93ebaaacebb9b5; asc ;;
4: len 4; hex 800003cf; asc ;;
*** (1) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 48 page no 4 n bits 72 index PRIMARY of table `redisson`.`product` trx id 5247 lock_mode X locks rec but not gap waiting
Record lock, heap no 2 PHYSICAL RECORD: n_fields 5; compact format; info bits 0
0: len 8; hex 8000000000000001; asc ;;
1: len 6; hex 000000001478; asc x;;
2: len 7; hex 02000000f3086d; asc m;;
3: len 12; hex ed8facecbc93ebaaacebb9b5; asc ;;
4: len 4; hex 800003cf; asc ;;
*** (2) TRANSACTION:
TRANSACTION 5250, ACTIVE 0 sec starting index read
mysql tables in use 1, locked 1
LOCK WAIT 5 lock struct(s), heap size 1128, 2 row lock(s), undo log entries 1
MySQL thread id 195, OS thread handle 277723707136, query id 12999 172.17.0.1 root updating
update product set name='포켓몬빵', stock_count=975 where id=1
*** (2) HOLDS THE LOCK(S):
RECORD LOCKS space id 48 page no 4 n bits 72 index PRIMARY of table `redisson`.`product` trx id 5250 lock mode S locks rec but not gap
Record lock, heap no 2 PHYSICAL RECORD: n_fields 5; compact format; info bits 0
0: len 8; hex 8000000000000001; asc ;;
1: len 6; hex 000000001478; asc x;;
2: len 7; hex 02000000f3086d; asc m;;
3: len 12; hex ed8facecbc93ebaaacebb9b5; asc ;;
4: len 4; hex 800003cf; asc ;;
*** (2) WAITING FOR THIS LOCK TO BE GRANTED:
RECORD LOCKS space id 48 page no 4 n bits 72 index PRIMARY of table `redisson`.`product` trx id 5250 lock_mode X locks rec but not gap waiting
Record lock, heap no 2 PHYSICAL RECORD: n_fields 5; compact format; info bits 0
0: len 8; hex 8000000000000001; asc ;;
1: len 6; hex 000000001478; asc x;;
2: len 7; hex 02000000f3086d; asc m;;
3: len 12; hex ed8facecbc93ebaaacebb9b5; asc ;;
4: len 4; hex 800003cf; asc ;;
두 트랜잭션이 S Lock을 가지고 있는 상황에서 서로 X Lock을 가져가려고 하니 데드락이 발생할 수 밖에 없었다. S Lock이 반환되어야 X Lock을 취할 수 있는데 X Lock을 얻기 전까진 S Lock을 반환하지 못하는 상황... 그렇다면 대체 누가 S Lock을 거는 걸까? 바로 외래키로 물려있는 order가 insert될 때 S Lock을 건다. 해결책은 2가지가 있다. 1) Order가 AI를 쓰지 않도록 하여 insert도 지연 실행되게 한다. 2) Product의 업데이트문을 Order가 insert되기 전 강제로 실행한다(saveAndFlush). 일단 이 예제에서는 2번을 택했다. 그렇게 하고 나니 1000개의 주문이 생성되고, 재고 수량도 엉망으로 업데이트가 되는 결과가 나왔다.
3. DB의 Exclusive Lock을 사용했을 때
위와 같은 상황이 발생하는 건 동시에 들어오는 요청이 재고 정보에 동시에 접근을 하기 때문에 발생하는 문제다. 그렇다면 동시에 접근을 못하게 하면 되지 않는가? 메서드 레벨에서 synchronized를 거는 방법을 생각할 수도 있겠지만, 서버를 2개 이상으로 구성을 하는 경우가 많기 때문에 적절하지 않다. 따라서 DB에 Exclusive Lock를 걸어서 이를 해결해보자. 아래와 같이 코드를 변경하였다. JPA에서 LockModeType.PESSIMISTIC_WRITE은 SELECT FOR UPDATE를 의미한다.
interface ProductRepository : JpaRepository<Product, Long> {
@Lock(LockModeType.PESSIMISTIC_WRITE)
override fun findById(id: Long): Optional<Product>
}
이제 이전의 테스트 코드를 실행하면 정상 동작하는 걸 확인할 수 있다. 그런데 DB에 매번 이렇게 X Lock을 거는 건 느리다. 필자의 컴퓨터를 기준으로 테스트 실행 시간이 약 5초 정도 소요된다. 프로덕션 코드에선 이보다 더한 병목도 발생할 수 있을 거라고 생각한다. 최적화가 필요해보인다. 그래서 쓰려고 하는게 바로 Redis이다.
4. Redisson을 이용한 Lock을 사용했을 때
Redis에도 Lock이라는 개념이 있다고 한다. 필자도 이번에 알았는데, 이를 편하게 사용할 수 있게 해주는 라이브러리가 있다. 바로 Redisson이다.
사용법
1) 디펜던시 추가
dependencies {
implementation("org.redisson:redisson-spring-boot-starter:3.17.1")
}
2) application.yml
redis:
mode: SINGLE
nodes:
- 'redis://localhost:6379'
3) 코드 작성
@Component
@ConfigurationProperties("redis")
class RedisProperty {
lateinit var mode: RedisMode
lateinit var nodes: List<String>
enum class RedisMode {
SINGLE, CLUSTER
}
}
@Configuration
class RedissonConfig {
@Bean
fun redissonClient(redisProperty: RedisProperty): RedissonClient {
val config = Config().apply {
when (redisProperty.mode) {
SINGLE -> this.useSingleServer().address = redisProperty.nodes[0]
CLUSTER -> this.useClusterServers().nodeAddresses = redisProperty.nodes
}
}
return Redisson.create(config)
}
}
구현
이제 기본적인 준비는 끝났다. 이 방법에 대한 대략적인 설명을 해보도록 하겠다.
일단 각 Product별 재고 수량을 레디스에 저장한다. 그리고 레디스에서 재고 수량을 읽어와서 주문 가능 여부를 판단한다. 이때, 재고 수량을 읽고, 주문 가능 여부를 판단하고, 바뀐 재고 수량을 다시 저장하는 것까지 오직 하나의 쓰레드에서만 이루어져야한다. DB에서의 Exclusive락과 동일한 역할을 하는 레디스에서의 락을 이용하는 것이다. 레디스는 메모리 디비이기 때문에 일단 RDB에 비해서 성능이 훨씬 빠를 것이다. 아래는 이를 위해 작성한 코드들이다.
interface KeyValueStore {
fun <T> getValue(key: String): Optional<T>
fun <T> save(key: String, value: T)
fun <T> executeWithLock(key: String, function: () -> T): T
}
@Component
class RedisStore(
private val redissonClient: RedissonClient
) : KeyValueStore {
companion object {
private const val LOCK_SUFFIX = ":lock"
private const val WAIT_TIME_SECONDS = 3L
private const val LEASE_TIME_SECONDS = 3L
}
override fun <T> getValue(key: String): Optional<T> {
val bucket = redissonClient.getBucket<T>(key)
if (bucket.isExists) {
return Optional.of(bucket.get())
}
return Optional.empty()
}
override fun <T> save(key: String, value: T) {
redissonClient.getBucket<T>(key).set(value)
}
override fun <T> executeWithLock(key: String, function: () -> T): T {
val lock = redissonClient.getLock(key + LOCK_SUFFIX)
try {
lock.tryLock(WAIT_TIME_SECONDS, LEASE_TIME_SECONDS, TimeUnit.SECONDS)
return function()
} finally {
unlock(lock)
}
}
private fun unlock(lock: RLock?) {
if (lock != null && lock.isLocked) {
lock.unlock()
}
}
}
object RedisKeyResolver {
private const val KEY_DELIMITER = ":"
fun getKey(id: Any, domain: String, vararg subDomains: String): String {
return listOf(domain, *subDomains, id.toString()).joinToString(KEY_DELIMITER)
}
}
위의 코드에서 핵심이 되는 메서드는 바로 'executeWithLock' 메서드이다. lock.tryLock(WAIT_TIME_SECONDS, LEASE_TIME_SECONDS, TimeUnit.SECONDS) 부분에서 락을 획득하기 전까진 function()을 실행하지 않으며, finally 부분에서 락을 반환한다.
이제 OrderService 코드에 이를 적용해보자. 적용하기 전에 ProductRepository에 걸어놨던 X Lock을 없애는 걸 잊어선 안된다. 코드는 다음과 같다.
interface ProductRepository : JpaRepository<Product, Long>
@Service
@Transactional
class OrderService(
private val orderRepository: OrderRepository,
private val productRepository: ProductRepository,
private val keyValueStore: KeyValueStore
) {
private val logger = LoggerFactory.getLogger(this::class.java)
fun order(userId: Long, orderCount: Int, productId: Long) {
val product = productRepository.findById(productId).orElseThrow { IllegalArgumentException() }
val key = RedisKeyResolver.getKey(productId, "product", "order")
keyValueStore.executeWithLock(key) {
val stockCount = keyValueStore.getValue<Int>(key).orElseGet {
product.totalCount - orderRepository.findAllByProductId(productId).count()
}
// 주문 수량이 재고 수량보다 많은지 확인
if (stockCount < orderCount) {
logger.info("재고 수량 : ${stockCount}, 주문 수량 : $orderCount")
throw IllegalArgumentException("주문 수량이 재고 수량보다 많습니다.")
}
keyValueStore.save(key, stockCount - orderCount)
orderRepository.save(Order(userId, orderCount, product))
}
}
}
이렇게 바꾸고 다시 이전의 테스트 코드를 돌리면 X-Lock을 걸었을 때와 마찬가지로 정상 동작하는 걸 확인할 수 있다. 또한 시간도 필자의 컴퓨터에서 약 3.5초 정도 걸린다(X-Lock이었을 때 5초). 실제 프로덕션 환경에서 X-Lock에 대한 Race condition이 심해질 수록 차이는 더 벌어질 수 있다고 생각한다.
RDB 동기화
Redis의 데이터는 캐시 데이터로서 활용된다. 따라서 최종 재고 데이터를 RDB에 동기화해줄 필요가 있다. 이는 꼭 동기적으로 될 필요는 없다. 동기적으로 할 경우, X-Lock에 대한 Race Condition이 발생하여 사실상 Redis를 사용하는 이유가 없어질 것이다. 따라서 아래와 같이 동기화 전용 클래스를 생성하고, OrderService의 맨 마지막에 이를 실행해준다.
@Component
class StockCountProjector(
private val keyValueStore: KeyValueStore,
private val productRepository: ProductRepository
) {
fun project(productId: Long) {
CoroutineScope(Dispatchers.Default).launch {
projectInBackground(productId)
}
}
private fun projectInBackground(productId: Long) {
val product = productRepository.findById(productId).orElseThrow { IllegalArgumentException() }
val key = RedisKeyResolver.getKey(productId, "product", "order")
val stockCount = keyValueStore.getValue<Int>(key).orElseGet {
product.totalCount - orderRepository.findAllByProductId(productId).count()
}
if (product.stockCount != stockCount) {
product.applyStockCount(stockCount)
productRepository.save(product)
}
}
}
@Service
@Transactional
class OrderService(
private val orderRepository: OrderRepository,
private val productRepository: ProductRepository,
private val keyValueStore: KeyValueStore,
private val stockCountProjector: StockCountProjector
) {
private val logger = LoggerFactory.getLogger(this::class.java)
fun order(userId: Long, orderCount: Int, productId: Long) {
val product = productRepository.findById(productId).orElseThrow { IllegalArgumentException() }
val key = RedisKeyResolver.getKey(productId, "product", "order")
keyValueStore.executeWithLock(key) {
val stockCount = keyValueStore.getValue<Int>(key).orElseGet {
product.totalCount - orderRepository.findAllByProductId(productId).count()
}
// 주문 수량이 재고 수량보다 많은지 확인
if (stockCount < orderCount) {
logger.info("재고 수량 : ${stockCount}, 주문 수량 : $orderCount")
throw IllegalArgumentException("주문 수량이 재고 수량보다 많습니다.")
}
keyValueStore.save(key, stockCount - orderCount)
orderRepository.save(Order(userId, orderCount, product))
}
stockCountProjector.project(productId)
}
}
5. 생각해볼 점
Redis와 RDB가 하나의 트랜잭션 안에 있다면, 예외가 발생할 경우 롤백도 같이 되는가?
테스트를 해본 결과, 되지 않는다. 레디스는 롤백을 지원하지 않는다고 한다(https://redis.com/blog/you-dont-need-transaction-rollbacks-in-redis). 그렇다면 만에 하나 디비에 문제가 생겨서 익셉션이 발생하고 DB만 롤백이 된다면, 레디스와 DB 사이의 값의 간극이 생길 것이다. 지금 당장 생각나는 방법은, 레디스 저장을 맨 마지막에 하는 것이다. 더 좋은 아이디어 있으면 공유 좀 부탁드리겠습니다^^; 아무튼 그걸 반영한 코드는 아래와 같다.
@Service
@Transactional
class OrderService(
private val orderRepository: OrderRepository,
private val productRepository: ProductRepository,
private val keyValueStore: KeyValueStore,
private val stockCountProjector: StockCountProjector
) {
private val logger = LoggerFactory.getLogger(this::class.java)
fun order(userId: Long, orderCount: Int, productId: Long) {
val product = productRepository.findById(productId).orElseThrow { IllegalArgumentException() }
val key = RedisKeyResolver.getKey(productId, "product", "order")
keyValueStore.executeWithLock(key) {
val stockCount = keyValueStore.getValue<Int>(key).orElseGet {
product.totalCount - orderRepository.findAllByProductId(productId).count()
}
// 주문 수량이 재고 수량보다 많은지 확인
if (stockCount < orderCount) {
logger.info("재고 수량 : ${stockCount}, 주문 수량 : $orderCount")
throw IllegalArgumentException("주문 수량이 재고 수량보다 많습니다.")
}
// 여기 순서를 바꿈
orderRepository.save(Order(userId, orderCount, product))
keyValueStore.save(key, stockCount - orderCount)
}
stockCountProjector.project(productId)
}
}
레디스 락이 풀리는 시점과 RDB 트랜잭션이 commit 되는 시점이 다르다.
레디스 락의 경우 메서드 끝 부분에서 unlock메서드를 통해 풀지만, RDB commit은 메서드가 모두 끝난 후 실행된다. 이는 스프링의 트랜잭션이 AOP를 기반으로 동작하기 때문인데, 이 때문에 데이터가 꼬일 가능성이 아주 희박하게나마 있다. 예를 들면, 1번 쓰레드가 락을 반환하고 아직 커밋은 하기 전인 상황에서, 2번 쓰레드가 락을 얻고 난 후 재고 수량을 확인하는데 하필! 그 타이밍에 redis의 값들이 모두 날아가서 재고 수량을 RDB 기반으로 확인하려고 한다면! 데이터가 꼬일 것이다...; 아주 희박하긴 하지만 발생할 수 있는 버그이기에, 만약 프로덕션 환경에서 이런 버그가 발견된다면 재현이 어려워서 원인을 찾는 것도 정말 어려울 것이다ㅠㅠ 실제로 로그를 찍어보면, Redis가 unlock 되는 시점이 transaction commit 보다 앞서는 것을 확인할 수 있다.
레디스 락에 대한 처리도 AOP로 하는 걸 생각해서 구현해봤다. 근데 Transactional도 AOP로 구현되어 있고, 내가 구현한 AOP를 Transactional 내부에서 실행되도록 해야하는데 쉽지가 않았다. 그리고 무엇보다 느렸다. 런타임에 프록시 객체를 생성하기 때문인 걸로 보인다. 이에 대해서는 필자가 더 공부가 필요해보인다.
아무튼 그래서, 돌고 돌아 그냥 원래 함수에서 트랜잭션에 대한 처리를 하기로 했다. 처리 방법은 아래와 같다. Propagation level은 Requires_new로 설정했다. 바깥에서 온 트랜잭션을 가져다쓸 경우, 어차피 또 커밋 시점이 뒤로 미뤄질 것이기 때문이다.
package com.minseoklim.redissonpractice.util
import java.util.Optional
import java.util.concurrent.TimeUnit
import org.redisson.api.RLock
import org.redisson.api.RedissonClient
import org.slf4j.LoggerFactory
import org.springframework.stereotype.Component
import org.springframework.transaction.PlatformTransactionManager
import org.springframework.transaction.TransactionDefinition
import org.springframework.transaction.TransactionManager
import org.springframework.transaction.TransactionStatus
import org.springframework.transaction.support.DefaultTransactionDefinition
@Component
class RedisStore(
private val redissonClient: RedissonClient,
private val transactionManager: PlatformTransactionManager
) : KeyValueStore {
companion object {
private const val LOCK_SUFFIX = ":lock"
private const val WAIT_TIME_SECONDS = 3L
private const val LEASE_TIME_SECONDS = 3L
}
private val logger = LoggerFactory.getLogger(this::class.java)
override fun <T> executeWithLock(key: String, function: () -> T): T {
val lock = redissonClient.getLock(key + LOCK_SUFFIX)
// transaction start
val transaction = getTransaction()
try {
lock.tryLock(WAIT_TIME_SECONDS, LEASE_TIME_SECONDS, TimeUnit.SECONDS)
return function()
} finally {
// transaction commit
transactionManager.commit(transaction)
unlock(lock)
logger.info("Redis unlocked!!!!")
}
}
private fun getTransaction(): TransactionStatus {
val transactionDefinition = DefaultTransactionDefinition()
transactionDefinition.propagationBehavior = TransactionDefinition.PROPAGATION_REQUIRES_NEW
return transactionManager.getTransaction(transactionDefinition)
}
private fun unlock(lock: RLock?) {
if (lock != null && lock.isLocked) {
lock.unlock()
}
}
}