Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@
import java.time.format.DateTimeParseException;
import java.util.Locale;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import com.dabom.messaging.kafka.contract.KafkaConsumerGroups;
import com.dabom.messaging.kafka.contract.KafkaEventTypes;
import com.dabom.messaging.kafka.contract.KafkaTopics;
import com.dabom.messaging.kafka.event.dto.usage.UsagePayload;
import com.dabom.messaging.kafka.metrics.KafkaMetrics;
import com.project.domain.policy.service.helper.PolicyConstraintWarmupHelper;
import com.project.domain.usage.service.dto.UsageUpdateResult;
import com.project.domain.usage.service.helper.UsageEventPublisher;
Expand All @@ -30,30 +35,29 @@ public class UsageSyncServiceImpl implements UsageSyncService {
private static final String EMPTY_APP_ID = "";

private final RedisKeyGenerator redisKeyGenerator;

// Redis Warmup Service
private final UsageRedisWarmupHelper usageRedisWarmupHelper;
private final PolicyConstraintWarmupHelper policyConstraintWarmupHelper;

// Lua Script 실행기
private final UsageLuaExecutor usageLuaExecutor;

// 이벤트 발행기
private final UsageEventPublisher usageEventPublisher;

// 로그 정리기
private final LogSanitizer logSanitizer;
private final KafkaMetrics kafkaMetrics;

// usage-event dedup TTL
@Value("${app.kafka.dedup.usage-ttl-seconds}")
private long dedupTtlSeconds;

@Override
public void syncUsage(String eventId, String eventTime, UsagePayload payload) {

Long familyId = payload.familyId();
Long customerId = payload.customerId();
long usageBytes = payload.bytesUsed();

// 1) eventTime 해석 + 월 키 기준 계산
LocalDateTime resolvedEventDateTime = resolveEventDateTime(eventTime);
LocalDate eventMonth = resolvedEventDateTime.toLocalDate().withDayOfMonth(1);

// Redis Key 생성
// 2) Lua 실행에 필요한 Redis 생성
String infoKey = redisKeyGenerator.generateFamilyInfoKey(familyId);
String remainingKey = redisKeyGenerator.generateFamilyRemainingKey(familyId);
String monthlyKey =
Expand All @@ -62,8 +66,9 @@ public void syncUsage(String eventId, String eventTime, UsagePayload payload) {
String constraintsKey =
redisKeyGenerator.generateFamilyCustomerConstraintsKey(familyId, customerId);
String alertsKey = redisKeyGenerator.generateFamilyAlertsKey(familyId);
String dedupKey = redisKeyGenerator.generateUsageEventDedupKey(eventId);

// Warmup
// 3) Redis warmup 보장
boolean familyInfoRedisWarmup =
usageRedisWarmupHelper.ensureFamilyInfoCached(familyId, infoKey);
boolean familyRemainingRedisWarmup =
Expand All @@ -83,7 +88,7 @@ public void syncUsage(String eventId, String eventTime, UsagePayload payload) {
String currentHhmm = resolvedEventDateTime.format(HHMM_FORMATTER);
String normalizedAppId = normalizeAppId(payload.appId());

// Lua Script 실행 + 결과 파싱
// 4) Lua로 정책 판정 + 사용량 반영 + dedup 검사 수행
UsageUpdateResult parsed =
usageLuaExecutor.execute(
new UsageLuaExecutor.UsageLuaCommand(
Expand All @@ -92,31 +97,45 @@ public void syncUsage(String eventId, String eventTime, UsagePayload payload) {
monthlyKey,
constraintsKey,
alertsKey,
dedupKey,
usageBytes,
currentHhmm,
normalizedAppId),
normalizedAppId,
dedupTtlSeconds),
eventId);
log.debug(
"Usage Synced: family={}, customer={}, status={}",
familyId,
customerId,
parsed.status());

// 이벤트 전파
// 5) duplicate면 후속 publish 없이 종료
if (parsed.duplicate()) {
kafkaMetrics.incrementDedupHit(
KafkaTopics.USAGE_EVENTS,
KafkaConsumerGroups.DABOM_PROCESSOR_USAGE_MAIN,
KafkaEventTypes.DATA_USAGE);
log.info(
"Skip duplicated usage event. eventId={}, familyId={}, customerId={}",
logSanitizer.sanitize(eventId),
familyId,
customerId);
return;
}

// 6) downstream 이벤트 발행
usageEventPublisher.publish(
new UsageEventPublisher.UsageEventContext(eventId, eventTime, payload, parsed));
}

private LocalDateTime resolveEventDateTime(String eventTime) {
// producer가 전달한 eventTime이 있으면 우선 사용한다.
if (eventTime != null && !eventTime.isBlank()) {
try {
return LocalDateTime.parse(eventTime);
} catch (DateTimeParseException ignored) {
log.debug("Failed to parse eventTime: {}", logSanitizer.sanitize(eventTime));
}
}
// eventTime이 없거나 파싱 실패 시 서버 현재 시각으로 보정한다.
return LocalDateTime.now(TimeConstants.ASIA_SEOUL);
}

Expand All @@ -125,6 +144,7 @@ private String normalizeAppId(String appId) {
return EMPTY_APP_ID;
}

// app 차단 정책 키와 비교할 수 있게 정규화
String normalized = appId.trim().toLowerCase(Locale.ROOT);
return normalized.isEmpty() ? EMPTY_APP_ID : normalized;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,6 @@ public record UsageUpdateResult(
String status,
long monthlyUsed,
double userRatio,
long monthlyLimit) {}
long monthlyLimit,
// usage-event duplicate 여부
boolean duplicate) {}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ public class UsageLuaExecutor {

private final StringRedisTemplate redisTemplate;
private final RedisScript<List<Object>> usageUpdateScript;

private final LogSanitizer logSanitizer;

// Usage Lua 스크립트를 실행하고 결과를 도메인 DTO로 변환한다.
// usage Lua 실행 + 결과 파싱
public UsageUpdateResult execute(UsageLuaCommand command, String eventId) {
List<Object> result =
redisTemplate.execute(
Expand All @@ -32,10 +31,12 @@ public UsageUpdateResult execute(UsageLuaCommand command, String eventId) {
command.remainingKey(),
command.monthlyKey(),
command.constraintsKey(),
command.alertsKey()),
command.alertsKey(),
command.dedupKey()),
String.valueOf(command.usageBytes()),
command.currentHhmm(),
command.appId());
command.appId(),
String.valueOf(command.dedupTtlSeconds()));

if (result == null || result.isEmpty()) {
log.error("Usage update script returned null. eventId={}", eventId);
Expand All @@ -45,9 +46,9 @@ public UsageUpdateResult execute(UsageLuaCommand command, String eventId) {
return parseScriptResult(result, eventId);
}

// lua script 결과 파싱
// Lua 결과 파싱
private UsageUpdateResult parseScriptResult(List<Object> result, String eventId) {
if (result.size() < 6) {
if (result.size() < 7) {
log.error(
"Usage update script returned invalid result. eventId={}, result={}",
logSanitizer.sanitize(eventId),
Expand All @@ -67,19 +68,24 @@ private UsageUpdateResult parseScriptResult(List<Object> result, String eventId)
: Double.parseDouble(userRatioObj.toString());

long monthlyLimit = ((Number) result.get(5)).longValue();
// 마지막 값은 duplicate 여부
boolean duplicate = ((Number) result.get(6)).longValue() == 1L;

return new UsageUpdateResult(
totalUsed, remaining, status, monthlyUsed, userRatio, monthlyLimit);
totalUsed, remaining, status, monthlyUsed, userRatio, monthlyLimit, duplicate);
}

// Lua 실행에 필요한 키/인자를 한 번에 전달하기 위한 내부 커맨드 객체
// Lua 실행에 필요한 인자 묶음
public record UsageLuaCommand(
String infoKey,
String remainingKey,
String monthlyKey,
String constraintsKey,
String alertsKey,
// usage-event 중복 검사 키
String dedupKey,
long usageBytes,
String currentHhmm,
String appId) {}
String appId,
long dedupTtlSeconds) {}
}
12 changes: 12 additions & 0 deletions src/main/java/com/project/global/util/RedisKeyGenerator.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ public class RedisKeyGenerator {
private static final String KEY_SEPARATOR = ":";
private static final String FAMILY_KEY_PREFIX = "family";
private static final String POLICY_EVENT_DEDUP_KEY_PREFIX = "event:dedup:policy";
private static final String USAGE_EVENT_DEDUP_KEY_PREFIX = "event:dedup:usage";
private static final DateTimeFormatter MONTH_SUFFIX_FORMATTER =
DateTimeFormatter.ofPattern("yyyyMM");

// 가족 알림 상태 키
public String generateFamilyAlertsKey(Long familyId) {
return FAMILY_KEY_PREFIX
+ KEY_SEPARATOR
Expand All @@ -24,14 +26,17 @@ public String generateFamilyAlertsKey(Long familyId) {
+ "THRESHOLD";
}

// 가족 quota 정보 키
public String generateFamilyInfoKey(Long familyId) {
return FAMILY_KEY_PREFIX + KEY_SEPARATOR + familyId + KEY_SEPARATOR + "info";
}

// 가족 잔여 데이터 키
public String generateFamilyRemainingKey(Long familyId) {
return FAMILY_KEY_PREFIX + KEY_SEPARATOR + familyId + KEY_SEPARATOR + "remaining";
}

// 고객 월별 사용량 키
public String generateFamilyCustomerMonthlyUsageKey(
Long familyId, Long customerId, LocalDate eventMonth) {
return FAMILY_KEY_PREFIX
Expand All @@ -49,6 +54,7 @@ public String generateFamilyCustomerMonthlyUsageKey(
+ eventMonth.format(MONTH_SUFFIX_FORMATTER);
}

// 고객 정책 제약 키
public String generateFamilyCustomerConstraintsKey(Long familyId, Long customerId) {
return FAMILY_KEY_PREFIX
+ KEY_SEPARATOR
Expand All @@ -61,7 +67,13 @@ public String generateFamilyCustomerConstraintsKey(Long familyId, Long customerI
+ "constraints";
}

// policy 이벤트 dedup 키
public String generatePolicyEventDedupKey(String eventId, Long customerId) {
return POLICY_EVENT_DEDUP_KEY_PREFIX + KEY_SEPARATOR + eventId + KEY_SEPARATOR + customerId;
}

// usage-event dedup 키
public String generateUsageEventDedupKey(String eventId) {
return USAGE_EVENT_DEDUP_KEY_PREFIX + KEY_SEPARATOR + eventId;
}
}
1 change: 1 addition & 0 deletions src/main/resources/application.yml
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ app:
kafka:
dedup:
policy-ttl-seconds: ${KAFKA_POLICY_DEDUP_TTL_SECONDS:3600}
usage-ttl-seconds: ${KAFKA_USAGE_DEDUP_TTL_SECONDS:1800}

server:
port: ${SERVER_PORT:8080}
Expand Down
Loading
Loading