일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | ||||
4 | 5 | 6 | 7 | 8 | 9 | 10 |
11 | 12 | 13 | 14 | 15 | 16 | 17 |
18 | 19 | 20 | 21 | 22 | 23 | 24 |
25 | 26 | 27 | 28 | 29 | 30 | 31 |
- join
- 오블완
- SQL
- 스터디
- select
- 카카오코테
- 혼공챌린지
- 혼공파
- 안드로이드스튜디오
- groupby
- 혼공단
- 자료구조
- 안드로이드
- Kotlin
- 정처기
- 자바
- CS
- 알고리즘
- 정보처리기사
- 코틀린
- Android
- 티스토리챌린지
- MySQL
- 프로그래머스
- 코테
- java
- 기술면접
- 인프런
- Til
- doitandroid
- Today
- Total
Welcome! Everything is fine.
[Spring] Redis ZSET + RabbitMQ 적용해 이벤트 오픈 알림 구현하기 본문
기존 코드의 문제점
기존 이벤트 API는 오픈 알림이 구현되지 않았고,
단순 DB 기반 스케줄러로 1분마다 오픈 시간을 체크해 상태를 변경하는 구조였다.
하지만 지금과 같은 구조는 다음과 같은 단점이 있다.
- 데이터베이스 부하 증가
- 스케줄러는 매 분마다 WHERE open_at <= NOW() 조건으로 모든 이벤트를 조회해야 한다.
- 기존 코드는 매 분마다 위와 같은 조건으로 풀스캔하고 있다.
- 인덱스를 적용하더라도 유저 수와 이벤트 수가 많아질수록 쿼리 부하가 커지고, 인덱싱만으로는 처리 한계가 생긴다.
- 이벤트 오픈 비동기 전파 필요성
- 만약 알림 도메인과 직접적인 연결을 한다면?
- 두 서비스가 서로의 생명주기와 내부 구조에 영향을 받을 수 있다.
- 직접적인 연결은 유지보수를 어렵게 하고, 장애 전파 가능성을 높인다.
- MQ를 사용하면 이벤트 서비스는 오픈 시 메시지만 발행하고, 알림 도메인은 메시지를 수신해 독립적으로 처리함으로써 구조적으로 느슨한 연결과 안정적인 전송이 가능하다.
- 만약 알림 도메인과 직접적인 연결을 한다면?
기능 구현
이벤트 생성 시 Redis ZSET 등록
기존 createEvent()에서 아래 코드를 추가했다.
- openEpoch : LocalDateTime 타입인 openAt 값을 UTC 기준 초 단위 숫자(Epoch Time) 로 변환한 것. → Redis ZSET에서는 score(정렬 기준)가 double 숫자여야 하고, 시간 비교를 위해 초 단위 숫자가 가장 직관적이기 때문에 이와 같은 방식으로 변환.
- ZSet에 event:open 을 key로, 이벤트의 ID를 value로 넣고 이벤트의 오픈 시간을 정렬 기준으로 등록했다.
이벤트 오픈 시간 확인 후 상태 변경 - openEventsIfDue()
기존 코드에서는 스케줄러를 통해 openEventsIfDue()가 실행되면 다음과 같은 흐름으로 진행됐다.
[1분마다 실행되는 스케줄러]
↓
[DB 조회]
- 이벤트 테이블에서
상태가 READY이고 open_at ≤ 현재시간인 이벤트 전체 조회
↓
[DB 상태 변경]
- 해당 이벤트의 상태를 OPENED로 변경
↓
[유저 알림 생성]
- 참여 유저 목록을 조회하여
- 알림을 직접 동기적으로 생성 (for-loop 돌면서 NotificationService 호출)
↓
[끝]
- 유저에게는 알림이 전달되지만,
- 동기 처리로 트랜잭션 지연 및 서비스 결합 발생 가능
개선된 openEventsIfDue()의 처리 흐름은 다음과 같다.
[1분마다 실행되는 스케줄러]
↓
[Redis ZSET 조회]
- event:open ZSET에서
score(=오픈 시간) ≤ now 인 ID들만 조회
↓
[각 ID별로 처리 반복]
↓
[1. 이벤트 엔티티 조회 및 상태 확인]
↓
[2. 상태가 READY면 OPENED로 변경]
↓
[3. MQ에 EventOpenMessage 발행]
↓
[4. Redis ZSET에서 해당 ID 제거]
↓
[끝]
- 실제 알림은 Consumer가 비동기로 처리
리팩토링한 openEventsIfDue() 인데, 예약된 이벤트의 오픈 시간이 도달하면
1) 상태를 변경하고,
2) 메시지를 MQ로 발행해 후속 처리를 비동기 전파하는 핵심 메서드라고 할 수 있다.
- rangeByScore(0, now)로 오픈 시간이 도달한 이벤트 ID만 선별 조회→ DB 쿼리 없이 오픈 대기 이벤트만 O(logN)으로 조회 가능 (= score 값이 0 이상 now 이하인 요소들의 value들(ID들)만 조회)
- 이벤트가 아직 READY 상태인지 확인 → READY 가 아닐 경우 건너뛰고, READY 상태라면 OPENED 로 상태 변경
- 오픈된 이벤트 정보를 MQ로 발행 → 알림/로그/통계 등은 직접 처리하지 않고, Consumer가 비동기로 처리
- 성공적으로 처리된 이벤트는 ZSET에서 제거
EventOpenProducer
위에서 메세지 발행을 하기 전 EventOpenProducer 클래스를 만들어야 한다.
해당 클래는 이벤트가 오픈되는 시점에 해당 이벤트 정보를 RabbitMQ에 메시지로 발행하는 역할을 한다.
💡 이벤트 오픈 메시지를 fanout exchange로 발행한 이유
현재 이벤트가 등록되고, 이벤트가 열리면 발행은 모든 유저에게 하되, Customer 쪽에서 알림 수신 설정을 한 유저에게만 알림을 보내도록 했다. 사용자 설정과 같은 동적인 조건에 따라 메시지를 처리해야 하는 경우, Consumer 측에서 이러한 로직을 구현하는 것이 유연성과 확장성 측면에서 유리할 수 있다고 한다.
🤔 왜 routingKey는 빈 문자열("")로 설정하는가?
FanoutExchange는 메시지를 받으면 연결된 모든 큐에 브로드캐스트하기 때문에 라우팅 키가 필요하지 않다.
이벤트 오픈 시간에 맞춰 테스트 한 후 로그를 보니 이벤트 오픈 메세지가 정상적으로 발행되었다.
EventOpenConsumer
이제 발행을 했으면 메세지를 수신해야 한다.
EventOpenConsumer 클래스는 RabbitMQ를 통해 전송된 이벤트 오픈 메시지를 수신하고, 유저에게 알림을 생성하는 클래스다.
전체적인 흐름은 다음과 같다.
- RabbitMQ의 Fanout Exchange로부터 메시지 수신
- 유효성 검사 → 잘못된 메시지는 무시
- 알림 설정이 활성화된 유저만 조회
- 각 유저에게 알림 생성
- 실패 시 로깅 처리
- @RabbitListener 기반 메시지 수신 : consume()
- RabbitMQ로부터 EVENT_OPEN_QUEUE에 수신된 메시지를 자동으로 처리
- 메시지 유효성 검증 : isValid()
- 필수 필드(eventId, storeId)가 없는 메시지는 로그 경고 후 무시
- 알림 설정 유저만 선별 조회 : getUsers()
- userService.getUsersWithAlarmEnabled() 을 통해 알림을 활성화한 유저만 필터링
- 유저 목록 조회 실패 시 emptyList() 반환
- 알림 전송 + 예외 대응 : sendNotification() ⇒ 추후 MQ 기반 알림 전송 분리 적용 고려
- 알림 요청 객체(NotificationRequestDto) 생성 후 서비스 호출
MQ 메시지 발행 및 소비가 정상적으로 수행되고 있는 것을 RabbitMQ 대시보드의 Message Rates 그래프로 확인한 결과이다.
RabbitConfig - Queue, Exchange, Binding 설정
RabbitConfig에 이벤트 오픈 관련 설정 등록한다.
- eventOpenQueue() : 이벤트 오픈 시 전체 유저에게 브로드캐스트할 큐 생성
- 큐 이름: EVENT_OPEN_QUEUE
- durable = true: 메시지 브로커 재시작 후에도 큐 유지
- 큐의 내구성을 보장하여 서비스 중단 없이 안정적인 운영 가능
- eventOpenExchange() : Fanout 방식의 브로드캐스트 Exchange 정의
- Exchange란 메세지를 어떤 큐로 보낼지 결정하는 중간 허브를 말한다.
- Exchange 이름: EVENT_OPEN_EXCHANGE
- 메시지를 받은 모든 큐로 동시에 복제 전송
- durable = true, autoDelete = false: 안정성과 명시적 해제를 모두 고려
- eventOpenBinding() : 큐와 익스체인지 연결
- eventOpenQueue()를 eventOpenExchange()에 바인딩
- Fanout 방식이므로 라우팅 키 불필요
'Spring' 카테고리의 다른 글
[트러블슈팅] 순환 참조 오류(Circular Dependency) (0) | 2025.04.16 |
---|---|
Spring Security + JWT 사용하기 (0) | 2025.03.15 |