From 1fa39c70869680e19bd6c1ff405ccb66604a0fbe Mon Sep 17 00:00:00 2001 From: ChoiSeungeon Date: Thu, 12 Mar 2026 11:13:49 +0900 Subject: [PATCH 1/6] =?UTF-8?q?DABOM-408=20chore:=20=EC=A4=91=EB=B3=B5=20?= =?UTF-8?q?=EB=B0=A9=EC=A7=80=20redis=20key=20TTL=20=EC=84=A4=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/resources/application.yml | 1 + src/test/resources/application-test.yml | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index a6bc53b..a8fea80 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -45,6 +45,7 @@ app: kafka: dedup: policy-ttl-seconds: ${KAFKA_POLICY_DEDUP_TTL_SECONDS:3600} + usage-ttl-seconds: ${KAFKA_USAGE_DEDUP_TTL_SECONDS:1800} server: port: ${SERVER_PORT:8080} diff --git a/src/test/resources/application-test.yml b/src/test/resources/application-test.yml index c7d1250..92e58de 100644 --- a/src/test/resources/application-test.yml +++ b/src/test/resources/application-test.yml @@ -30,3 +30,8 @@ spring: producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + +app: + kafka: + dedup: + usage-ttl-seconds: 60 From 4e698d3b7a7cc6601df9fd80a2d6cae6a974e17c Mon Sep 17 00:00:00 2001 From: ChoiSeungeon Date: Thu, 12 Mar 2026 11:14:41 +0900 Subject: [PATCH 2/6] =?UTF-8?q?DABOM-408=20feat:=20eventId=20=EA=B8=B0?= =?UTF-8?q?=EC=A4=80=20=EC=A4=91=EB=B3=B5=EC=B2=98=EB=A6=AC=20=EB=B0=A9?= =?UTF-8?q?=EC=A7=80=20=EB=A1=9C=EC=A7=81=20=EC=B6=94=EA=B0=80?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../usage/service/UsageSyncServiceImpl.java | 50 +++++++--- .../usage/service/dto/UsageUpdateResult.java | 4 +- .../service/helper/UsageLuaExecutor.java | 24 +++-- .../global/util/RedisKeyGenerator.java | 12 +++ src/main/resources/lua/usage_update.lua | 98 +++++++++++-------- 5 files changed, 122 insertions(+), 66 deletions(-) diff --git a/src/main/java/com/project/domain/usage/service/UsageSyncServiceImpl.java b/src/main/java/com/project/domain/usage/service/UsageSyncServiceImpl.java index 15ac1e0..8391ea9 100644 --- a/src/main/java/com/project/domain/usage/service/UsageSyncServiceImpl.java +++ b/src/main/java/com/project/domain/usage/service/UsageSyncServiceImpl.java @@ -6,9 +6,14 @@ import java.time.format.DateTimeParseException; import java.util.Locale; +import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service; +import com.dabom.messaging.kafka.contract.KafkaConsumerGroups; +import com.dabom.messaging.kafka.contract.KafkaEventTypes; +import com.dabom.messaging.kafka.contract.KafkaTopics; import com.dabom.messaging.kafka.event.dto.usage.UsagePayload; +import com.dabom.messaging.kafka.metrics.KafkaMetrics; import com.project.domain.policy.service.helper.PolicyConstraintWarmupHelper; import com.project.domain.usage.service.dto.UsageUpdateResult; import com.project.domain.usage.service.helper.UsageEventPublisher; @@ -30,19 +35,16 @@ public class UsageSyncServiceImpl implements UsageSyncService { private static final String EMPTY_APP_ID = ""; private final RedisKeyGenerator redisKeyGenerator; - - // Redis Warmup Service private final UsageRedisWarmupHelper usageRedisWarmupHelper; private final PolicyConstraintWarmupHelper policyConstraintWarmupHelper; - - // Lua Script 실행기 private final UsageLuaExecutor usageLuaExecutor; - - // 이벤트 발행기 private final UsageEventPublisher usageEventPublisher; - - // 로그 정리기 private final LogSanitizer logSanitizer; + private final KafkaMetrics kafkaMetrics; + + // usage-event dedup TTL + @Value("${app.kafka.dedup.usage-ttl-seconds}") + private long dedupTtlSeconds; @Override public void syncUsage(String eventId, String eventTime, UsagePayload payload) { @@ -50,10 +52,12 @@ public void syncUsage(String eventId, String eventTime, UsagePayload payload) { Long familyId = payload.familyId(); Long customerId = payload.customerId(); long usageBytes = payload.bytesUsed(); + + // 1) eventTime 해석 + 월 키 기준 계산 LocalDateTime resolvedEventDateTime = resolveEventDateTime(eventTime); LocalDate eventMonth = resolvedEventDateTime.toLocalDate().withDayOfMonth(1); - // Redis Key 생성 + // 2) Lua 실행에 필요한 Redis 키 생성 String infoKey = redisKeyGenerator.generateFamilyInfoKey(familyId); String remainingKey = redisKeyGenerator.generateFamilyRemainingKey(familyId); String monthlyKey = @@ -62,8 +66,9 @@ public void syncUsage(String eventId, String eventTime, UsagePayload payload) { String constraintsKey = redisKeyGenerator.generateFamilyCustomerConstraintsKey(familyId, customerId); String alertsKey = redisKeyGenerator.generateFamilyAlertsKey(familyId); + String dedupKey = redisKeyGenerator.generateUsageEventDedupKey(eventId); - // Warmup + // 3) Redis warmup 보장 boolean familyInfoRedisWarmup = usageRedisWarmupHelper.ensureFamilyInfoCached(familyId, infoKey); boolean familyRemainingRedisWarmup = @@ -83,7 +88,7 @@ public void syncUsage(String eventId, String eventTime, UsagePayload payload) { String currentHhmm = resolvedEventDateTime.format(HHMM_FORMATTER); String normalizedAppId = normalizeAppId(payload.appId()); - // Lua Script 실행 + 결과 파싱 + // 4) Lua로 정책 판정 + 사용량 반영 + dedup 검사 수행 UsageUpdateResult parsed = usageLuaExecutor.execute( new UsageLuaExecutor.UsageLuaCommand( @@ -92,9 +97,11 @@ public void syncUsage(String eventId, String eventTime, UsagePayload payload) { monthlyKey, constraintsKey, alertsKey, + dedupKey, usageBytes, currentHhmm, - normalizedAppId), + normalizedAppId, + dedupTtlSeconds), eventId); log.debug( "Usage Synced: family={}, customer={}, status={}", @@ -102,13 +109,26 @@ public void syncUsage(String eventId, String eventTime, UsagePayload payload) { customerId, parsed.status()); - // 이벤트 전파 + // 5) duplicate면 후속 publish 없이 종료 + if (parsed.duplicate()) { + kafkaMetrics.incrementDedupHit( + KafkaTopics.USAGE_EVENTS, + KafkaConsumerGroups.DABOM_PROCESSOR_USAGE_MAIN, + KafkaEventTypes.DATA_USAGE); + log.info( + "Skip duplicated usage event. eventId={}, familyId={}, customerId={}", + logSanitizer.sanitize(eventId), + familyId, + customerId); + return; + } + + // 6) downstream 이벤트 발행 usageEventPublisher.publish( new UsageEventPublisher.UsageEventContext(eventId, eventTime, payload, parsed)); } private LocalDateTime resolveEventDateTime(String eventTime) { - // producer가 전달한 eventTime이 있으면 우선 사용한다. if (eventTime != null && !eventTime.isBlank()) { try { return LocalDateTime.parse(eventTime); @@ -116,7 +136,6 @@ private LocalDateTime resolveEventDateTime(String eventTime) { log.debug("Failed to parse eventTime: {}", logSanitizer.sanitize(eventTime)); } } - // eventTime이 없거나 파싱 실패 시 서버 현재 시각으로 보정한다. return LocalDateTime.now(TimeConstants.ASIA_SEOUL); } @@ -125,6 +144,7 @@ private String normalizeAppId(String appId) { return EMPTY_APP_ID; } + // app 차단 정책 키와 비교할 수 있게 정규화 String normalized = appId.trim().toLowerCase(Locale.ROOT); return normalized.isEmpty() ? EMPTY_APP_ID : normalized; } diff --git a/src/main/java/com/project/domain/usage/service/dto/UsageUpdateResult.java b/src/main/java/com/project/domain/usage/service/dto/UsageUpdateResult.java index 399384a..237605a 100644 --- a/src/main/java/com/project/domain/usage/service/dto/UsageUpdateResult.java +++ b/src/main/java/com/project/domain/usage/service/dto/UsageUpdateResult.java @@ -6,4 +6,6 @@ public record UsageUpdateResult( String status, long monthlyUsed, double userRatio, - long monthlyLimit) {} + long monthlyLimit, + // usage-event duplicate 여부 + boolean duplicate) {} diff --git a/src/main/java/com/project/domain/usage/service/helper/UsageLuaExecutor.java b/src/main/java/com/project/domain/usage/service/helper/UsageLuaExecutor.java index 3af665b..e89529d 100644 --- a/src/main/java/com/project/domain/usage/service/helper/UsageLuaExecutor.java +++ b/src/main/java/com/project/domain/usage/service/helper/UsageLuaExecutor.java @@ -19,10 +19,9 @@ public class UsageLuaExecutor { private final StringRedisTemplate redisTemplate; private final RedisScript> usageUpdateScript; - private final LogSanitizer logSanitizer; - // Usage Lua 스크립트를 실행하고 결과를 도메인 DTO로 변환한다. + // usage Lua 실행 + 결과 파싱 public UsageUpdateResult execute(UsageLuaCommand command, String eventId) { List result = redisTemplate.execute( @@ -32,10 +31,12 @@ public UsageUpdateResult execute(UsageLuaCommand command, String eventId) { command.remainingKey(), command.monthlyKey(), command.constraintsKey(), - command.alertsKey()), + command.alertsKey(), + command.dedupKey()), String.valueOf(command.usageBytes()), command.currentHhmm(), - command.appId()); + command.appId(), + String.valueOf(command.dedupTtlSeconds())); if (result == null || result.isEmpty()) { log.error("Usage update script returned null. eventId={}", eventId); @@ -45,9 +46,9 @@ public UsageUpdateResult execute(UsageLuaCommand command, String eventId) { return parseScriptResult(result, eventId); } - // lua script 결과 파싱 + // Lua 결과 파싱 private UsageUpdateResult parseScriptResult(List result, String eventId) { - if (result.size() < 6) { + if (result.size() < 7) { log.error( "Usage update script returned invalid result. eventId={}, result={}", logSanitizer.sanitize(eventId), @@ -67,19 +68,24 @@ private UsageUpdateResult parseScriptResult(List result, String eventId) : Double.parseDouble(userRatioObj.toString()); long monthlyLimit = ((Number) result.get(5)).longValue(); + // 마지막 값은 duplicate 여부 + boolean duplicate = ((Number) result.get(6)).longValue() == 1L; return new UsageUpdateResult( - totalUsed, remaining, status, monthlyUsed, userRatio, monthlyLimit); + totalUsed, remaining, status, monthlyUsed, userRatio, monthlyLimit, duplicate); } - // Lua 실행에 필요한 키/인자를 한 번에 전달하기 위한 내부 커맨드 객체 + // Lua 실행에 필요한 인자 묶음 public record UsageLuaCommand( String infoKey, String remainingKey, String monthlyKey, String constraintsKey, String alertsKey, + // usage-event 중복 검사 키 + String dedupKey, long usageBytes, String currentHhmm, - String appId) {} + String appId, + long dedupTtlSeconds) {} } diff --git a/src/main/java/com/project/global/util/RedisKeyGenerator.java b/src/main/java/com/project/global/util/RedisKeyGenerator.java index 9040e67..ba914c1 100644 --- a/src/main/java/com/project/global/util/RedisKeyGenerator.java +++ b/src/main/java/com/project/global/util/RedisKeyGenerator.java @@ -11,9 +11,11 @@ public class RedisKeyGenerator { private static final String KEY_SEPARATOR = ":"; private static final String FAMILY_KEY_PREFIX = "family"; private static final String POLICY_EVENT_DEDUP_KEY_PREFIX = "event:dedup:policy"; + private static final String USAGE_EVENT_DEDUP_KEY_PREFIX = "event:dedup:usage"; private static final DateTimeFormatter MONTH_SUFFIX_FORMATTER = DateTimeFormatter.ofPattern("yyyyMM"); + // 가족 알림 상태 키 public String generateFamilyAlertsKey(Long familyId) { return FAMILY_KEY_PREFIX + KEY_SEPARATOR @@ -24,14 +26,17 @@ public String generateFamilyAlertsKey(Long familyId) { + "THRESHOLD"; } + // 가족 quota 정보 키 public String generateFamilyInfoKey(Long familyId) { return FAMILY_KEY_PREFIX + KEY_SEPARATOR + familyId + KEY_SEPARATOR + "info"; } + // 가족 잔여 데이터 키 public String generateFamilyRemainingKey(Long familyId) { return FAMILY_KEY_PREFIX + KEY_SEPARATOR + familyId + KEY_SEPARATOR + "remaining"; } + // 고객 월별 사용량 키 public String generateFamilyCustomerMonthlyUsageKey( Long familyId, Long customerId, LocalDate eventMonth) { return FAMILY_KEY_PREFIX @@ -49,6 +54,7 @@ public String generateFamilyCustomerMonthlyUsageKey( + eventMonth.format(MONTH_SUFFIX_FORMATTER); } + // 고객 정책 제약 키 public String generateFamilyCustomerConstraintsKey(Long familyId, Long customerId) { return FAMILY_KEY_PREFIX + KEY_SEPARATOR @@ -61,7 +67,13 @@ public String generateFamilyCustomerConstraintsKey(Long familyId, Long customerI + "constraints"; } + // policy 이벤트 dedup 키 public String generatePolicyEventDedupKey(String eventId, Long customerId) { return POLICY_EVENT_DEDUP_KEY_PREFIX + KEY_SEPARATOR + eventId + KEY_SEPARATOR + customerId; } + + // usage-event dedup 키 + public String generateUsageEventDedupKey(String eventId) { + return USAGE_EVENT_DEDUP_KEY_PREFIX + KEY_SEPARATOR + eventId; + } } diff --git a/src/main/resources/lua/usage_update.lua b/src/main/resources/lua/usage_update.lua index 7830fbf..f434a6b 100644 --- a/src/main/resources/lua/usage_update.lua +++ b/src/main/resources/lua/usage_update.lua @@ -3,51 +3,80 @@ -- KEYS[3]: family:{fid}:customer:{uid}:usage:monthly:{yyyyMM} -- KEYS[4]: family:{fid}:customer:{uid}:constraints -- KEYS[5]: family:{fid}:alert:THRESHOLD (prefix) +-- KEYS[6]: event:dedup:usage:{eventId} -- ARGV[1]: usageBytes -- ARGV[2]: currentHHmm (e.g. 2230) -- ARGV[3]: normalizedAppId +-- ARGV[4]: dedupTtlSeconds local usageBytes = tonumber(ARGV[1]) local currentHHmm = tonumber(ARGV[2] or '0') local appId = ARGV[3] or '' - --- 1. [Check] 개인 제약 조건 로딩 -local constraints_array = redis.call('HGETALL', KEYS[4]) -local constraints = {} -for i = 1, #constraints_array, 2 do - constraints[constraints_array[i]] = constraints_array[i+1] -end - -local monthlyLimitStr = constraints['LIMIT:DATA:MONTHLY'] +local dedupTtlSeconds = tonumber(ARGV[4] or '0') local monthlyLimit = -1 -if monthlyLimitStr then monthlyLimit = tonumber(monthlyLimitStr) end -local currentMonthly = tonumber(redis.call('GET', KEYS[3]) or '0') - --- (공통 반환 함수) -local function getResult(status, currentMonthlyUsed) +-- 공통 반환 형식 +local function getResult(status, currentMonthlyUsed, duplicate) local totalLimit = tonumber(redis.call('HGET', KEYS[1], 'totalQuota') or '0') local currentRemaining = tonumber(redis.call('GET', KEYS[2]) or totalLimit) local totalUsed = totalLimit - currentRemaining local userRatio = 0 if totalLimit > 0 then userRatio = currentMonthlyUsed / totalLimit end - return {totalUsed, currentRemaining, status, currentMonthlyUsed, userRatio, monthlyLimit} + return { + totalUsed, + currentRemaining, + status, + currentMonthlyUsed, + userRatio, + monthlyLimit, + duplicate and 1 or 0 + } end --- 1. [Block] 완전 차단 여부 +-- 1. usage-event dedup 검사 +if dedupTtlSeconds > 0 then + local firstSeen = redis.call('SET', KEYS[6], '1', 'NX', 'EX', dedupTtlSeconds) + if not firstSeen then + local duplicatedMonthly = tonumber(redis.call('GET', KEYS[3]) or '0') + local constraintsArrayOnDuplicate = redis.call('HGETALL', KEYS[4]) + local constraintsOnDuplicate = {} + for i = 1, #constraintsArrayOnDuplicate, 2 do + constraintsOnDuplicate[constraintsArrayOnDuplicate[i]] = + constraintsArrayOnDuplicate[i + 1] + end + + local duplicatedLimitStr = constraintsOnDuplicate['LIMIT:DATA:MONTHLY'] + if duplicatedLimitStr then monthlyLimit = tonumber(duplicatedLimitStr) end + + return getResult("DUPLICATE", duplicatedMonthly, true) + end +end + +-- 2. 고객 정책 제약 로딩 +local constraintsArray = redis.call('HGETALL', KEYS[4]) +local constraints = {} +for i = 1, #constraintsArray, 2 do + constraints[constraintsArray[i]] = constraintsArray[i + 1] +end + +local monthlyLimitStr = constraints['LIMIT:DATA:MONTHLY'] +if monthlyLimitStr then monthlyLimit = tonumber(monthlyLimitStr) end + +local currentMonthly = tonumber(redis.call('GET', KEYS[3]) or '0') + +-- 3. 차단/제한 조건 검사 if constraints['BLOCK:ACCESS'] == "1" then - return getResult("MANUAL", currentMonthly) + return getResult("MANUAL", currentMonthly, false) end if appId ~= '' and constraints['BLOCK:APP:' .. appId] == "1" then - return getResult("APP_BLOCK", currentMonthly) + return getResult("APP_BLOCK", currentMonthly, false) end local blockStart = nil local blockEnd = nil --- BLOCK:TIME = "HHMM-HHMM" 해당 포멧을 기준으로 파싱 local blockTimeRange = constraints['BLOCK:TIME'] if blockTimeRange then local dashPos = string.find(blockTimeRange, "-", 1, true) @@ -59,32 +88,26 @@ if blockTimeRange then end end --- 2. [Block] 시간 차단 여부 if blockStart and blockEnd then if blockStart < blockEnd then - -- same-day window, e.g. 0900~1800 if currentHHmm >= blockStart and currentHHmm < blockEnd then - return getResult("TIME_BLOCK", currentMonthly) + return getResult("TIME_BLOCK", currentMonthly, false) end elseif blockStart > blockEnd then - -- overnight window, e.g. 2200~0700 if currentHHmm >= blockStart or currentHHmm < blockEnd then - return getResult("TIME_BLOCK", currentMonthly) + return getResult("TIME_BLOCK", currentMonthly, false) end else - -- start == end means full-day block - return getResult("TIME_BLOCK", currentMonthly) + return getResult("TIME_BLOCK", currentMonthly, false) end end --- 3. [Limit] 개인 월간 한도 초과 여부 if monthlyLimit ~= -1 then if (currentMonthly + usageBytes) > monthlyLimit then - return getResult("MONTHLY_LIMIT_EXCEEDED", currentMonthly) + return getResult("MONTHLY_LIMIT_EXCEEDED", currentMonthly, false) end end --- 4. [Quota] 가족 잔여량 부족 여부 local currentRemaining = tonumber(redis.call('GET', KEYS[2])) if currentRemaining == nil then local totalLimit = tonumber(redis.call('HGET', KEYS[1], 'totalQuota') or '0') @@ -92,16 +115,14 @@ if currentRemaining == nil then end if currentRemaining < usageBytes then - return getResult("FAMILY_QUOTA_EXCEEDED", currentMonthly) + return getResult("FAMILY_QUOTA_EXCEEDED", currentMonthly, false) end --- 5. 가족 잔여량 차감 +-- 4. 사용량 반영 local newRemaining = redis.call('DECRBY', KEYS[2], usageBytes) - --- 6. 개인 월간 사용량 증가 local newMonthly = redis.call('INCRBY', KEYS[3], usageBytes) --- 7. [Result] 상태 결정 +-- 5. 경고 상태 계산 local limitStr = redis.call('HGET', KEYS[1], 'totalQuota') local totalLimit = tonumber(limitStr or '0') local status = "NORMAL" @@ -114,7 +135,6 @@ else ratio = newRemaining / totalLimit end - -- 현재 미도달 경고 중 가장 작은 임계 local alertLevel = nil if ratio < 0.1 then alertLevel = "10" @@ -127,25 +147,21 @@ else status = "WARNING_50" end - -- 경고 상태이면 중복 체크 + -- 같은 임계치 알림은 한 번만 발행되게 보호 if alertLevel then local alertKey = KEYS[5] .. ":" .. alertLevel - -- 이미 알림을 보냈는지 확인 local isSent = redis.call('EXISTS', alertKey) if isSent == 1 then - -- 이미 보낸 레벨이면 상태를 NORMAL로 덮어씀 status = "NORMAL" else - -- 아직 안보냈으면 알림 상태 기록 (PUBLISHED) redis.call('SET', alertKey, "PUBLISHED") end end end --- 8. 최종 반환 local totalUsed = totalLimit - newRemaining local userRatio = 0 if totalLimit > 0 then userRatio = newMonthly / totalLimit end -return {totalUsed, newRemaining, status, newMonthly, userRatio, monthlyLimit} +return {totalUsed, newRemaining, status, newMonthly, userRatio, monthlyLimit, 0} From 6653c07c4e75cae895e53dd4bb00469a1f0ff628 Mon Sep 17 00:00:00 2001 From: ChoiSeungeon Date: Thu, 12 Mar 2026 11:15:08 +0900 Subject: [PATCH 3/6] =?UTF-8?q?DABOM-408=20test:=20=EC=A4=91=EB=B3=B5=20?= =?UTF-8?q?=EC=B2=98=EB=A6=AC=20=EB=B0=A9=EC=A7=80=20=EC=B2=98=EB=A6=AC=20?= =?UTF-8?q?=EB=A1=9C=EC=A7=81=20=EC=B6=94=EA=B0=80=EB=A1=9C=20=EC=9D=B8?= =?UTF-8?q?=ED=95=9C=20=ED=85=8C=EC=8A=A4=ED=8A=B8=20=EC=BD=94=EB=93=9C=20?= =?UTF-8?q?=EC=88=98=EC=A0=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../service/UsageSyncServiceImplTest.java | 50 +++++++++++++++++-- .../helper/UsageEventPublisherTest.java | 9 ++-- .../service/helper/UsageLuaExecutorTest.java | 41 +++++++++++++-- 3 files changed, 86 insertions(+), 14 deletions(-) diff --git a/src/test/java/com/project/domain/usage/service/UsageSyncServiceImplTest.java b/src/test/java/com/project/domain/usage/service/UsageSyncServiceImplTest.java index db3902d..33b4ad7 100644 --- a/src/test/java/com/project/domain/usage/service/UsageSyncServiceImplTest.java +++ b/src/test/java/com/project/domain/usage/service/UsageSyncServiceImplTest.java @@ -21,8 +21,13 @@ import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.springframework.test.util.ReflectionTestUtils; +import com.dabom.messaging.kafka.contract.KafkaConsumerGroups; +import com.dabom.messaging.kafka.contract.KafkaEventTypes; +import com.dabom.messaging.kafka.contract.KafkaTopics; import com.dabom.messaging.kafka.event.dto.usage.UsagePayload; +import com.dabom.messaging.kafka.metrics.KafkaMetrics; import com.project.domain.policy.service.helper.PolicyConstraintWarmupHelper; import com.project.domain.usage.service.dto.UsageUpdateResult; import com.project.domain.usage.service.helper.UsageEventPublisher; @@ -42,9 +47,11 @@ class UsageSyncServiceImplTest { @Mock private UsageLuaExecutor usageLuaExecutor; @Mock private UsageEventPublisher usageEventPublisher; @Mock private LogSanitizer logSanitizer; + @Mock private KafkaMetrics kafkaMetrics; @BeforeEach void setUp() { + ReflectionTestUtils.setField(usageSyncServiceImpl, "dedupTtlSeconds", 60L); lenient() .when(logSanitizer.sanitize(nullable(String.class))) .thenAnswer( @@ -62,10 +69,10 @@ void syncUsage_SuccessFlow() { LocalDate eventMonth = LocalDate.of(2026, 3, 1); UsagePayload payload = new UsagePayload(100L, 1L, "appId", 1024L, Map.of()); - stubCommon(100L, 1L, eventMonth); + stubCommon(100L, 1L, eventMonth, eventId); UsageUpdateResult luaResult = - new UsageUpdateResult(5000L, 5000L, "NORMAL", 1000L, 0.1, 10000L); + new UsageUpdateResult(5000L, 5000L, "NORMAL", 1000L, 0.1, 10000L, false); given(usageLuaExecutor.execute(any(UsageLuaExecutor.UsageLuaCommand.class), eq(eventId))) .willReturn(luaResult); @@ -81,12 +88,15 @@ void syncUsage_SuccessFlow() { assertEquals("monthlyKey", command.monthlyKey()); assertEquals("constraintsKey", command.constraintsKey()); assertEquals("alertsKey", command.alertsKey()); + assertEquals("event:dedup:usage:evt_1", command.dedupKey()); assertEquals(1024L, command.usageBytes()); assertEquals("1234", command.currentHhmm()); assertEquals("appid", command.appId()); + assertEquals(60L, command.dedupTtlSeconds()); verify(usageEventPublisher, times(1)) .publish(any(UsageEventPublisher.UsageEventContext.class)); + verify(kafkaMetrics, never()).incrementDedupHit(any(), any(), any()); } @Test @@ -97,9 +107,11 @@ void syncUsage_NormalizesAppIdBeforeLuaExecution() { LocalDate eventMonth = LocalDate.of(2026, 3, 1); UsagePayload payload = new UsagePayload(100L, 1L, " Com.YouTube.App ", 1024L, Map.of()); - stubCommon(100L, 1L, eventMonth); + stubCommon(100L, 1L, eventMonth, eventId); given(usageLuaExecutor.execute(any(UsageLuaExecutor.UsageLuaCommand.class), eq(eventId))) - .willReturn(new UsageUpdateResult(5000L, 5000L, "APP_BLOCK", 1000L, 0.1, 10000L)); + .willReturn( + new UsageUpdateResult( + 5000L, 5000L, "APP_BLOCK", 1000L, 0.1, 10000L, false)); usageSyncServiceImpl.syncUsage(eventId, eventTime, payload); @@ -126,6 +138,8 @@ void syncUsage_WarmupFailed() { given(redisKeyGenerator.generateFamilyCustomerConstraintsKey(100L, 1L)) .willReturn("constraintsKey"); given(redisKeyGenerator.generateFamilyAlertsKey(100L)).willReturn("alertsKey"); + given(redisKeyGenerator.generateUsageEventDedupKey(eventId)) + .willReturn("event:dedup:usage:" + eventId); given(usageRedisWarmupHelper.ensureFamilyInfoCached(100L, "family:100:info")) .willReturn(false); @@ -140,7 +154,31 @@ void syncUsage_WarmupFailed() { verify(usageEventPublisher, never()).publish(any()); } - private void stubCommon(long familyId, long customerId, LocalDate eventMonth) { + @Test + @DisplayName("중복 이벤트면 publish를 생략하고 dedup metric만 기록한다") + void syncUsage_DuplicateSkipsPublish() { + String eventId = "evt_dup"; + String eventTime = "2026-03-04T12:34:56"; + LocalDate eventMonth = LocalDate.of(2026, 3, 1); + UsagePayload payload = new UsagePayload(100L, 1L, "appId", 1024L, Map.of()); + + stubCommon(100L, 1L, eventMonth, eventId); + given(usageLuaExecutor.execute(any(UsageLuaExecutor.UsageLuaCommand.class), eq(eventId))) + .willReturn( + new UsageUpdateResult( + 5000L, 5000L, "DUPLICATE", 1000L, 0.1, 10000L, true)); + + usageSyncServiceImpl.syncUsage(eventId, eventTime, payload); + + verify(usageEventPublisher, never()).publish(any()); + verify(kafkaMetrics, times(1)) + .incrementDedupHit( + eq(KafkaTopics.USAGE_EVENTS), + eq(KafkaConsumerGroups.DABOM_PROCESSOR_USAGE_MAIN), + eq(KafkaEventTypes.DATA_USAGE)); + } + + private void stubCommon(long familyId, long customerId, LocalDate eventMonth, String eventId) { given(redisKeyGenerator.generateFamilyInfoKey(familyId)) .willReturn("family:" + familyId + ":info"); given(redisKeyGenerator.generateFamilyRemainingKey(familyId)) @@ -152,6 +190,8 @@ private void stubCommon(long familyId, long customerId, LocalDate eventMonth) { given(redisKeyGenerator.generateFamilyCustomerConstraintsKey(familyId, customerId)) .willReturn("constraintsKey"); given(redisKeyGenerator.generateFamilyAlertsKey(familyId)).willReturn("alertsKey"); + given(redisKeyGenerator.generateUsageEventDedupKey(eventId)) + .willReturn("event:dedup:usage:" + eventId); given( usageRedisWarmupHelper.ensureFamilyInfoCached( familyId, "family:" + familyId + ":info")) diff --git a/src/test/java/com/project/domain/usage/service/helper/UsageEventPublisherTest.java b/src/test/java/com/project/domain/usage/service/helper/UsageEventPublisherTest.java index 201331c..e045e0b 100644 --- a/src/test/java/com/project/domain/usage/service/helper/UsageEventPublisherTest.java +++ b/src/test/java/com/project/domain/usage/service/helper/UsageEventPublisherTest.java @@ -39,7 +39,7 @@ class UsageEventPublisherTest { void publish_Normal() { UsagePayload payload = new UsagePayload(100L, 1L, "app", 1024L, Map.of()); UsageUpdateResult result = - new UsageUpdateResult(5000L, 5000L, "NORMAL", 1000L, 0.1, 10000L); + new UsageUpdateResult(5000L, 5000L, "NORMAL", 1000L, 0.1, 10000L, false); UsageEventPublisher.UsageEventContext ctx = new UsageEventPublisher.UsageEventContext( "evt_1", "2026-02-24T12:00:00", payload, result); @@ -67,7 +67,7 @@ void publish_Normal() { void publish_Warning() { UsagePayload payload = new UsagePayload(100L, 1L, "app", 1024L, Map.of()); UsageUpdateResult result = - new UsageUpdateResult(9000L, 1000L, "WARNING_10", 2000L, 0.2, 10000L); + new UsageUpdateResult(9000L, 1000L, "WARNING_10", 2000L, 0.2, 10000L, false); UsageEventPublisher.UsageEventContext ctx = new UsageEventPublisher.UsageEventContext( "evt_2", "2026-02-24T12:00:00", payload, result); @@ -91,7 +91,8 @@ void publish_Warning() { void publish_Blocked() { UsagePayload payload = new UsagePayload(100L, 1L, "app", 1024L, Map.of()); UsageUpdateResult result = - new UsageUpdateResult(8000L, 2000L, "MONTHLY_LIMIT_EXCEEDED", 10001L, 1.0, 10000L); + new UsageUpdateResult( + 8000L, 2000L, "MONTHLY_LIMIT_EXCEEDED", 10001L, 1.0, 10000L, false); UsageEventPublisher.UsageEventContext ctx = new UsageEventPublisher.UsageEventContext( "evt_3", "2026-02-24T12:00:00", payload, result); @@ -113,7 +114,7 @@ void publish_Blocked() { void publish_AppBlock_SkipsPersistAndRealtime() { UsagePayload payload = new UsagePayload(100L, 1L, "app", 1024L, Map.of()); UsageUpdateResult result = - new UsageUpdateResult(8000L, 2000L, "APP_BLOCK", 10001L, 1.0, 10000L); + new UsageUpdateResult(8000L, 2000L, "APP_BLOCK", 10001L, 1.0, 10000L, false); UsageEventPublisher.UsageEventContext ctx = new UsageEventPublisher.UsageEventContext( "evt_4", "2026-02-24T12:00:00", payload, result); diff --git a/src/test/java/com/project/domain/usage/service/helper/UsageLuaExecutorTest.java b/src/test/java/com/project/domain/usage/service/helper/UsageLuaExecutorTest.java index 6cb275a..772de21 100644 --- a/src/test/java/com/project/domain/usage/service/helper/UsageLuaExecutorTest.java +++ b/src/test/java/com/project/domain/usage/service/helper/UsageLuaExecutorTest.java @@ -55,9 +55,11 @@ void execute_ParseSuccess() { "monthlyKey", "constraintsKey", "alertsKey", + "event:dedup:usage:evt_1", 1024L, "2230", - "com.youtube.app"); + "com.youtube.app", + 60L); given( redisTemplate.execute( @@ -65,8 +67,9 @@ void execute_ParseSuccess() { anyList(), any(Object.class), any(Object.class), + any(Object.class), any(Object.class))) - .willReturn(List.of(5000L, 5000L, "NORMAL", 1000L, 0.1, 10000L)); + .willReturn(List.of(5000L, 5000L, "NORMAL", 1000L, 0.1, 10000L, 0L)); UsageUpdateResult result = usageLuaExecutor.execute(command, "evt_1"); @@ -76,7 +79,8 @@ void execute_ParseSuccess() { anyList(), eq("1024"), eq("2230"), - eq("com.youtube.app")); + eq("com.youtube.app"), + eq("60")); assertEquals(5000L, result.totalUsed()); assertEquals(5000L, result.remaining()); @@ -84,19 +88,44 @@ void execute_ParseSuccess() { assertEquals(1000L, result.monthlyUsed()); assertEquals(0.1, result.userRatio()); assertEquals(10000L, result.monthlyLimit()); + assertEquals(false, result.duplicate()); + } + + @Test + @DisplayName("duplicate 플래그를 파싱한다") + void execute_ParseDuplicateFlag() { + UsageLuaExecutor.UsageLuaCommand command = + new UsageLuaExecutor.UsageLuaCommand( + "a", "b", "c", "d", "e", "dup", 1L, "0000", "", 60L); + given( + redisTemplate.execute( + eq(usageUpdateScript), + anyList(), + any(Object.class), + any(Object.class), + any(Object.class), + any(Object.class))) + .willReturn(List.of(100L, 900L, "DUPLICATE", 50L, 0.05, -1L, 1L)); + + UsageUpdateResult result = usageLuaExecutor.execute(command, "evt_dup"); + + assertEquals(true, result.duplicate()); + assertEquals("DUPLICATE", result.status()); } @Test @DisplayName("Lua 결과가 null이면 예외를 던진다") void execute_NullResult() { UsageLuaExecutor.UsageLuaCommand command = - new UsageLuaExecutor.UsageLuaCommand("a", "b", "c", "d", "e", 1L, "0000", ""); + new UsageLuaExecutor.UsageLuaCommand( + "a", "b", "c", "d", "e", "dup", 1L, "0000", "", 60L); given( redisTemplate.execute( eq(usageUpdateScript), anyList(), any(Object.class), any(Object.class), + any(Object.class), any(Object.class))) .willReturn(null); @@ -107,13 +136,15 @@ void execute_NullResult() { @DisplayName("Lua 결과 길이가 부족하면 예외를 던진다") void execute_InvalidResultSize() { UsageLuaExecutor.UsageLuaCommand command = - new UsageLuaExecutor.UsageLuaCommand("a", "b", "c", "d", "e", 1L, "0000", ""); + new UsageLuaExecutor.UsageLuaCommand( + "a", "b", "c", "d", "e", "dup", 1L, "0000", "", 60L); given( redisTemplate.execute( eq(usageUpdateScript), anyList(), any(Object.class), any(Object.class), + any(Object.class), any(Object.class))) .willReturn(List.of(1L, 2L, "NORMAL")); From 9d8cd54907e38268480f6a65f953a51e0cf82c84 Mon Sep 17 00:00:00 2001 From: ChoiSeungeon Date: Thu, 12 Mar 2026 11:17:59 +0900 Subject: [PATCH 4/6] =?UTF-8?q?DABOM-408=20style:=20checkStyle=20=EC=A0=81?= =?UTF-8?q?=EC=9A=A9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../project/domain/usage/service/UsageSyncServiceImplTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/test/java/com/project/domain/usage/service/UsageSyncServiceImplTest.java b/src/test/java/com/project/domain/usage/service/UsageSyncServiceImplTest.java index 33b4ad7..35fde01 100644 --- a/src/test/java/com/project/domain/usage/service/UsageSyncServiceImplTest.java +++ b/src/test/java/com/project/domain/usage/service/UsageSyncServiceImplTest.java @@ -165,8 +165,7 @@ void syncUsage_DuplicateSkipsPublish() { stubCommon(100L, 1L, eventMonth, eventId); given(usageLuaExecutor.execute(any(UsageLuaExecutor.UsageLuaCommand.class), eq(eventId))) .willReturn( - new UsageUpdateResult( - 5000L, 5000L, "DUPLICATE", 1000L, 0.1, 10000L, true)); + new UsageUpdateResult(5000L, 5000L, "DUPLICATE", 1000L, 0.1, 10000L, true)); usageSyncServiceImpl.syncUsage(eventId, eventTime, payload); From 198bb5f3ecb193ffddb9d0f4e2e670cc7b328b36 Mon Sep 17 00:00:00 2001 From: ChoiSeungeon Date: Thu, 12 Mar 2026 11:26:46 +0900 Subject: [PATCH 5/6] =?UTF-8?q?DABOM-408=20test:=20=EC=A4=91=EB=B3=B5?= =?UTF-8?q?=EB=90=9C=20eq()=20=EC=A0=9C=EA=B1=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../domain/usage/service/UsageSyncServiceImplTest.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/test/java/com/project/domain/usage/service/UsageSyncServiceImplTest.java b/src/test/java/com/project/domain/usage/service/UsageSyncServiceImplTest.java index 35fde01..a16c6e3 100644 --- a/src/test/java/com/project/domain/usage/service/UsageSyncServiceImplTest.java +++ b/src/test/java/com/project/domain/usage/service/UsageSyncServiceImplTest.java @@ -172,9 +172,9 @@ void syncUsage_DuplicateSkipsPublish() { verify(usageEventPublisher, never()).publish(any()); verify(kafkaMetrics, times(1)) .incrementDedupHit( - eq(KafkaTopics.USAGE_EVENTS), - eq(KafkaConsumerGroups.DABOM_PROCESSOR_USAGE_MAIN), - eq(KafkaEventTypes.DATA_USAGE)); + KafkaTopics.USAGE_EVENTS, + KafkaConsumerGroups.DABOM_PROCESSOR_USAGE_MAIN, + KafkaEventTypes.DATA_USAGE); } private void stubCommon(long familyId, long customerId, LocalDate eventMonth, String eventId) { From 42160b195e52936c87a7c6b16c9a43e5c8586eb0 Mon Sep 17 00:00:00 2001 From: ChoiSeungeon Date: Thu, 12 Mar 2026 11:31:53 +0900 Subject: [PATCH 6/6] =?UTF-8?q?DABOM-408=20refactor:=20=EC=A4=91=EB=B3=B5?= =?UTF-8?q?=20=EC=9D=B4=EB=B2=A4=ED=8A=B8=20=EC=B2=98=EB=A6=AC=20=EC=8B=9C?= =?UTF-8?q?=20=EB=B6=88=ED=95=84=EC=9A=94=ED=95=9C=20=EC=83=81=EC=84=B8=20?= =?UTF-8?q?=EB=8D=B0=EC=9D=B4=ED=84=B0=EB=A5=BC=20=EC=A0=9C=EA=B1=B0?= =?UTF-8?q?=ED=95=98=EC=97=AC=20=EB=A1=9C=EC=A7=81=20=EB=8B=A8=EC=88=9C?= =?UTF-8?q?=ED=99=94?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/resources/lua/usage_update.lua | 14 ++------------ 1 file changed, 2 insertions(+), 12 deletions(-) diff --git a/src/main/resources/lua/usage_update.lua b/src/main/resources/lua/usage_update.lua index f434a6b..b94c3f5 100644 --- a/src/main/resources/lua/usage_update.lua +++ b/src/main/resources/lua/usage_update.lua @@ -38,18 +38,8 @@ end if dedupTtlSeconds > 0 then local firstSeen = redis.call('SET', KEYS[6], '1', 'NX', 'EX', dedupTtlSeconds) if not firstSeen then - local duplicatedMonthly = tonumber(redis.call('GET', KEYS[3]) or '0') - local constraintsArrayOnDuplicate = redis.call('HGETALL', KEYS[4]) - local constraintsOnDuplicate = {} - for i = 1, #constraintsArrayOnDuplicate, 2 do - constraintsOnDuplicate[constraintsArrayOnDuplicate[i]] = - constraintsArrayOnDuplicate[i + 1] - end - - local duplicatedLimitStr = constraintsOnDuplicate['LIMIT:DATA:MONTHLY'] - if duplicatedLimitStr then monthlyLimit = tonumber(duplicatedLimitStr) end - - return getResult("DUPLICATE", duplicatedMonthly, true) + -- duplicate면 Java에서 상세 수치를 사용하지 않으므로 고정값만 반환한다. + return {0, 0, "DUPLICATE", 0, 0, -1, 1} end end