diff --git a/api/db.py b/api/db.py index 6bbd68f8..a1e5afbe 100644 --- a/api/db.py +++ b/api/db.py @@ -11,7 +11,9 @@ from fastapi_pagination.ext.motor import paginate from motor import motor_asyncio from redis import asyncio as aioredis -from kernelci.api.models import EventHistory, Hierarchy, Node, parse_node_obj +from kernelci.api.models import ( + EventHistory, Hierarchy, Node, TelemetryEvent, parse_node_obj +) from .models import User, UserGroup @@ -28,6 +30,7 @@ class Database: Node: 'node', UserGroup: 'usergroup', EventHistory: 'eventhistory', + TelemetryEvent: 'telemetry', } OPERATOR_MAP = { @@ -242,6 +245,12 @@ async def create(self, obj): obj.id = res.inserted_id return obj + async def insert_many(self, model, documents): + """Create multiple documents in a collection.""" + col = self._get_collection(model) + result = await col.insert_many(documents) + return result.inserted_ids + async def _create_recursively(self, hierarchy: Hierarchy, parent: Node, cls, col): obj = parse_node_obj(hierarchy.node) @@ -294,6 +303,12 @@ async def update(self, obj): raise ValueError(f"No object found with id: {obj.id}") return obj.__class__(**await col.find_one(ObjectId(obj.id))) + async def aggregate(self, model, pipeline): + """Run an aggregation pipeline on a model's collection""" + col = self._get_collection(model) + cursor = col.aggregate(pipeline) + return await cursor.to_list(length=None) + async def delete_by_id(self, model, obj_id): """Delete one object matching a given id""" col = self._get_collection(model) diff --git a/api/main.py b/api/main.py index 6d23f6b7..b766c30c 100644 --- a/api/main.py +++ b/api/main.py @@ -53,6 +53,7 @@ parse_node_obj, KernelVersion, EventHistory, + TelemetryEvent, ) from .auth import Authentication from .db import Database @@ -953,6 +954,324 @@ async def get_events(request: Request): return JSONResponse(content=json_comp) +# ----------------------------------------------------------------------------- +# Telemetry of pipeline execution and other events(not node stuff). +# This is a separate collection from +# EventHistory since it may have a much higher volume and different +# query patterns and allows us to optimize indexes and storage +# separately. + +@app.post('/telemetry', response_model=dict, tags=["telemetry"]) +async def post_telemetry( + events: List[dict], + current_user: User = Depends(get_current_user), +): + """Bulk insert telemetry events. + + Accepts a list of telemetry event dicts. Each event must have at + least 'kind' and 'runtime' fields. Events are validated against + the TelemetryEvent model before insertion. + """ + metrics.add('http_requests_total', 1) + if not events: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Events list cannot be empty", + ) + docs = [] + for event in events: + try: + obj = TelemetryEvent(**event) + except Exception as exc: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Invalid telemetry event: {exc}", + ) from exc + doc = obj.model_dump(by_alias=True) + doc.pop('_id', None) + docs.append(doc) + inserted_ids = await db.insert_many(TelemetryEvent, docs) + return {"inserted": len(inserted_ids)} + + +@app.get('/telemetry', response_model=PageModel, tags=["telemetry"]) +async def get_telemetry(request: Request): + """Query telemetry events with optional filters. + + Supports filtering by any TelemetryEvent field, plus time range + via 'since' and 'until' parameters (ISO 8601 format). + Results are paginated (default limit=50). + """ + metrics.add('http_requests_total', 1) + query_params = dict(request.query_params) + + for pg_key in ['limit', 'offset']: + query_params.pop(pg_key, None) + + since = query_params.pop('since', None) + until = query_params.pop('until', None) + if since or until: + ts_filter = {} + if since: + ts_filter['$gte'] = datetime.fromisoformat(since) + if until: + ts_filter['$lte'] = datetime.fromisoformat(until) + query_params['ts'] = ts_filter + + # Convert string 'true'/'false' for boolean fields + if 'is_infra_error' in query_params: + val = query_params['is_infra_error'].lower() + if val not in ['true', 'false']: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="Bad is_infra_error value, use 'true' or 'false'", + ) + if val == 'true': + query_params['is_infra_error'] = True + else: + query_params['is_infra_error'] = False + + paginated_resp = await db.find_by_attributes( + TelemetryEvent, query_params + ) + paginated_resp.items = serialize_paginated_data( + TelemetryEvent, paginated_resp.items + ) + return paginated_resp + + +TELEMETRY_STATS_GROUP_FIELDS = { + 'runtime', 'device_type', 'job_name', 'tree', 'branch', + 'arch', 'kind', 'error_type', +} + + +@app.get('/telemetry/stats', tags=["telemetry"]) +async def get_telemetry_stats(request: Request): + """Get aggregated telemetry statistics. + + This is rule-based anomaly detection using + thresholded empirical rates computed over + a sliding (rolling) time window. + This is not a full anomaly detection system + with baselines or machine learning, but at + last something to start with. + + Query parameters: + - group_by: Comma-separated fields to group by + (runtime, device_type, job_name, tree, branch, arch, + kind, error_type) + - kind: Filter by event kind before aggregating + - runtime: Filter by runtime name + - since/until: Time range (ISO 8601) + + Returns grouped counts with pass/fail/incomplete/infra_error + breakdowns for result-bearing events. + """ + metrics.add('http_requests_total', 1) + query_params = dict(request.query_params) + + group_by_str = query_params.pop('group_by', None) + if not group_by_str: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail="'group_by' parameter is required", + ) + group_by = [f.strip() for f in group_by_str.split(',')] + invalid = set(group_by) - TELEMETRY_STATS_GROUP_FIELDS + if invalid: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Invalid group_by fields: {invalid}", + ) + + match_stage = { + key: query_params.pop(key) + for key in ('kind', 'runtime', 'device_type', 'job_name', + 'tree', 'branch', 'arch') + if query_params.get(key) + } + + since = query_params.pop('since', None) + until = query_params.pop('until', None) + if since or until: + match_stage['ts'] = { + **({'$gte': datetime.fromisoformat(since)} if since else {}), + **({'$lte': datetime.fromisoformat(until)} if until else {}), + } + + pipeline = [{'$match': match_stage}] if match_stage else [] + pipeline.append({ + '$group': { + '_id': {f: f'${f}' for f in group_by}, + 'total': {'$sum': 1}, + 'pass': {'$sum': { + '$cond': [{'$eq': ['$result', 'pass']}, 1, 0] + }}, + 'fail': {'$sum': { + '$cond': [{'$eq': ['$result', 'fail']}, 1, 0] + }}, + 'incomplete': {'$sum': { + '$cond': [{'$eq': ['$result', 'incomplete']}, 1, 0] + }}, + 'skip': {'$sum': { + '$cond': [{'$eq': ['$result', 'skip']}, 1, 0] + }}, + 'infra_error': {'$sum': { + '$cond': ['$is_infra_error', 1, 0] + }}, + } + }) + pipeline.append({'$sort': {'total': -1}}) + + results = await db.aggregate(TelemetryEvent, pipeline) + + results = await db.aggregate(TelemetryEvent, pipeline) + return JSONResponse(content=jsonable_encoder([ + { + **doc['_id'].copy(), + 'total': doc['total'], + 'pass': doc['pass'], + 'fail': doc['fail'], + 'incomplete': doc['incomplete'], + 'skip': doc['skip'], + 'infra_error': doc['infra_error'], + } + for doc in results + ])) + +# This is test value, can adjust based on expected query patterns and volumes. +ANOMALY_WINDOW_MAP = { + '1h': 1, '3h': 3, '6h': 6, '12h': 12, '24h': 24, '48h': 48, +} + + +@app.get('/telemetry/anomalies', tags=["telemetry"]) +async def get_telemetry_anomalies( + window: str = Query( + '6h', description='Time window: 1h, 3h, 6h, 12h, 24h, 48h' + ), + threshold: float = Query( + 0.5, ge=0.0, le=1.0, + description='Min failure/infra error rate to flag (0.0-1.0)' + ), + min_total: int = Query( + 3, ge=1, + description='Min events in window to consider (avoids noise)' + ), +): + """Detect anomalies in telemetry data. + + Finds runtime+device_type combinations where the infra error + rate or failure rate exceeds the threshold within the given + time window. Also detects runtimes with submission errors. + + Returns a list sorted by severity (highest error rate first). + """ + metrics.add('http_requests_total', 1) + + hours = ANOMALY_WINDOW_MAP.get(window) + if not hours: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, + detail=f"Invalid window '{window}'. " + f"Use: {', '.join(ANOMALY_WINDOW_MAP.keys())}", + ) + since = datetime.utcnow() - timedelta(hours=hours) + + # Anomaly 1: High infra error / failure rate per runtime+device_type + result_pipeline = [ + {'$match': { + 'kind': {'$in': ['job_result', 'test_result']}, + 'ts': {'$gte': since}, + }}, + {'$group': { + '_id': { + 'runtime': '$runtime', + 'device_type': '$device_type', + }, + 'total': {'$sum': 1}, + 'fail': {'$sum': { + '$cond': [{'$eq': ['$result', 'fail']}, 1, 0] + }}, + 'incomplete': {'$sum': { + '$cond': [{'$eq': ['$result', 'incomplete']}, 1, 0] + }}, + 'infra_error': {'$sum': { + '$cond': ['$is_infra_error', 1, 0] + }}, + }}, + {'$match': {'total': {'$gte': min_total}}}, + {'$addFields': { + 'infra_rate': { + '$divide': ['$infra_error', '$total'] + }, + 'fail_rate': { + '$divide': [ + {'$add': ['$fail', '$incomplete']}, '$total' + ] + }, + }}, + {'$match': { + '$or': [ + {'infra_rate': {'$gte': threshold}}, + {'fail_rate': {'$gte': threshold}}, + ] + }}, + {'$sort': {'infra_rate': -1, 'fail_rate': -1}}, + ] + + # Anomaly 2: Submission/connectivity errors per runtime + error_pipeline = [ + {'$match': { + 'kind': {'$in': ['runtime_error', 'job_skip']}, + 'ts': {'$gte': since}, + }}, + {'$group': { + '_id': { + 'runtime': '$runtime', + 'error_type': '$error_type', + }, + 'count': {'$sum': 1}, + }}, + {'$match': {'count': {'$gte': min_total}}}, + {'$sort': {'count': -1}}, + ] + + result_anomalies = await db.aggregate( + TelemetryEvent, result_pipeline + ) + error_anomalies = await db.aggregate( + TelemetryEvent, error_pipeline + ) + + output = { + 'window': window, + 'threshold': threshold, + 'min_total': min_total, + 'since': since.isoformat(), + 'result_anomalies': [], + 'error_anomalies': [], + } + + for doc in result_anomalies: + row = doc['_id'].copy() + row['total'] = doc['total'] + row['fail'] = doc['fail'] + row['incomplete'] = doc['incomplete'] + row['infra_error'] = doc['infra_error'] + row['infra_rate'] = round(doc['infra_rate'], 3) + row['fail_rate'] = round(doc['fail_rate'], 3) + output['result_anomalies'].append(row) + + for doc in error_anomalies: + row = doc['_id'].copy() + row['count'] = doc['count'] + output['error_anomalies'].append(row) + + return JSONResponse(content=jsonable_encoder(output)) + + # ----------------------------------------------------------------------------- # Nodes def _get_node_event_data(operation, node, is_hierarchy=False):