From 888ba8306db4fabb8bff702f768cd69a060d6bf3 Mon Sep 17 00:00:00 2001 From: Ashish-dwi99 Date: Tue, 10 Feb 2026 22:20:48 +0530 Subject: [PATCH] feat: cross-agent handoff bus, updated positioning MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add HandoffSessionBus with session digests, auto-resume, lane checkpointing - Add handoff MCP tools: save_session_digest, get_last_session, list_sessions - Update README to lead with agent-switching pain point and 4-pillar positioning - Add early-stage warning to README — not recommended for production use - Add workspace continuity rules (CLAUDE.md, CURSOR.md, AGENTS.md) - Extend CLI installer, API routes, DB schema for handoff support Co-Authored-By: Claude Opus 4.6 --- AGENTS.md | 29 + CLAUDE.md | 28 + CURSOR.md | 28 + README.md | 87 +- engram/api/app.py | 92 +++ engram/api/schemas.py | 39 + engram/cli.py | 150 +++- engram/configs/base.py | 29 +- engram/core/handoff.py | 193 +++++ engram/core/handoff_bus.py | 938 ++++++++++++++++++++++ engram/core/kernel.py | 290 +++++++ engram/core/policy.py | 1 + engram/db/sqlite.py | 716 +++++++++++++++++ engram/integrations/claude_code_plugin.py | 33 +- engram/mcp_server.py | 420 +++++++++- engram/memory/client.py | 103 +++ engram/memory/main.py | 207 +++++ engram/utils/repo_identity.py | 72 ++ 18 files changed, 3423 insertions(+), 32 deletions(-) create mode 100644 CLAUDE.md create mode 100644 CURSOR.md create mode 100644 engram/core/handoff.py create mode 100644 engram/core/handoff_bus.py create mode 100644 engram/utils/repo_identity.py diff --git a/AGENTS.md b/AGENTS.md index e2bcc78..94d913e 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -30,3 +30,32 @@ ## Configuration & Secrets - Gemini requires `GEMINI_API_KEY` (or `GOOGLE_API_KEY`). OpenAI uses `OPENAI_API_KEY` via the SDK. - Never commit secrets; document new environment variables in README or this file when introduced. + + +## Engram Continuity (Auto-Generated) + +Follow these rules for cross-agent continuity on every new task/thread. + +1) Before answering substantive repo/task questions, call `get_last_session`: +- `user_id`: `"default"` unless provided +- `requester_agent_id`: `"codex"` +- `repo`: absolute workspace path +- Include `agent_id` only when the user explicitly asks to continue from a specific source agent. + +2) If no handoff session exists, continue normally and use memory tools as needed. + +3) On major milestones and before pausing/ending, call `save_session_digest` with: +- `task_summary` +- `repo` +- `status` (`"active"`, `"paused"`, or `"completed"`) +- `decisions_made`, `files_touched`, `todos_remaining` +- `blockers`, `key_commands`, `test_results` when available +- `agent_id`: `"codex"`, `requester_agent_id`: `"codex"` + +4) Prefer Engram MCP handoff tools over shell/SQLite inspection for continuity. + +Target agent profile: `Codex/agent-runner`. + + + + diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..2610b23 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,28 @@ + +## Engram Continuity (Auto-Generated) + +Follow these rules for cross-agent continuity on every new task/thread. + +1) Before answering substantive repo/task questions, call `get_last_session`: +- `user_id`: `"default"` unless provided +- `requester_agent_id`: `"claude-code"` +- `repo`: absolute workspace path +- Include `agent_id` only when the user explicitly asks to continue from a specific source agent. + +2) If no handoff session exists, continue normally and use memory tools as needed. + +3) On major milestones and before pausing/ending, call `save_session_digest` with: +- `task_summary` +- `repo` +- `status` (`"active"`, `"paused"`, or `"completed"`) +- `decisions_made`, `files_touched`, `todos_remaining` +- `blockers`, `key_commands`, `test_results` when available +- `agent_id`: `"claude-code"`, `requester_agent_id`: `"claude-code"` + +4) Prefer Engram MCP handoff tools over shell/SQLite inspection for continuity. + +Target agent profile: `Claude Code`. + + + + diff --git a/CURSOR.md b/CURSOR.md new file mode 100644 index 0000000..c36523a --- /dev/null +++ b/CURSOR.md @@ -0,0 +1,28 @@ + +## Engram Continuity (Auto-Generated) + +Follow these rules for cross-agent continuity on every new task/thread. + +1) Before answering substantive repo/task questions, call `get_last_session`: +- `user_id`: `"default"` unless provided +- `requester_agent_id`: `"cursor"` +- `repo`: absolute workspace path +- Include `agent_id` only when the user explicitly asks to continue from a specific source agent. + +2) If no handoff session exists, continue normally and use memory tools as needed. + +3) On major milestones and before pausing/ending, call `save_session_digest` with: +- `task_summary` +- `repo` +- `status` (`"active"`, `"paused"`, or `"completed"`) +- `decisions_made`, `files_touched`, `todos_remaining` +- `blockers`, `key_commands`, `test_results` when available +- `agent_id`: `"cursor"`, `requester_agent_id`: `"cursor"` + +4) Prefer Engram MCP handoff tools over shell/SQLite inspection for continuity. + +Target agent profile: `Cursor`. + + + + diff --git a/README.md b/README.md index 47ab43e..220d455 100644 --- a/README.md +++ b/README.md @@ -11,8 +11,12 @@

- A user-owned memory store that any agent can plug into to become instantly personalized.
- Agents read via scoped retrieval. Writes land in staging until you approve. + Hit a rate limit in Claude Code? Open Codex — it already knows what you were doing.
+ One memory kernel. Shared across every agent. Bio-inspired forgetting. Staged writes. Episodic recall. +

+ +

+ ⚠ Early-stage software — not recommended for production use. APIs may change. Use at your own risk.

@@ -36,23 +40,24 @@ ## Why Engram -Every AI agent you use starts with amnesia. Your coding assistant forgets your preferences between sessions. Your planning agent has no idea what your research agent discovered yesterday. You end up re-explaining context that should already be known. +Every AI agent you use starts with amnesia. But the real pain isn't just forgetting — it's what happens when you **switch agents**. -**Engram fixes this.** It's a Personal Memory Kernel (PMK) — a single memory store that sits between you and all your agents. Any agent can plug in via MCP or REST to become instantly personalized, without you having to repeat yourself. +You're 40 minutes into a refactor with Claude Code. You've touched six files, picked a migration strategy, mapped out the remaining TODOs. Then you hit a rate limit. Or your terminal crashes. Or you just need Codex for the next part. So you switch — and the new agent has **zero context**. You re-paste file paths, re-explain decisions, re-describe the plan. Half the time the new agent contradicts something you'd already decided. -But unlike "store everything forever" approaches, Engram treats agents as **untrusted writers**. Writes land in staging. You control what sticks. And memories that stop being useful fade away naturally — just like biological memory. +**Engram fixes this.** It's a Personal Memory Kernel (PMK) — one memory store shared across all your agents. When Claude Code pauses, it saves a session digest. When Codex picks up, it loads that digest and continues where you left off. No re-explanation. No cold starts. -| Capability | Other Memory Layers | **Engram** | -|:-----------|:--------------------|:-----------| -| Bio-inspired forgetting | No | **Ebbinghaus decay curve** | -| Untrusted agent writes | Store directly | **Staging + verification + conflict stash** | -| Episodic narrative memory | No | **CAST scenes (time/place/topic)** | -| Multi-modal encoding | Rare | **5 retrieval paths (EchoMem)** | -| Cross-agent memory sharing | Per-agent silos | **Scoped retrieval with masking** | -| Knowledge graph | Sometimes | **Entity extraction + linking** | +But Engram isn't just a handoff bus. It solves four fundamental problems with how AI memory works today: + +| Problem | Other Memory Layers | **Engram** | +|:--------|:--------------------|:-----------| +| **Switching agents = cold start** | Manual copy/paste context | **Handoff bus — session digests, auto-resume** | +| **Nobody forgets** | Store everything forever | **Ebbinghaus decay curve, ~45% less storage** | +| **Agents write with no oversight** | Store directly | **Staging + verification + trust scoring** | +| **No episodic memory** | Vector search only | **CAST scenes (time/place/topic)** | +| Multi-modal encoding | Single embedding | **5 retrieval paths (EchoMem)** | +| Cross-agent memory sharing | Per-agent silos | **Scoped retrieval with all-but-mask privacy** | | Reference-aware decay | No | **If other agents use it, don't delete it** | -| Hybrid search | Vector only | **Semantic + keyword + episodic** | -| Storage efficiency | Store everything | **~45% less** | +| Knowledge graph | Sometimes | **Entity extraction + linking** | | MCP + REST | One or the other | **Both, plug-and-play** | | Local-first | Cloud-required | **127.0.0.1:8100 by default** | @@ -127,9 +132,10 @@ docker compose up -d # API at http://localhost:8100 Engram is a **Personal Memory Kernel** — not just a vector store with an API. It has opinions about how memory should work: -1. **Agents are untrusted writers.** Every write is a proposal that lands in staging. Trusted agents can auto-merge; untrusted ones wait for approval. +1. **Switching agents shouldn't mean starting over.** When an agent pauses — rate limit, crash, tool switch — it saves a session digest. The next agent loads it and continues. Zero re-explanation. 2. **Memory has a lifecycle.** New memories start in short-term (SML), get promoted to long-term (LML) through repeated access, and fade away through Ebbinghaus decay if unused. -3. **Scoping is mandatory.** Every memory is scoped by user. Agents see only what they're allowed to — everything else gets the "all but mask" treatment (structure visible, details redacted). +3. **Agents are untrusted writers.** Every write is a proposal that lands in staging. Trusted agents can auto-merge; untrusted ones wait for approval. +4. **Scoping is mandatory.** Every memory is scoped by user. Agents see only what they're allowed to — everything else gets the "all but mask" treatment (structure visible, details redacted). ``` ┌─────────────────────────────────────────────────────────────────┐ @@ -192,7 +198,7 @@ Engram is a **Personal Memory Kernel** — not just a vector store with an API. ### The Memory Stack -Engram combines four bio-inspired memory systems, each handling a different aspect of how humans actually remember: +Engram combines five systems, each handling a different aspect of how memory should work: #### FadeMem — Decay & Consolidation @@ -243,6 +249,21 @@ Scene: "Engram v2 architecture session" Memories: [mem_1, mem_2] ← semantic facts extracted ``` +#### Handoff Bus — Cross-Agent Continuity + +When an agent pauses work — rate limit, crash, you switch tools — it saves a session digest: task summary, decisions made, files touched, remaining TODOs, blockers. The next agent calls `get_last_session` and gets the full context. No re-explanation needed. + +``` +Claude Code (rate limited) + → save_session_digest(task, decisions, files, todos, blockers) + → Session stored in handoff bus + +Codex (picks up) + → get_last_session(repo="/my-project") + → Gets full context: task, decisions, files, todos + → Continues where Claude Code stopped +``` + --- ### Key Flows @@ -300,6 +321,16 @@ Engram is plug-and-play. Run `engram install` and it auto-configures everything: engram install # Writes MCP config to ~/.claude.json ``` +`engram install` also bootstraps workspace continuity rules (in your current +project directory) so agents call handoff tools automatically: + +- `AGENTS.md` +- `CLAUDE.md` +- `CURSOR.md` +- `.cursor/rules/engram-continuity.mdc` + +Set `ENGRAM_INSTALL_SKIP_WORKSPACE_RULES=1` to disable this behavior. + **MCP tools** give Claude reactive memory — it stores and retrieves when you ask. The optional **Claude Code plugin** makes memory **proactive** — relevant context is injected automatically before Claude sees your message: @@ -340,10 +371,12 @@ Claude: Based on your preferences, I'd recommend TypeScript... ### Cursor `engram install` writes MCP config to `~/.cursor/mcp.json`. Restart Cursor to load. +It also sets `ENGRAM_MCP_AGENT_ID=cursor` for deterministic handoff identity. ### OpenAI Codex `engram install` writes MCP config to `~/.codex/config.toml`. Restart Codex to load. +It also sets `ENGRAM_MCP_AGENT_ID=codex` for deterministic handoff identity. ### OpenClaw @@ -379,6 +412,9 @@ Once configured, your agent has access to these tools: | `list_pending_commits` | Inspect staged write queue | | `resolve_conflict` | Resolve invariant conflicts (accept proposed or keep existing) | | `search_scenes` / `get_scene` | Episodic CAST scene retrieval with masking policy | +| `save_session_digest` | Save handoff context when pausing or switching agents | +| `get_last_session` | Load session context from the last active agent | +| `list_sessions` | Browse handoff history across agents | --- @@ -432,6 +468,19 @@ curl "http://localhost:8100/v1/trust?user_id=u123&agent_id=planner" # 7. Sleep-cycle maintenance curl -X POST http://localhost:8100/v1/sleep/run \ -d '{"user_id": "u123", "apply_decay": true, "cleanup_stale_refs": true}' + +# 8. Zero-intervention handoff (session bus) +curl -X POST http://localhost:8100/v1/handoff/resume \ + -H "Authorization: Bearer " \ + -H "Content-Type: application/json" \ + -d '{"user_id":"u123","agent_id":"frontend","repo_path":"/repo","objective":"continue latest task"}' + +curl -X POST http://localhost:8100/v1/handoff/checkpoint \ + -H "Authorization: Bearer " \ + -H "Content-Type: application/json" \ + -d '{"user_id":"u123","agent_id":"frontend","repo_path":"/repo","task_summary":"implemented card layout"}' + +curl "http://localhost:8100/v1/handoff/lanes?user_id=u123" ``` ### Python SDK @@ -796,7 +845,7 @@ MIT License — see [LICENSE](LICENSE) for details. ---

- Your agents forget everything between sessions. Engram fixes that. + Switch agents without losing context. Stop re-explaining yourself.

GitHub · Issues · diff --git a/engram/api/app.py b/engram/api/app.py index fa44ffb..86df251 100644 --- a/engram/api/app.py +++ b/engram/api/app.py @@ -28,6 +28,8 @@ CommitResolutionRequest, ConflictResolutionRequest, DailyDigestResponse, + HandoffCheckpointRequest, + HandoffResumeRequest, NamespaceDeclareRequest, NamespacePermissionRequest, SceneSearchRequest, @@ -178,6 +180,96 @@ async def create_session(request: SessionCreateRequest, http_request: Request): raise HTTPException(status_code=403, detail=str(exc)) +@app.post("/v1/handoff/resume") +@app.post("/v1/handoff/resume/") +async def handoff_resume(request: HandoffResumeRequest, http_request: Request): + token = get_token_from_request(http_request) + kernel = get_kernel() + try: + return kernel.auto_resume_context( + user_id=request.user_id, + agent_id=request.agent_id, + repo_path=request.repo_path, + branch=request.branch, + lane_type=request.lane_type, + objective=request.objective, + agent_role=request.agent_role, + namespace=request.namespace, + statuses=request.statuses, + auto_create=request.auto_create, + token=token, + requester_agent_id=request.requester_agent_id, + ) + except PermissionError as exc: + raise require_session_error(exc) + + +@app.post("/v1/handoff/checkpoint") +@app.post("/v1/handoff/checkpoint/") +async def handoff_checkpoint(request: HandoffCheckpointRequest, http_request: Request): + token = get_token_from_request(http_request) + kernel = get_kernel() + payload = { + "status": request.status, + "task_summary": request.task_summary, + "decisions_made": request.decisions_made, + "files_touched": request.files_touched, + "todos_remaining": request.todos_remaining, + "blockers": request.blockers, + "key_commands": request.key_commands, + "test_results": request.test_results, + "context_snapshot": request.context_snapshot, + } + try: + return kernel.auto_checkpoint( + user_id=request.user_id, + agent_id=request.agent_id, + payload=payload, + event_type=request.event_type, + repo_path=request.repo_path, + branch=request.branch, + lane_id=request.lane_id, + lane_type=request.lane_type, + objective=request.objective, + agent_role=request.agent_role, + namespace=request.namespace, + confidentiality_scope=request.confidentiality_scope, + expected_version=request.expected_version, + token=token, + requester_agent_id=request.requester_agent_id, + ) + except PermissionError as exc: + raise require_session_error(exc) + + +@app.get("/v1/handoff/lanes") +@app.get("/v1/handoff/lanes/") +async def list_handoff_lanes( + http_request: Request, + user_id: str = Query(default="default"), + repo_path: Optional[str] = Query(default=None), + status: Optional[str] = Query(default=None), + statuses: Optional[List[str]] = Query(default=None), + limit: int = Query(default=20, ge=1, le=200), + requester_agent_id: Optional[str] = Query(default=None), +): + token = get_token_from_request(http_request) + kernel = get_kernel() + try: + lanes = kernel.list_handoff_lanes( + user_id=user_id, + repo_path=repo_path, + status=status, + statuses=statuses, + limit=limit, + token=token, + requester_agent_id=requester_agent_id, + ) + return {"lanes": lanes, "count": len(lanes)} + except PermissionError as exc: + raise require_session_error(exc) + + @app.post("/v1/search", response_model=SearchResultResponse) @app.post("/v1/search/", response_model=SearchResultResponse) @app.post("/v1/memories/search", response_model=SearchResultResponse) diff --git a/engram/api/schemas.py b/engram/api/schemas.py index 2e6534f..a9944ce 100644 --- a/engram/api/schemas.py +++ b/engram/api/schemas.py @@ -27,6 +27,45 @@ class SessionCreateResponse(BaseModel): namespaces: List[str] +class HandoffResumeRequest(BaseModel): + user_id: str = Field(default="default") + agent_id: Optional[str] = Field(default=None) + requester_agent_id: Optional[str] = Field(default=None) + repo_path: Optional[str] = Field(default=None) + branch: Optional[str] = Field(default=None) + lane_type: str = Field(default="general") + objective: Optional[str] = Field(default=None) + agent_role: Optional[str] = Field(default=None) + namespace: str = Field(default="default") + statuses: Optional[List[str]] = Field(default=None) + auto_create: bool = Field(default=True) + + +class HandoffCheckpointRequest(BaseModel): + user_id: str = Field(default="default") + agent_id: str + requester_agent_id: Optional[str] = Field(default=None) + repo_path: Optional[str] = Field(default=None) + branch: Optional[str] = Field(default=None) + lane_id: Optional[str] = Field(default=None) + lane_type: str = Field(default="general") + objective: Optional[str] = Field(default=None) + agent_role: Optional[str] = Field(default=None) + namespace: str = Field(default="default") + confidentiality_scope: str = Field(default="work") + event_type: str = Field(default="tool_complete") + status: str = Field(default="active") + task_summary: Optional[str] = Field(default=None) + decisions_made: List[str] = Field(default_factory=list) + files_touched: List[str] = Field(default_factory=list) + todos_remaining: List[str] = Field(default_factory=list) + blockers: List[str] = Field(default_factory=list) + key_commands: List[str] = Field(default_factory=list) + test_results: List[str] = Field(default_factory=list) + context_snapshot: Optional[str] = Field(default=None) + expected_version: Optional[int] = Field(default=None) + + class SearchRequestV2(BaseModel): query: str user_id: str = Field(default="default") diff --git a/engram/cli.py b/engram/cli.py index 9902c84..19b1c9d 100644 --- a/engram/cli.py +++ b/engram/cli.py @@ -9,7 +9,7 @@ import sys import shutil from pathlib import Path -from typing import Dict, Any, Optional +from typing import Dict, Any, Optional, List # TOML support (built-in since Python 3.11, fallback for earlier versions) try: @@ -24,6 +24,114 @@ except ImportError: HAS_TOMLI_W = False +_CONTINUITY_START = "" +_CONTINUITY_END = "" + + +def _handoff_block(agent_id: str, agent_label: str) -> str: + return f"""\ +{_CONTINUITY_START} +## Engram Continuity (Auto-Generated) + +Follow these rules for cross-agent continuity on every new task/thread. + +1) Before answering substantive repo/task questions, call `get_last_session`: +- `user_id`: `"default"` unless provided +- `requester_agent_id`: `"{agent_id}"` +- `repo`: absolute workspace path +- Include `agent_id` only when the user explicitly asks to continue from a specific source agent. + +2) If no handoff session exists, continue normally and use memory tools as needed. + +3) On major milestones and before pausing/ending, call `save_session_digest` with: +- `task_summary` +- `repo` +- `status` (`"active"`, `"paused"`, or `"completed"`) +- `decisions_made`, `files_touched`, `todos_remaining` +- `blockers`, `key_commands`, `test_results` when available +- `agent_id`: `"{agent_id}"`, `requester_agent_id`: `"{agent_id}"` + +4) Prefer Engram MCP handoff tools over shell/SQLite inspection for continuity. + +Target agent profile: `{agent_label}`. +{_CONTINUITY_END} +""" + + +def _cursor_rule_content() -> str: + return """\ +--- +description: "Use Engram handoff tools automatically for continuity" +alwaysApply: true +--- + +When an Engram MCP server is available: + +1) At the start of a new task/thread, call `get_last_session` first. + Do not pass `agent_id` unless the user explicitly asks for a specific source agent. +2) Use the returned handoff context to continue work naturally. +3) Before pausing or ending, call `save_session_digest`. +4) Do not use shell/SQLite probing for continuity when MCP handoff tools exist. +""" + + +def _upsert_block_file(path: Path, block: str) -> bool: + existing = path.read_text(encoding="utf-8") if path.exists() else "" + start_idx = existing.find(_CONTINUITY_START) + end_idx = existing.find(_CONTINUITY_END) + + if start_idx != -1 and end_idx != -1 and end_idx >= start_idx: + end_idx += len(_CONTINUITY_END) + updated = existing[:start_idx] + block + existing[end_idx:] + elif existing.strip(): + updated = existing.rstrip() + "\n\n" + block + "\n" + else: + updated = block + "\n" + + if updated == existing: + return False + + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(updated, encoding="utf-8") + return True + + +def _write_file_if_changed(path: Path, content: str) -> bool: + existing = path.read_text(encoding="utf-8") if path.exists() else "" + normalized = content if content.endswith("\n") else content + "\n" + if existing == normalized: + return False + path.parent.mkdir(parents=True, exist_ok=True) + path.write_text(normalized, encoding="utf-8") + return True + + +def _install_workspace_continuity_rules(workspace: Path) -> List[str]: + updated: List[str] = [] + + targets = [ + (workspace / "AGENTS.md", _handoff_block("codex", "Codex/agent-runner")), + (workspace / "CLAUDE.md", _handoff_block("claude-code", "Claude Code")), + (workspace / "CURSOR.md", _handoff_block("cursor", "Cursor")), + ] + for path, block in targets: + if _upsert_block_file(path, block): + updated.append(str(path)) + + cursor_rule_path = workspace / ".cursor" / "rules" / "engram-continuity.mdc" + if _write_file_if_changed(cursor_rule_path, _cursor_rule_content()): + updated.append(str(cursor_rule_path)) + + return updated + + +def _config_with_agent_identity(server_config: Dict[str, Any], agent_id: str) -> Dict[str, Any]: + updated = dict(server_config) + env = dict(updated.get("env", {})) + env["ENGRAM_MCP_AGENT_ID"] = agent_id + updated["env"] = env + return updated + def _read_toml(path: Path) -> Dict[str, Any]: """Read a TOML file.""" @@ -148,15 +256,18 @@ def install(): json_targets = [ { "name": "Claude Code (CLI)", - "path": Path.home() / ".claude.json" + "path": Path.home() / ".claude.json", + "agent_id": "claude-code", }, { "name": "Claude Desktop (macOS)", - "path": Path.home() / "Library/Application Support/Claude/claude_desktop_config.json" + "path": Path.home() / "Library/Application Support/Claude/claude_desktop_config.json", + "agent_id": "claude-code", }, { "name": "Cursor", - "path": Path.home() / ".cursor" / "mcp.json" + "path": Path.home() / ".cursor" / "mcp.json", + "agent_id": "cursor", }, ] @@ -164,7 +275,8 @@ def install(): toml_targets = [ { "name": "Codex CLI", - "path": Path.home() / ".codex" / "config.toml" + "path": Path.home() / ".codex" / "config.toml", + "agent_id": "codex", }, ] @@ -172,12 +284,14 @@ def install(): # Install to JSON configs for target in json_targets: - if _update_config(target["name"], target["path"], "engram-memory", mcp_config): + target_config = _config_with_agent_identity(mcp_config, target["agent_id"]) + if _update_config(target["name"], target["path"], "engram-memory", target_config): installed_count += 1 # Install to TOML configs (Codex) for target in toml_targets: - if _update_codex_config(target["name"], target["path"], "engram-memory", mcp_config): + target_config = _config_with_agent_identity(mcp_config, target["agent_id"]) + if _update_codex_config(target["name"], target["path"], "engram-memory", target_config): installed_count += 1 # Install OpenClaw skill @@ -192,6 +306,28 @@ def install(): if _deploy_cc_plugin(): installed_count += 1 + # Install workspace-level continuity rules for agents (idempotent block updates) + skip_workspace_rules = os.environ.get("ENGRAM_INSTALL_SKIP_WORKSPACE_RULES", "").strip().lower() in { + "1", + "true", + "yes", + "on", + } + if not skip_workspace_rules: + cwd = Path.cwd() + if (cwd / ".git").exists() or (cwd / "pyproject.toml").exists(): + updated_paths = _install_workspace_continuity_rules(cwd) + if updated_paths: + print("\nInstalled workspace continuity rules:") + for entry in updated_paths: + print(f" • {entry}") + else: + print("\nWorkspace continuity rules already up to date.") + else: + print("\nℹ️ Skipping workspace continuity rules (current directory is not a project root).") + else: + print("\nℹ️ Skipping workspace continuity rules (ENGRAM_INSTALL_SKIP_WORKSPACE_RULES is set).") + if installed_count > 0: print("\n✨ Installation successful!") print("Engram is now configured for:") diff --git a/engram/configs/base.py b/engram/configs/base.py index 3ed3978..24d08a1 100644 --- a/engram/configs/base.py +++ b/engram/configs/base.py @@ -1,5 +1,5 @@ import os -from typing import Any, Dict, Optional +from typing import Any, Dict, List, Optional from pydantic import BaseModel, Field @@ -114,6 +114,32 @@ class ProfileConfig(BaseModel): max_facts_per_profile: int = 100 +class HandoffConfig(BaseModel): + """Configuration for cross-agent session handoff.""" + enable_handoff: bool = True + auto_enrich: bool = True # LLM-enrich digests with linked memories + max_sessions_per_user: int = 100 # retain last N sessions + auto_session_bus: bool = True + auto_checkpoint_events: List[str] = Field( + default_factory=lambda: ["tool_complete", "agent_pause", "agent_end"] + ) + lane_inactivity_minutes: int = 240 + max_lanes_per_user: int = 50 + max_checkpoints_per_lane: int = 200 + resume_statuses: List[str] = Field(default_factory=lambda: ["active", "paused"]) + auto_trusted_agents: List[str] = Field( + default_factory=lambda: [ + "pm", + "design", + "frontend", + "backend", + "claude-code", + "codex", + "chatgpt", + ] + ) + + class ScopeConfig(BaseModel): """Configuration for scope-aware sharing weights.""" agent_weight: float = 1.0 @@ -160,3 +186,4 @@ class MemoryConfig(BaseModel): graph: KnowledgeGraphConfig = Field(default_factory=lambda: KnowledgeGraphConfig()) scene: SceneConfig = Field(default_factory=SceneConfig) profile: ProfileConfig = Field(default_factory=ProfileConfig) + handoff: HandoffConfig = Field(default_factory=HandoffConfig) diff --git a/engram/core/handoff.py b/engram/core/handoff.py new file mode 100644 index 0000000..9790199 --- /dev/null +++ b/engram/core/handoff.py @@ -0,0 +1,193 @@ +"""Compatibility layer for cross-agent handoff powered by HandoffSessionBus.""" + +from __future__ import annotations + +from typing import Any, Dict, List, Optional + +from engram.core.handoff_bus import HandoffSessionBus + + +class HandoffProcessor: + """Backwards-compatible handoff facade with session bus internals.""" + + def __init__( + self, + db, + memory=None, + embedder=None, + llm=None, # retained for compatibility + config: Optional[Dict[str, Any]] = None, + ): + self.db = db + self.memory = memory + self.embedder = embedder + cfg = config or {} + self.auto_enrich = bool(cfg.get("auto_enrich", True)) + self.max_sessions = int(cfg.get("max_sessions", 100)) + self.session_bus = HandoffSessionBus( + db=db, + memory=memory, + embedder=embedder, + config=cfg, + ) + + # ------------------------------------------------------------------ + # Legacy session digest API + # ------------------------------------------------------------------ + + def save_digest(self, user_id: str, agent_id: str, digest: Dict[str, Any]) -> Dict[str, Any]: + return self.session_bus.save_session_digest(user_id=user_id, agent_id=agent_id, digest=digest) + + def get_handoff_context(self, session_id: str) -> Dict[str, Any]: + return self.session_bus.get_handoff_context(session_id) + + def get_last_session( + self, + *, + user_id: str, + agent_id: Optional[str] = None, + repo: Optional[str] = None, + statuses: Optional[List[str]] = None, + ) -> Optional[Dict[str, Any]]: + return self.session_bus.get_last_session( + user_id=user_id, + agent_id=agent_id, + repo=repo, + statuses=statuses, + ) + + def list_sessions( + self, + *, + user_id: str, + agent_id: Optional[str] = None, + repo: Optional[str] = None, + status: Optional[str] = None, + statuses: Optional[List[str]] = None, + limit: int = 20, + ) -> List[Dict[str, Any]]: + return self.session_bus.list_sessions( + user_id=user_id, + agent_id=agent_id, + repo=repo, + status=status, + statuses=statuses, + limit=limit, + ) + + # ------------------------------------------------------------------ + # Session bus API + # ------------------------------------------------------------------ + + def auto_resume_context( + self, + *, + user_id: str, + agent_id: Optional[str], + repo_path: Optional[str] = None, + branch: Optional[str] = None, + lane_type: str = "general", + objective: Optional[str] = None, + agent_role: Optional[str] = None, + namespace: str = "default", + statuses: Optional[List[str]] = None, + auto_create: bool = True, + ) -> Dict[str, Any]: + return self.session_bus.auto_resume_context( + user_id=user_id, + agent_id=agent_id, + repo_path=repo_path, + branch=branch, + lane_type=lane_type, + objective=objective, + agent_role=agent_role, + namespace=namespace, + statuses=statuses, + auto_create=auto_create, + ) + + def auto_checkpoint( + self, + *, + user_id: str, + agent_id: str, + payload: Dict[str, Any], + event_type: str = "tool_complete", + repo_path: Optional[str] = None, + branch: Optional[str] = None, + lane_id: Optional[str] = None, + lane_type: str = "general", + objective: Optional[str] = None, + agent_role: Optional[str] = None, + namespace: str = "default", + confidentiality_scope: str = "work", + expected_version: Optional[int] = None, + ) -> Dict[str, Any]: + return self.session_bus.auto_checkpoint( + user_id=user_id, + agent_id=agent_id, + payload=payload, + event_type=event_type, + repo_path=repo_path, + branch=branch, + lane_id=lane_id, + lane_type=lane_type, + objective=objective, + agent_role=agent_role, + namespace=namespace, + confidentiality_scope=confidentiality_scope, + expected_version=expected_version, + ) + + def finalize_lane( + self, + *, + user_id: str, + agent_id: str, + lane_id: str, + status: str = "paused", + payload: Optional[Dict[str, Any]] = None, + repo_path: Optional[str] = None, + branch: Optional[str] = None, + agent_role: Optional[str] = None, + namespace: str = "default", + ) -> Dict[str, Any]: + return self.session_bus.finalize_lane( + user_id=user_id, + agent_id=agent_id, + lane_id=lane_id, + status=status, + payload=payload, + repo_path=repo_path, + branch=branch, + agent_role=agent_role, + namespace=namespace, + ) + + def list_lanes( + self, + *, + user_id: str, + repo_path: Optional[str] = None, + status: Optional[str] = None, + statuses: Optional[List[str]] = None, + limit: int = 20, + ) -> List[Dict[str, Any]]: + return self.session_bus.list_lanes( + user_id=user_id, + repo_path=repo_path, + status=status, + statuses=statuses, + limit=limit, + ) + + # Legacy method kept for callers that still invoke explicit enrichment. + def enrich(self, session_id: str, user_id: str) -> Dict[str, Any]: + session = self.db.get_handoff_session(session_id) + if not session: + return {} + return { + "linked_memories": len(session.get("linked_memory_ids", [])), + "linked_scenes": len(session.get("linked_scene_ids", [])), + } + diff --git a/engram/core/handoff_bus.py b/engram/core/handoff_bus.py new file mode 100644 index 0000000..7e2da78 --- /dev/null +++ b/engram/core/handoff_bus.py @@ -0,0 +1,938 @@ +"""Automatic cross-agent session bus for multi-lane handoff continuity.""" + +from __future__ import annotations + +import logging +from datetime import datetime +from typing import Any, Dict, List, Optional, Tuple + +from engram.core.policy import ALL_CONFIDENTIALITY_SCOPES, DEFAULT_CAPABILITIES, HANDOFF_CAPABILITIES +from engram.utils.repo_identity import canonicalize_repo_identity + +logger = logging.getLogger(__name__) + + +def _utc_now_iso() -> str: + return datetime.utcnow().isoformat() + + +def _safe_dt(value: Optional[str]) -> Optional[datetime]: + if not value: + return None + try: + return datetime.fromisoformat(str(value)) + except Exception: + return None + + +def _tokenize(text: Optional[str]) -> set[str]: + if not text: + return set() + return {token.strip().lower() for token in str(text).replace("/", " ").replace("_", " ").split() if token.strip()} + + +def _merge_list_values(existing: Any, incoming: Any) -> List[str]: + merged: List[str] = [] + for value in list(existing or []) + list(incoming or []): + item = str(value).strip() + if item and item not in merged: + merged.append(item) + return merged + + +class HandoffSessionBus: + """Server-side session bus with lane routing and automatic checkpointing.""" + + def __init__( + self, + *, + db, + memory=None, + embedder=None, + config: Optional[Dict[str, Any]] = None, + ): + self.db = db + self.memory = memory + self.embedder = embedder + cfg = config or {} + self.auto_enrich = bool(cfg.get("auto_enrich", True)) + self.max_sessions_per_user = int(cfg.get("max_sessions", 100)) + self.max_lanes_per_user = int(cfg.get("max_lanes_per_user", 50)) + self.max_checkpoints_per_lane = int(cfg.get("max_checkpoints_per_lane", 200)) + self.resume_statuses = [str(v).strip() for v in cfg.get("resume_statuses", ["active", "paused"]) if str(v).strip()] + self.lane_inactivity_minutes = int(cfg.get("lane_inactivity_minutes", 240)) + self.auto_trusted_agents = { + str(agent).strip().lower() + for agent in cfg.get( + "auto_trusted_agents", + ["pm", "design", "frontend", "backend", "claude-code", "codex", "chatgpt"], + ) + if str(agent).strip() + } + + # ------------------------------------------------------------------ + # Public API: automatic lane + checkpoints + # ------------------------------------------------------------------ + + def auto_resume_context( + self, + *, + user_id: str, + agent_id: Optional[str], + repo_path: Optional[str] = None, + branch: Optional[str] = None, + lane_type: str = "general", + objective: Optional[str] = None, + agent_role: Optional[str] = None, + namespace: str = "default", + statuses: Optional[List[str]] = None, + auto_create: bool = True, + ) -> Dict[str, Any]: + self._bootstrap_auto_trusted_policy(user_id=user_id, agent_id=agent_id, namespace=namespace) + repo_identity = canonicalize_repo_identity(repo_path, branch=branch) + allowed_statuses = statuses or list(self.resume_statuses) + + lane, created = self._select_or_create_lane( + user_id=user_id, + repo_identity=repo_identity, + lane_type=lane_type, + objective=objective, + namespace=namespace, + statuses=allowed_statuses, + auto_create=auto_create, + ) + if not lane: + return {"error": "No matching lane found"} + + checkpoint = self.db.get_latest_handoff_checkpoint(lane["id"]) + packet = self._build_resume_packet( + lane=lane, + checkpoint=checkpoint, + from_agent=agent_id, + agent_role=agent_role, + ) + packet["created_new_lane"] = bool(created) + if created: + packet["warm_context"] = self._warm_context( + user_id=user_id, + repo_identity=repo_identity, + objective=objective, + ) + return packet + + def auto_checkpoint( + self, + *, + user_id: str, + agent_id: str, + payload: Dict[str, Any], + event_type: str = "tool_complete", + repo_path: Optional[str] = None, + branch: Optional[str] = None, + lane_id: Optional[str] = None, + lane_type: str = "general", + objective: Optional[str] = None, + agent_role: Optional[str] = None, + namespace: str = "default", + confidentiality_scope: str = "work", + expected_version: Optional[int] = None, + ) -> Dict[str, Any]: + self._bootstrap_auto_trusted_policy(user_id=user_id, agent_id=agent_id, namespace=namespace) + repo_identity = canonicalize_repo_identity(repo_path, branch=branch) + + lane = self.db.get_handoff_lane(lane_id) if lane_id else None + if lane and lane.get("user_id") != user_id: + lane = None + + if not lane: + lane, _ = self._select_or_create_lane( + user_id=user_id, + repo_identity=repo_identity, + lane_type=lane_type, + objective=objective, + namespace=namespace, + statuses=list(self.resume_statuses), + auto_create=True, + ) + if not lane: + return {"error": "Unable to resolve or create handoff lane"} + + now = _utc_now_iso() + normalized_payload = self._normalize_checkpoint_payload(payload) + if objective and not normalized_payload.get("task_summary"): + normalized_payload["task_summary"] = objective + + previous_state = dict(lane.get("current_state") or {}) + merged_state, conflicts = self._merge_state(previous_state, normalized_payload) + + checkpoint_data = { + "lane_id": lane["id"], + "user_id": user_id, + "agent_id": agent_id, + "agent_role": agent_role, + "event_type": event_type, + "task_summary": normalized_payload.get("task_summary"), + "decisions_made": merged_state.get("decisions_made", []), + "files_touched": merged_state.get("files_touched", []), + "todos_remaining": merged_state.get("todos_remaining", []), + "blockers": merged_state.get("blockers", []), + "key_commands": merged_state.get("key_commands", []), + "test_results": merged_state.get("test_results", []), + "merge_conflicts": conflicts, + "context_snapshot": merged_state.get("context_snapshot"), + "created_at": now, + } + checkpoint_id = self.db.add_handoff_checkpoint(checkpoint_data) + + enrichment = {"linked_memories": 0, "linked_scenes": 0} + if self.auto_enrich: + enrichment = self._enrich_checkpoint( + checkpoint_id=checkpoint_id, + user_id=user_id, + repo_identity=repo_identity, + task_summary=merged_state.get("task_summary"), + created_at=now, + ) + + target_version = int(lane.get("version", 0)) + 1 + lane_status = str(normalized_payload.get("status") or lane.get("status") or "active") + lane_updates = { + "status": lane_status, + "objective": merged_state.get("task_summary") or lane.get("objective"), + "current_state": merged_state, + "last_checkpoint_at": now, + "version": target_version, + "updated_at": now, + "namespace": namespace or lane.get("namespace", "default"), + "confidentiality_scope": confidentiality_scope or lane.get("confidentiality_scope", "work"), + "repo_id": repo_identity.get("repo_id"), + "repo_path": repo_identity.get("repo_path"), + "branch": repo_identity.get("branch") or lane.get("branch"), + } + updated = self.db.update_handoff_lane( + lane["id"], + lane_updates, + expected_version=expected_version, + ) + if not updated: + # Optimistic conflict fallback: refresh lane and force merge. + fresh_lane = self.db.get_handoff_lane(lane["id"]) or lane + fresh_state = dict(fresh_lane.get("current_state") or {}) + resolved_state, merge_conflicts = self._merge_state(fresh_state, normalized_payload) + all_conflicts = list(conflicts) + list(merge_conflicts) + self.db.update_handoff_lane( + lane["id"], + { + "current_state": resolved_state, + "version": int(fresh_lane.get("version", 0)) + 1, + "last_checkpoint_at": now, + "updated_at": now, + "status": lane_status, + }, + ) + conflicts = all_conflicts + merged_state = resolved_state + + if conflicts: + self.db.add_handoff_lane_conflict( + { + "lane_id": lane["id"], + "checkpoint_id": checkpoint_id, + "user_id": user_id, + "conflict_fields": [item.get("field") for item in conflicts if item.get("field")], + "previous_state": previous_state, + "incoming_state": normalized_payload, + "resolved_state": merged_state, + "created_at": now, + } + ) + + self.db.prune_handoff_checkpoints(lane_id=lane["id"], max_checkpoints=self.max_checkpoints_per_lane) + self.db.prune_handoff_lanes(user_id=user_id, max_lanes=self.max_lanes_per_user) + + return { + "lane_id": lane["id"], + "checkpoint_id": checkpoint_id, + "status": lane_status, + "version": target_version, + "conflicts": conflicts, + "enrichment": enrichment, + } + + def finalize_lane( + self, + *, + user_id: str, + agent_id: str, + lane_id: str, + status: str = "paused", + payload: Optional[Dict[str, Any]] = None, + repo_path: Optional[str] = None, + branch: Optional[str] = None, + agent_role: Optional[str] = None, + namespace: str = "default", + ) -> Dict[str, Any]: + result = self.auto_checkpoint( + user_id=user_id, + agent_id=agent_id, + lane_id=lane_id, + payload=payload or {}, + event_type="agent_end", + repo_path=repo_path, + branch=branch, + agent_role=agent_role, + namespace=namespace, + ) + lane = self.db.get_handoff_lane(lane_id) + if lane: + self.db.update_handoff_lane(lane_id, {"status": status}) + result["lane_status"] = status + return result + + def list_lanes( + self, + *, + user_id: str, + repo_path: Optional[str] = None, + status: Optional[str] = None, + statuses: Optional[List[str]] = None, + limit: int = 20, + ) -> List[Dict[str, Any]]: + repo_identity = canonicalize_repo_identity(repo_path, branch=None) if repo_path else {"repo_id": None} + return self.db.list_handoff_lanes( + user_id=user_id, + repo_id=repo_identity.get("repo_id"), + status=status, + statuses=statuses, + limit=limit, + ) + + # ------------------------------------------------------------------ + # Legacy compatibility (session digests) + # ------------------------------------------------------------------ + + def save_session_digest(self, user_id: str, agent_id: str, digest: Dict[str, Any]) -> Dict[str, Any]: + repo_path = digest.get("repo") + repo_identity = canonicalize_repo_identity(repo_path, branch=digest.get("branch")) + status = str(digest.get("status") or "paused") + checkpoint_payload = { + "status": status, + "task_summary": digest.get("task_summary"), + "decisions_made": digest.get("decisions_made", []), + "files_touched": digest.get("files_touched", []), + "todos_remaining": digest.get("todos_remaining", []), + "blockers": digest.get("blockers", []), + "key_commands": digest.get("key_commands", []), + "test_results": digest.get("test_results", []), + "context_snapshot": digest.get("context_snapshot"), + } + checkpoint = self.auto_checkpoint( + user_id=user_id, + agent_id=agent_id, + payload=checkpoint_payload, + event_type="agent_pause" if status in {"paused", "active"} else "agent_end", + repo_path=repo_path, + branch=digest.get("branch"), + lane_id=digest.get("lane_id"), + lane_type=digest.get("lane_type", "general"), + objective=digest.get("task_summary"), + agent_role=digest.get("agent_role"), + namespace=digest.get("namespace", "default"), + confidentiality_scope=digest.get("confidentiality_scope", "work"), + ) + lane_id = checkpoint.get("lane_id") + checkpoint_id = checkpoint.get("checkpoint_id") + checkpoint_memories = self.db.get_handoff_checkpoint_memories(checkpoint_id) if checkpoint_id else [] + checkpoint_scenes = self.db.get_handoff_checkpoint_scenes(checkpoint_id) if checkpoint_id else [] + + now = _utc_now_iso() + session_data = { + "user_id": user_id, + "agent_id": agent_id, + "repo": repo_identity.get("repo_path"), + "repo_id": repo_identity.get("repo_id"), + "status": status, + "task_summary": digest.get("task_summary", ""), + "decisions_made": digest.get("decisions_made", []), + "files_touched": digest.get("files_touched", []), + "todos_remaining": digest.get("todos_remaining", []), + "blockers": digest.get("blockers", []), + "key_commands": digest.get("key_commands", []), + "test_results": digest.get("test_results", []), + "context_snapshot": digest.get("context_snapshot"), + "linked_memory_ids": [item.get("id") for item in checkpoint_memories if item.get("id")], + "linked_scene_ids": [item.get("id") for item in checkpoint_scenes if item.get("id")], + "lane_id": lane_id, + "started_at": digest.get("started_at", now), + "ended_at": digest.get("ended_at"), + "last_checkpoint_at": now, + "namespace": digest.get("namespace", "default"), + "confidentiality_scope": digest.get("confidentiality_scope", "work"), + } + session_id = self.db.add_handoff_session(session_data) + self.db.prune_handoff_sessions(user_id=user_id, max_sessions=self.max_sessions_per_user) + return self.db.get_handoff_session(session_id) or {"id": session_id, **session_data} + + def get_handoff_context(self, session_id: str) -> Dict[str, Any]: + session = self.db.get_handoff_session(session_id) + if not session: + return {"error": "Session not found"} + lane = self.db.get_handoff_lane(session.get("lane_id")) if session.get("lane_id") else None + checkpoint = self.db.get_latest_handoff_checkpoint(session.get("lane_id")) if session.get("lane_id") else None + + related_memories = self.db.get_handoff_session_memories(session_id) + if not related_memories and checkpoint: + related_memories = self.db.get_handoff_checkpoint_memories(checkpoint["id"]) + related_scenes = self.db.get_handoff_checkpoint_scenes(checkpoint["id"]) if checkpoint else [] + + return { + "session_id": session["id"], + "lane_id": session.get("lane_id"), + "status": session.get("status", "paused"), + "repo": session.get("repo"), + "repo_id": session.get("repo_id"), + "from_agent": session.get("agent_id"), + "task_summary": session.get("task_summary", ""), + "decisions_made": session.get("decisions_made", []), + "files_touched": session.get("files_touched", []), + "todos_remaining": session.get("todos_remaining", []), + "blockers": session.get("blockers", []), + "key_commands": session.get("key_commands", []), + "test_results": session.get("test_results", []), + "context_snapshot": session.get("context_snapshot"), + "started_at": session.get("started_at"), + "ended_at": session.get("ended_at"), + "last_checkpoint_at": session.get("last_checkpoint_at"), + "lane_status": lane.get("status") if lane else None, + "lane_version": lane.get("version") if lane else None, + "related_memories": [ + {"id": item.get("id"), "memory": item.get("memory", "")} + for item in related_memories + ], + "related_scenes": [ + { + "id": scene.get("id"), + "summary": scene.get("summary"), + "topic": scene.get("topic"), + "start_time": scene.get("start_time"), + } + for scene in related_scenes + ], + } + + def get_last_session( + self, + *, + user_id: str, + agent_id: Optional[str] = None, + repo: Optional[str] = None, + statuses: Optional[List[str]] = None, + ) -> Optional[Dict[str, Any]]: + repo_identity = canonicalize_repo_identity(repo, branch=None) if repo else {"repo_id": None} + preferred_statuses = statuses or list(self.resume_statuses) + repo_candidates: List[Optional[str]] = [repo_identity.get("repo_id")] + if repo_candidates[0] is not None: + repo_candidates.append(None) + + for repo_id in repo_candidates: + session = self.db.get_last_handoff_session( + user_id=user_id, + agent_id=agent_id, + repo=repo if repo_id is not None else None, + repo_id=repo_id, + statuses=preferred_statuses, + ) + if session: + return self.get_handoff_context(session["id"]) + + for repo_id in repo_candidates: + session = self.db.get_last_handoff_session( + user_id=user_id, + agent_id=agent_id, + repo=repo if repo_id is not None else None, + repo_id=repo_id, + statuses=None, + ) + if session: + return self.get_handoff_context(session["id"]) + + # Compatibility fallback: if legacy sessions are absent, derive context + # from the latest lane/checkpoint state so resume still works. + for repo_id in repo_candidates: + lane_packet = self._latest_lane_resume_packet( + user_id=user_id, + agent_id=agent_id, + repo_id=repo_id, + statuses=preferred_statuses, + ) + if lane_packet: + return lane_packet + for repo_id in repo_candidates: + lane_packet = self._latest_lane_resume_packet( + user_id=user_id, + agent_id=agent_id, + repo_id=repo_id, + statuses=None, + ) + if lane_packet: + return lane_packet + return None + + def list_sessions( + self, + *, + user_id: str, + agent_id: Optional[str] = None, + repo: Optional[str] = None, + status: Optional[str] = None, + statuses: Optional[List[str]] = None, + limit: int = 20, + ) -> List[Dict[str, Any]]: + repo_identity = canonicalize_repo_identity(repo, branch=None) if repo else {"repo_id": None} + sessions = self.db.list_handoff_sessions( + user_id=user_id, + agent_id=agent_id, + repo=repo, + repo_id=repo_identity.get("repo_id"), + status=status, + statuses=statuses, + limit=limit, + ) + if sessions: + return sessions + + lane_sessions = self._lane_sessions_fallback( + user_id=user_id, + agent_id=agent_id, + repo_id=repo_identity.get("repo_id"), + status=status, + statuses=statuses, + limit=limit, + ) + if lane_sessions or repo_identity.get("repo_id") is None: + return lane_sessions + return self._lane_sessions_fallback( + user_id=user_id, + agent_id=agent_id, + repo_id=None, + status=status, + statuses=statuses, + limit=limit, + ) + + # ------------------------------------------------------------------ + # Internal helpers + # ------------------------------------------------------------------ + + def _bootstrap_auto_trusted_policy(self, *, user_id: str, agent_id: Optional[str], namespace: str) -> None: + if not user_id or not agent_id: + return + normalized_agent = str(agent_id).strip().lower() + if normalized_agent not in self.auto_trusted_agents: + return + existing = self.db.get_agent_policy( + user_id=user_id, + agent_id=agent_id, + include_wildcard=False, + ) + if existing: + return + capabilities = sorted(set(list(DEFAULT_CAPABILITIES) + list(HANDOFF_CAPABILITIES))) + namespaces = ["default"] + ns_value = str(namespace or "").strip() + if ns_value and ns_value not in namespaces: + namespaces.append(ns_value) + self.db.upsert_agent_policy( + user_id=user_id, + agent_id=agent_id, + allowed_confidentiality_scopes=list(ALL_CONFIDENTIALITY_SCOPES), + allowed_capabilities=capabilities, + allowed_namespaces=namespaces, + ) + + def _latest_lane_resume_packet( + self, + *, + user_id: str, + agent_id: Optional[str], + repo_id: Optional[str], + statuses: Optional[List[str]], + ) -> Optional[Dict[str, Any]]: + lanes = self.db.list_handoff_lanes( + user_id=user_id, + repo_id=repo_id, + statuses=statuses, + limit=50, + ) + for lane in lanes: + checkpoint = self.db.get_latest_handoff_checkpoint(lane["id"]) + source_agent = checkpoint.get("agent_id") if checkpoint else None + if agent_id and source_agent != agent_id: + continue + return self._build_resume_packet( + lane=lane, + checkpoint=checkpoint, + from_agent=source_agent, + agent_role=checkpoint.get("agent_role") if checkpoint else None, + ) + return None + + def _lane_sessions_fallback( + self, + *, + user_id: str, + agent_id: Optional[str], + repo_id: Optional[str], + status: Optional[str], + statuses: Optional[List[str]], + limit: int, + ) -> List[Dict[str, Any]]: + lanes = self.db.list_handoff_lanes( + user_id=user_id, + repo_id=repo_id, + status=status, + statuses=statuses, + limit=max(limit, 1), + ) + results: List[Dict[str, Any]] = [] + for lane in lanes: + checkpoint = self.db.get_latest_handoff_checkpoint(lane["id"]) + source_agent = checkpoint.get("agent_id") if checkpoint else None + if agent_id and source_agent != agent_id: + continue + source = checkpoint or lane.get("current_state") or {} + results.append( + { + "id": lane.get("id"), + "agent_id": source_agent, + "repo": lane.get("repo_path"), + "repo_id": lane.get("repo_id"), + "status": lane.get("status"), + "task_summary": source.get("task_summary") or lane.get("objective", ""), + "decisions_made": source.get("decisions_made", []), + "files_touched": source.get("files_touched", []), + "todos_remaining": source.get("todos_remaining", []), + "blockers": source.get("blockers", []), + "key_commands": source.get("key_commands", []), + "test_results": source.get("test_results", []), + "context_snapshot": source.get("context_snapshot"), + "lane_id": lane.get("id"), + "last_checkpoint_at": lane.get("last_checkpoint_at"), + "updated_at": lane.get("updated_at"), + "source": "lane_checkpoint", + } + ) + if len(results) >= limit: + break + return results + + def _select_or_create_lane( + self, + *, + user_id: str, + repo_identity: Dict[str, Optional[str]], + lane_type: str, + objective: Optional[str], + namespace: str, + statuses: List[str], + auto_create: bool, + ) -> Tuple[Optional[Dict[str, Any]], bool]: + candidates = self.db.list_handoff_lanes( + user_id=user_id, + repo_id=repo_identity.get("repo_id"), + statuses=statuses, + limit=50, + ) + if not candidates: + candidates = self.db.list_handoff_lanes( + user_id=user_id, + repo_id=None, + statuses=statuses, + limit=50, + ) + + best_lane: Optional[Dict[str, Any]] = None + best_score = -1.0 + objective_terms = _tokenize(objective) + for lane in candidates: + score = self._score_lane( + lane=lane, + repo_id=repo_identity.get("repo_id"), + branch=repo_identity.get("branch"), + objective_terms=objective_terms, + ) + if score > best_score: + best_score = score + best_lane = lane + + if best_lane and best_score >= 0.45: + return best_lane, False + if not auto_create: + return None, False + + now = _utc_now_iso() + lane_id = self.db.add_handoff_lane( + { + "user_id": user_id, + "repo_id": repo_identity.get("repo_id"), + "repo_path": repo_identity.get("repo_path"), + "branch": repo_identity.get("branch"), + "lane_type": lane_type or "general", + "status": "active", + "objective": objective, + "current_state": { + "task_summary": objective or "", + "decisions_made": [], + "files_touched": [], + "todos_remaining": [], + "blockers": [], + "key_commands": [], + "test_results": [], + "context_snapshot": None, + }, + "namespace": namespace or "default", + "confidentiality_scope": "work", + "last_checkpoint_at": now, + "version": 0, + "created_at": now, + "updated_at": now, + } + ) + lane = self.db.get_handoff_lane(lane_id) + return lane, True + + def _score_lane( + self, + *, + lane: Dict[str, Any], + repo_id: Optional[str], + branch: Optional[str], + objective_terms: set[str], + ) -> float: + score = 0.0 + if repo_id and lane.get("repo_id") == repo_id: + score += 0.55 + if branch and lane.get("branch") == branch: + score += 0.15 + + lane_terms = _tokenize(lane.get("objective")) + if objective_terms and lane_terms: + overlap = len(objective_terms & lane_terms) / max(1, len(objective_terms | lane_terms)) + score += overlap * 0.2 + + last_checkpoint = _safe_dt(lane.get("last_checkpoint_at") or lane.get("updated_at") or lane.get("created_at")) + if last_checkpoint: + age_minutes = max(0.0, (datetime.utcnow() - last_checkpoint).total_seconds() / 60.0) + score += max(0.0, 0.1 - min(age_minutes, 24 * 60) / (24 * 60 * 10)) + if age_minutes > self.lane_inactivity_minutes and lane.get("status") == "active": + score -= 0.2 + return score + + def _normalize_checkpoint_payload(self, payload: Dict[str, Any]) -> Dict[str, Any]: + payload = dict(payload or {}) + normalized = { + "status": str(payload.get("status") or "active"), + "task_summary": str(payload.get("task_summary") or "").strip(), + "decisions_made": _merge_list_values([], payload.get("decisions_made", [])), + "files_touched": _merge_list_values([], payload.get("files_touched", [])), + "todos_remaining": _merge_list_values([], payload.get("todos_remaining", [])), + "blockers": _merge_list_values([], payload.get("blockers", [])), + "key_commands": _merge_list_values([], payload.get("key_commands", [])), + "test_results": _merge_list_values([], payload.get("test_results", [])), + "context_snapshot": payload.get("context_snapshot"), + } + return normalized + + def _merge_state(self, current: Dict[str, Any], incoming: Dict[str, Any]) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: + merged = dict(current or {}) + conflicts: List[Dict[str, Any]] = [] + list_fields = { + "decisions_made", + "files_touched", + "todos_remaining", + "blockers", + "key_commands", + "test_results", + } + scalar_fields = {"task_summary", "context_snapshot", "status"} + + for key in list_fields: + merged[key] = _merge_list_values(merged.get(key, []), incoming.get(key, [])) + + for key in scalar_fields: + old_value = merged.get(key) + new_value = incoming.get(key) + if new_value in (None, "", []): + continue + if old_value not in (None, "", []) and old_value != new_value: + conflicts.append({"field": key, "previous": old_value, "incoming": new_value}) + merged[key] = new_value + return merged, conflicts + + def _warm_context( + self, + *, + user_id: str, + repo_identity: Dict[str, Optional[str]], + objective: Optional[str], + ) -> Dict[str, Any]: + memories: List[Dict[str, Any]] = [] + try: + if self.memory and objective: + search_payload = self.memory.search( + query=objective, + user_id=user_id, + limit=6, + boost_on_access=False, + ) + memories = list(search_payload.get("results", [])) + if not memories: + all_payload = self.memory.get_all(user_id=user_id, limit=6) if self.memory else {"results": []} + memories = list(all_payload.get("results", [])) + except Exception: + logger.warning("Warm context memory lookup failed", exc_info=True) + memories = [] + + scenes = self.db.get_scenes(user_id=user_id, limit=5) + repo_path = (repo_identity.get("repo_path") or "").lower() + if repo_path: + scoped = [scene for scene in scenes if repo_path in str(scene.get("location") or "").lower()] + if scoped: + scenes = scoped + + return { + "related_memories": [ + {"id": memory.get("id"), "memory": memory.get("memory", "")} + for memory in memories[:6] + ], + "related_scenes": [ + { + "id": scene.get("id"), + "summary": scene.get("summary"), + "topic": scene.get("topic"), + "start_time": scene.get("start_time"), + } + for scene in scenes[:5] + ], + } + + def _enrich_checkpoint( + self, + *, + checkpoint_id: str, + user_id: str, + repo_identity: Dict[str, Optional[str]], + task_summary: Optional[str], + created_at: str, + ) -> Dict[str, int]: + linked_memory_ids: List[str] = [] + linked_scene_ids: List[str] = [] + query = (task_summary or "").strip() + + if query and self.memory and self.memory.embedder and self.memory.vector_store: + try: + embedding = self.memory.embedder.embed(query, memory_action="search") + results = self.memory.vector_store.search( + query=query, + vectors=embedding, + limit=12, + filters={"user_id": user_id}, + ) + for item in results: + memory_id = getattr(item, "id", None) + if memory_id is None and isinstance(item, dict): + memory_id = item.get("id") + if memory_id and memory_id not in linked_memory_ids: + linked_memory_ids.append(str(memory_id)) + except Exception: + logger.warning("Handoff vector enrichment failed", exc_info=True) + + if not linked_memory_ids and query: + query_terms = _tokenize(query) + all_memories = self.db.get_all_memories(user_id=user_id, include_tombstoned=False) + scored: List[Tuple[int, str]] = [] + for memory in all_memories: + memory_text = str(memory.get("memory", "")).lower() + overlap = sum(1 for token in query_terms if token in memory_text) + if overlap > 0: + scored.append((overlap, str(memory["id"]))) + scored.sort(key=lambda item: item[0], reverse=True) + linked_memory_ids = [memory_id for _, memory_id in scored[:10]] + + scenes = self.db.get_scenes( + user_id=user_id, + start_before=created_at, + limit=10, + ) + repo_path = (repo_identity.get("repo_path") or "").lower() + if repo_path: + scoped_scenes = [scene for scene in scenes if repo_path in str(scene.get("location") or "").lower()] + if scoped_scenes: + scenes = scoped_scenes + linked_scene_ids = [str(scene["id"]) for scene in scenes[:6] if scene.get("id")] + + for index, memory_id in enumerate(linked_memory_ids[:10]): + self.db.add_handoff_checkpoint_memory( + checkpoint_id=checkpoint_id, + memory_id=memory_id, + relevance_score=max(0.1, 1.0 - (index * 0.05)), + ) + for index, scene_id in enumerate(linked_scene_ids[:6]): + self.db.add_handoff_checkpoint_scene( + checkpoint_id=checkpoint_id, + scene_id=scene_id, + relevance_score=max(0.1, 1.0 - (index * 0.05)), + ) + + return { + "linked_memories": min(10, len(linked_memory_ids)), + "linked_scenes": min(6, len(linked_scene_ids)), + } + + def _build_resume_packet( + self, + *, + lane: Dict[str, Any], + checkpoint: Optional[Dict[str, Any]], + from_agent: Optional[str], + agent_role: Optional[str], + ) -> Dict[str, Any]: + state = dict(lane.get("current_state") or {}) + source = checkpoint or state + memories = self.db.get_handoff_checkpoint_memories(checkpoint["id"]) if checkpoint else [] + scenes = self.db.get_handoff_checkpoint_scenes(checkpoint["id"]) if checkpoint else [] + return { + "lane_id": lane.get("id"), + "repo_id": lane.get("repo_id"), + "repo_path": lane.get("repo_path"), + "branch": lane.get("branch"), + "lane_type": lane.get("lane_type"), + "status": lane.get("status"), + "objective": lane.get("objective"), + "lane_version": lane.get("version", 0), + "from_agent": checkpoint.get("agent_id") if checkpoint else from_agent, + "agent_role": checkpoint.get("agent_role") if checkpoint else agent_role, + "task_summary": source.get("task_summary", lane.get("objective", "")), + "decisions_made": source.get("decisions_made", []), + "files_touched": source.get("files_touched", []), + "todos_remaining": source.get("todos_remaining", []), + "blockers": source.get("blockers", []), + "key_commands": source.get("key_commands", []), + "test_results": source.get("test_results", []), + "context_snapshot": source.get("context_snapshot"), + "last_checkpoint_at": lane.get("last_checkpoint_at"), + "next_actions": source.get("todos_remaining", []), + "related_memories": [ + {"id": item.get("id"), "memory": item.get("memory", "")} + for item in memories + ], + "related_scenes": [ + { + "id": scene.get("id"), + "summary": scene.get("summary"), + "topic": scene.get("topic"), + "start_time": scene.get("start_time"), + } + for scene in scenes + ], + } diff --git a/engram/core/kernel.py b/engram/core/kernel.py index ab5c2a9..700d5c5 100644 --- a/engram/core/kernel.py +++ b/engram/core/kernel.py @@ -13,6 +13,7 @@ from engram.core.policy import ( CONFIDENTIALITY_SCOPES, DEFAULT_CAPABILITIES, + HANDOFF_CAPABILITIES, default_allowed_scopes, detect_confidentiality_scope, enforce_scope_on_results, @@ -95,6 +96,20 @@ def create_session( if require_policy and not policy: raise PermissionError(f"No agent policy configured for user={user_id} agent={agent_id}") + requested_caps = set(normalized_capabilities) + handoff_caps = set(HANDOFF_CAPABILITIES) + if requested_caps & handoff_caps: + if agent_id and not policy: + policy = self._bootstrap_handoff_policy_if_trusted( + user_id=user_id, + agent_id=agent_id, + namespaces=normalized_namespaces, + ) + if not policy: + raise PermissionError( + f"Handoff capabilities require explicit agent policy for user={user_id} agent={agent_id}" + ) + if policy: normalized_scopes = self._clamp_scopes_with_policy( requested_scopes=normalized_scopes, @@ -211,6 +226,42 @@ def _resolve_session_namespaces( resolved = ["default"] return resolved + def _bootstrap_handoff_policy_if_trusted( + self, + *, + user_id: str, + agent_id: str, + namespaces: Optional[List[str]], + ) -> Optional[Dict[str, Any]]: + handoff_cfg = getattr(self.memory, "handoff_config", None) + trusted_agents = { + str(value).strip().lower() + for value in getattr(handoff_cfg, "auto_trusted_agents", []) + if str(value).strip() + } + if str(agent_id).strip().lower() not in trusted_agents: + return None + + allowed_namespaces = ["default"] + for namespace in namespaces or []: + ns_value = self._normalize_namespace(namespace) + if ns_value not in allowed_namespaces: + allowed_namespaces.append(ns_value) + + allowed_capabilities = sorted(set(list(DEFAULT_CAPABILITIES) + list(HANDOFF_CAPABILITIES))) + self.db.upsert_agent_policy( + user_id=user_id, + agent_id=agent_id, + allowed_confidentiality_scopes=list(CONFIDENTIALITY_SCOPES), + allowed_capabilities=allowed_capabilities, + allowed_namespaces=allowed_namespaces, + ) + return self.db.get_agent_policy( + user_id=user_id, + agent_id=agent_id, + include_wildcard=True, + ) + @staticmethod def _normalize_policy_namespaces(namespaces: Optional[List[str]]) -> List[str]: values = sorted({str(namespace).strip() for namespace in (namespaces or []) if str(namespace).strip()}) @@ -1344,3 +1395,242 @@ def delete_agent_policy( ) deleted = self.db.delete_agent_policy(user_id=user_id, agent_id=agent_id) return {"deleted": bool(deleted), "user_id": user_id, "agent_id": agent_id} + + # ------------------------------------------------------------------ + # Handoff session bus methods + # ------------------------------------------------------------------ + + def _require_handoff_processor(self): + processor = getattr(self.memory, "handoff_processor", None) + if processor is None: + raise RuntimeError("Handoff is disabled") + return processor + + def save_session_digest( + self, + *, + user_id: str, + agent_id: str, + digest: Dict[str, Any], + token: Optional[str] = None, + requester_agent_id: Optional[str] = None, + ) -> Dict[str, Any]: + caller_agent = requester_agent_id or agent_id + if token or caller_agent: + self.authenticate_session( + token=token, + user_id=user_id, + agent_id=caller_agent, + require_for_agent=bool(caller_agent), + required_capabilities=["write_handoff"], + ) + processor = self._require_handoff_processor() + return processor.save_digest(user_id=user_id, agent_id=agent_id, digest=digest) + + def get_last_session( + self, + *, + user_id: str, + agent_id: Optional[str] = None, + repo: Optional[str] = None, + statuses: Optional[List[str]] = None, + token: Optional[str] = None, + requester_agent_id: Optional[str] = None, + ) -> Optional[Dict[str, Any]]: + caller_agent = requester_agent_id or agent_id + if token or caller_agent: + self.authenticate_session( + token=token, + user_id=user_id, + agent_id=caller_agent, + require_for_agent=bool(caller_agent), + required_capabilities=["read_handoff"], + ) + processor = self._require_handoff_processor() + return processor.get_last_session( + user_id=user_id, + agent_id=agent_id, + repo=repo, + statuses=statuses, + ) + + def list_sessions( + self, + *, + user_id: str, + agent_id: Optional[str] = None, + repo: Optional[str] = None, + status: Optional[str] = None, + statuses: Optional[List[str]] = None, + limit: int = 20, + token: Optional[str] = None, + requester_agent_id: Optional[str] = None, + ) -> List[Dict[str, Any]]: + caller_agent = requester_agent_id or agent_id + if token or caller_agent: + self.authenticate_session( + token=token, + user_id=user_id, + agent_id=caller_agent, + require_for_agent=bool(caller_agent), + required_capabilities=["read_handoff"], + ) + processor = self._require_handoff_processor() + return processor.list_sessions( + user_id=user_id, + agent_id=agent_id, + repo=repo, + status=status, + statuses=statuses, + limit=limit, + ) + + def auto_resume_context( + self, + *, + user_id: str, + agent_id: Optional[str], + repo_path: Optional[str] = None, + branch: Optional[str] = None, + lane_type: str = "general", + objective: Optional[str] = None, + agent_role: Optional[str] = None, + namespace: str = "default", + statuses: Optional[List[str]] = None, + auto_create: bool = True, + token: Optional[str] = None, + requester_agent_id: Optional[str] = None, + ) -> Dict[str, Any]: + caller_agent = requester_agent_id or agent_id + if token or caller_agent: + self.authenticate_session( + token=token, + user_id=user_id, + agent_id=caller_agent, + require_for_agent=bool(caller_agent), + required_capabilities=["read_handoff"], + ) + processor = self._require_handoff_processor() + return processor.auto_resume_context( + user_id=user_id, + agent_id=agent_id, + repo_path=repo_path, + branch=branch, + lane_type=lane_type, + objective=objective, + agent_role=agent_role, + namespace=namespace, + statuses=statuses, + auto_create=auto_create, + ) + + def auto_checkpoint( + self, + *, + user_id: str, + agent_id: str, + payload: Dict[str, Any], + event_type: str = "tool_complete", + repo_path: Optional[str] = None, + branch: Optional[str] = None, + lane_id: Optional[str] = None, + lane_type: str = "general", + objective: Optional[str] = None, + agent_role: Optional[str] = None, + namespace: str = "default", + confidentiality_scope: str = "work", + expected_version: Optional[int] = None, + token: Optional[str] = None, + requester_agent_id: Optional[str] = None, + ) -> Dict[str, Any]: + caller_agent = requester_agent_id or agent_id + if token or caller_agent: + self.authenticate_session( + token=token, + user_id=user_id, + agent_id=caller_agent, + require_for_agent=bool(caller_agent), + required_capabilities=["write_handoff"], + ) + processor = self._require_handoff_processor() + return processor.auto_checkpoint( + user_id=user_id, + agent_id=agent_id, + payload=payload, + event_type=event_type, + repo_path=repo_path, + branch=branch, + lane_id=lane_id, + lane_type=lane_type, + objective=objective, + agent_role=agent_role, + namespace=namespace, + confidentiality_scope=confidentiality_scope, + expected_version=expected_version, + ) + + def finalize_lane( + self, + *, + user_id: str, + agent_id: str, + lane_id: str, + status: str = "paused", + payload: Optional[Dict[str, Any]] = None, + repo_path: Optional[str] = None, + branch: Optional[str] = None, + agent_role: Optional[str] = None, + namespace: str = "default", + token: Optional[str] = None, + requester_agent_id: Optional[str] = None, + ) -> Dict[str, Any]: + caller_agent = requester_agent_id or agent_id + if token or caller_agent: + self.authenticate_session( + token=token, + user_id=user_id, + agent_id=caller_agent, + require_for_agent=bool(caller_agent), + required_capabilities=["write_handoff"], + ) + processor = self._require_handoff_processor() + return processor.finalize_lane( + user_id=user_id, + agent_id=agent_id, + lane_id=lane_id, + status=status, + payload=payload, + repo_path=repo_path, + branch=branch, + agent_role=agent_role, + namespace=namespace, + ) + + def list_handoff_lanes( + self, + *, + user_id: str, + repo_path: Optional[str] = None, + status: Optional[str] = None, + statuses: Optional[List[str]] = None, + limit: int = 20, + token: Optional[str] = None, + requester_agent_id: Optional[str] = None, + ) -> List[Dict[str, Any]]: + caller_agent = requester_agent_id + if token or caller_agent: + self.authenticate_session( + token=token, + user_id=user_id, + agent_id=caller_agent, + require_for_agent=bool(caller_agent), + required_capabilities=["read_handoff"], + ) + processor = self._require_handoff_processor() + return processor.list_lanes( + user_id=user_id, + repo_path=repo_path, + status=status, + statuses=statuses, + limit=limit, + ) diff --git a/engram/core/policy.py b/engram/core/policy.py index dc9553e..af023bc 100644 --- a/engram/core/policy.py +++ b/engram/core/policy.py @@ -20,6 +20,7 @@ "manage_namespaces", "run_sleep_cycle", ] +HANDOFF_CAPABILITIES = ["read_handoff", "write_handoff"] SENSITIVE_HINTS = { "finance": {"finance", "bank", "salary", "invoice", "tax", "payment", "credit"}, "health": {"health", "medical", "doctor", "diagnosis", "therapy", "medication"}, diff --git a/engram/db/sqlite.py b/engram/db/sqlite.py index 3f73f2c..dbb1da8 100644 --- a/engram/db/sqlite.py +++ b/engram/db/sqlite.py @@ -353,6 +353,42 @@ def _ensure_v2_schema(self, conn: sqlite3.Connection) -> None: CREATE INDEX IF NOT EXISTS idx_ns_permissions_agent ON namespace_permissions(user_id, agent_id); CREATE INDEX IF NOT EXISTS idx_ns_permissions_namespace ON namespace_permissions(namespace_id); """, + "v2_011": """ + CREATE TABLE IF NOT EXISTS handoff_sessions ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + agent_id TEXT NOT NULL, + repo TEXT, + status TEXT NOT NULL DEFAULT 'paused' + CHECK (status IN ('active', 'paused', 'completed', 'abandoned')), + task_summary TEXT NOT NULL, + decisions_made TEXT DEFAULT '[]', + files_touched TEXT DEFAULT '[]', + todos_remaining TEXT DEFAULT '[]', + context_snapshot TEXT, + linked_memory_ids TEXT DEFAULT '[]', + linked_scene_ids TEXT DEFAULT '[]', + started_at TEXT DEFAULT CURRENT_TIMESTAMP, + ended_at TEXT, + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT DEFAULT CURRENT_TIMESTAMP + ); + CREATE INDEX IF NOT EXISTS idx_handoff_user ON handoff_sessions(user_id); + CREATE INDEX IF NOT EXISTS idx_handoff_agent ON handoff_sessions(agent_id); + CREATE INDEX IF NOT EXISTS idx_handoff_repo ON handoff_sessions(repo); + CREATE INDEX IF NOT EXISTS idx_handoff_status ON handoff_sessions(status); + CREATE INDEX IF NOT EXISTS idx_handoff_updated ON handoff_sessions(updated_at DESC); + + CREATE TABLE IF NOT EXISTS handoff_session_memories ( + session_id TEXT NOT NULL, + memory_id TEXT NOT NULL, + relevance_score REAL DEFAULT 1.0, + PRIMARY KEY (session_id, memory_id), + FOREIGN KEY (session_id) REFERENCES handoff_sessions(id), + FOREIGN KEY (memory_id) REFERENCES memories(id) + ); + CREATE INDEX IF NOT EXISTS idx_hsm_session ON handoff_session_memories(session_id); + """, "v2_010": """ CREATE TABLE IF NOT EXISTS agent_policies ( id TEXT PRIMARY KEY, @@ -368,6 +404,87 @@ def _ensure_v2_schema(self, conn: sqlite3.Connection) -> None: CREATE INDEX IF NOT EXISTS idx_agent_policies_user ON agent_policies(user_id); CREATE INDEX IF NOT EXISTS idx_agent_policies_agent ON agent_policies(agent_id); """, + "v2_012": """ + CREATE TABLE IF NOT EXISTS handoff_lanes ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + repo_id TEXT, + repo_path TEXT, + branch TEXT, + lane_type TEXT DEFAULT 'general', + status TEXT NOT NULL DEFAULT 'active' + CHECK (status IN ('active', 'paused', 'completed', 'abandoned')), + objective TEXT, + current_state TEXT DEFAULT '{}', + namespace TEXT DEFAULT 'default', + confidentiality_scope TEXT DEFAULT 'work', + last_checkpoint_at TEXT DEFAULT CURRENT_TIMESTAMP, + version INTEGER DEFAULT 0, + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + updated_at TEXT DEFAULT CURRENT_TIMESTAMP + ); + CREATE INDEX IF NOT EXISTS idx_handoff_lanes_user ON handoff_lanes(user_id); + CREATE INDEX IF NOT EXISTS idx_handoff_lanes_repo ON handoff_lanes(repo_id); + CREATE INDEX IF NOT EXISTS idx_handoff_lanes_status ON handoff_lanes(status); + CREATE INDEX IF NOT EXISTS idx_handoff_lanes_recent ON handoff_lanes(last_checkpoint_at DESC, created_at DESC); + + CREATE TABLE IF NOT EXISTS handoff_checkpoints ( + id TEXT PRIMARY KEY, + lane_id TEXT NOT NULL, + user_id TEXT NOT NULL, + agent_id TEXT NOT NULL, + agent_role TEXT, + event_type TEXT DEFAULT 'tool_complete', + task_summary TEXT, + decisions_made TEXT DEFAULT '[]', + files_touched TEXT DEFAULT '[]', + todos_remaining TEXT DEFAULT '[]', + blockers TEXT DEFAULT '[]', + key_commands TEXT DEFAULT '[]', + test_results TEXT DEFAULT '[]', + merge_conflicts TEXT DEFAULT '[]', + context_snapshot TEXT, + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (lane_id) REFERENCES handoff_lanes(id) + ); + CREATE INDEX IF NOT EXISTS idx_handoff_cp_lane ON handoff_checkpoints(lane_id, created_at DESC); + CREATE INDEX IF NOT EXISTS idx_handoff_cp_user ON handoff_checkpoints(user_id, created_at DESC); + + CREATE TABLE IF NOT EXISTS handoff_checkpoint_memories ( + checkpoint_id TEXT NOT NULL, + memory_id TEXT NOT NULL, + relevance_score REAL DEFAULT 1.0, + PRIMARY KEY (checkpoint_id, memory_id), + FOREIGN KEY (checkpoint_id) REFERENCES handoff_checkpoints(id), + FOREIGN KEY (memory_id) REFERENCES memories(id) + ); + CREATE INDEX IF NOT EXISTS idx_hcm_checkpoint ON handoff_checkpoint_memories(checkpoint_id); + + CREATE TABLE IF NOT EXISTS handoff_checkpoint_scenes ( + checkpoint_id TEXT NOT NULL, + scene_id TEXT NOT NULL, + relevance_score REAL DEFAULT 1.0, + PRIMARY KEY (checkpoint_id, scene_id), + FOREIGN KEY (checkpoint_id) REFERENCES handoff_checkpoints(id), + FOREIGN KEY (scene_id) REFERENCES scenes(id) + ); + CREATE INDEX IF NOT EXISTS idx_hcs_checkpoint ON handoff_checkpoint_scenes(checkpoint_id); + + CREATE TABLE IF NOT EXISTS handoff_lane_conflicts ( + id TEXT PRIMARY KEY, + lane_id TEXT NOT NULL, + checkpoint_id TEXT, + user_id TEXT NOT NULL, + conflict_fields TEXT DEFAULT '[]', + previous_state TEXT DEFAULT '{}', + incoming_state TEXT DEFAULT '{}', + resolved_state TEXT DEFAULT '{}', + created_at TEXT DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (lane_id) REFERENCES handoff_lanes(id), + FOREIGN KEY (checkpoint_id) REFERENCES handoff_checkpoints(id) + ); + CREATE INDEX IF NOT EXISTS idx_handoff_conflicts_lane ON handoff_lane_conflicts(lane_id, created_at DESC); + """, } for version, ddl in migrations.items(): @@ -399,6 +516,14 @@ def _ensure_v2_schema(self, conn: sqlite3.Connection) -> None: self._migrate_add_column_conn(conn, "sessions", "namespaces", "TEXT DEFAULT '[]'") self._migrate_add_column_conn(conn, "memory_subscribers", "last_seen_at", "TEXT") self._migrate_add_column_conn(conn, "memory_subscribers", "expires_at", "TEXT") + self._migrate_add_column_conn(conn, "handoff_sessions", "repo_id", "TEXT") + self._migrate_add_column_conn(conn, "handoff_sessions", "blockers", "TEXT DEFAULT '[]'") + self._migrate_add_column_conn(conn, "handoff_sessions", "key_commands", "TEXT DEFAULT '[]'") + self._migrate_add_column_conn(conn, "handoff_sessions", "test_results", "TEXT DEFAULT '[]'") + self._migrate_add_column_conn(conn, "handoff_sessions", "lane_id", "TEXT") + self._migrate_add_column_conn(conn, "handoff_sessions", "last_checkpoint_at", "TEXT") + self._migrate_add_column_conn(conn, "handoff_sessions", "namespace", "TEXT DEFAULT 'default'") + self._migrate_add_column_conn(conn, "handoff_sessions", "confidentiality_scope", "TEXT DEFAULT 'work'") conn.execute("CREATE INDEX IF NOT EXISTS idx_memory_subscribers_expires ON memory_subscribers(expires_at)") @@ -438,6 +563,18 @@ def _ensure_v2_schema(self, conn: sqlite3.Connection) -> None: WHERE namespaces IS NULL OR namespaces = '' """ ) + conn.execute( + """ + UPDATE handoff_sessions + SET blockers = COALESCE(NULLIF(blockers, ''), '[]'), + key_commands = COALESCE(NULLIF(key_commands, ''), '[]'), + test_results = COALESCE(NULLIF(test_results, ''), '[]'), + namespace = COALESCE(NULLIF(namespace, ''), 'default'), + confidentiality_scope = COALESCE(NULLIF(confidentiality_scope, ''), 'work'), + repo_id = COALESCE(NULLIF(repo_id, ''), repo), + last_checkpoint_at = COALESCE(last_checkpoint_at, updated_at, created_at) + """ + ) conn.execute( """ UPDATE memories @@ -2226,6 +2363,10 @@ def list_user_ids(self) -> List[str]: SELECT user_id FROM sessions UNION ALL SELECT user_id FROM proposal_commits + UNION ALL + SELECT user_id FROM handoff_sessions + UNION ALL + SELECT user_id FROM handoff_lanes ) WHERE user_id IS NOT NULL AND user_id != '' ORDER BY user_id @@ -2373,6 +2514,581 @@ def get_decay_log_entries(self, limit: int = 20) -> List[Dict[str, Any]]: ).fetchall() return [dict(row) for row in rows] + # ========================================================================= + # Handoff session methods (legacy compatibility) + # ========================================================================= + + def add_handoff_session(self, data: Dict[str, Any]) -> str: + session_id = data.get("id", str(uuid.uuid4())) + now = datetime.utcnow().isoformat() + with self._get_connection() as conn: + conn.execute( + """ + INSERT INTO handoff_sessions ( + id, user_id, agent_id, repo, repo_id, status, task_summary, + decisions_made, files_touched, todos_remaining, blockers, key_commands, test_results, + context_snapshot, linked_memory_ids, linked_scene_ids, lane_id, + started_at, ended_at, last_checkpoint_at, + namespace, confidentiality_scope, + created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + session_id, + data.get("user_id", "default"), + data.get("agent_id", "unknown"), + data.get("repo"), + data.get("repo_id"), + data.get("status", "paused"), + data.get("task_summary", ""), + json.dumps(data.get("decisions_made", [])), + json.dumps(data.get("files_touched", [])), + json.dumps(data.get("todos_remaining", [])), + json.dumps(data.get("blockers", [])), + json.dumps(data.get("key_commands", [])), + json.dumps(data.get("test_results", [])), + data.get("context_snapshot"), + json.dumps(data.get("linked_memory_ids", [])), + json.dumps(data.get("linked_scene_ids", [])), + data.get("lane_id"), + data.get("started_at", now), + data.get("ended_at"), + data.get("last_checkpoint_at", data.get("updated_at", now)), + data.get("namespace", "default"), + data.get("confidentiality_scope", "work"), + data.get("created_at", now), + data.get("updated_at", now), + ), + ) + return session_id + + def get_handoff_session(self, session_id: str) -> Optional[Dict[str, Any]]: + with self._get_connection() as conn: + row = conn.execute( + "SELECT * FROM handoff_sessions WHERE id = ?", + (session_id,), + ).fetchone() + if row: + return self._handoff_row_to_dict(row) + return None + + def get_last_handoff_session( + self, + user_id: str, + agent_id: Optional[str] = None, + repo: Optional[str] = None, + repo_id: Optional[str] = None, + statuses: Optional[List[str]] = None, + ) -> Optional[Dict[str, Any]]: + query = "SELECT * FROM handoff_sessions WHERE user_id = ?" + params: List[Any] = [user_id] + if agent_id: + query += " AND agent_id = ?" + params.append(agent_id) + if repo_id: + query += " AND repo_id = ?" + params.append(repo_id) + elif repo: + query += " AND repo = ?" + params.append(repo) + if statuses: + placeholders = ", ".join("?" for _ in statuses) + query += f" AND status IN ({placeholders})" + params.extend(statuses) + query += " ORDER BY COALESCE(last_checkpoint_at, updated_at, created_at) DESC, created_at DESC LIMIT 1" + with self._get_connection() as conn: + row = conn.execute(query, params).fetchone() + if row: + return self._handoff_row_to_dict(row) + return None + + def list_handoff_sessions( + self, + user_id: str, + agent_id: Optional[str] = None, + repo: Optional[str] = None, + repo_id: Optional[str] = None, + status: Optional[str] = None, + statuses: Optional[List[str]] = None, + limit: int = 20, + ) -> List[Dict[str, Any]]: + query = "SELECT * FROM handoff_sessions WHERE user_id = ?" + params: List[Any] = [user_id] + if agent_id: + query += " AND agent_id = ?" + params.append(agent_id) + if repo_id: + query += " AND repo_id = ?" + params.append(repo_id) + elif repo: + query += " AND repo = ?" + params.append(repo) + if status: + query += " AND status = ?" + params.append(status) + elif statuses: + placeholders = ", ".join("?" for _ in statuses) + query += f" AND status IN ({placeholders})" + params.extend(statuses) + query += " ORDER BY COALESCE(last_checkpoint_at, updated_at, created_at) DESC, created_at DESC LIMIT ?" + params.append(limit) + with self._get_connection() as conn: + rows = conn.execute(query, params).fetchall() + return [self._handoff_row_to_dict(row) for row in rows] + + def update_handoff_session(self, session_id: str, updates: Dict[str, Any]) -> bool: + set_clauses = [] + params: List[Any] = [] + json_fields = { + "decisions_made", + "files_touched", + "todos_remaining", + "blockers", + "key_commands", + "test_results", + "linked_memory_ids", + "linked_scene_ids", + } + for key, value in updates.items(): + if key in json_fields: + value = json.dumps(value) + set_clauses.append(f"{key} = ?") + params.append(value) + if not set_clauses: + return False + set_clauses.append("updated_at = ?") + params.append(datetime.utcnow().isoformat()) + params.append(session_id) + with self._get_connection() as conn: + cursor = conn.execute( + f"UPDATE handoff_sessions SET {', '.join(set_clauses)} WHERE id = ?", + params, + ) + return cursor.rowcount > 0 + + def delete_handoff_sessions(self, session_ids: List[str]) -> int: + ids = [str(value) for value in (session_ids or []) if str(value).strip()] + if not ids: + return 0 + placeholders = ", ".join("?" for _ in ids) + with self._get_connection() as conn: + conn.execute( + f"DELETE FROM handoff_session_memories WHERE session_id IN ({placeholders})", + ids, + ) + cursor = conn.execute( + f"DELETE FROM handoff_sessions WHERE id IN ({placeholders})", + ids, + ) + return cursor.rowcount + + def prune_handoff_sessions(self, user_id: str, max_sessions: int) -> int: + limit_value = max(0, int(max_sessions)) + with self._get_connection() as conn: + rows = conn.execute( + """ + SELECT id FROM handoff_sessions + WHERE user_id = ? + ORDER BY COALESCE(last_checkpoint_at, updated_at, created_at) DESC, created_at DESC + """, + (user_id,), + ).fetchall() + session_ids = [str(row["id"]) for row in rows] + stale_ids = session_ids[limit_value:] + return self.delete_handoff_sessions(stale_ids) + + def add_handoff_session_memory(self, session_id: str, memory_id: str, relevance_score: float = 1.0) -> None: + with self._get_connection() as conn: + conn.execute( + "INSERT OR IGNORE INTO handoff_session_memories (session_id, memory_id, relevance_score) VALUES (?, ?, ?)", + (session_id, memory_id, relevance_score), + ) + + def get_handoff_session_memories(self, session_id: str) -> List[Dict[str, Any]]: + with self._get_connection() as conn: + rows = conn.execute( + """ + SELECT m.*, hsm.relevance_score FROM memories m + JOIN handoff_session_memories hsm ON m.id = hsm.memory_id + WHERE hsm.session_id = ? AND m.tombstone = 0 + ORDER BY hsm.relevance_score DESC + """, + (session_id,), + ).fetchall() + return [self._row_to_dict(row) for row in rows] + + # ========================================================================= + # Handoff lane + checkpoint methods (session bus) + # ========================================================================= + + def add_handoff_lane(self, data: Dict[str, Any]) -> str: + lane_id = data.get("id", str(uuid.uuid4())) + now = datetime.utcnow().isoformat() + with self._get_connection() as conn: + conn.execute( + """ + INSERT INTO handoff_lanes ( + id, user_id, repo_id, repo_path, branch, lane_type, status, + objective, current_state, namespace, confidentiality_scope, + last_checkpoint_at, version, created_at, updated_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + lane_id, + data.get("user_id", "default"), + data.get("repo_id"), + data.get("repo_path"), + data.get("branch"), + data.get("lane_type", "general"), + data.get("status", "active"), + data.get("objective"), + json.dumps(data.get("current_state", {})), + data.get("namespace", "default"), + data.get("confidentiality_scope", "work"), + data.get("last_checkpoint_at", now), + int(data.get("version", 0)), + data.get("created_at", now), + data.get("updated_at", now), + ), + ) + return lane_id + + def get_handoff_lane(self, lane_id: str) -> Optional[Dict[str, Any]]: + with self._get_connection() as conn: + row = conn.execute( + "SELECT * FROM handoff_lanes WHERE id = ?", + (lane_id,), + ).fetchone() + if not row: + return None + return self._handoff_lane_row_to_dict(row) + + def list_handoff_lanes( + self, + user_id: str, + *, + repo_id: Optional[str] = None, + status: Optional[str] = None, + statuses: Optional[List[str]] = None, + limit: int = 20, + ) -> List[Dict[str, Any]]: + query = "SELECT * FROM handoff_lanes WHERE user_id = ?" + params: List[Any] = [user_id] + if repo_id: + query += " AND repo_id = ?" + params.append(repo_id) + if status: + query += " AND status = ?" + params.append(status) + elif statuses: + placeholders = ", ".join("?" for _ in statuses) + query += f" AND status IN ({placeholders})" + params.extend(statuses) + query += " ORDER BY COALESCE(last_checkpoint_at, created_at) DESC, created_at DESC LIMIT ?" + params.append(limit) + with self._get_connection() as conn: + rows = conn.execute(query, params).fetchall() + return [self._handoff_lane_row_to_dict(row) for row in rows] + + def update_handoff_lane( + self, + lane_id: str, + updates: Dict[str, Any], + *, + expected_version: Optional[int] = None, + ) -> bool: + set_clauses = [] + params: List[Any] = [] + for key, value in updates.items(): + if key == "current_state" and not isinstance(value, str): + value = json.dumps(value) + set_clauses.append(f"{key} = ?") + params.append(value) + if not set_clauses: + return False + set_clauses.append("updated_at = ?") + params.append(datetime.utcnow().isoformat()) + query = f"UPDATE handoff_lanes SET {', '.join(set_clauses)} WHERE id = ?" + params.append(lane_id) + if expected_version is not None: + query += " AND version = ?" + params.append(int(expected_version)) + with self._get_connection() as conn: + cursor = conn.execute(query, params) + return cursor.rowcount > 0 + + def delete_handoff_lanes(self, lane_ids: List[str]) -> int: + ids = [str(value) for value in (lane_ids or []) if str(value).strip()] + if not ids: + return 0 + placeholders = ", ".join("?" for _ in ids) + with self._get_connection() as conn: + checkpoint_rows = conn.execute( + f"SELECT id FROM handoff_checkpoints WHERE lane_id IN ({placeholders})", + ids, + ).fetchall() + checkpoint_ids = [str(row["id"]) for row in checkpoint_rows] + if checkpoint_ids: + self.delete_handoff_checkpoints(checkpoint_ids) + with self._get_connection() as conn: + conn.execute( + f"DELETE FROM handoff_lane_conflicts WHERE lane_id IN ({placeholders})", + ids, + ) + cursor = conn.execute( + f"DELETE FROM handoff_lanes WHERE id IN ({placeholders})", + ids, + ) + return cursor.rowcount + + def prune_handoff_lanes(self, user_id: str, max_lanes: int) -> int: + limit_value = max(0, int(max_lanes)) + with self._get_connection() as conn: + rows = conn.execute( + """ + SELECT id FROM handoff_lanes + WHERE user_id = ? + ORDER BY COALESCE(last_checkpoint_at, created_at) DESC, created_at DESC + """, + (user_id,), + ).fetchall() + lane_ids = [str(row["id"]) for row in rows] + stale_ids = lane_ids[limit_value:] + return self.delete_handoff_lanes(stale_ids) + + def add_handoff_checkpoint(self, data: Dict[str, Any]) -> str: + checkpoint_id = data.get("id", str(uuid.uuid4())) + now = datetime.utcnow().isoformat() + with self._get_connection() as conn: + conn.execute( + """ + INSERT INTO handoff_checkpoints ( + id, lane_id, user_id, agent_id, agent_role, event_type, task_summary, + decisions_made, files_touched, todos_remaining, blockers, key_commands, + test_results, merge_conflicts, context_snapshot, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + checkpoint_id, + data.get("lane_id"), + data.get("user_id", "default"), + data.get("agent_id", "unknown"), + data.get("agent_role"), + data.get("event_type", "tool_complete"), + data.get("task_summary"), + json.dumps(data.get("decisions_made", [])), + json.dumps(data.get("files_touched", [])), + json.dumps(data.get("todos_remaining", [])), + json.dumps(data.get("blockers", [])), + json.dumps(data.get("key_commands", [])), + json.dumps(data.get("test_results", [])), + json.dumps(data.get("merge_conflicts", [])), + data.get("context_snapshot"), + data.get("created_at", now), + ), + ) + return checkpoint_id + + def get_handoff_checkpoint(self, checkpoint_id: str) -> Optional[Dict[str, Any]]: + with self._get_connection() as conn: + row = conn.execute( + "SELECT * FROM handoff_checkpoints WHERE id = ?", + (checkpoint_id,), + ).fetchone() + if not row: + return None + return self._handoff_checkpoint_row_to_dict(row) + + def list_handoff_checkpoints(self, lane_id: str, limit: int = 50) -> List[Dict[str, Any]]: + with self._get_connection() as conn: + rows = conn.execute( + """ + SELECT * FROM handoff_checkpoints + WHERE lane_id = ? + ORDER BY created_at DESC + LIMIT ? + """, + (lane_id, max(1, int(limit))), + ).fetchall() + return [self._handoff_checkpoint_row_to_dict(row) for row in rows] + + def get_latest_handoff_checkpoint(self, lane_id: str) -> Optional[Dict[str, Any]]: + checkpoints = self.list_handoff_checkpoints(lane_id=lane_id, limit=1) + return checkpoints[0] if checkpoints else None + + def delete_handoff_checkpoints(self, checkpoint_ids: List[str]) -> int: + ids = [str(value) for value in (checkpoint_ids or []) if str(value).strip()] + if not ids: + return 0 + placeholders = ", ".join("?" for _ in ids) + with self._get_connection() as conn: + conn.execute( + f"DELETE FROM handoff_checkpoint_memories WHERE checkpoint_id IN ({placeholders})", + ids, + ) + conn.execute( + f"DELETE FROM handoff_checkpoint_scenes WHERE checkpoint_id IN ({placeholders})", + ids, + ) + conn.execute( + f"DELETE FROM handoff_lane_conflicts WHERE checkpoint_id IN ({placeholders})", + ids, + ) + cursor = conn.execute( + f"DELETE FROM handoff_checkpoints WHERE id IN ({placeholders})", + ids, + ) + return cursor.rowcount + + def prune_handoff_checkpoints(self, lane_id: str, max_checkpoints: int) -> int: + limit_value = max(0, int(max_checkpoints)) + checkpoints = self.list_handoff_checkpoints(lane_id=lane_id, limit=100000) + stale_ids = [checkpoint["id"] for checkpoint in checkpoints[limit_value:]] + return self.delete_handoff_checkpoints(stale_ids) + + def add_handoff_checkpoint_memory( + self, + checkpoint_id: str, + memory_id: str, + relevance_score: float = 1.0, + ) -> None: + with self._get_connection() as conn: + conn.execute( + """ + INSERT OR IGNORE INTO handoff_checkpoint_memories (checkpoint_id, memory_id, relevance_score) + VALUES (?, ?, ?) + """, + (checkpoint_id, memory_id, relevance_score), + ) + + def add_handoff_checkpoint_scene( + self, + checkpoint_id: str, + scene_id: str, + relevance_score: float = 1.0, + ) -> None: + with self._get_connection() as conn: + conn.execute( + """ + INSERT OR IGNORE INTO handoff_checkpoint_scenes (checkpoint_id, scene_id, relevance_score) + VALUES (?, ?, ?) + """, + (checkpoint_id, scene_id, relevance_score), + ) + + def get_handoff_checkpoint_memories(self, checkpoint_id: str) -> List[Dict[str, Any]]: + with self._get_connection() as conn: + rows = conn.execute( + """ + SELECT m.*, hcm.relevance_score FROM memories m + JOIN handoff_checkpoint_memories hcm ON m.id = hcm.memory_id + WHERE hcm.checkpoint_id = ? AND m.tombstone = 0 + ORDER BY hcm.relevance_score DESC + """, + (checkpoint_id,), + ).fetchall() + return [self._row_to_dict(row) for row in rows] + + def get_handoff_checkpoint_scenes(self, checkpoint_id: str) -> List[Dict[str, Any]]: + with self._get_connection() as conn: + rows = conn.execute( + """ + SELECT s.*, hcs.relevance_score FROM scenes s + JOIN handoff_checkpoint_scenes hcs ON s.id = hcs.scene_id + WHERE hcs.checkpoint_id = ? AND s.tombstone = 0 + ORDER BY hcs.relevance_score DESC + """, + (checkpoint_id,), + ).fetchall() + return [self._scene_row_to_dict(row) for row in rows] + + def add_handoff_lane_conflict(self, data: Dict[str, Any]) -> str: + conflict_id = data.get("id", str(uuid.uuid4())) + now = datetime.utcnow().isoformat() + with self._get_connection() as conn: + conn.execute( + """ + INSERT INTO handoff_lane_conflicts ( + id, lane_id, checkpoint_id, user_id, + conflict_fields, previous_state, incoming_state, resolved_state, created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + conflict_id, + data.get("lane_id"), + data.get("checkpoint_id"), + data.get("user_id", "default"), + json.dumps(data.get("conflict_fields", [])), + json.dumps(data.get("previous_state", {})), + json.dumps(data.get("incoming_state", {})), + json.dumps(data.get("resolved_state", {})), + data.get("created_at", now), + ), + ) + return conflict_id + + def list_handoff_lane_conflicts(self, lane_id: str, limit: int = 50) -> List[Dict[str, Any]]: + with self._get_connection() as conn: + rows = conn.execute( + """ + SELECT * FROM handoff_lane_conflicts + WHERE lane_id = ? + ORDER BY created_at DESC + LIMIT ? + """, + (lane_id, max(1, int(limit))), + ).fetchall() + return [self._handoff_conflict_row_to_dict(row) for row in rows] + + def _handoff_row_to_dict(self, row: sqlite3.Row) -> Dict[str, Any]: + data = dict(row) + for key in { + "decisions_made", + "files_touched", + "todos_remaining", + "blockers", + "key_commands", + "test_results", + "linked_memory_ids", + "linked_scene_ids", + }: + data[key] = self._parse_json_value(data.get(key), []) + data["repo_id"] = data.get("repo_id") or data.get("repo") + data["namespace"] = data.get("namespace") or "default" + data["confidentiality_scope"] = data.get("confidentiality_scope") or "work" + data["last_checkpoint_at"] = data.get("last_checkpoint_at") or data.get("updated_at") or data.get("created_at") + return data + + def _handoff_lane_row_to_dict(self, row: sqlite3.Row) -> Dict[str, Any]: + data = dict(row) + data["current_state"] = self._parse_json_value(data.get("current_state"), {}) + data["namespace"] = data.get("namespace") or "default" + data["confidentiality_scope"] = data.get("confidentiality_scope") or "work" + data["version"] = int(data.get("version", 0) or 0) + return data + + def _handoff_checkpoint_row_to_dict(self, row: sqlite3.Row) -> Dict[str, Any]: + data = dict(row) + for key in { + "decisions_made", + "files_touched", + "todos_remaining", + "blockers", + "key_commands", + "test_results", + "merge_conflicts", + }: + data[key] = self._parse_json_value(data.get(key), []) + return data + + def _handoff_conflict_row_to_dict(self, row: sqlite3.Row) -> Dict[str, Any]: + data = dict(row) + data["conflict_fields"] = self._parse_json_value(data.get("conflict_fields"), []) + data["previous_state"] = self._parse_json_value(data.get("previous_state"), {}) + data["incoming_state"] = self._parse_json_value(data.get("incoming_state"), {}) + data["resolved_state"] = self._parse_json_value(data.get("resolved_state"), {}) + return data + # ========================================================================= # Utilities # ========================================================================= diff --git a/engram/integrations/claude_code_plugin.py b/engram/integrations/claude_code_plugin.py index 282ef5a..8b85033 100644 --- a/engram/integrations/claude_code_plugin.py +++ b/engram/integrations/claude_code_plugin.py @@ -360,7 +360,19 @@ def main() -> None: Do **not** paste or quote the raw injected block to the user. Weave the information naturally into your response. -## Rule 2 — Proactively save when the user signals intent +## Rule 2 — Always do handoff bootstrap on new task threads + +At the beginning of a new repo/task thread, call `get_last_session` before +deep implementation guidance: +* `user_id`: `"default"` unless user provides another +* `requester_agent_id`: `"claude-code"` +* `repo`: absolute workspace path when available +* Include `agent_id` only if the user explicitly asks to resume from a specific source agent. + +If a handoff session exists, continue from it naturally without exposing raw +tool payloads. + +## Rule 3 — Proactively save when the user signals intent Call `remember` when you see any of: * An explicit "remember this" or "don't forget" instruction @@ -369,21 +381,34 @@ def main() -> None: Do **not** spam the store — one clean save per signal is enough. -## Rule 3 — Search on explicit recall requests +## Rule 4 — Search on explicit recall requests When the user says something like "what did we …", "recall …", or "from last time …", call `search_memory` with a short query derived from their words. Present results naturally; do not show raw JSON. -## Rule 4 — Stay quiet about the plumbing +## Rule 5 — Save handoff digest before pausing/ending + +When pausing, hitting tool limits, or ending a substantial work chunk, call +`save_session_digest` with: +* `task_summary` +* `repo` +* `status` (`paused`/`active`/`completed`) +* `decisions_made`, `files_touched`, `todos_remaining` +* `blockers`, `key_commands`, `test_results` when available +* `agent_id` and `requester_agent_id` as `"claude-code"` + +## Rule 6 — Stay quiet about the plumbing Never mention hooks, plugin files, MCP transport, or internal URLs unless the user explicitly asks how memory works. -## Rule 5 — Tool selection guide +## Rule 7 — Tool selection guide | User intent | Tool to call | Key params | |---|---|---| +| Resume prior repo task | `get_last_session` | `user_id`, `repo` (`agent_id` only when explicitly requested) | +| Save cross-agent handoff | `save_session_digest` | `task_summary`, `status`, `files_touched`, `todos_remaining` | | Quick save (no categories) | `remember` | `content` | | Save with categories / scope | `add_memory` | `content`, `categories`, `scope` | | Find something from before | `search_memory` | `query`, `limit` | diff --git a/engram/mcp_server.py b/engram/mcp_server.py index d256bbb..02860e7 100644 --- a/engram/mcp_server.py +++ b/engram/mcp_server.py @@ -166,7 +166,7 @@ def get_memory() -> Memory: @server.list_tools() async def list_tools() -> List[Tool]: """List available engram tools.""" - return [ + tools = [ Tool( name="add_memory", description="Create a write proposal in staging by default. Supports direct writes only when mode='direct' in trusted local contexts. For simple saves without extras, prefer `remember`.", @@ -704,8 +704,185 @@ async def list_tools() -> List[Tool]: "required": ["query"] } ), + # ---- Cross-Agent Handoff tools ---- + Tool( + name="save_session_digest", + description="Save a session digest before ending or when interrupted. Enables cross-agent handoff so another agent can continue where you left off.", + inputSchema={ + "type": "object", + "properties": { + "task_summary": { + "type": "string", + "description": "What was the agent doing — the main task being worked on" + }, + "user_id": { + "type": "string", + "description": "User identifier (default: 'default')" + }, + "agent_id": { + "type": "string", + "description": "Identifier of the agent saving the digest (default: 'claude-code')" + }, + "requester_agent_id": { + "type": "string", + "description": "Agent identity performing this write (defaults to agent_id)." + }, + "repo": { + "type": "string", + "description": "Repository or project path for scoping" + }, + "branch": { + "type": "string", + "description": "Optional branch name for lane routing" + }, + "lane_id": { + "type": "string", + "description": "Optional lane identifier for checkpointing" + }, + "lane_type": { + "type": "string", + "description": "Lane category (default: general)" + }, + "agent_role": { + "type": "string", + "description": "Role of source agent (pm/design/frontend/backend/etc.)" + }, + "namespace": { + "type": "string", + "description": "Namespace scope for handoff (default: default)" + }, + "confidentiality_scope": { + "type": "string", + "description": "Confidentiality scope for handoff (default: work)" + }, + "status": { + "type": "string", + "enum": ["active", "paused", "completed", "abandoned"], + "description": "Session status (default: 'paused')" + }, + "decisions_made": { + "type": "array", + "items": {"type": "string"}, + "description": "Key decisions made during the session" + }, + "files_touched": { + "type": "array", + "items": {"type": "string"}, + "description": "File paths modified during the session" + }, + "todos_remaining": { + "type": "array", + "items": {"type": "string"}, + "description": "Remaining work items for the next agent" + }, + "blockers": { + "type": "array", + "items": {"type": "string"}, + "description": "Known blockers for the receiving agent" + }, + "key_commands": { + "type": "array", + "items": {"type": "string"}, + "description": "Important commands run during the session" + }, + "test_results": { + "type": "array", + "items": {"type": "string"}, + "description": "Recent test outcomes" + }, + "context_snapshot": { + "type": "string", + "description": "Free-form context blob for the receiving agent" + }, + "started_at": { + "type": "string", + "description": "ISO timestamp when the session started" + }, + "ended_at": { + "type": "string", + "description": "ISO timestamp when the session ended" + }, + }, + "required": ["task_summary"] + } + ), + Tool( + name="get_last_session", + description="Get the most recent session digest to continue where the last agent left off. Returns full handoff context including linked memories.", + inputSchema={ + "type": "object", + "properties": { + "user_id": { + "type": "string", + "description": "User identifier (default: 'default')" + }, + "agent_id": { + "type": "string", + "description": "Filter by source agent identifier" + }, + "requester_agent_id": { + "type": "string", + "description": "Agent identity performing this read." + }, + "repo": { + "type": "string", + "description": "Filter by repository/project path" + }, + "statuses": { + "type": "array", + "items": {"type": "string"}, + "description": "Optional status list filter (defaults to active/paused)" + }, + } + } + ), + Tool( + name="list_sessions", + description="Browse session handoff history. Returns a summary list of past sessions.", + inputSchema={ + "type": "object", + "properties": { + "user_id": { + "type": "string", + "description": "User identifier (default: 'default')" + }, + "agent_id": { + "type": "string", + "description": "Filter by agent identifier" + }, + "requester_agent_id": { + "type": "string", + "description": "Agent identity performing this read." + }, + "repo": { + "type": "string", + "description": "Filter by repository/project path" + }, + "status": { + "type": "string", + "enum": ["active", "paused", "completed", "abandoned"], + "description": "Filter by session status" + }, + "limit": { + "type": "integer", + "description": "Maximum number of sessions to return (default: 20)" + }, + "statuses": { + "type": "array", + "items": {"type": "string"}, + "description": "Optional status list filter." + }, + } + } + ), ] + # Some MCP clients cap/trim tool manifests per chat. + # Keep handoff tools at the front so cross-agent continuity remains available. + priority = {"save_session_digest": 0, "get_last_session": 1, "list_sessions": 2} + tools.sort(key=lambda tool: priority.get(tool.name, 1000)) + return tools + @server.call_tool() async def call_tool(name: str, arguments: Dict[str, Any]) -> List[TextContent]: @@ -731,6 +908,79 @@ def _session_token( ) return session["token"] + def _preview(value: Any, limit: int = 1200) -> str: + try: + text = json.dumps(value, default=str) + except Exception: + text = str(value) + if len(text) > limit: + return text[:limit] + "...(truncated)" + return text + + auto_handoff_enabled = bool( + getattr(memory, "handoff_processor", None) + and getattr(memory, "handoff_config", None) + and getattr(memory.handoff_config, "auto_session_bus", False) + ) + auto_handoff_skip_tools = {"save_session_digest", "get_last_session", "list_sessions"} + auto_handoff_context: Dict[str, Any] = {} + auto_handoff_token: Optional[str] = None + auto_resume_packet: Optional[Dict[str, Any]] = None + + if auto_handoff_enabled and name not in auto_handoff_skip_tools: + caller_agent_id = ( + arguments.get("requester_agent_id") + or arguments.get("agent_id") + or os.environ.get("ENGRAM_MCP_AGENT_ID") + or "claude-code" + ) + user_id = arguments.get("user_id", "default") + namespace = arguments.get("namespace", "default") + repo_path = arguments.get("repo") or arguments.get("repo_path") or os.getcwd() + objective = ( + arguments.get("task_summary") + or arguments.get("objective") + or arguments.get("query") + or f"{name} task" + ) + auto_handoff_context = { + "user_id": user_id, + "agent_id": caller_agent_id, + "namespace": namespace, + "repo_path": repo_path, + "branch": arguments.get("branch"), + "lane_id": arguments.get("lane_id"), + "lane_type": arguments.get("lane_type", "general"), + "objective": objective, + "agent_role": arguments.get("agent_role"), + "confidentiality_scope": arguments.get("confidentiality_scope", "work"), + } + try: + auto_handoff_token = _session_token( + user_id=user_id, + agent_id=caller_agent_id, + capabilities=["read_handoff", "write_handoff"], + namespaces=[namespace], + ) + except Exception: + auto_handoff_token = None + try: + auto_resume_packet = memory.auto_resume_context( + user_id=user_id, + agent_id=caller_agent_id, + repo_path=repo_path, + branch=arguments.get("branch"), + lane_type=arguments.get("lane_type", "general"), + objective=objective, + agent_role=arguments.get("agent_role"), + namespace=namespace, + token=auto_handoff_token, + requester_agent_id=caller_agent_id, + auto_create=True, + ) + except Exception: + auto_resume_packet = None + if name in {"add_memory", "propose_write"}: content = arguments.get("content", "") user_id = arguments.get("user_id", "default") @@ -1206,9 +1456,177 @@ def _session_token( "total": len(profiles), } + # ---- Handoff tools ---- + elif name == "save_session_digest": + user_id = arguments.get("user_id", "default") + agent_id = arguments.get("agent_id", "claude-code") + requester_agent_id = arguments.get("requester_agent_id", agent_id) + namespace = arguments.get("namespace", "default") + token = _session_token( + user_id=user_id, + agent_id=requester_agent_id, + capabilities=["write_handoff"], + namespaces=[namespace], + ) + task_summary = str(arguments.get("task_summary", "")).strip() + if not task_summary: + result = {"error": "task_summary is required"} + else: + digest = { + "task_summary": task_summary, + "repo": arguments.get("repo"), + "branch": arguments.get("branch"), + "lane_id": arguments.get("lane_id"), + "lane_type": arguments.get("lane_type"), + "agent_role": arguments.get("agent_role"), + "namespace": namespace, + "confidentiality_scope": arguments.get("confidentiality_scope", "work"), + "status": arguments.get("status", "paused"), + "decisions_made": arguments.get("decisions_made", []), + "files_touched": arguments.get("files_touched", []), + "todos_remaining": arguments.get("todos_remaining", []), + "blockers": arguments.get("blockers", []), + "key_commands": arguments.get("key_commands", []), + "test_results": arguments.get("test_results", []), + "context_snapshot": arguments.get("context_snapshot"), + "started_at": arguments.get("started_at"), + "ended_at": arguments.get("ended_at"), + } + result = memory.save_session_digest( + user_id, + agent_id, + digest, + token=token, + requester_agent_id=requester_agent_id, + ) + + elif name == "get_last_session": + user_id = arguments.get("user_id", "default") + agent_id = arguments.get("agent_id") + requester_agent_id = arguments.get( + "requester_agent_id", + arguments.get("agent_id", "claude-code"), + ) + namespace = arguments.get("namespace", "default") + token = _session_token( + user_id=user_id, + agent_id=requester_agent_id, + capabilities=["read_handoff"], + namespaces=[namespace], + ) + repo = arguments.get("repo") + session = memory.get_last_session( + user_id, + agent_id=agent_id, + repo=repo, + statuses=arguments.get("statuses"), + token=token, + requester_agent_id=requester_agent_id, + ) + if session: + result = session + else: + result = {"error": "No sessions found"} + + elif name == "list_sessions": + user_id = arguments.get("user_id", "default") + requester_agent_id = arguments.get( + "requester_agent_id", + arguments.get("agent_id", "claude-code"), + ) + namespace = arguments.get("namespace", "default") + token = _session_token( + user_id=user_id, + agent_id=requester_agent_id, + capabilities=["read_handoff"], + namespaces=[namespace], + ) + sessions = memory.list_sessions( + user_id=user_id, + agent_id=arguments.get("agent_id"), + repo=arguments.get("repo"), + status=arguments.get("status"), + statuses=arguments.get("statuses"), + limit=arguments.get("limit", 20), + token=token, + requester_agent_id=requester_agent_id, + ) + result = { + "sessions": [ + { + "id": s["id"], + "agent_id": s.get("agent_id"), + "repo": s.get("repo"), + "repo_id": s.get("repo_id"), + "lane_id": s.get("lane_id"), + "status": s.get("status"), + "task_summary": s.get("task_summary", "")[:200], + "last_checkpoint_at": s.get("last_checkpoint_at"), + "updated_at": s.get("updated_at"), + } + for s in sessions + ], + "total": len(sessions), + } + else: result = {"error": f"Unknown tool: {name}"} + if ( + auto_handoff_enabled + and name not in auto_handoff_skip_tools + and auto_handoff_context + and "tool_complete" in getattr(memory.handoff_config, "auto_checkpoint_events", []) + ): + checkpoint_payload = { + "status": "active", + "task_summary": ( + arguments.get("task_summary") + or arguments.get("objective") + or arguments.get("query") + or f"{name} completed" + ), + "decisions_made": arguments.get("decisions_made", []), + "files_touched": arguments.get("files_touched", []), + "todos_remaining": arguments.get("todos_remaining", []), + "blockers": arguments.get("blockers", []), + "key_commands": arguments.get("key_commands", []), + "test_results": arguments.get("test_results", []), + "context_snapshot": _preview( + { + "tool": name, + "arguments": arguments, + "result": result, + }, + limit=2000, + ), + } + try: + checkpoint_result = memory.auto_checkpoint( + user_id=auto_handoff_context["user_id"], + agent_id=auto_handoff_context["agent_id"], + payload=checkpoint_payload, + event_type="tool_complete", + repo_path=auto_handoff_context["repo_path"], + branch=auto_handoff_context["branch"], + lane_id=auto_handoff_context["lane_id"], + lane_type=auto_handoff_context["lane_type"], + objective=auto_handoff_context["objective"], + agent_role=auto_handoff_context["agent_role"], + namespace=auto_handoff_context["namespace"], + confidentiality_scope=auto_handoff_context["confidentiality_scope"], + token=auto_handoff_token, + requester_agent_id=auto_handoff_context["agent_id"], + ) + except Exception as checkpoint_exc: + checkpoint_result = {"error": str(checkpoint_exc)} + + if isinstance(result, dict): + handoff_meta: Dict[str, Any] = {"checkpoint": checkpoint_result} + if auto_resume_packet: + handoff_meta["resume"] = auto_resume_packet + result["_handoff"] = handoff_meta + return [TextContent(type="text", text=json.dumps(result, indent=2, default=str))] except Exception as e: diff --git a/engram/memory/client.py b/engram/memory/client.py index b93dd87..1162a71 100644 --- a/engram/memory/client.py +++ b/engram/memory/client.py @@ -168,6 +168,109 @@ def run_sleep_cycle( } return self._request("POST", "/v1/sleep/run", json_body=payload) + def handoff_resume( + self, + *, + user_id: str, + agent_id: Optional[str] = None, + repo_path: Optional[str] = None, + branch: Optional[str] = None, + lane_type: str = "general", + objective: Optional[str] = None, + agent_role: Optional[str] = None, + namespace: str = "default", + statuses: Optional[List[str]] = None, + auto_create: bool = True, + requester_agent_id: Optional[str] = None, + ) -> Dict[str, Any]: + payload: Dict[str, Any] = { + "user_id": user_id, + "agent_id": agent_id, + "repo_path": repo_path, + "branch": branch, + "lane_type": lane_type, + "objective": objective, + "agent_role": agent_role, + "namespace": namespace, + "statuses": statuses, + "auto_create": auto_create, + "requester_agent_id": requester_agent_id, + } + return self._request("POST", "/v1/handoff/resume", json_body=payload) + + def handoff_checkpoint( + self, + *, + user_id: str, + agent_id: str, + task_summary: Optional[str] = None, + status: str = "active", + repo_path: Optional[str] = None, + branch: Optional[str] = None, + lane_id: Optional[str] = None, + lane_type: str = "general", + objective: Optional[str] = None, + agent_role: Optional[str] = None, + namespace: str = "default", + confidentiality_scope: str = "work", + event_type: str = "tool_complete", + decisions_made: Optional[List[str]] = None, + files_touched: Optional[List[str]] = None, + todos_remaining: Optional[List[str]] = None, + blockers: Optional[List[str]] = None, + key_commands: Optional[List[str]] = None, + test_results: Optional[List[str]] = None, + context_snapshot: Optional[str] = None, + expected_version: Optional[int] = None, + requester_agent_id: Optional[str] = None, + ) -> Dict[str, Any]: + payload: Dict[str, Any] = { + "user_id": user_id, + "agent_id": agent_id, + "task_summary": task_summary, + "status": status, + "repo_path": repo_path, + "branch": branch, + "lane_id": lane_id, + "lane_type": lane_type, + "objective": objective, + "agent_role": agent_role, + "namespace": namespace, + "confidentiality_scope": confidentiality_scope, + "event_type": event_type, + "decisions_made": decisions_made or [], + "files_touched": files_touched or [], + "todos_remaining": todos_remaining or [], + "blockers": blockers or [], + "key_commands": key_commands or [], + "test_results": test_results or [], + "context_snapshot": context_snapshot, + "expected_version": expected_version, + "requester_agent_id": requester_agent_id, + } + return self._request("POST", "/v1/handoff/checkpoint", json_body=payload) + + def list_handoff_lanes( + self, + *, + user_id: str, + repo_path: Optional[str] = None, + status: Optional[str] = None, + statuses: Optional[List[str]] = None, + limit: int = 20, + requester_agent_id: Optional[str] = None, + ) -> Dict[str, Any]: + params: Dict[str, Any] = { + "user_id": user_id, + "repo_path": repo_path, + "status": status, + "limit": limit, + "requester_agent_id": requester_agent_id, + } + if statuses: + params["statuses"] = statuses + return self._request("GET", "/v1/handoff/lanes", params=params) + def get_agent_trust(self, *, user_id: str, agent_id: str) -> Dict[str, Any]: return self._request("GET", "/v1/trust", params={"user_id": user_id, "agent_id": agent_id}) diff --git a/engram/memory/main.py b/engram/memory/main.py index ddf7f47..0cbc586 100644 --- a/engram/memory/main.py +++ b/engram/memory/main.py @@ -24,6 +24,7 @@ from engram.core.graph import KnowledgeGraph from engram.core.scene import SceneProcessor from engram.core.profile import ProfileProcessor +from engram.core.handoff import HandoffProcessor from engram.core.kernel import PersonalMemoryKernel from engram.core.policy import feature_enabled from engram.db.sqlite import SQLiteManager @@ -171,6 +172,28 @@ def __init__(self, config: Optional[MemoryConfig] = None): else: self.profile_processor = None + # Initialize HandoffProcessor + self.handoff_config = self.config.handoff + if self.handoff_config.enable_handoff: + self.handoff_processor = HandoffProcessor( + db=self.db, + memory=self, + embedder=self.embedder, + llm=self.llm, + config={ + "auto_enrich": self.handoff_config.auto_enrich, + "max_sessions": self.handoff_config.max_sessions_per_user, + "auto_session_bus": self.handoff_config.auto_session_bus, + "lane_inactivity_minutes": self.handoff_config.lane_inactivity_minutes, + "max_lanes_per_user": self.handoff_config.max_lanes_per_user, + "max_checkpoints_per_lane": self.handoff_config.max_checkpoints_per_lane, + "resume_statuses": self.handoff_config.resume_statuses, + "auto_trusted_agents": self.handoff_config.auto_trusted_agents, + }, + ) + else: + self.handoff_processor = None + # v2 Personal Memory Kernel orchestration layer. self.kernel = PersonalMemoryKernel(self) @@ -2326,6 +2349,190 @@ def get_profile_memories(self, profile_id: str) -> List[Dict[str, Any]]: """Get memories linked to a profile.""" return self.db.get_profile_memories(profile_id) + # ========================================================================= + # Handoff Session Methods + # ========================================================================= + + def save_session_digest( + self, + user_id: str, + agent_id: str, + digest: Dict[str, Any], + token: Optional[str] = None, + requester_agent_id: Optional[str] = None, + ) -> Dict[str, Any]: + """Save a session digest for cross-agent handoff.""" + return self.kernel.save_session_digest( + user_id=user_id, + agent_id=agent_id, + digest=digest, + token=token, + requester_agent_id=requester_agent_id, + ) + + def get_last_session( + self, + user_id: str, + agent_id: Optional[str] = None, + repo: Optional[str] = None, + statuses: Optional[List[str]] = None, + token: Optional[str] = None, + requester_agent_id: Optional[str] = None, + ) -> Optional[Dict[str, Any]]: + """Get the most recent session, optionally filtered by agent/repo. + + Returns the full handoff context including linked memories. + """ + return self.kernel.get_last_session( + user_id=user_id, + agent_id=agent_id, + repo=repo, + statuses=statuses, + token=token, + requester_agent_id=requester_agent_id, + ) + + def list_sessions( + self, + user_id: str, + agent_id: Optional[str] = None, + repo: Optional[str] = None, + status: Optional[str] = None, + statuses: Optional[List[str]] = None, + limit: int = 20, + token: Optional[str] = None, + requester_agent_id: Optional[str] = None, + ) -> List[Dict[str, Any]]: + """List handoff sessions with optional filters.""" + return self.kernel.list_sessions( + user_id=user_id, + agent_id=agent_id, + repo=repo, + status=status, + statuses=statuses, + limit=limit, + token=token, + requester_agent_id=requester_agent_id, + ) + + def auto_resume_context( + self, + *, + user_id: str, + agent_id: Optional[str], + repo_path: Optional[str] = None, + branch: Optional[str] = None, + lane_type: str = "general", + objective: Optional[str] = None, + agent_role: Optional[str] = None, + namespace: str = "default", + statuses: Optional[List[str]] = None, + auto_create: bool = True, + token: Optional[str] = None, + requester_agent_id: Optional[str] = None, + ) -> Dict[str, Any]: + return self.kernel.auto_resume_context( + user_id=user_id, + agent_id=agent_id, + repo_path=repo_path, + branch=branch, + lane_type=lane_type, + objective=objective, + agent_role=agent_role, + namespace=namespace, + statuses=statuses, + auto_create=auto_create, + token=token, + requester_agent_id=requester_agent_id, + ) + + def auto_checkpoint( + self, + *, + user_id: str, + agent_id: str, + payload: Dict[str, Any], + event_type: str = "tool_complete", + repo_path: Optional[str] = None, + branch: Optional[str] = None, + lane_id: Optional[str] = None, + lane_type: str = "general", + objective: Optional[str] = None, + agent_role: Optional[str] = None, + namespace: str = "default", + confidentiality_scope: str = "work", + expected_version: Optional[int] = None, + token: Optional[str] = None, + requester_agent_id: Optional[str] = None, + ) -> Dict[str, Any]: + return self.kernel.auto_checkpoint( + user_id=user_id, + agent_id=agent_id, + payload=payload, + event_type=event_type, + repo_path=repo_path, + branch=branch, + lane_id=lane_id, + lane_type=lane_type, + objective=objective, + agent_role=agent_role, + namespace=namespace, + confidentiality_scope=confidentiality_scope, + expected_version=expected_version, + token=token, + requester_agent_id=requester_agent_id, + ) + + def finalize_lane( + self, + *, + user_id: str, + agent_id: str, + lane_id: str, + status: str = "paused", + payload: Optional[Dict[str, Any]] = None, + repo_path: Optional[str] = None, + branch: Optional[str] = None, + agent_role: Optional[str] = None, + namespace: str = "default", + token: Optional[str] = None, + requester_agent_id: Optional[str] = None, + ) -> Dict[str, Any]: + return self.kernel.finalize_lane( + user_id=user_id, + agent_id=agent_id, + lane_id=lane_id, + status=status, + payload=payload, + repo_path=repo_path, + branch=branch, + agent_role=agent_role, + namespace=namespace, + token=token, + requester_agent_id=requester_agent_id, + ) + + def list_handoff_lanes( + self, + *, + user_id: str, + repo_path: Optional[str] = None, + status: Optional[str] = None, + statuses: Optional[List[str]] = None, + limit: int = 20, + token: Optional[str] = None, + requester_agent_id: Optional[str] = None, + ) -> List[Dict[str, Any]]: + return self.kernel.list_handoff_lanes( + user_id=user_id, + repo_path=repo_path, + status=status, + statuses=statuses, + limit=limit, + token=token, + requester_agent_id=requester_agent_id, + ) + # ========================================================================= # Dashboard / Visualization Methods # ========================================================================= diff --git a/engram/utils/repo_identity.py b/engram/utils/repo_identity.py new file mode 100644 index 0000000..b2fc95c --- /dev/null +++ b/engram/utils/repo_identity.py @@ -0,0 +1,72 @@ +"""Repository identity helpers for cross-agent handoff.""" + +from __future__ import annotations + +import hashlib +import os +import subprocess +from typing import Dict, Optional + + +def _run_git(repo_path: str, args: list[str]) -> Optional[str]: + try: + output = subprocess.check_output( + ["git", "-C", repo_path, *args], + stderr=subprocess.DEVNULL, + text=True, + ) + value = (output or "").strip() + return value or None + except Exception: + return None + + +def _normalize_remote(remote_url: Optional[str]) -> Optional[str]: + if not remote_url: + return None + value = remote_url.strip() + if not value: + return None + if value.startswith("git@"): + # git@github.com:owner/repo.git -> ssh://git@github.com/owner/repo + value = value.replace(":", "/", 1) + value = f"ssh://{value}" + if value.endswith(".git"): + value = value[:-4] + return value.lower() + + +def _hash(prefix: str, value: str) -> str: + digest = hashlib.sha1(value.encode("utf-8")).hexdigest() + return f"{prefix}:{digest[:20]}" + + +def canonicalize_repo_identity( + repo_path: Optional[str], + *, + branch: Optional[str] = None, +) -> Dict[str, Optional[str]]: + """Return stable repo identity for handoff lane routing.""" + path_hint = repo_path or os.getcwd() + resolved_path = os.path.realpath(os.path.expanduser(path_hint)) + + git_root = _run_git(resolved_path, ["rev-parse", "--show-toplevel"]) + canonical_path = os.path.realpath(git_root) if git_root else resolved_path + + git_remote = _normalize_remote(_run_git(canonical_path, ["config", "--get", "remote.origin.url"])) + git_branch = branch or _run_git(canonical_path, ["rev-parse", "--abbrev-ref", "HEAD"]) + if git_branch == "HEAD": + git_branch = None + + if git_remote: + repo_id = _hash("git", git_remote) + else: + repo_id = _hash("path", canonical_path.lower()) + + return { + "repo_id": repo_id, + "repo_path": canonical_path, + "branch": git_branch, + "remote": git_remote, + } +