기존 DB Polling 방식을 사용하던 알림 도메인 스케줄러를 리팩토링 하기로 했다.
이유 - DB polling 방식의 한계점과 아키텍처 구성에 대해다.
한계점
- 조회 주기 고정 - 정해진 시간에 따라 전체적으로 동작하기 때문에 실시간성이 떨어진다.
- DB I/O 부하 - 스케줄 진행시 조건에 맞는 모든 알림 데이터를 쿼리해야 한다.
- 트래픽 증가에 대한 병목 - 대량 데이터 처리시 조회와 처리가 지연된다.
- 스케일 아웃이 복잡해짐 - 스케줄러를 다중으로 구성할 때 중복 처리 방지가 필요하다(ex) 분산락)
- 특정 시간대 트래픽 증가 - 특정 시간에만 트래픽이 발생한다.
- 순차 처리에 의한 시간 보장 불가 - 스케줄러 주기에 의존하기 때문에 대량 데이터를 처리할 시 실시간성을 보장하기 어려울 수 있다.
- 장애 복구 및 유실시 어려움 - 실패시 재처리 로직이나 손실 데이터 경우 로직이 복잡해지고 DB에 부하가 많아진다.
- 실시간 알림과 스케줄러 알림 트래픽 병목 - 실시간 알림을 보내는 로직과 스케줄러가 실행될 때 트래픽에 병복이 생긴다. 스케룰러를 통해 실시간 알림 처리가 지연될 수 있다.
위와 같은 한계점이 있고 서버 부하와 DB 부하에 초점을 맞춰 부하 테스트를 통해 성능을 분석하고 DB 폴링 방식에서 탐색 범위와 개선해 어떤 부분이 개선 되었는지를 확인하자.
개선 기대 효과
- 실시간 트리거 - 레디스 TTL과 ZSET 기반 딜레이 큐를 활용해 예약된 시간에 발송
- DB 조회 부하 제거 - 스케줄러로 인한 DB 부하 제거
- 고성능 처리 - DB 폴링방식 대비 안정적인 처리량 기대
- 결합 감소 - 레디스와 카프카로 인한 처리로 독립적인 구독과 처리가 가능
- 재처리 유연성 확보 - 카프카로 DLQ나 재시도를 추가해 유연성 증가 기대
- 확장성 증가 - consumer의 증가로 스케일 아웃 용이
- 지연 시간 제어 - TTL 도달 시점에 정확한 예약 트리거 가능
- 장애 대응 향상 - RDB에도 백업하고 kafka를 통한 로그 기반 처리로 유실을 방지
- 분산 처리 - 메시지 큐 기반 분산 처리로 동시성 제어가 용이
개선 기술 및 계획
구성요소 | 역할 |
Redis Delay Queue | 예약 알림의 "예약 시점"까지 대기시킴 (TTL or ZSET score 기반) |
Kafka Producer | 예약 시간 도달 시 Kafka에 알림 발송 이벤트 전송 |
Kafka Consumer | 알림 서비스가 이벤트를 받아 포맷팅 후 발송 |
Notification Service | 실제 메시지 전송, 상태 저장, 실패 처리 |
Monitoring (Micrometer 등) | 전송 성공/실패율, 딜레이 시간 등 메트릭 수집 및 시각화 |
테스트 계획
기본적으로 대량의 데이터를 미리 넣어 놓은 후 각각 케이스를 테스트.
변경 전
- 5만 건 등록 후 스케줄러 동작 시 → TPS, latency 측정
- DB 쿼리 부하 확인 (EXPLAIN ANALYZE)
- 스케줄러 주기 → 누락/지연 케이스 파악
변경 후
- 5만 건 Redis에 적재 + TTL 10초
- Kafka에 메시지 전송 시간, 처리 성공률, 소비 지연 측정
- Redis와 Kafka의 CPU, Memory, TPS, 처리 시간 비교
DB 폴링 방식 - JMeter 테스트 결과
@Slf4j
@RequiredArgsConstructor
@Component
public class NotificationScheduler {
private final NotificationRepository notificationRepository;
private final NotificationFormatterStrategySelector formatterSelector;
private final NotificationSenderStrategySelector sendSelector;
private final NotificationParamExtractor paramExtractor;
private final NotificationMetrics metrics;
@Scheduled(cron = "0 */1 * * * *")
@Transactional
public void sendScheduledNotifications() {
metrics.recordSchedulerExecution(() -> {
LocalDateTime now = LocalDateTime.now();
List<Notification> notifications = notificationRepository
.findByStatusAndScheduledTimeLessThanEqualAndDeletedByIsNull(
NotificationStatus.PENDING, now);
metrics.recordFetchedScheduledCount(notifications.size());
for (Notification notification : notifications) {
Timer.Sample sample = metrics.startSendTimer();
try {
scheduleProcess(notification, sample);
} catch (Exception e) {
log.error("스케줄링 알림 전송 실패: {}", notification.getId(), e);
notification.modifyNotificationStatusIsFailed();
metrics.incrementSendFail();
}
}
});
}
private void scheduleProcess(Notification notification, Sample sample) {
NotificationFormatterStrategy strategy = formatterSelector.select(
notification.getNotificationType());
Map<String, String> params = paramExtractor.extract(notification);
NotificationTemplate formattedMessage = strategy.format(params);
NotificationSenderStrategy senderStrategy = sendSelector.select(
notification.getNotificationMethod());
senderStrategy.send(notification.getUserId(), formattedMessage);
notification.modifyNotificationStatusIsSent();
metrics.incrementSendSuccess();
metrics.recordSendLatency(sample, "scheduled");
}
}
DB plan
1차 3만건 부하
Seq Scan on p_notification (cost=0.00..1955.00 rows=1 width=168) (actual time=17.809..17.809 rows=0 loops=1)
Filter: ((deleted_by IS NULL) AND ((status)::text = 'PENDING'::text) AND (scheduled_time <= now()))
Rows Removed by Filter: 30000
Planning Time: 2.784 ms
Execution Time: 17.837 ms
항목 | 설명 |
실행 방식 | Seq Scan (전체 테이블 스캔) |
필터 조건 | status = 'PENDING' AND scheduled_time <= now() AND deleted_by IS NULL |
결과 행 수 | 0개 (조건 만족하는 데이터 없음) |
스캔된 행 수 | 30,000개 |
실행 시간 | 17.8ms |
2차 5만건
Seq Scan on p_notification (cost=0.00..4480.00 rows=1 width=168) (actual time=31.948..31.949 rows=0 loops=1)
Filter: ((deleted_by IS NULL) AND ((status)::text = 'PENDING'::text) AND (scheduled_time <= now()))
Rows Removed by Filter: 80000
Planning Time: 4.871 ms
Execution Time: 32.003 ms
sql 실행에만 32ms, 실제 처리 끝까지는 약 45초정도 걸렸다.
부하 테스트를 위해 15만건의 데이터를 넣어 놓고 추가적으로 실행할 5만건의 데이터를 추가
전체 탐색을 하기 때문에 기존 데이터가 많을수록 지연될거라 판단.
Gather (cost=1000.00..8352.43 rows=1 width=168) (actual time=87.202..90.326 rows=0 loops=1)
Workers Planned: 2
Workers Launched: 2
-> Parallel Seq Scan on p_notification (cost=0.00..7352.33 rows=1 width=168) (actual time=66.749..66.749 rows=0 loops=3)
Filter: ((deleted_by IS NULL) AND ((status)::text = 'PENDING'::text) AND (scheduled_time <= now()))
Rows Removed by Filter: 66667
Planning Time: 3.849 ms
Execution Time: 90.423 ms
실행에만 90ms가 걸린 것을 확인
그라파나를 통한 실행 부하를 모니터링 해보자.
총 실행 시간 - 3.67이고 41분 15초에 3.73으로 늘어났다. 이는 1분이 넘게 걸린다는 것이다.
모든 알림이 생성수와 실행과 성공에 맞물리게 됨으로 모두 성공인 것은 알수 있다.
지연 정적 평균은 0.0000509이다.
전송 지연의 최대 값은 0.177이다.
할당한 메모리 총량 - 164 mib
- allocated: 객체 생성량 (JVM에서 메모리 얼마나 할당했는가?)
- promoted: Old Gen으로 간 양 (GC 후 살아남아 승격된 객체 양)
promoted은 계속 증가한다면 Old Gen이 계속 차고 있다는 의미로 GC 튜닝이 필요할 수 있다.
System CPU Usage은 시스템 전체 CPU 사용률
Process CPU Usage은 JVM 프로세스 자체가 사용하는 CPU 비율이다.
JVM은 시스템 전체에서 10%, System CPU Usage의 20%만 사용 중이다.
위는 이후로 1분씩 끊어 5만건 테스트를 진행한 부분이다. 테스트에서 보다싶이 갈수록 지연도 길어지고 처리 평균시간, 실행 시간도 높아진다. 지금은 쌓이는 데이터를 보이다보니 안 보일 수 있지만 점점 데이터가 많아질수록 처리 성능이 낮아진다.
Redis + kafka - JMeter 테스트 결과
@Override
@Transactional
public CreateNotificationInfo createNotification(CreateNotificationCommand command) {
metrics.incrementCreate();
CreateNotificationInfo createNotificationInfo = CreateNotificationInfo
.from(notificationRepository.save(command.toEntity()));
notificationRepository.addToDelayQueue(
createNotificationInfo.notificationUuid(), createNotificationInfo.scheduledTime());
return createNotificationInfo;
}
@Repository
@RequiredArgsConstructor
@Slf4j
public class NotificationRedisRepositoryImpl implements NotificationRedisRepository{
private final RedisTemplate<String, String> redisTemplate;
private final DefaultRedisScript<List> popDueScript;
@Value("${redis-key}")
private String delayQueue;
@Override
public void addToDelayQueue(String notificationUuId, LocalDateTime scheduledTime) {
long score = scheduledTime.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
redisTemplate.opsForZSet().add(delayQueue, notificationUuId, score);
}
@Override
public List<String> popDueNotifications(int maxCount) {
long now = System.currentTimeMillis();
// 스크립트를 실행하고 결과를 String 리스트로 받음
List<String> result = redisTemplate.execute(
popDueScript,
List.of(delayQueue),
String.valueOf(now), String.valueOf(maxCount)
);
return validResult(result);
}
private static List<String> validResult(List<String> result) {
// 결과가 null 이거나 빈 리스트일 경우, 빈 리스트를 반환
if (result == null || result.isEmpty()) {
log.debug("처리한 알림이 없거나 스크립트에 문제가 생겼습니다.");
return List.of();
}
try {
// 정상적으로 결과를 String 리스트로 반환
return result;
} catch (Exception e) {
log.error("처리 중 오류 발생 {}", result, e);
return List.of();
}
}
}
@Configuration
public class RedisLuaScriptConfig {
//KEYS[1] : 조회할 ZSET 키 (예: "notification:delay-queue")
//ARGV[1] : 현재 시간 (millis 기준), 즉 이 시간 이하의 항목만 조회
//ARGV[2] : 최대 몇 개까지 가져올지 설정 (LIMIT 개수)
//결과는 현재 시간까지 도달한 알림들만 최대 ARGV[2]개 가져옴
//위에서 가져온 results 안의 value들을 루프 돌면서
//ZSET에서 제거(ZREM) —> 중복 발송 방지
@Bean
public DefaultRedisScript<List> addToDelayQueueScript() {
DefaultRedisScript<List> script = new DefaultRedisScript<>();
script.setScriptText(
"local results = redis.call('ZRANGEBYSCORE', KEYS[1], 0, ARGV[1], 'LIMIT', 0, ARGV[2]) " +
"for i, v in ipairs(results) do redis.call('ZREM', KEYS[1], v) end " +
"return results"
);
script.setResultType(List.class);
return script;
}
}
@Scheduled(fixedDelay = 1000)
@Transactional
public void pollAndDispatch() {
List<String> notificationIds = notificationRepository.popDueNotifications(MAX_DISPATCH_COUNT);
if (notificationIds.isEmpty()) {
log.debug("발송할 알림이 없습니다.");
return;
}
metrics.recordSchedulerExecution(() -> {
List<Notification> notifications = notificationRepository
.findByNotificationUuidIn(notificationIds);
metrics.recordFetchedScheduledCount(notifications.size());
for (Notification notification : notifications) {
publisher.publish(NotificationScheduleSendEvent.from(notification));
}
});
}
}
@Slf4j
@Component
@RequiredArgsConstructor
public class KafkaNotificationProducer implements NotificationEventPublisher {
private final KafkaTemplate<String, NotificationEvent> kafkaTemplate;
private final String notificationTopic;
@Override
public void publish(NotificationEvent event) {
kafkaTemplate.send(notificationTopic, event);
}
private static void logEvent(NotificationEvent notificationEvent) {
log.info("Published notification event {}", notificationEvent.eventType().name());
}
}
//리스너
@KafkaListener(
topics = "notification-event",
containerFactory = "ScheduleSendNotificationEventKafkaListenerContainerFactory"
)
public void handleNotificationSend(NotificationScheduleSendEvent event) {
notificationService.consumerScheduleSendNotification(event);
}
@Override
@Transactional
public void consumerScheduleSendNotification(NotificationScheduleSendEvent event) {
Timer.Sample sample = metrics.startSendTimer();
try {
NotificationFormatterStrategy strategy =
formatterSelector.select(NotificationType.valueOf(event.payload().notificationType()));
Map<String, String> params = paramExtractor.extractEvent(event);
NotificationTemplate formattedMessage = strategy.format(params);
NotificationSenderStrategy senderStrategy = sendSelector.select(
NotificationMethod.valueOf(event.payload().notificationMethod()));
senderStrategy.send(event.payload().userId(), formattedMessage);
Notification notification = findByNotification(event.payload().notificationUuid());
notification.modifyNotificationStatusIsSent();
log.info("수정 완료 :{} ", notification.getStatus());
metrics.incrementSendSuccess();
metrics.recordSendLatency(sample, "scheduled");
} catch (Exception e) {
log.error("알림 발송 실패: {}", event.payload().notificationUuid(), e);
metrics.incrementSendFail();
}
}
이전과 달리 DB 폴링 방식이 아니고 레디스 ZSET 구조에 RDB저장과 동시에 데이터를 저장한다.
레디스 딜레이 큐에 저장시 score를 스케줄 시간 기준으로 설정하고 스케줄링 실행시 스케줄링 시간 기준 이전 score의 값을 가져오고 알림을 실행하도록 List에 저장해 반환하고 삭제한다.
꺼내오고 실행 할 값을 약 1천건으로 제약했고 이는 성능을 측정하며 유동적으로 조정할 수 있고 너무 클시 성능에 영향이 간다.
레디스 딜레이큐에 알람 정보를 같이 넣어 두고 꺼낼 때 역직렬화를 한다면 지금보다 더 나은 성능을 기대할 수 있을 것이다.
1000건을 꺼내서 db에서 꺼내 오고 카프카를 통해 비동기 발행을 하고 알림을 실행처리한다.
위와 같은 흐름으로 진행한다. 다음으로는 성능을 확인해보자.
DB에는 40만건 이상 이미 데이터가 축적된 상태이다.
각 지표를 확인하면
알림 최대 지연율은 0.117
평균 알림 지연 시간은 최대 0.00131
알림 생성 부분에 성공한 알림 수 가 5만건이 안 되는데 이는 테스트용으로 알림의 한 채널에만 전송을 했는데 슬랙 측에서 Too Many Requests from POST가 중간에 발생하기 때문이다.
이전 DB 폴링 방식에서는 모두 성공한 것 같이 보였지만 5만건 중 실제로 도착한 알림은 10%도 되지 않았다.
사실 지표로 지연 시간의 의미는 크지 않으나 자원 사용량과 한계점에 대한 초점을 두었다.
JVM은 시스템 전체에서 8%, System CPU Usage의 20%만 사용 중이다.
기존 방식 대비 2%의 효율을 얻었다.
메모리 부분에서 가장 큰 효율이 나타났다. 기존 28mib 가량 사용하던 방식에서 현재 219kib를 사용한다. 이는 GC의 발동이 적다는 것이고 자원 사용 효율이 많이 상승했다.
다음으로 주목할 부분은 커넥션 풀이다.
DB 폴링 방식을 사용할 때에는 데이터 수량이 늘고 탐색 범위가 늘어날수록 커넥션 사용이 밀려 30만건이 있는 상태에서 테스트 했을 때 최대 8개의 커넥션을 사용중이기도 했다. 하지만 현재 방식은 최대 1개만을 사용하고 반환하는 것을 확인할 수 있다. 지금은 물론 단일 인스턴스기 때문이지만 이전 방식에서는 하나의 스케줄을 실행할 때 여러 커넥션음 점유 했었다. 실제 사용자들이 스케줄 실행에도 실행한다 해도 이전보다 크게 성능상 이점을 가져갈 수 있을 것이다.
문제점 발견
기존 DB Polling 방식에서 알림 성공 처리가 제대로 되지 않고 성능 체크와 스케일 아웃 고려시 한계점 발생
문제점 해결 방안
Redis Delay Queue와 kafka 사용으로 고정된 시간이 아닌 지속적 처리와 비동기 처리 진행 방향 설정
결론
기존 DB Polling 방식에서의 성능과 한계점인 조회 주기 고정, DB I/O 부하,트래픽 증가에 대한 병목, 스케일 아웃이 복잡도 증가가 대표적으로 있었다.
조회 주기 고정을 Redis delayQueue와 Kafka를 통해 비동기 처리를 진행하도록 해서 지속적인 조회와 처리로 해결
DB I/O 부하에서 탐색 범위가 점점 커지던 부분을 redis를 통해 꺼내와 처리하게끔 수정으로 탐색과 탐색에 따른 일시적으로 크던 DB I/O
해결
트래픽 증가에 대한 병목은 주기적 처리로 인해 별도의 특정 전체적인 알림 발송을 제외한 시간대에 병목을 해결
스케일 아웃에 대한 복잡도 증가는 기존 DB Polling 방식에서는 동시성도 고려해야하고 수정도 서로 선점하면 안 되고 락이나 데드락도 고려해야하고 이렇게 되면 성능적으로 저하가 심해진다. 현재는 원자적으로 실행하는 루아 스크립트를 사용해서 값을 가져와서 수정하고 값을 꺼낼 때 다중 인스턴스로 구성하게 될 때 분산락만 고려하면 된다.
성능도 5만건 기준으로 DB안에 데이터가 점점 많아질 때 실행 계획이 점점 시간이 늘어나고 알림 전송 지연률도 점점 증가하는 것을 확인할 수 있었다. Redis delayQueue와 Kafka를 통한 처리를 통해 비교적 낮은 지연률을 확인할 수 있었고 특별히 주목할 만한 성능차이는 메모리 부분과 GC의 성능 차이에 주목할만 하다. Old Gen으로 간 양이 약 131배 향상 된 것을 확인할 수 있었다.
한계점을 해결하고 차후 스케일 아웃에도 용이하게 구성했다.
'spring' 카테고리의 다른 글
DDD 4계층 구조와 역방향 의존성 (0) | 2025.05.15 |
---|---|
spring 요청처리 흐름과 흐름에 따른 에러처리 문제 (0) | 2025.05.12 |
TIL 코드 수정(비동기) Mono (1) | 2025.03.31 |
TIL Saga 패턴 사전 공부 (0) | 2025.03.29 |
TIL git conflict (1) | 2025.03.27 |