fix: event and replay schemas moved to schemas folder, all other domain obj - converted to python dataclasses#179
fix: event and replay schemas moved to schemas folder, all other domain obj - converted to python dataclasses#179
Conversation
…in obj - converted to python dataclasses
📝 WalkthroughWalkthroughMigrates many Pydantic models to Python dataclasses and updates callsites to use dataclasses.asdict()/constructor unpacking instead of model_validate()/model_dump(); reorganizes several Pydantic schemas into new modules (notably event and replay schemas) and updates imports across routes, repositories, services, and tests. Changes
Estimated code review effort🎯 4 (Complex) | ⏱️ ~55 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
5 issues found across 47 files
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="backend/app/schemas_pydantic/admin_settings.py">
<violation number="1" location="backend/app/schemas_pydantic/admin_settings.py:6">
P2: The CPU validation regex only allows millicore values (e.g., `500m`) and rejects valid Kubernetes quantities like `1` or `0.5`, which the docs allow. This will make legitimate CPU limits fail validation.</violation>
</file>
<file name="backend/app/schemas_pydantic/dlq.py">
<violation number="1" location="backend/app/schemas_pydantic/dlq.py:84">
P2: This new `DLQTopicSummary` duplicates `DLQTopicSummaryResponse` exactly. Keeping two identical models increases maintenance risk and ambiguity—prefer reusing the existing response model or aliasing it instead of redefining it.</violation>
</file>
<file name="backend/app/domain/user/user_models.py">
<violation number="1" location="backend/app/domain/user/user_models.py:108">
P2: After migrating from Pydantic to dataclass, `is_valid()` can crash with `TypeError` if `email` is `None`. Unlike Pydantic, dataclasses don't enforce runtime types, so `email=None` is now possible. Since `all([...])` eagerly evaluates all list elements, `EMAIL_PATTERN.match(self.email)` runs even though `self.email` is falsy. Consider short-circuiting or guarding the email pattern match.</violation>
</file>
<file name="backend/app/api/routes/saved_scripts.py">
<violation number="1" location="backend/app/api/routes/saved_scripts.py:71">
P2: `model_dump()` includes optional fields with `None` defaults, so the update payload will always carry `None` for omitted fields and a new `updated_at`. That can overwrite existing values unintentionally. Use `model_dump(exclude_unset=True)` when building the dataclass so only client-provided fields are updated.</violation>
</file>
<file name="backend/app/services/user_settings_service.py">
<violation number="1" location="backend/app/services/user_settings_service.py:197">
P1: `_apply_event` passes event metadata fields (event_id, event_type, timestamp, changed_fields, reason) into `DomainUserSettings(**...)`, which will raise a `TypeError` because those keys aren’t valid `DomainUserSettings` parameters. Filter the event dict to only settings fields before constructing the settings model.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| {**settings.model_dump(), **event.model_dump(exclude_none=True), "updated_at": event.timestamp} | ||
| ) | ||
| """Apply a settings update event via dict merge.""" | ||
| return DomainUserSettings(**{ |
There was a problem hiding this comment.
P1: _apply_event passes event metadata fields (event_id, event_type, timestamp, changed_fields, reason) into DomainUserSettings(**...), which will raise a TypeError because those keys aren’t valid DomainUserSettings parameters. Filter the event dict to only settings fields before constructing the settings model.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/services/user_settings_service.py, line 197:
<comment>`_apply_event` passes event metadata fields (event_id, event_type, timestamp, changed_fields, reason) into `DomainUserSettings(**...)`, which will raise a `TypeError` because those keys aren’t valid `DomainUserSettings` parameters. Filter the event dict to only settings fields before constructing the settings model.</comment>
<file context>
@@ -190,10 +193,12 @@ async def restore_settings_to_point(self, user_id: str, timestamp: datetime) ->
- {**settings.model_dump(), **event.model_dump(exclude_none=True), "updated_at": event.timestamp}
- )
+ """Apply a settings update event via dict merge."""
+ return DomainUserSettings(**{
+ **dataclasses.asdict(settings),
+ **{k: v for k, v in dataclasses.asdict(event).items() if v is not None},
</file context>
| from app.domain.admin.settings_models import LogLevel | ||
|
|
||
| K8S_MEMORY_PATTERN = r"^[1-9]\d*(Ki|Mi|Gi)$" | ||
| K8S_CPU_PATTERN = r"^[1-9]\d*m$" |
There was a problem hiding this comment.
P2: The CPU validation regex only allows millicore values (e.g., 500m) and rejects valid Kubernetes quantities like 1 or 0.5, which the docs allow. This will make legitimate CPU limits fail validation.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/schemas_pydantic/admin_settings.py, line 6:
<comment>The CPU validation regex only allows millicore values (e.g., `500m`) and rejects valid Kubernetes quantities like `1` or `0.5`, which the docs allow. This will make legitimate CPU limits fail validation.</comment>
<file context>
@@ -1,4 +1,30 @@
+from app.domain.admin.settings_models import LogLevel
+
+K8S_MEMORY_PATTERN = r"^[1-9]\d*(Ki|Mi|Gi)$"
+K8S_CPU_PATTERN = r"^[1-9]\d*m$"
+
+
</file context>
| K8S_CPU_PATTERN = r"^[1-9]\d*m$" | |
| K8S_CPU_PATTERN = r"^(?:[1-9]\d*(?:\.\d+)?|0\.\d+|[1-9]\d*m)$" |
| class DLQTopicSummary(BaseModel): | ||
| """Summary of a topic in DLQ.""" | ||
|
|
||
| model_config = ConfigDict(from_attributes=True) | ||
|
|
||
| topic: str | ||
| total_messages: int | ||
| status_breakdown: dict[DLQMessageStatus, int] | ||
| oldest_message: datetime | ||
| newest_message: datetime | ||
| avg_retry_count: float | ||
| max_retry_count: int |
There was a problem hiding this comment.
P2: This new DLQTopicSummary duplicates DLQTopicSummaryResponse exactly. Keeping two identical models increases maintenance risk and ambiguity—prefer reusing the existing response model or aliasing it instead of redefining it.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/schemas_pydantic/dlq.py, line 84:
<comment>This new `DLQTopicSummary` duplicates `DLQTopicSummaryResponse` exactly. Keeping two identical models increases maintenance risk and ambiguity—prefer reusing the existing response model or aliasing it instead of redefining it.</comment>
<file context>
@@ -81,6 +81,20 @@ class DLQTopicSummaryResponse(BaseModel):
max_retry_count: int
+class DLQTopicSummary(BaseModel):
+ """Summary of a topic in DLQ."""
+
</file context>
| class DLQTopicSummary(BaseModel): | |
| """Summary of a topic in DLQ.""" | |
| model_config = ConfigDict(from_attributes=True) | |
| topic: str | |
| total_messages: int | |
| status_breakdown: dict[DLQMessageStatus, int] | |
| oldest_message: datetime | |
| newest_message: datetime | |
| avg_retry_count: float | |
| max_retry_count: int | |
| DLQTopicSummary = DLQTopicSummaryResponse |
| self.email, | ||
| self.password and len(self.password) >= 8, | ||
| EMAIL_PATTERN.match(self.email) is not None, # Proper email validation | ||
| EMAIL_PATTERN.match(self.email) is not None, |
There was a problem hiding this comment.
P2: After migrating from Pydantic to dataclass, is_valid() can crash with TypeError if email is None. Unlike Pydantic, dataclasses don't enforce runtime types, so email=None is now possible. Since all([...]) eagerly evaluates all list elements, EMAIL_PATTERN.match(self.email) runs even though self.email is falsy. Consider short-circuiting or guarding the email pattern match.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/domain/user/user_models.py, line 108:
<comment>After migrating from Pydantic to dataclass, `is_valid()` can crash with `TypeError` if `email` is `None`. Unlike Pydantic, dataclasses don't enforce runtime types, so `email=None` is now possible. Since `all([...])` eagerly evaluates all list elements, `EMAIL_PATTERN.match(self.email)` runs even though `self.email` is falsy. Consider short-circuiting or guarding the email pattern match.</comment>
<file context>
@@ -114,16 +105,15 @@ def is_valid(self) -> bool:
self.email,
self.password and len(self.password) >= 8,
- EMAIL_PATTERN.match(self.email) is not None, # Proper email validation
+ EMAIL_PATTERN.match(self.email) is not None,
]
)
</file context>
| EMAIL_PATTERN.match(self.email) is not None, | |
| self.email and EMAIL_PATTERN.match(self.email) is not None, |
| ) -> SavedScriptResponse: | ||
| """Update an existing saved script.""" | ||
| update_data = DomainSavedScriptUpdate.model_validate(script_update) | ||
| update_data = DomainSavedScriptUpdate(**script_update.model_dump()) |
There was a problem hiding this comment.
P2: model_dump() includes optional fields with None defaults, so the update payload will always carry None for omitted fields and a new updated_at. That can overwrite existing values unintentionally. Use model_dump(exclude_unset=True) when building the dataclass so only client-provided fields are updated.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/api/routes/saved_scripts.py, line 71:
<comment>`model_dump()` includes optional fields with `None` defaults, so the update payload will always carry `None` for omitted fields and a new `updated_at`. That can overwrite existing values unintentionally. Use `model_dump(exclude_unset=True)` when building the dataclass so only client-provided fields are updated.</comment>
<file context>
@@ -68,7 +68,7 @@ async def update_saved_script(
) -> SavedScriptResponse:
"""Update an existing saved script."""
- update_data = DomainSavedScriptUpdate.model_validate(script_update)
+ update_data = DomainSavedScriptUpdate(**script_update.model_dump())
domain = await saved_script_service.update_saved_script(script_id, user.user_id, update_data)
return SavedScriptResponse.model_validate(domain)
</file context>
| update_data = DomainSavedScriptUpdate(**script_update.model_dump()) | |
| update_data = DomainSavedScriptUpdate(**script_update.model_dump(exclude_unset=True)) |
There was a problem hiding this comment.
Actionable comments posted: 4
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
backend/app/domain/replay/models.py (1)
40-52:⚠️ Potential issue | 🟡 Minor
is_empty()omitsexclude_event_types— a filter with only exclusions would appear "empty".
exclude_event_typesis not checked inis_empty(). If aReplayFilterhas onlyexclude_event_typesset (all other fieldsNone),is_empty()returnsTrue, yetto_mongo_query()would produce a non-empty query with$nin. This could lead to unintended behavior ifis_empty()is used to guard against running unfiltered replays.🐛 Proposed fix
def is_empty(self) -> bool: return not any( [ self.event_ids, self.execution_id, self.aggregate_id, self.event_types, + self.exclude_event_types, self.start_time, self.end_time, self.user_id, self.service_name, ] )
🤖 Fix all issues with AI agents
In `@backend/app/db/repositories/sse_repository.py`:
- Line 23: The returned dict from
doc.model_dump(include=set(DomainExecution.__dataclass_fields__)) can contain a
nested Pydantic ResourceUsage instance that won't match the DomainExecution
dataclass type; after creating the data dict from doc.model_dump inside the
function that returns DomainExecution, check if data.get("resource_usage") is
truthy and convert it to the dataclass-compatible type via
ResourceUsageDomain.model_validate(...) (or otherwise reconstruct the nested
field explicitly) before calling DomainExecution(**data), referencing
DomainExecution, ResourceUsageDomain and doc.model_dump to locate where to apply
the fix.
In `@backend/app/domain/saga/models.py`:
- Line 21: The user_id field in SagaContextData currently defaults to a random
UUID which can mask missing propagation; change its default to an empty string
(consistent with other string defaults) so missing user IDs are detectable —
locate the user_id declaration (user_id: str = field(...)) in the
SagaContextData model and replace the default_factory lambda:str(uuid4()) with a
plain empty-string default (or default_factory that returns "") and update any
tests/constructors that assumed an auto-generated UUID.
In `@backend/app/schemas_pydantic/event_schemas.py`:
- Around line 1-11: EventStatistics is duplicated across EventSchemas and
Events; delete the duplicate class definition (EventStatistics) from
event_schemas.py and update any consumers to import the single canonical
EventStatistics from the existing events.py definition (ensure references to
EventStatistics resolve to the class in events.py, e.g., where
admin_user_overview imports it). Also remove the unused type aliases
MongoQueryValue and MongoQuery from event_schemas.py (or if you intend to keep
them, move them into a shared types module and update imports accordingly);
ensure tests/consumers compile after the removal/move.
In `@backend/app/services/user_settings_service.py`:
- Around line 195-201: The _apply_event function is merging
dataclasses.asdict(event) into DomainUserSettings which passes event-only keys
(e.g., event_id, event_type, changed_fields) and triggers a TypeError; fix it by
filtering the event dict to only keys that exist on DomainUserSettings before
merging: compute the allowed keys from dataclasses.asdict(settings) (or
dataclasses.fields(DomainUserSettings)), then build the merge using only {k: v
for k, v in dataclasses.asdict(event).items() if k in allowed_keys and v is not
None}, keep "updated_at": event.timestamp, and construct
DomainUserSettings(**merged) as before.
🧹 Nitpick comments (14)
backend/app/schemas_pydantic/dlq.py (1)
84-96:DLQTopicSummaryandDLQTopicSummaryResponseare identical — consolidate them.Both classes (lines 70–81 and 84–95) have the exact same fields and config. Currently, the repository layer returns
DLQTopicSummary(dlq_repository.py:54) and the API route converts it toDLQTopicSummaryResponse(api/routes/dlq.py:99). Eliminate the duplication by using a single class or creating an alias.Example: Use DLQTopicSummary in the API and optionally alias for backward compatibility
class DLQTopicSummaryResponse(BaseModel): - """Response model for topic summary.""" - - model_config = ConfigDict(from_attributes=True) - - topic: str - total_messages: int - status_breakdown: dict[DLQMessageStatus, int] - oldest_message: datetime - newest_message: datetime - avg_retry_count: float - max_retry_count: int - - class DLQTopicSummary(BaseModel): """Summary of a topic in DLQ.""" model_config = ConfigDict(from_attributes=True) topic: str total_messages: int status_breakdown: dict[DLQMessageStatus, int] oldest_message: datetime newest_message: datetime avg_retry_count: float max_retry_count: int + + +# Alias for backward compatibility with API layer +DLQTopicSummaryResponse = DLQTopicSummaryThen update api/routes/dlq.py:95–96 and :99 to use
DLQTopicSummarydirectly, removing the unnecessarymodel_validatecall.backend/app/schemas_pydantic/event_schemas.py (1)
130-140:EventReplayInfo—event_countcould drift fromlen(events).
event_countis declared independently fromevents: list[DomainEvent]. If callers populate both, they can become inconsistent (e.g.,event_count=5with 3 events in the list). Consider whetherevent_countshould be a computed property or validated againstlen(events).backend/app/domain/events/__init__.py (1)
82-97: Domain layer now depends on the schemas layer — layer inversion concern.The domain package (
app.domain.events) now imports from the presentation/schema layer (app.schemas_pydantic.event_schemas). In clean/hexagonal architecture, the domain layer should be the innermost layer with no outward dependencies. This creates a dependency cycle where:
app.schemas_pydantic.event_schemasimportsDomainEvent,EventMetadatafromapp.domain.events.typedapp.domain.events.__init__imports schemas back fromapp.schemas_pydantic.event_schemasWhile Python resolves this without a circular import error (since
__init__.pyimports from.typed, not from itself), it muddies the architectural boundary. Consumers in the domain layer that dofrom app.domain.events import EventFilterare now silently pulling in a Pydantic schema — which is the opposite direction from the stated goal of separating schemas from domain objects.Consider either:
- Keeping these re-exports for backward compatibility but marking them as deprecated with a migration path, or
- Having consumers import directly from
app.schemas_pydantic.event_schemas.#!/bin/bash # Check which files import these event schema types from the domain package # to assess the scope of updating imports echo "=== Imports of event schema types from domain.events ===" rg -n 'from app\.domain\.events import' --type py | rg -i 'EventFilter|EventListResult|EventBrowseResult|EventDetail|EventSummary|EventStatistics|EventProjection|EventReplayInfo|EventExportRow|ExecutionEventsResult|EventTypeCount|HourlyEventCount|ServiceEventCount|UserEventCount' echo "" echo "=== Imports from app.domain.events (wildcard or multi) that might pull schemas ===" rg -n 'from app\.domain\.events import' --type py -C 2 | head -80backend/app/domain/saga/models.py (1)
24-45:SagaandSagaInstanceshare near-identical fields and__post_init__logic.Both classes define the same fields (
saga_name,execution_id,state,current_step,completed_steps,compensated_steps,context_data,error_message,created_at,updated_at,completed_at,retry_count) and the same__post_init__dict-to-SagaContextDatacoercion. Consider extracting a common base or mixin to reduce duplication.Also applies to: 114-135
backend/app/db/repositories/saga_repository.py (1)
68-68: Consider extracting the repeatedset(Saga.__dataclass_fields__)to a module-level constant.The expression
set(Saga.__dataclass_fields__)appears 6 times across this file. Extracting it to a constant (e.g.,_SAGA_FIELDS = set(Saga.__dataclass_fields__)) at module level improves readability and reduces repetition.Suggested refactor
Add at module level (after imports):
_SAGA_FIELDS = set(Saga.__dataclass_fields__)Then replace each occurrence, e.g.:
- return Saga(**doc.model_dump(include=set(Saga.__dataclass_fields__))) if doc else None + return Saga(**doc.model_dump(include=_SAGA_FIELDS)) if doc else NoneAlso applies to: 75-75, 79-79, 92-92, 104-104, 129-129
backend/app/services/result_processor/processor.py (1)
66-68: Consider extracting the repeated field-filtering pattern into a helper.The
_fields = ExecutionResultDomain.__dataclass_fields__/ filter / construct pattern is duplicated across all three handlers. A small private method would reduce repetition.Also applies to: 91-93, 117-121
backend/app/schemas_pydantic/admin_settings.py (1)
5-6: CPU pattern excludes whole-core notation (e.g.,"2"or"0.5").
K8S_CPU_PATTERNonly accepts millicore format ("500m","2000m"). Kubernetes also supports decimal notation ("0.5","2"). If this restriction is intentional for consistency, consider adding a brief comment to document the design choice.backend/app/db/repositories/saved_script_repository.py (2)
9-11: Consider extracting the repeated doc-to-domain conversion into a helper.The pattern
DomainSavedScript(**doc.model_dump(include=set(DomainSavedScript.__dataclass_fields__)))is repeated at lines 11, 18, 35, and 49. A small helper (e.g., a@staticmethodor module-level function) would reduce duplication and make the field-set computation a one-time cost.♻️ Suggested helper
+_SAVED_SCRIPT_FIELDS = set(DomainSavedScript.__dataclass_fields__) + +def _to_domain(doc: SavedScriptDocument) -> DomainSavedScript: + return DomainSavedScript(**doc.model_dump(include=_SAVED_SCRIPT_FIELDS)) + + class SavedScriptRepository: async def create_saved_script(self, create_data: DomainSavedScriptCreate, user_id: str) -> DomainSavedScript: doc = SavedScriptDocument(**dataclasses.asdict(create_data), user_id=user_id) await doc.insert() - return DomainSavedScript(**doc.model_dump(include=set(DomainSavedScript.__dataclass_fields__))) + return _to_domain(doc)
33-33: FilteringNonevalues also excludes intentionally nullable fields — acceptable here but worth noting.
{k: v for k, v in dataclasses.asdict(update_data).items() if v is not None}treatsNoneas "not set." This is fine forDomainSavedScriptUpdatewhere all nullable fields useNoneto mean "no change," but it means you can never explicitly clear a field toNone(e.g., settingdescriptiontoNone). If that's a deliberate contract, consider adding a brief docstring or comment onDomainSavedScriptUpdateclarifying this convention.backend/app/db/repositories/notification_repository.py (1)
24-26: Same doc-to-domain helper opportunity as inSavedScriptRepository.The
DomainNotification(**doc.model_dump(include=set(DomainNotification.__dataclass_fields__)))pattern appears at lines 26, 95, 137, and the subscription variant at lines 168, 184, 194, 206. Consider small helpers to reduce repetition.backend/app/db/repositories/execution_repository.py (1)
25-25: Repeatedset(DomainExecution.__dataclass_fields__)— consider extracting to a module-level constant.This expression is evaluated on every call. While cheap, extracting it to a module-level constant (e.g.,
_DOMAIN_EXECUTION_FIELDS = set(DomainExecution.__dataclass_fields__)) improves readability and avoids repeated set construction, especially inget_executionswhere it runs per-document in a list comprehension.♻️ Proposed refactor
from app.domain.execution import ( DomainExecution, DomainExecutionCreate, ExecutionResultDomain, ) + +_DOMAIN_EXECUTION_FIELDS = set(DomainExecution.__dataclass_fields__)Then use
_DOMAIN_EXECUTION_FIELDSwhereverset(DomainExecution.__dataclass_fields__)appears:- return DomainExecution(**doc.model_dump(include=set(DomainExecution.__dataclass_fields__))) + return DomainExecution(**doc.model_dump(include=_DOMAIN_EXECUTION_FIELDS))Also applies to: 35-35, 67-67
backend/app/domain/user/settings_models.py (1)
41-47: Repeated__post_init__coercion boilerplate fornotifications/editoracross three classes.The dict→dataclass coercion for
notificationsandeditoris duplicated inDomainUserSettings,DomainUserSettingsUpdate, andDomainUserSettingsChangedEvent. Consider extracting a small helper to reduce repetition, e.g.:♻️ Example helper extraction
def _coerce_nested(instance: Any, field_name: str, cls: type) -> None: raw = getattr(instance, field_name) if isinstance(raw, dict): setattr(instance, field_name, cls(**raw))Then in each
__post_init__:def __post_init__(self) -> None: _coerce_nested(self, "notifications", DomainNotificationSettings) _coerce_nested(self, "editor", DomainEditorSettings)Also applies to: 60-66, 87-93
backend/app/db/repositories/user_settings_repository.py (1)
56-59: Long line reduces readability.Consider extracting the field set to a local variable for clarity, consistent with how
_detail_fieldsis used inadmin_events_service.py(line 196).♻️ Suggested refactor
- return [ - DomainUserSettingsChangedEvent(**e.model_dump(include=set(DomainUserSettingsChangedEvent.__dataclass_fields__))) - for e in docs - ] + _fields = set(DomainUserSettingsChangedEvent.__dataclass_fields__) + return [ + DomainUserSettingsChangedEvent(**e.model_dump(include=_fields)) + for e in docs + ]backend/app/services/user_settings_service.py (1)
82-95:dataclasses.asdict(updates)is computed twice.Lines 82 and 94 both call
dataclasses.asdict(updates)with the same None filter.asdictrecursively copies the entire object graph each time. Consider computing it once.♻️ Suggested refactor
- changes = {k: v for k, v in dataclasses.asdict(updates).items() if v is not None} + updates_dict = dataclasses.asdict(updates) + changes = {k: v for k, v in updates_dict.items() if v is not None} if not changes: return current ... - await self._publish_settings_event( - user_id, {k: v for k, v in dataclasses.asdict(updates).items() if v is not None}, reason - ) + await self._publish_settings_event(user_id, changes, reason)
| if not doc: | ||
| return None | ||
| return DomainExecution.model_validate(doc) | ||
| return DomainExecution(**doc.model_dump(include=set(DomainExecution.__dataclass_fields__))) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Check ResourceUsageDomain definition
rg -n 'class ResourceUsageDomain' --type=py -A 15
# Check ResourceUsage definition in docs
rg -n 'class ResourceUsage' --type=py -A 10
# Check if DomainExecution has __post_init__ for resource_usage coercion
rg -n 'class DomainExecution' --type=py -A 25 | head -50Repository: HardMax71/Integr8sCode
Length of output: 6700
🏁 Script executed:
#!/bin/bash
# Check the full DomainExecution definition with context before line 20
rg -n 'class DomainExecution' --type=py -B 5 -A 30 backend/app/domain/execution/models.py
# Check for __post_init__ in DomainExecution or nearby classes
rg -n '__post_init__' --type=py -B 2 -A 5 backend/app/domain/execution/models.pyRepository: HardMax71/Integr8sCode
Length of output: 1981
🏁 Script executed:
#!/bin/bash
# View the sse_repository.py file around line 23
sed -n '1,35p' backend/app/db/repositories/sse_repository.py
# Also check the imports to understand the types
head -30 backend/app/db/repositories/sse_repository.pyRepository: HardMax71/Integr8sCode
Length of output: 1931
🏁 Script executed:
#!/bin/bash
# Check ExecutionDocument definition to see resource_usage type
sed -n '13,40p' backend/app/db/docs/execution.py
# Also check if there's any conversion logic elsewhere
rg -n 'ResourceUsageDomain' --type=py -B 2 -A 2 backend/app/db/repositories/sse_repository.pyRepository: HardMax71/Integr8sCode
Length of output: 1140
Fix type mismatch: Convert nested resource_usage Pydantic model to ResourceUsageDomain.
When model_dump(include=...) serializes nested models with the default mode="python", it returns resource_usage as a ResourceUsage instance—not a dict. Unpacking this directly into the DomainExecution dataclass (which expects ResourceUsageDomain | None) will fail at runtime when resource_usage is not None, since dataclasses do not coerce types.
Use ResourceUsageDomain.model_validate(doc.resource_usage) to convert, or reconstruct the nested field explicitly after unpacking:
data = doc.model_dump(include=set(DomainExecution.__dataclass_fields__))
if data.get("resource_usage"):
data["resource_usage"] = ResourceUsageDomain.model_validate(data["resource_usage"])
return DomainExecution(**data)🤖 Prompt for AI Agents
In `@backend/app/db/repositories/sse_repository.py` at line 23, The returned dict
from doc.model_dump(include=set(DomainExecution.__dataclass_fields__)) can
contain a nested Pydantic ResourceUsage instance that won't match the
DomainExecution dataclass type; after creating the data dict from doc.model_dump
inside the function that returns DomainExecution, check if
data.get("resource_usage") is truthy and convert it to the dataclass-compatible
type via ResourceUsageDomain.model_validate(...) (or otherwise reconstruct the
nested field explicitly) before calling DomainExecution(**data), referencing
DomainExecution, ResourceUsageDomain and doc.model_dump to locate where to apply
the fix.
| def _apply_event(self, settings: DomainUserSettings, event: DomainUserSettingsChangedEvent) -> DomainUserSettings: | ||
| """Apply a settings update event via dict merge + model_validate.""" | ||
| return DomainUserSettings.model_validate( | ||
| {**settings.model_dump(), **event.model_dump(exclude_none=True), "updated_at": event.timestamp} | ||
| ) | ||
| """Apply a settings update event via dict merge.""" | ||
| return DomainUserSettings(**{ | ||
| **dataclasses.asdict(settings), | ||
| **{k: v for k, v in dataclasses.asdict(event).items() if v is not None}, | ||
| "updated_at": event.timestamp, | ||
| }) |
There was a problem hiding this comment.
Bug: _apply_event passes event-only fields to DomainUserSettings, causing TypeError.
dataclasses.asdict(event) includes keys like event_id, event_type, timestamp, changed_fields, and reason that do not exist on DomainUserSettings. The v is not None filter keeps required/non-None event fields (e.g., event_id, event_type, changed_fields). The DomainUserSettings(**merged) constructor will reject these unknown keyword arguments with a TypeError.
The previous Pydantic model_validate silently ignored extra fields, but dataclass constructors are strict.
🐛 Proposed fix — restrict merge to settings-applicable fields
def _apply_event(self, settings: DomainUserSettings, event: DomainUserSettingsChangedEvent) -> DomainUserSettings:
"""Apply a settings update event via dict merge."""
+ settings_fields = set(DomainUserSettings.__dataclass_fields__)
+ event_updates = {
+ k: v for k, v in dataclasses.asdict(event).items()
+ if v is not None and k in settings_fields
+ }
return DomainUserSettings(**{
**dataclasses.asdict(settings),
- **{k: v for k, v in dataclasses.asdict(event).items() if v is not None},
+ **event_updates,
"updated_at": event.timestamp,
})📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def _apply_event(self, settings: DomainUserSettings, event: DomainUserSettingsChangedEvent) -> DomainUserSettings: | |
| """Apply a settings update event via dict merge + model_validate.""" | |
| return DomainUserSettings.model_validate( | |
| {**settings.model_dump(), **event.model_dump(exclude_none=True), "updated_at": event.timestamp} | |
| ) | |
| """Apply a settings update event via dict merge.""" | |
| return DomainUserSettings(**{ | |
| **dataclasses.asdict(settings), | |
| **{k: v for k, v in dataclasses.asdict(event).items() if v is not None}, | |
| "updated_at": event.timestamp, | |
| }) | |
| def _apply_event(self, settings: DomainUserSettings, event: DomainUserSettingsChangedEvent) -> DomainUserSettings: | |
| """Apply a settings update event via dict merge.""" | |
| settings_fields = set(DomainUserSettings.__dataclass_fields__) | |
| event_updates = { | |
| k: v for k, v in dataclasses.asdict(event).items() | |
| if v is not None and k in settings_fields | |
| } | |
| return DomainUserSettings(**{ | |
| **dataclasses.asdict(settings), | |
| **event_updates, | |
| "updated_at": event.timestamp, | |
| }) |
🤖 Prompt for AI Agents
In `@backend/app/services/user_settings_service.py` around lines 195 - 201, The
_apply_event function is merging dataclasses.asdict(event) into
DomainUserSettings which passes event-only keys (e.g., event_id, event_type,
changed_fields) and triggers a TypeError; fix it by filtering the event dict to
only keys that exist on DomainUserSettings before merging: compute the allowed
keys from dataclasses.asdict(settings) (or
dataclasses.fields(DomainUserSettings)), then build the merge using only {k: v
for k, v in dataclasses.asdict(event).items() if k in allowed_keys and v is not
None}, keep "updated_at": event.timestamp, and construct
DomainUserSettings(**merged) as before.
|
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Fix all issues with AI agents
In `@backend/app/schemas_pydantic/event_schemas.py`:
- Around line 21-33: There are two conflicting EventFilter class definitions
(one in EventFilter and a local one in admin_events) causing masking via
model_validate and an unused status field; fix by consolidating: either remove
the local EventFilter class in admin_events and import the canonical EventFilter
(update imports/usages and remove any model_validate conversions), or if they
serve different purposes rename the local class to a distinct name (e.g.,
EventFilterRequest) and remove the unused status field from whichever class
doesn’t need it; ensure all routes/controllers reference the single chosen
symbol (EventFilter or the new name) and update imports accordingly.
In `@backend/app/schemas_pydantic/replay_schemas.py`:
- Around line 86-88: The field errors: list[ReplayError] uses ReplayError which
is currently a stdlib dataclass and causes Pydantic serialization failures
(e.g., when calling ReplaySession.model_dump() or FastAPI response
serialization); replace the stdlib dataclass with a Pydantic dataclass by
importing and decorating ReplayError with pydantic.dataclasses.dataclass
(maintain the existing datetime-typed field), so that ReplayError becomes
Pydantic-aware and the errors list serializes correctly.
🧹 Nitpick comments (4)
backend/app/schemas_pydantic/event_schemas.py (1)
36-56:EventListResultandEventBrowseResultare near-identical — consider unifying.Both models wrap
list[DomainEvent]withtotal,skip, andlimit. The only difference isEventListResultaddshas_more: bool. Consider making one model with an optionalhas_morefield, or havingEventListResultextendEventBrowseResult.♻️ Example consolidation
-class EventListResult(BaseModel): - """Result of event list query.""" - model_config = ConfigDict(from_attributes=True) - events: list[DomainEvent] - total: int - skip: int - limit: int - has_more: bool - - -class EventBrowseResult(BaseModel): - """Result for event browsing.""" - model_config = ConfigDict(from_attributes=True) - events: list[DomainEvent] - total: int - skip: int - limit: int +class EventBrowseResult(BaseModel): + """Result for event browsing.""" + model_config = ConfigDict(from_attributes=True) + events: list[DomainEvent] + total: int + skip: int + limit: int + + +class EventListResult(EventBrowseResult): + """Result of event list query (adds pagination flag).""" + has_more: boolbackend/app/domain/admin/settings_models.py (1)
7-19: Validation rules are duplicated between domain and schema layers.
K8S_MEMORY_PATTERN,K8S_CPU_PATTERN, and the range bounds in_RANGE_RULESare defined here and again (as raw strings andge/leconstraints) inbackend/app/schemas_pydantic/admin_settings.py. If a bound changes in one place but not the other, requests could pass API validation but fail domain construction (or vice versa).Consider having the schema import and reference the domain-level constants (e.g.,
_RANGE_RULESand the compiled patterns) as the single source of truth, or extract them into a shared constants module.backend/app/schemas_pydantic/replay_schemas.py (1)
91-107:ReplayRequestembeds the domainReplayFilterdirectly — consider usingReplayFilterSchemainstead.
ReplayFilteris a domain dataclass with methods liketo_mongo_query()andis_empty(). Using it directly in a Pydantic request schema couples the API surface to domain internals. If domain fields are added toReplayFilter(e.g.,event_ids,aggregate_idare already present on the domain model but absent fromReplayFilterSchema), they'd become silently accepted API input.
ReplayFilterSchemaalready exists and defines the intended API surface — consider using it here instead.Proposed fix
class ReplayRequest(BaseModel): """Request schema for creating replay sessions""" replay_type: ReplayType target: ReplayTarget = ReplayTarget.KAFKA - filter: ReplayFilter = Field(default_factory=ReplayFilter) + filter: ReplayFilterSchema = Field(default_factory=ReplayFilterSchema)backend/app/domain/admin/__init__.py (1)
1-2: Layer dependency: domain package imports from schemas_pydantic.
ExecutionResultSummaryis now sourced fromapp.schemas_pydantic.replay_schemasand re-exported from the domain admin package. This introduces a dependency from the domain layer to the schema/presentation layer, which inverts the typical dependency direction (schemas should depend on domain, not vice versa). The public API surface is preserved, so this isn't a breaking change, but it may be worth noting for future architectural cleanups.
| class EventFilter(BaseModel): | ||
| """Filter criteria for querying events.""" | ||
|
|
||
| model_config = ConfigDict(from_attributes=True) | ||
|
|
||
| event_types: list[EventType] | None = None | ||
| aggregate_id: str | None = None | ||
| user_id: str | None = None | ||
| service_name: str | None = None | ||
| start_time: datetime | None = None | ||
| end_time: datetime | None = None | ||
| search_text: str | None = None | ||
| status: str | None = None |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find all definitions of EventFilter
echo "=== EventFilter class definitions ==="
rg -n 'class EventFilter' --type py -A 2
echo ""
echo "=== EventFilter imports and usage ==="
rg -n 'EventFilter' --type py -B 1 -A 1 | head -100Repository: HardMax71/Integr8sCode
Length of output: 6631
🏁 Script executed:
#!/bin/bash
# Check what EventFilter is exported from domain/events
echo "=== backend/app/domain/events/__init__.py content (around EventFilter) ==="
cat -n backend/app/domain/events/__init__.py | grep -A 30 "from.*import"
echo ""
echo "=== EventFilter fields in event_schemas.py ==="
sed -n '21,33p' backend/app/schemas_pydantic/event_schemas.py
echo ""
echo "=== EventFilter fields in admin_events.py ==="
sed -n '12,25p' backend/app/schemas_pydantic/admin_events.pyRepository: HardMax71/Integr8sCode
Length of output: 2999
🏁 Script executed:
#!/bin/bash
# Check where admin_events_service.py imports EventFilter from
echo "=== admin_events_service.py imports ==="
head -30 backend/app/services/admin/admin_events_service.py | grep -A 10 "^from\|^import"
echo ""
echo "=== admin_events.py full import/export context ==="
head -50 backend/app/schemas_pydantic/admin_events.py
echo ""
echo "=== Check where EventBrowseRequest is used ==="
rg -n 'EventBrowseRequest' --type py -B 2 -A 2Repository: HardMax71/Integr8sCode
Length of output: 6367
🏁 Script executed:
#!/bin/bash
# Check what happens in the conversion - look at the exact line in the route
echo "=== API route conversion logic ==="
sed -n '30,45p' backend/app/api/routes/admin/events.py
echo ""
echo "=== Check if repository/service actually need status field ==="
rg -n 'status' backend/app/db/repositories/admin/admin_events_repository.py -B 2 -A 2Repository: HardMax71/Integr8sCode
Length of output: 942
🏁 Script executed:
#!/bin/bash
# Check if status field in EventFilter is actually used in filtering logic
echo "=== Repository filter conditions method ==="
sed -n '33,46p' backend/app/db/repositories/admin/admin_events_repository.py
echo ""
echo "=== Check if status is referenced in filter building ==="
rg -n '_event_filter_conditions|f\.status' backend/app/db/repositories/admin/admin_events_repository.py -A 15
echo ""
echo "=== Check if status field is used in any export/list methods ==="
rg -n 'event_filter' backend/app/db/repositories/admin/admin_events_repository.py -B 2 -A 10 | head -80Repository: HardMax71/Integr8sCode
Length of output: 5567
🏁 Script executed:
#!/bin/bash
# Final check: is the status field used ANYWHERE in the codebase?
echo "=== Search for status field usage in EventFilter context ==="
rg -n '\.status' backend/app/schemas_pydantic/event_schemas.py backend/app/services/admin/ backend/app/db/repositories/admin/ --type py
echo ""
echo "=== Verify which EventFilter is the canonical one ==="
echo "Imports of EventFilter in test file:"
grep -n "from.*EventFilter\|import.*EventFilter" backend/tests/e2e/test_admin_events_routes.py
echo ""
echo "Imports of EventFilter in API routes:"
grep -n "from.*EventFilter\|import.*EventFilter" backend/app/api/routes/admin/events.pyRepository: HardMax71/Integr8sCode
Length of output: 519
Remove duplicate EventFilter class or consolidate into a single definition.
Two EventFilter classes exist in the same package with conflicting field definitions:
event_schemas.py(exported viadomain.events): includes unusedstatusfieldadmin_events.py: omitsstatusfield
The API route converts between them with model_validate(), masking a design inconsistency. The status field is never used in filtering logic. Choose one:
- Keep only the
event_schemas.pyversion and remove the local definition fromadmin_events.py, or - Give them distinct names if they genuinely serve different purposes (e.g.,
EventFilterRequest,EventFilterQuery)
🤖 Prompt for AI Agents
In `@backend/app/schemas_pydantic/event_schemas.py` around lines 21 - 33, There
are two conflicting EventFilter class definitions (one in EventFilter and a
local one in admin_events) causing masking via model_validate and an unused
status field; fix by consolidating: either remove the local EventFilter class in
admin_events and import the canonical EventFilter (update imports/usages and
remove any model_validate conversions), or if they serve different purposes
rename the local class to a distinct name (e.g., EventFilterRequest) and remove
the unused status field from whichever class doesn’t need it; ensure all
routes/controllers reference the single chosen symbol (EventFilter or the new
name) and update imports accordingly.
| last_event_at: datetime | None = None | ||
|
|
||
| errors: list[ReplayError] = Field(default_factory=list) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find ReplayError definition
ast-grep --pattern $'@dataclass
class ReplayError:
$$$'Repository: HardMax71/Integr8sCode
Length of output: 1330
🏁 Script executed:
# Also search for ReplayError in the file to understand context
rg -n "ReplayError" backend/app/schemas_pydantic/replay_schemas.py -B 2 -A 5Repository: HardMax71/Integr8sCode
Length of output: 526
🏁 Script executed:
# Check ReplaySession configuration
rg -n "class ReplaySession" backend/app/schemas_pydantic/replay_schemas.py -A 30Repository: HardMax71/Integr8sCode
Length of output: 1213
🏁 Script executed:
# Search for ReplaySession usage in endpoints/routes
fd -e py | xargs rg -l "ReplaySession" | grep -E "(route|endpoint|api|handler)" | head -10Repository: HardMax71/Integr8sCode
Length of output: 138
🏁 Script executed:
# Search for ReplaySession usage more broadly
rg "ReplaySession" -A 3 | grep -E "(return|response|model_dump|json)" | head -20Repository: HardMax71/Integr8sCode
Length of output: 1545
🏁 Script executed:
# Check for tests related to ReplaySession serialization
fd -e py -path "*/test*" | xargs rg -l "ReplaySession" 2>/dev/nullRepository: HardMax71/Integr8sCode
Length of output: 1280
🏁 Script executed:
# Check if there are any JSON schema or serialization related code
rg "model_dump|model_validate|from_attributes" backend/app/schemas_pydantic/replay_schemas.py -B 2 -A 2Repository: HardMax71/Integr8sCode
Length of output: 1136
🏁 Script executed:
# Check the API routes that return ReplaySession
rg -n "ReplaySession" backend/app/api/routes/replay.py -B 3 -A 10Repository: HardMax71/Integr8sCode
Length of output: 1426
🏁 Script executed:
# Check the e2e test to see what it's testing
cat -n backend/tests/e2e/test_replay_routes.py | head -80Repository: HardMax71/Integr8sCode
Length of output: 3227
🏁 Script executed:
# Search for any serialization issues or workarounds in the codebase
rg "ReplayError" backend/app -A 3 -B 1 | grep -E "(serializ|dump|json|validation)" | head -20Repository: HardMax71/Integr8sCode
Length of output: 48
🏁 Script executed:
# Check the pydantic-dataclasses documentation referenced in the findings
cat -n docs/architecture/pydantic-dataclasses.mdRepository: HardMax71/Integr8sCode
Length of output: 8993
🏁 Script executed:
# Check the ReplayError import and full definition in domain/replay/models.py
cat -n backend/app/domain/replay/models.pyRepository: HardMax71/Integr8sCode
Length of output: 6296
🏁 Script executed:
# Check if ReplaySession is properly handling ReplayError in JSON serialization by looking at actual responses
rg "model_dump|model_dump_json|json()" backend/app/api/routes/replay.py -B 2 -A 2Repository: HardMax71/Integr8sCode
Length of output: 360
ReplayError must be a Pydantic dataclass to support Pydantic serialization.
ReplayError is defined as a stdlib dataclass with a datetime field. When ReplaySession.model_dump() or FastAPI serializes the response, Pydantic cannot handle stdlib dataclasses with complex types like datetime. Convert ReplayError to a Pydantic dataclass (consistent with the codebase pattern in pydantic-dataclasses.md):
from pydantic.dataclasses import dataclass🤖 Prompt for AI Agents
In `@backend/app/schemas_pydantic/replay_schemas.py` around lines 86 - 88, The
field errors: list[ReplayError] uses ReplayError which is currently a stdlib
dataclass and causes Pydantic serialization failures (e.g., when calling
ReplaySession.model_dump() or FastAPI response serialization); replace the
stdlib dataclass with a Pydantic dataclass by importing and decorating
ReplayError with pydantic.dataclasses.dataclass (maintain the existing
datetime-typed field), so that ReplayError becomes Pydantic-aware and the errors
list serializes correctly.
There was a problem hiding this comment.
1 issue found across 21 files (changes from recent commits).
Prompt for AI agents (all issues)
Check if these issues are valid — if so, understand the root cause of each and fix them. If appropriate, use sub-agents to investigate and fix each issue separately.
<file name="backend/app/schemas_pydantic/replay_schemas.py">
<violation number="1" location="backend/app/schemas_pydantic/replay_schemas.py:40">
P2: ReplayFilterSchema omits event_ids and aggregate_id, so replay session responses drop those filter values even though the domain ReplayFilter accepts them. This changes the API payload and can hide filters that clients sent.</violation>
</file>
Reply with feedback, questions, or to request a fix. Tag @cubic-dev-ai to re-run a review.
| class ReplayFilterSchema(BaseModel): | ||
| model_config = ConfigDict(from_attributes=True) | ||
|
|
||
| execution_id: str | None = None |
There was a problem hiding this comment.
P2: ReplayFilterSchema omits event_ids and aggregate_id, so replay session responses drop those filter values even though the domain ReplayFilter accepts them. This changes the API payload and can hide filters that clients sent.
Prompt for AI agents
Check if this issue is valid — if so, understand the root cause and fix it. At backend/app/schemas_pydantic/replay_schemas.py, line 40:
<comment>ReplayFilterSchema omits event_ids and aggregate_id, so replay session responses drop those filter values even though the domain ReplayFilter accepts them. This changes the API payload and can hide filters that clients sent.</comment>
<file context>
@@ -22,3 +32,128 @@ class ExecutionResultSummary(BaseModel):
+class ReplayFilterSchema(BaseModel):
+ model_config = ConfigDict(from_attributes=True)
+
+ execution_id: str | None = None
+ event_types: list[EventType] | None = None
+ start_time: datetime | None = None
</file context>



Summary by cubic
Moved event and replay API schemas into dedicated modules and completed the switch of domain models to Python dataclasses. This tightens the layer split, simplifies serialization, and keeps API behavior stable.
Refactors
Migration
Written for commit 58756c7. Summary will update on new commits.
Summary by CodeRabbit
Refactor
Tests