from __future__ import annotations

import json
import math
import re
import subprocess
import uuid
from dataclasses import dataclass
from datetime import UTC, datetime, timedelta
from typing import Any
from urllib.parse import urlparse

from .schema import SCHEMA_VERSION
from .store import SQLiteStore


def utcnow() -> str:
    return datetime.now(UTC).isoformat()


def make_id(prefix: str) -> str:
    return f"{prefix}_{uuid.uuid4().hex}"


def normalize_text(value: str) -> str:
    return " ".join(value.lower().split())


def normalize_for_dedupe(value: str) -> str:
    text = value.lower()
    text = re.sub(r"\[telegram\]", " ", text)
    text = re.sub(r"https?://\S+", " URL ", text)
    text = re.sub(r"\b(task|run|job|session)_[a-z0-9_-]+\b", " ID ", text)
    text = re.sub(r"\b\d{4}-\d{2}-\d{2}[t ][^\s]+\b", " TS ", text)
    text = re.sub(r"payload:\s*\{.*", " payload_json ", text, flags=re.IGNORECASE | re.DOTALL)
    text = re.sub(r'"route"\s*:\s*\{.*', " route_json ", text, flags=re.IGNORECASE | re.DOTALL)
    text = re.sub(r"[^\w\s]", " ", text)
    return " ".join(text.split())


def _line_value(content: str, label: str) -> str:
    prefix = f"{label}:"
    for line in content.splitlines():
        if line.startswith(prefix):
            return line[len(prefix) :].strip()
    return ""


@dataclass(slots=True)
class SearchResult:
    memory: dict[str, Any]
    score: float
    explanation: str
    facet_score: float
    lexical_score: float
    semantic_score: float

    def as_dict(self) -> dict[str, Any]:
        return {
            "memory": self.memory,
            "score": self.score,
            "score_breakdown": {
                "facet": self.facet_score,
                "lexical": self.lexical_score,
                "semantic": self.semantic_score,
                "confidence": self.memory["confidence"],
                "freshness": self.memory["freshness"],
            },
            "matched_scope": self.memory["scope"],
            "evidence_ref": self.memory["evidence_ref"],
            "explanation": self.explanation,
        }


class MemoryService:
    def __init__(self, db_path: str) -> None:
        self.store = SQLiteStore(db_path)
        self._maintenance_check_running = False
        self.store.initialize()
        self.migrate_v2()
        self._ensure_default_jobs()
        self._run_opportunistic_maintenance(limit=10)

    def create_memory(self, memory_input: dict[str, Any]) -> dict[str, Any]:
        now = utcnow()
        metadata = dict(memory_input.get("metadata") or {})
        subtype = memory_input.get("subtype")
        run_id = memory_input.get("run_id")
        task_id = memory_input.get("task_id")
        url = memory_input.get("url")
        domain = memory_input.get("domain") or (urlparse(url).netloc if url else None)
        origin_agent = memory_input.get("origin_agent") or metadata.get("origin_agent") or memory_input.get("agent_id")
        record = {
            "id": memory_input.get("id", make_id("mem")),
            "schema_version": memory_input.get("schema_version", SCHEMA_VERSION),
            "type": memory_input["type"],
            "subtype": subtype,
            "scope": memory_input["scope"],
            "status": memory_input.get("status", "active"),
            "project_id": memory_input.get("project_id"),
            "repo_id": memory_input.get("repo_id"),
            "agent_id": memory_input.get("agent_id"),
            "origin_agent": origin_agent,
            "run_id": run_id,
            "task_id": task_id,
            "url": url,
            "domain": domain,
            "source_kind": memory_input.get("source_kind", "manual"),
            "title": memory_input["title"],
            "content": memory_input["content"],
            "summary": memory_input.get("summary") or self._make_summary(memory_input["content"]),
            "confidence": float(memory_input.get("confidence", 1.0)),
            "freshness": float(memory_input.get("freshness", 1.0)),
            "created_at": memory_input.get("created_at", now),
            "updated_at": now,
            "observed_at": memory_input.get("observed_at", now),
            "source_ref": memory_input.get("source_ref"),
            "evidence_ref": memory_input.get("evidence_ref"),
            "embedding_json": self._encode_embedding(memory_input.get("embedding")),
            "metadata_json": self.store.dumps(metadata),
        }
        with self.store.connection() as conn:
            conn.execute(
                """
                INSERT OR REPLACE INTO memories (
                    id, schema_version, type, subtype, scope, status, project_id, repo_id, agent_id, origin_agent,
                    run_id, task_id, url, domain, source_kind, title, content, summary, confidence, freshness,
                    created_at, updated_at, observed_at, source_ref, evidence_ref, embedding_json, metadata_json
                ) VALUES (
                    :id, :schema_version, :type, :subtype, :scope, :status, :project_id, :repo_id, :agent_id, :origin_agent,
                    :run_id, :task_id, :url, :domain, :source_kind, :title, :content, :summary, :confidence, :freshness,
                    :created_at, :updated_at, :observed_at, :source_ref, :evidence_ref, :embedding_json, :metadata_json
                )
                """,
                record,
            )
            self._upsert_fts(conn, record)
        self._run_opportunistic_maintenance(limit=1)
        return self.get_memory(record["id"])

    def ingest(self, memory_input: dict[str, Any]) -> dict[str, Any]:
        return self.create_memory(memory_input)

    def capture_session(self, source_ref: str, scope: str, content: str, **metadata: Any) -> dict[str, Any]:
        return self._capture_source(
            trigger_type="session_close",
            source_kind="run",
            source_ref=source_ref,
            scope=scope,
            content=content,
            metadata=metadata,
        )

    def capture_conversation(
        self, source_ref: str, scope: str, content: str, capture_mode: str = "manual_import", **metadata: Any
    ) -> dict[str, Any]:
        metadata = {"capture_mode": capture_mode, **metadata}
        return self._capture_source(
            trigger_type="conversation_capture",
            source_kind="conversation",
            source_ref=source_ref,
            scope=scope,
            content=content,
            metadata=metadata,
        )

    def search(
        self,
        query: str,
        scopes: list[str] | None = None,
        filters: dict[str, Any] | None = None,
        limit: int = 10,
        include_inbox: bool = False,
    ) -> dict[str, Any]:
        filters = filters or {}
        query_terms = [term for term in normalize_text(query).split() if term]
        sql = ["SELECT * FROM memories WHERE 1=1"]
        params: list[Any] = []

        if scopes:
            sql.append(f"AND scope IN ({','.join('?' for _ in scopes)})")
            params.extend(scopes)
        if not include_inbox:
            sql.append("AND status = ?")
            params.append("active")
        params.extend(self._memory_filter_sql(sql, filters))

        with self.store.connection() as conn:
            rows = [self._row_to_memory(row) for row in conn.execute(" ".join(sql), params)]

        results: list[SearchResult] = []
        query_embedding = self._text_embedding(query) if query_terms else []
        for row in rows:
            haystack = normalize_text(" ".join([row["title"], row["summary"], row["content"]]))
            lexical = self._lexical_score(query_terms, haystack)
            semantic = self._semantic_score(query_embedding, row.get("embedding")) if query_terms else 0.0
            facet = self._facet_score(row, filters)
            if query_terms and lexical == 0 and semantic == 0 and facet == 0:
                continue
            score = facet * 0.45 + lexical * 0.3 + semantic * 0.15 + row["confidence"] * 0.06 + row["freshness"] * 0.04
            explanation = (
                f"facets={facet:.2f} lexical={lexical:.2f} semantic={semantic:.2f} "
                f"confidence={row['confidence']:.2f} freshness={row['freshness']:.2f}"
            )
            results.append(SearchResult(row, score, explanation, facet, lexical, semantic))

        ordered = sorted(
            results,
            key=lambda item: (
                item.facet_score,
                item.lexical_score,
                item.semantic_score,
                item.memory["confidence"],
                item.memory["freshness"],
                item.memory["updated_at"],
            ),
            reverse=True,
        )[:limit]
        payload = [item.as_dict() for item in ordered]

        retrieval_id = make_id("ret")
        with self.store.connection() as conn:
            conn.execute(
                "INSERT INTO retrieval_logs (id, query_text, filters_json, results_json, created_at) VALUES (?, ?, ?, ?, ?)",
                (
                    retrieval_id,
                    query,
                    self.store.dumps({"scopes": scopes, "filters": filters, "include_inbox": include_inbox}),
                    self.store.dumps(payload),
                    utcnow(),
                ),
            )
        return {"retrieval_id": retrieval_id, "results": payload}

    def context_for(
        self, project: str | None = None, repo: str | None = None, agent: str | None = None, task: str | None = None
    ) -> dict[str, Any]:
        profile_facts = self.list_memories(status="active", scope="global", memory_type="profile", limit=8)
        project_filters = {"project_id": project, "repo_id": repo}
        active_decisions = self.list_memories(
            status="active",
            scope="project" if project else None,
            memory_type="decision",
            project_id=project,
            repo_id=repo,
            limit=8,
        )
        project_memories = self.list_memories(
            status="active",
            project_id=project,
            repo_id=repo,
            limit=12,
        )
        project_memories = [item for item in project_memories if item["type"] in {"project", "decision", "artifact", "episode"}][:12]
        recent_episodes = self.list_memories(
            status="active",
            memory_type="episode",
            project_id=project,
            repo_id=repo,
            limit=10,
        )
        task_relevant_artifacts = self.list_memories(
            status="active",
            memory_type="artifact",
            project_id=project,
            repo_id=repo,
            task_id=task,
            limit=10,
        )
        if task and not task_relevant_artifacts:
            task_relevant_artifacts = self.search(task, filters={**project_filters, "type": "artifact"}, limit=5)["results"]
        citations = [
            item["id"]
            for item in [*profile_facts, *project_memories, *active_decisions, *recent_episodes]
            if isinstance(item, dict) and "id" in item
        ]
        return {
            "profile_facts": profile_facts,
            "project_memories": project_memories,
            "active_decisions": active_decisions,
            "recent_episodes": recent_episodes,
            "task_relevant_artifacts": task_relevant_artifacts,
            "citations": citations,
            "agent_id": agent,
        }

    def consolidate(self, inbox_items: list[str]) -> dict[str, Any]:
        promoted: list[str] = []
        linked: list[dict[str, str]] = []
        with self.store.connection() as conn:
            for memory_id in inbox_items:
                row = conn.execute("SELECT * FROM memories WHERE id = ?", (memory_id,)).fetchone()
                if row is None:
                    continue
                memory = self._row_to_memory(row)
                duplicate = conn.execute(
                    """
                    SELECT id FROM memories
                    WHERE id != ?
                      AND status = 'active'
                      AND type = ?
                      AND scope = ?
                      AND content = ?
                    LIMIT 1
                    """,
                    (memory_id, memory["type"], memory["scope"], memory["content"]),
                ).fetchone()
                if duplicate:
                    link_id = make_id("lnk")
                    conn.execute(
                        """
                        INSERT INTO memory_links (id, from_memory_id, to_memory_id, relation, created_at, metadata_json)
                        VALUES (?, ?, ?, ?, ?, ?)
                        """,
                        (
                            link_id,
                            memory_id,
                            duplicate["id"],
                            "related_to",
                            utcnow(),
                            self.store.dumps({"reason": "exact_duplicate"}),
                        ),
                    )
                    conn.execute("UPDATE memories SET status = ?, updated_at = ? WHERE id = ?", ("archived", utcnow(), memory_id))
                    linked.append({"from": memory_id, "to": duplicate["id"], "relation": "related_to"})
                    continue
                conn.execute("UPDATE memories SET status = ?, updated_at = ? WHERE id = ?", ("active", utcnow(), memory_id))
                promoted.append(memory_id)
        return {"promoted": promoted, "linked": linked}

    def consolidate_candidates(self, *, config: dict[str, Any]) -> dict[str, Any]:
        stats = {
            "candidates": 0,
            "promoted": 0,
            "archived": 0,
            "linked": 0,
            "conflicts": 0,
            "llm_summaries": 0,
        }
        age_days = int(config.get("candidate_age_days", 7))
        max_candidates = int(config.get("max_candidates", 200))
        high_threshold = float(config.get("dedupe_high_threshold", 0.9))
        mid_threshold = float(config.get("dedupe_mid_threshold", 0.75))
        promote_threshold = float(config.get("promote_confidence", 0.7))
        conflict_threshold = float(config.get("conflict_threshold", 0.55))
        llm_enabled = bool(config.get("llm_enabled", False))
        llm_provider = config.get("llm_provider") or "opencode"
        llm_command = config.get("llm_command") or ["opencode", "-m", "MiniMax M2.5 Free"]
        cutoff = (datetime.now(UTC) - timedelta(days=age_days)).isoformat()

        with self.store.connection() as conn:
            rows = conn.execute(
                """
                SELECT * FROM memories
                WHERE (
                    status = 'inbox'
                    OR (type = 'episode' AND status = 'active')
                )
                  AND created_at <= ?
                ORDER BY created_at ASC
                LIMIT ?
                """,
                (cutoff, max_candidates),
            ).fetchall()

            candidates = [self._row_to_memory(row) for row in rows]
            stats["candidates"] = len(candidates)

            for candidate in candidates:
                related = conn.execute(
                    """
                    SELECT * FROM memories
                    WHERE status = 'active'
                      AND id != ?
                      AND type = ?
                      AND scope = ?
                    ORDER BY updated_at DESC
                    LIMIT 50
                    """,
                    (candidate["id"], candidate["type"], candidate["scope"]),
                ).fetchall()
                related_memories = [self._row_to_memory(row) for row in related]

                candidate_norm = normalize_for_dedupe(candidate["content"])
                best_match = None
                best_score = 0.0
                for other in related_memories:
                    other_norm = normalize_for_dedupe(other["content"])
                    lexical = self._lexical_score(candidate_norm.split(), other_norm)
                    semantic = self._semantic_score(candidate.get("embedding") or [], other.get("embedding"))
                    score = lexical if semantic == 0.0 else (lexical * 0.6 + semantic * 0.4)
                    if score > best_score:
                        best_score = score
                        best_match = other

                if best_match and candidate_norm == normalize_for_dedupe(best_match["content"]):
                    self._link_memories(conn, candidate["id"], best_match["id"], "related_to", "exact_duplicate")
                    conn.execute(
                        "UPDATE memories SET status = ?, updated_at = ? WHERE id = ?",
                        ("archived", utcnow(), candidate["id"]),
                    )
                    stats["archived"] += 1
                    stats["linked"] += 1
                    continue

                if best_match and best_score >= high_threshold:
                    self._link_memories(conn, candidate["id"], best_match["id"], "related_to", "high_similarity")
                    conn.execute(
                        "UPDATE memories SET status = ?, updated_at = ? WHERE id = ?",
                        ("archived", utcnow(), candidate["id"]),
                    )
                    stats["archived"] += 1
                    stats["linked"] += 1
                    continue

                if best_match and best_score >= mid_threshold:
                    if llm_enabled:
                        synthesized = self._llm_summarize_pair(
                            candidate,
                            best_match,
                            provider=llm_provider,
                            command=llm_command,
                        )
                        if synthesized:
                            conn.execute(
                                "UPDATE memories SET summary = ?, updated_at = ?, metadata_json = ? WHERE id = ?",
                                (
                                    synthesized,
                                    utcnow(),
                                    self.store.dumps(
                                        {
                                            **candidate.get("metadata", {}),
                                            "synthesis_sources": [candidate["id"], best_match["id"]],
                                            "synthesis_provider": llm_provider,
                                            "synthesis_command": llm_command,
                                        }
                                    ),
                                    candidate["id"],
                                ),
                            )
                            stats["llm_summaries"] += 1
                    self._link_memories(conn, candidate["id"], best_match["id"], "related_to", "mid_similarity")
                    stats["linked"] += 1

                if best_match and best_score >= conflict_threshold and self._conflict_signal(candidate, best_match):
                    self._record_conflict(conn, candidate["id"], best_match["id"], "potential_contradiction")
                    self._link_memories(conn, candidate["id"], best_match["id"], "contradicts", "heuristic")
                    stats["conflicts"] += 1

                if candidate["status"] == "inbox" and candidate.get("confidence", 0.0) >= promote_threshold:
                    conn.execute(
                        "UPDATE memories SET status = ?, updated_at = ? WHERE id = ?",
                        ("active", utcnow(), candidate["id"]),
                    )
                    stats["promoted"] += 1

        return stats

    def upsert_profile_fact(
        self, title: str, content: str, evidence_ref: str | None = None, source_ref: str | None = None
    ) -> dict[str, Any]:
        return self.create_memory(
            {
                "type": "profile",
                "scope": "global",
                "title": title,
                "content": content,
                "summary": self._make_summary(content),
                "source_ref": source_ref,
                "evidence_ref": evidence_ref,
                "metadata": {"kind": "profile_fact"},
            }
        )

    def explain(self, identifier: str) -> dict[str, Any]:
        with self.store.connection() as conn:
            retrieval = conn.execute("SELECT * FROM retrieval_logs WHERE id = ?", (identifier,)).fetchone()
            if retrieval:
                return {
                    "kind": "retrieval",
                    "id": retrieval["id"],
                    "query_text": retrieval["query_text"],
                    "filters": self.store.loads(retrieval["filters_json"]),
                    "results": self.store.loads(retrieval["results_json"]),
                }
            memory = conn.execute("SELECT * FROM memories WHERE id = ?", (identifier,)).fetchone()
            if memory:
                mem = self._row_to_memory(memory)
                sources = conn.execute("SELECT * FROM sources WHERE source_ref = ? OR id = ?", (mem["source_ref"], mem["source_ref"])).fetchall()
                return {"kind": "memory", "memory": mem, "sources": [dict(source) for source in sources]}
        raise KeyError(f"Unknown identifier: {identifier}")

    def export(self) -> dict[str, Any]:
        tables = [
            "projects",
            "repos",
            "sources",
            "ingestion_events",
            "memories",
            "memory_links",
            "memory_conflicts",
            "retrieval_logs",
            "maintenance_jobs",
            "maintenance_runs",
            "profiles",
            "tasks",
            "task_runs",
            "artifacts",
        ]
        exported: dict[str, list[dict[str, Any]]] = {}
        with self.store.connection() as conn:
            for table in tables:
                rows = conn.execute(f"SELECT * FROM {table}").fetchall()
                exported[table] = [dict(row) for row in rows]
        return exported

    def import_data(self, payload: dict[str, Any]) -> None:
        tables = [
            "projects",
            "repos",
            "sources",
            "ingestion_events",
            "memories",
            "memory_links",
            "memory_conflicts",
            "retrieval_logs",
            "maintenance_jobs",
            "maintenance_runs",
            "profiles",
            "tasks",
            "task_runs",
            "artifacts",
        ]
        with self.store.connection() as conn:
            for table in tables:
                rows = payload.get(table, [])
                if not rows:
                    continue
                columns = list(rows[0].keys())
                placeholder = ",".join("?" for _ in columns)
                sql = f"INSERT OR REPLACE INTO {table} ({','.join(columns)}) VALUES ({placeholder})"
                conn.executemany(sql, [tuple(row.get(col) for col in columns) for row in rows])
                if table == "memories":
                    for row in rows:
                        self._upsert_fts(conn, row)
        self.migrate_v2()
        self._run_opportunistic_maintenance(limit=10)

    def get_memory(self, memory_id: str) -> dict[str, Any]:
        with self.store.connection() as conn:
            row = conn.execute("SELECT * FROM memories WHERE id = ?", (memory_id,)).fetchone()
            if row is None:
                raise KeyError(memory_id)
            return self._row_to_memory(row)

    def list_memories(
        self,
        *,
        status: str | None = "active",
        scope: str | None = None,
        memory_type: str | None = None,
        subtype: str | None = None,
        project_id: str | None = None,
        repo_id: str | None = None,
        source_ref: str | None = None,
        evidence_ref: str | None = None,
        run_id: str | None = None,
        task_id: str | None = None,
        origin_agent: str | None = None,
        url: str | None = None,
        domain: str | None = None,
        metadata: dict[str, Any] | None = None,
        limit: int = 50,
    ) -> list[dict[str, Any]]:
        sql = ["SELECT * FROM memories WHERE 1=1"]
        params: list[Any] = []
        if status is not None:
            sql.append("AND status = ?")
            params.append(status)
        if scope is not None:
            sql.append("AND scope = ?")
            params.append(scope)
        filters = {
            "type": memory_type,
            "subtype": subtype,
            "project_id": project_id,
            "repo_id": repo_id,
            "source_ref": source_ref,
            "evidence_ref": evidence_ref,
            "run_id": run_id,
            "task_id": task_id,
            "origin_agent": origin_agent,
            "url": url,
            "domain": domain,
        }
        params.extend(self._memory_filter_sql(sql, {key: value for key, value in filters.items() if value is not None}))
        for key, value in (metadata or {}).items():
            sql.append("AND json_extract(metadata_json, ?) = ?")
            params.extend((f"$.{key}", value))
        sql.append("ORDER BY updated_at DESC LIMIT ?")
        params.append(limit)
        with self.store.connection() as conn:
            rows = conn.execute(" ".join(sql), params).fetchall()
            return [self._row_to_memory(row) for row in rows]

    def list_pending_events(self) -> list[dict[str, Any]]:
        with self.store.connection() as conn:
            rows = conn.execute("SELECT * FROM ingestion_events WHERE job_state = 'pending' ORDER BY created_at ASC").fetchall()
            return [dict(row) for row in rows]

    def mark_event(self, event_id: str, state: str, outcome: str | None = None, error_message: str | None = None) -> None:
        with self.store.connection() as conn:
            conn.execute(
                """
                UPDATE ingestion_events
                SET job_state = ?, processor_outcome = ?, error_message = ?, updated_at = ?, processed_at = ?
                WHERE id = ?
                """,
                (state, outcome, error_message, utcnow(), utcnow(), event_id),
            )

    def create_project(self, project_id: str, name: str, description: str | None = None) -> None:
        now = utcnow()
        with self.store.connection() as conn:
            conn.execute(
                "INSERT OR REPLACE INTO projects (id, name, description, created_at, updated_at) VALUES (?, ?, ?, ?, ?)",
                (project_id, name, description, now, now),
            )
        self._run_opportunistic_maintenance(limit=1)

    def create_repo(self, repo_id: str, name: str, project_id: str | None = None, path: str | None = None) -> None:
        now = utcnow()
        with self.store.connection() as conn:
            conn.execute(
                "INSERT OR REPLACE INTO repos (id, project_id, name, path, created_at, updated_at) VALUES (?, ?, ?, ?, ?, ?)",
                (repo_id, project_id, name, path, now, now),
            )
        self._run_opportunistic_maintenance(limit=1)

    def create_task(
        self,
        *,
        title: str,
        intent: str,
        kind: str = "task",
        status: str = "open",
        priority: int = 3,
        project_id: str | None = None,
        repo_id: str | None = None,
        parent_task_id: str | None = None,
        origin: str | None = None,
        owner_agent: str | None = None,
        blocked_reason: str | None = None,
        requires_human_input: bool = False,
        due_at: str | None = None,
        metadata: dict[str, Any] | None = None,
        task_id: str | None = None,
        run_id: str | None = None,
    ) -> dict[str, Any]:
        now = utcnow()
        record = {
            "id": task_id or make_id("task"),
            "schema_version": SCHEMA_VERSION,
            "run_id": run_id,
            "title": title,
            "intent": intent,
            "kind": kind,
            "status": status,
            "priority": priority,
            "project_id": project_id,
            "repo_id": repo_id,
            "parent_task_id": parent_task_id,
            "origin": origin,
            "owner_agent": owner_agent,
            "blocked_reason": blocked_reason,
            "requires_human_input": 1 if requires_human_input else 0,
            "due_at": due_at,
            "created_at": now,
            "updated_at": now,
            "metadata_json": self.store.dumps(metadata or {}),
        }
        with self.store.connection() as conn:
            conn.execute(
                """
                INSERT INTO tasks (
                    id, schema_version, run_id, title, intent, kind, status, priority, project_id, repo_id,
                    parent_task_id, origin, owner_agent, blocked_reason, requires_human_input, due_at,
                    created_at, updated_at, metadata_json
                ) VALUES (
                    :id, :schema_version, :run_id, :title, :intent, :kind, :status, :priority, :project_id, :repo_id,
                    :parent_task_id, :origin, :owner_agent, :blocked_reason, :requires_human_input, :due_at,
                    :created_at, :updated_at, :metadata_json
                )
                """,
                record,
            )
        self._run_opportunistic_maintenance(limit=1)
        return self.get_task(record["id"])

    def get_task(self, task_id: str) -> dict[str, Any]:
        with self.store.connection() as conn:
            row = conn.execute("SELECT * FROM tasks WHERE id = ?", (task_id,)).fetchone()
            if row is None:
                raise KeyError(task_id)
            return self._row_to_task(row)

    def list_tasks(
        self,
        *,
        status: str | None = None,
        owner_agent: str | None = None,
        requires_human_input: bool | None = None,
        run_id: str | None = None,
        limit: int = 50,
    ) -> list[dict[str, Any]]:
        sql = ["SELECT * FROM tasks WHERE 1=1"]
        params: list[Any] = []
        if status is not None:
            sql.append("AND status = ?")
            params.append(status)
        if owner_agent is not None:
            sql.append("AND owner_agent = ?")
            params.append(owner_agent)
        if requires_human_input is not None:
            sql.append("AND requires_human_input = ?")
            params.append(1 if requires_human_input else 0)
        if run_id is not None:
            sql.append("AND run_id = ?")
            params.append(run_id)
        sql.append("ORDER BY priority ASC, created_at ASC LIMIT ?")
        params.append(limit)
        with self.store.connection() as conn:
            rows = conn.execute(" ".join(sql), params).fetchall()
            return [self._row_to_task(row) for row in rows]

    def update_task(
        self,
        task_id: str,
        *,
        status: str | None = None,
        owner_agent: str | None = None,
        blocked_reason: str | None = None,
        requires_human_input: bool | None = None,
        metadata: dict[str, Any] | None = None,
        run_id: str | None = None,
    ) -> dict[str, Any]:
        task = self.get_task(task_id)
        task["status"] = status or task["status"]
        task["owner_agent"] = owner_agent if owner_agent is not None else task["owner_agent"]
        task["blocked_reason"] = blocked_reason
        task["run_id"] = run_id if run_id is not None else task.get("run_id")
        if requires_human_input is not None:
            task["requires_human_input"] = requires_human_input
        if metadata is not None:
            task["metadata"] = metadata
        with self.store.connection() as conn:
            conn.execute(
                """
                UPDATE tasks
                SET status = ?, owner_agent = ?, blocked_reason = ?, requires_human_input = ?, run_id = ?, updated_at = ?, metadata_json = ?
                WHERE id = ?
                """,
                (
                    task["status"],
                    task["owner_agent"],
                    task["blocked_reason"],
                    1 if task["requires_human_input"] else 0,
                    task.get("run_id"),
                    utcnow(),
                    self.store.dumps(task["metadata"]),
                    task_id,
                ),
            )
        self._run_opportunistic_maintenance(limit=1)
        return self.get_task(task_id)

    def start_task_run(
        self,
        task_id: str,
        agent_id: str,
        *,
        input_payload: dict[str, Any] | None = None,
        metadata: dict[str, Any] | None = None,
    ) -> dict[str, Any]:
        now = utcnow()
        record = {
            "id": make_id("run"),
            "task_id": task_id,
            "agent_id": agent_id,
            "status": "running",
            "input_payload_json": self.store.dumps(input_payload or {}),
            "result_summary": None,
            "error_message": None,
            "started_at": now,
            "completed_at": None,
            "created_at": now,
            "updated_at": now,
            "metadata_json": self.store.dumps(metadata or {}),
        }
        with self.store.connection() as conn:
            conn.execute(
                """
                INSERT INTO task_runs (
                    id, task_id, agent_id, status, input_payload_json, result_summary, error_message,
                    started_at, completed_at, created_at, updated_at, metadata_json
                ) VALUES (
                    :id, :task_id, :agent_id, :status, :input_payload_json, :result_summary, :error_message,
                    :started_at, :completed_at, :created_at, :updated_at, :metadata_json
                )
                """,
                record,
            )
        self._run_opportunistic_maintenance(limit=1)
        return self.get_task_run(record["id"])

    def get_task_run(self, run_id: str) -> dict[str, Any]:
        with self.store.connection() as conn:
            row = conn.execute("SELECT * FROM task_runs WHERE id = ?", (run_id,)).fetchone()
            if row is None:
                raise KeyError(run_id)
            return self._row_to_task_run(row)

    def finish_task_run(
        self,
        run_id: str,
        *,
        status: str,
        result_summary: str | None = None,
        error_message: str | None = None,
    ) -> dict[str, Any]:
        now = utcnow()
        with self.store.connection() as conn:
            conn.execute(
                """
                UPDATE task_runs
                SET status = ?, result_summary = ?, error_message = ?, updated_at = ?, completed_at = ?
                WHERE id = ?
                """,
                (status, result_summary, error_message, now, now, run_id),
            )
        self._run_opportunistic_maintenance(limit=1)
        return self.get_task_run(run_id)

    def list_task_runs(self, *, task_id: str | None = None, status: str | None = None, limit: int = 50) -> list[dict[str, Any]]:
        sql = ["SELECT * FROM task_runs WHERE 1=1"]
        params: list[Any] = []
        if task_id is not None:
            sql.append("AND task_id = ?")
            params.append(task_id)
        if status is not None:
            sql.append("AND status = ?")
            params.append(status)
        sql.append("ORDER BY created_at DESC LIMIT ?")
        params.append(limit)
        with self.store.connection() as conn:
            rows = conn.execute(" ".join(sql), params).fetchall()
            return [self._row_to_task_run(row) for row in rows]

    def create_artifact(
        self,
        *,
        task_id: str,
        artifact_type: str,
        title: str,
        content: str,
        fmt: str = "md",
        status: str = "active",
        source_ref: str | None = None,
        metadata: dict[str, Any] | None = None,
        artifact_id: str | None = None,
    ) -> dict[str, Any]:
        now = utcnow()
        record = {
            "id": artifact_id or make_id("art"),
            "task_id": task_id,
            "artifact_type": artifact_type,
            "title": title,
            "content": content,
            "format": fmt,
            "status": status,
            "source_ref": source_ref,
            "created_at": now,
            "updated_at": now,
            "metadata_json": self.store.dumps(metadata or {}),
        }
        with self.store.connection() as conn:
            conn.execute(
                """
                INSERT INTO artifacts (
                    id, task_id, artifact_type, title, content, format, status, source_ref, created_at, updated_at, metadata_json
                ) VALUES (
                    :id, :task_id, :artifact_type, :title, :content, :format, :status, :source_ref, :created_at, :updated_at, :metadata_json
                )
                """,
                record,
            )
        self._run_opportunistic_maintenance(limit=1)
        return self.get_artifact(record["id"])

    def get_artifact(self, artifact_id: str) -> dict[str, Any]:
        with self.store.connection() as conn:
            row = conn.execute("SELECT * FROM artifacts WHERE id = ?", (artifact_id,)).fetchone()
            if row is None:
                raise KeyError(artifact_id)
            return self._row_to_artifact(row)

    def list_artifacts(self, *, task_id: str | None = None, limit: int = 50) -> list[dict[str, Any]]:
        sql = ["SELECT * FROM artifacts WHERE 1=1"]
        params: list[Any] = []
        if task_id is not None:
            sql.append("AND task_id = ?")
            params.append(task_id)
        sql.append("ORDER BY created_at DESC LIMIT ?")
        params.append(limit)
        with self.store.connection() as conn:
            rows = conn.execute(" ".join(sql), params).fetchall()
            return [self._row_to_artifact(row) for row in rows]

    def dashboard_snapshot(self, *, owner_agent: str | None = None) -> dict[str, Any]:
        active_tasks = self.list_tasks(owner_agent=owner_agent, limit=50)
        active_tasks = [task for task in active_tasks if task["status"] in {"open", "draft", "in_progress"}][:20]
        blocked_tasks = self.list_tasks(status="blocked", owner_agent=owner_agent, limit=20)
        recent_runs = self.list_task_runs(limit=20)
        recent_artifacts = self.list_artifacts(limit=20)
        return {
            "active_tasks": active_tasks,
            "blocked_tasks": blocked_tasks,
            "recent_runs": recent_runs,
            "recent_artifacts": recent_artifacts,
        }

    def task_bundle(self, task_id: str) -> dict[str, Any]:
        task = self.get_task(task_id)
        with self.store.connection() as conn:
            child_rows = conn.execute(
                "SELECT * FROM tasks WHERE parent_task_id = ? ORDER BY priority ASC, created_at ASC",
                (task_id,),
            ).fetchall()
        return {
            "task": task,
            "children": [self._row_to_task(row) for row in child_rows],
            "runs": self.list_task_runs(task_id=task_id, limit=100),
            "artifacts": self.list_artifacts(task_id=task_id, limit=100),
        }

    def audit_v2(self) -> dict[str, Any]:
        with self.store.connection() as conn:
            approvals = self._count_if_table_exists(conn, "approvals")
            handoffs = self._count_if_table_exists(conn, "handoffs")
            legacy_memory_rows = conn.execute(
                """
                SELECT COUNT(*) AS count
                FROM memories
                WHERE schema_version != ?
                   OR json_extract(metadata_json, '$.legacy_kind') IS NOT NULL
                   OR json_extract(metadata_json, '$.legacy_system') IS NOT NULL
                """,
                (SCHEMA_VERSION,),
            ).fetchone()["count"]
            legacy_task_rows = conn.execute(
                """
                SELECT COUNT(*) AS count
                FROM tasks
                WHERE schema_version != ?
                   OR json_extract(metadata_json, '$.legacy_kind') IS NOT NULL
                   OR json_extract(metadata_json, '$.legacy_system') IS NOT NULL
                """,
                (SCHEMA_VERSION,),
            ).fetchone()["count"]
            memories = conn.execute("SELECT COUNT(*) AS count FROM memories").fetchone()["count"]
            tasks = conn.execute("SELECT COUNT(*) AS count FROM tasks").fetchone()["count"]
            maintenance_jobs = self._count_if_table_exists(conn, "maintenance_jobs")
        return {
            "schema_version": SCHEMA_VERSION,
            "memories": memories,
            "tasks": tasks,
            "approvals": approvals,
            "handoffs": handoffs,
            "maintenance_jobs": maintenance_jobs,
            "legacy_memory_rows": legacy_memory_rows,
            "legacy_task_rows": legacy_task_rows,
        }

    def migrate_v2(self) -> dict[str, Any]:
        before = self.audit_v2()
        migrated = {"memories": 0, "tasks": 0, "approval_artifacts": 0, "handoff_artifacts": 0}
        with self.store.connection() as conn:
            memory_rows = conn.execute("SELECT * FROM memories").fetchall()
            for row in memory_rows:
                record = self._migrate_memory_row(dict(row))
                if record is None:
                    continue
                conn.execute(
                    """
                    UPDATE memories
                    SET schema_version = ?, type = ?, subtype = ?, origin_agent = ?, run_id = ?, task_id = ?,
                        url = ?, domain = ?, summary = ?, metadata_json = ?, updated_at = ?
                    WHERE id = ?
                    """,
                    (
                        record["schema_version"],
                        record["type"],
                        record["subtype"],
                        record["origin_agent"],
                        record["run_id"],
                        record["task_id"],
                        record["url"],
                        record["domain"],
                        record["summary"],
                        record["metadata_json"],
                        utcnow(),
                        record["id"],
                    ),
                )
                migrated["memories"] += 1
            task_rows = conn.execute("SELECT * FROM tasks").fetchall()
            for row in task_rows:
                record = self._migrate_task_row(dict(row))
                if record is None:
                    continue
                conn.execute(
                    """
                    UPDATE tasks
                    SET schema_version = ?, run_id = ?, metadata_json = ?, updated_at = ?
                    WHERE id = ?
                    """,
                    (record["schema_version"], record["run_id"], record["metadata_json"], utcnow(), record["id"]),
                )
                migrated["tasks"] += 1
            if self._table_exists(conn, "approvals"):
                for row in conn.execute("SELECT * FROM approvals").fetchall():
                    approval = dict(row)
                    task_row = conn.execute("SELECT * FROM tasks WHERE id = ?", (approval["task_id"],)).fetchone()
                    if task_row is None:
                        continue
                    task = self._row_to_task(task_row)
                    metadata = dict(task.get("metadata", {}))
                    metadata["approval"] = {
                        "kind": approval["kind"],
                        "risk_level": approval["risk_level"],
                        "payload": self.store.loads(approval["payload_json"]) or {},
                        "status": approval["status"],
                        "requested_at": approval["requested_at"],
                        "resolved_at": approval["resolved_at"],
                        "resolution_note": approval["resolution_note"],
                    }
                    conn.execute(
                        """
                        UPDATE tasks
                        SET metadata_json = ?, updated_at = ?
                        WHERE id = ?
                        """,
                        (self.store.dumps(metadata), utcnow(), approval["task_id"]),
                    )
                    conn.execute(
                        """
                        INSERT INTO artifacts (
                            id, task_id, artifact_type, title, content, format, status, source_ref, created_at, updated_at, metadata_json
                        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                        """,
                        (
                            make_id("art"),
                            approval["task_id"],
                            "approval_resolution",
                            f"Approval {approval['kind']}",
                            json.dumps(metadata["approval"], sort_keys=True),
                            "json",
                            "archived" if approval["status"] != "pending" else "active",
                            "migration:v2",
                            utcnow(),
                            utcnow(),
                            self.store.dumps({}),
                        ),
                    )
                    migrated["approval_artifacts"] += 1
                conn.execute("DROP TABLE approvals")
            if self._table_exists(conn, "handoffs"):
                for row in conn.execute("SELECT * FROM handoffs").fetchall():
                    handoff = dict(row)
                    payload = self.store.loads(handoff["payload_json"]) or {}
                    conn.execute(
                        """
                        INSERT INTO artifacts (
                            id, task_id, artifact_type, title, content, format, status, source_ref, created_at, updated_at, metadata_json
                        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                        """,
                        (
                            make_id("art"),
                            handoff["task_id"],
                            "delegation_note",
                            f"Handoff {handoff['from_agent']} -> {handoff['to_agent']}",
                            json.dumps(
                                {
                                    "from_agent": handoff["from_agent"],
                                    "to_agent": handoff["to_agent"],
                                    "reason": handoff["reason"],
                                    "status": handoff["status"],
                                    "payload": payload,
                                    "result_summary": handoff["result_summary"],
                                    "error_message": handoff["error_message"],
                                    "completed_at": handoff["completed_at"],
                                },
                                sort_keys=True,
                            ),
                            "json",
                            "active",
                            "migration:v2",
                            utcnow(),
                            utcnow(),
                            self.store.dumps({}),
                        ),
                    )
                    migrated["handoff_artifacts"] += 1
                conn.execute("DROP TABLE handoffs")
        after = self.audit_v2()
        return {"status": "ok", "before": before, "after": after, "migrated": migrated}

    def _capture_source(
        self, trigger_type: str, source_kind: str, source_ref: str, scope: str, content: str, metadata: dict[str, Any]
    ) -> dict[str, Any]:
        now = utcnow()
        source = {
            "id": make_id("src"),
            "source_kind": source_kind,
            "title": metadata.get("title") or source_ref,
            "content": content,
            "source_ref": source_ref,
            "metadata_json": self.store.dumps(metadata),
            "created_at": now,
        }
        event = {
            "id": make_id("ing"),
            "trigger_type": trigger_type,
            "source_id": source["id"],
            "requested_scope": scope,
            "job_state": "pending",
            "processor_outcome": None,
            "retry_count": 0,
            "error_message": None,
            "created_at": now,
            "updated_at": now,
            "processed_at": None,
        }
        with self.store.connection() as conn:
            conn.execute(
                """
                INSERT INTO sources (id, source_kind, title, content, source_ref, metadata_json, created_at)
                VALUES (:id, :source_kind, :title, :content, :source_ref, :metadata_json, :created_at)
                """,
                source,
            )
            conn.execute(
                """
                INSERT INTO ingestion_events (
                    id, trigger_type, source_id, requested_scope, job_state, processor_outcome,
                    retry_count, error_message, created_at, updated_at, processed_at
                ) VALUES (
                    :id, :trigger_type, :source_id, :requested_scope, :job_state, :processor_outcome,
                    :retry_count, :error_message, :created_at, :updated_at, :processed_at
                )
                """,
                event,
            )
        self._run_opportunistic_maintenance(limit=1)
        return {"source": source, "event": event}

    def _run_opportunistic_maintenance(self, *, limit: int) -> None:
        if self._maintenance_check_running:
            return
        due_jobs = self.list_due_maintenance_jobs(limit=1)
        if not due_jobs:
            return
        self._maintenance_check_running = True
        try:
            from .engine import MemoryEngine

            MemoryEngine(self).run_due_maintenance(limit=limit)
        except Exception:
            return
        finally:
            self._maintenance_check_running = False

    def _memory_filter_sql(self, sql: list[str], filters: dict[str, Any]) -> list[Any]:
        params: list[Any] = []
        allowed = {"project_id", "repo_id", "type", "subtype", "run_id", "task_id", "origin_agent", "url", "domain"}
        for key, value in filters.items():
            if key not in allowed or value is None:
                continue
            sql.append(f"AND {key} = ?")
            params.append(value)
        return params

    def _facet_score(self, memory: dict[str, Any], filters: dict[str, Any]) -> float:
        tracked = ["project_id", "repo_id", "type", "subtype", "run_id", "task_id", "origin_agent", "url", "domain"]
        applied = [key for key in tracked if filters.get(key) is not None]
        if not applied:
            return 0.0
        matched = sum(1 for key in applied if memory.get(key) == filters[key])
        return matched / len(applied)

    def _migrate_memory_row(self, row: dict[str, Any]) -> dict[str, Any] | None:
        metadata = self.store.loads(row.get("metadata_json")) or {}
        if row.get("schema_version") == SCHEMA_VERSION and not metadata.get("legacy_kind") and not metadata.get("legacy_system"):
            return None
        subtype = self._memory_subtype(metadata)
        origin_agent = row.get("origin_agent") or metadata.get("origin_agent") or metadata.get("legacy_system") or row.get("agent_id")
        run_id = row.get("run_id") or metadata.get("run_id") or metadata.get("legacy_run_id") or self._infer_run_id(row, subtype)
        task_id = row.get("task_id") or metadata.get("task_id")
        url = row.get("url") or metadata.get("url") or metadata.get("source_url") or self._infer_url(row, subtype)
        domain = row.get("domain") or metadata.get("domain") or (urlparse(url).netloc if url else None)
        clean = {key: value for key, value in metadata.items() if key not in {"legacy_kind", "legacy_system", "legacy_run_id", "run_id", "task_id", "url", "domain", "source_url", "record_kind", "schema"}}
        if subtype == "research_run":
            clean["goal"] = clean.get("goal") or _line_value(row["content"], "Goal") or row["title"].removeprefix("Research run: ").strip()
            clean["scope"] = clean.get("scope") or _line_value(row["content"], "Scope")
            clean["assumptions"] = clean.get("assumptions") or _line_value(row["content"], "Assumptions")
            clean["summary"] = clean.get("summary") or _line_value(row["content"], "Summary") or row["summary"]
        clean["schema"] = SCHEMA_VERSION
        clean["subtype"] = subtype
        record_type = row["type"]
        if subtype == "router_handoff":
            record_type = "episode"
        elif subtype == "research_source":
            record_type = "artifact"
        return {
            "id": row["id"],
            "schema_version": SCHEMA_VERSION,
            "type": record_type,
            "subtype": subtype,
            "origin_agent": origin_agent,
            "run_id": run_id,
            "task_id": task_id,
            "url": url,
            "domain": domain,
            "summary": clean.get("summary", row["summary"]),
            "metadata_json": self.store.dumps(clean),
        }

    def _migrate_task_row(self, row: dict[str, Any]) -> dict[str, Any] | None:
        metadata = self.store.loads(row.get("metadata_json")) or {}
        if row.get("schema_version") == SCHEMA_VERSION and not metadata.get("legacy_kind") and not metadata.get("legacy_system"):
            return None
        run_id = row.get("run_id") or metadata.get("run_id") or metadata.get("legacy_run_id")
        clean = {key: value for key, value in metadata.items() if key not in {"legacy_kind", "legacy_system", "legacy_run_id"}}
        clean["schema"] = SCHEMA_VERSION
        return {
            "id": row["id"],
            "schema_version": SCHEMA_VERSION,
            "run_id": run_id,
            "metadata_json": self.store.dumps(clean),
        }

    def _ensure_default_jobs(self) -> None:
        defaults = [
            {
                "id": "job_consolidation_daily_10",
                "job_type": "consolidation",
                "cadence": "daily",
                "interval_minutes": None,
                "window_start": "10:00",
                "window_end": None,
                "metadata": {
                    "mode": "daily_10",
                    "candidate_age_days": 0,
                    "max_candidates": 200,
                    "dedupe_high_threshold": 0.9,
                    "dedupe_mid_threshold": 0.75,
                    "promote_confidence": 0.7,
                    "conflict_threshold": 0.55,
                    "llm_enabled": False,
                    "llm_provider": "opencode",
                    "llm_command": ["opencode", "-m", "MiniMax M2.5 Free"],
                },
            },
            {
                "id": "job_consolidation_daily_15",
                "job_type": "consolidation",
                "cadence": "daily",
                "interval_minutes": None,
                "window_start": "15:00",
                "window_end": None,
                "metadata": {
                    "mode": "daily_15",
                    "candidate_age_days": 0,
                    "max_candidates": 200,
                    "dedupe_high_threshold": 0.9,
                    "dedupe_mid_threshold": 0.75,
                    "promote_confidence": 0.7,
                    "conflict_threshold": 0.55,
                    "llm_enabled": False,
                    "llm_provider": "opencode",
                    "llm_command": ["opencode", "-m", "MiniMax M2.5 Free"],
                },
            },
            {
                "id": "job_consolidation_weekly",
                "job_type": "consolidation",
                "cadence": "weekly",
                "interval_minutes": None,
                "window_start": None,
                "window_end": None,
                "metadata": {
                    "mode": "weekly",
                    "candidate_age_days": 0,
                    "max_candidates": 800,
                    "dedupe_high_threshold": 0.9,
                    "dedupe_mid_threshold": 0.75,
                    "promote_confidence": 0.7,
                    "conflict_threshold": 0.55,
                    "llm_enabled": True,
                    "llm_provider": "opencode",
                    "llm_command": ["opencode", "-m", "MiniMax M2.5 Free"],
                },
            },
        ]
        with self.store.connection() as conn:
            legacy = conn.execute("SELECT id FROM maintenance_jobs WHERE id = ?", ("job_consolidation_daily",)).fetchone()
            if legacy:
                conn.execute(
                    """
                    UPDATE maintenance_jobs
                    SET enabled = 0, last_summary = ?, updated_at = ?
                    WHERE id = ?
                    """,
                    ("deprecated: split into 10:00 and 15:00 jobs", utcnow(), "job_consolidation_daily"),
                )
            for job in defaults:
                row = conn.execute("SELECT id FROM maintenance_jobs WHERE id = ?", (job["id"],)).fetchone()
                if row:
                    continue
                now = utcnow()
                next_due = self._compute_next_due(now, job["cadence"], job["interval_minutes"], job["window_start"], job["window_end"])
                conn.execute(
                    """
                    INSERT INTO maintenance_jobs (
                        id, job_type, cadence, interval_minutes, window_start, window_end, next_due_at,
                        last_run_at, last_status, last_summary, enabled, metadata_json, created_at, updated_at
                    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                    """,
                    (
                        job["id"],
                        job["job_type"],
                        job["cadence"],
                        job["interval_minutes"],
                        job["window_start"],
                        job["window_end"],
                        next_due,
                        None,
                        None,
                        None,
                        1,
                        self.store.dumps(job["metadata"]),
                        now,
                        now,
                    ),
                )

    def list_maintenance_jobs(self) -> list[dict[str, Any]]:
        with self.store.connection() as conn:
            rows = conn.execute("SELECT * FROM maintenance_jobs ORDER BY next_due_at ASC").fetchall()
            return [self._row_to_maintenance_job(row) for row in rows]

    def list_due_maintenance_jobs(self, *, now: str | None = None, limit: int | None = None) -> list[dict[str, Any]]:
        now = now or utcnow()
        sql = "SELECT * FROM maintenance_jobs WHERE enabled = 1 AND next_due_at <= ? ORDER BY next_due_at ASC"
        params: list[Any] = [now]
        if limit is not None:
            sql += " LIMIT ?"
            params.append(limit)
        with self.store.connection() as conn:
            rows = conn.execute(sql, params).fetchall()
            return [self._row_to_maintenance_job(row) for row in rows]

    def get_maintenance_job(self, job_id: str) -> dict[str, Any]:
        with self.store.connection() as conn:
            row = conn.execute("SELECT * FROM maintenance_jobs WHERE id = ?", (job_id,)).fetchone()
            if row is None:
                raise KeyError(job_id)
            return self._row_to_maintenance_job(row)

    def update_maintenance_job(
        self,
        job_id: str,
        *,
        cadence: str | None = None,
        interval_minutes: int | None = None,
        window_start: str | None = None,
        window_end: str | None = None,
        next_due_at: str | None = None,
        last_run_at: str | None = None,
        last_status: str | None = None,
        last_summary: str | None = None,
        enabled: bool | None = None,
        metadata: dict[str, Any] | None = None,
    ) -> dict[str, Any]:
        job = self.get_maintenance_job(job_id)
        if cadence is not None:
            job["cadence"] = cadence
        if interval_minutes is not None:
            job["interval_minutes"] = interval_minutes
        if window_start is not None:
            job["window_start"] = window_start
        if window_end is not None:
            job["window_end"] = window_end
        if next_due_at is not None:
            job["next_due_at"] = next_due_at
        if last_run_at is not None:
            job["last_run_at"] = last_run_at
        if last_status is not None:
            job["last_status"] = last_status
        if last_summary is not None:
            job["last_summary"] = last_summary
        if enabled is not None:
            job["enabled"] = enabled
        if metadata is not None:
            job["metadata"] = metadata
        with self.store.connection() as conn:
            conn.execute(
                """
                UPDATE maintenance_jobs
                SET cadence = ?, interval_minutes = ?, window_start = ?, window_end = ?,
                    next_due_at = ?, last_run_at = ?, last_status = ?, last_summary = ?, enabled = ?, metadata_json = ?, updated_at = ?
                WHERE id = ?
                """,
                (
                    job["cadence"],
                    job.get("interval_minutes"),
                    job.get("window_start"),
                    job.get("window_end"),
                    job["next_due_at"],
                    job.get("last_run_at"),
                    job.get("last_status"),
                    job.get("last_summary"),
                    1 if job.get("enabled") else 0,
                    self.store.dumps(job.get("metadata") or {}),
                    utcnow(),
                    job_id,
                ),
            )
        return self.get_maintenance_job(job_id)

    def compute_next_due(
        self,
        *,
        now: str | None = None,
        cadence: str,
        interval_minutes: int | None,
        window_start: str | None,
        window_end: str | None,
    ) -> str:
        return self._compute_next_due(
            now or utcnow(),
            cadence,
            interval_minutes,
            window_start,
            window_end,
        )

    def create_maintenance_run(self, job_id: str) -> dict[str, Any]:
        run_id = make_id("mrun")
        now = utcnow()
        with self.store.connection() as conn:
            conn.execute(
                """
                INSERT INTO maintenance_runs (id, job_id, status, started_at, completed_at, summary, stats_json, error_message)
                VALUES (?, ?, ?, ?, ?, ?, ?, ?)
                """,
                (run_id, job_id, "running", now, None, None, self.store.dumps({}), None),
            )
        return self.get_maintenance_run(run_id)

    def finish_maintenance_run(
        self,
        run_id: str,
        *,
        status: str,
        summary: str | None = None,
        stats: dict[str, Any] | None = None,
        error_message: str | None = None,
    ) -> dict[str, Any]:
        now = utcnow()
        with self.store.connection() as conn:
            conn.execute(
                """
                UPDATE maintenance_runs
                SET status = ?, completed_at = ?, summary = ?, stats_json = ?, error_message = ?
                WHERE id = ?
                """,
                (status, now, summary, self.store.dumps(stats or {}), error_message, run_id),
            )
        return self.get_maintenance_run(run_id)

    def get_maintenance_run(self, run_id: str) -> dict[str, Any]:
        with self.store.connection() as conn:
            row = conn.execute("SELECT * FROM maintenance_runs WHERE id = ?", (run_id,)).fetchone()
            if row is None:
                raise KeyError(run_id)
            return self._row_to_maintenance_run(row)

    def list_maintenance_runs(self, *, job_id: str | None = None, limit: int = 20) -> list[dict[str, Any]]:
        sql = "SELECT * FROM maintenance_runs WHERE 1=1"
        params: list[Any] = []
        if job_id is not None:
            sql += " AND job_id = ?"
            params.append(job_id)
        sql += " ORDER BY started_at DESC LIMIT ?"
        params.append(limit)
        with self.store.connection() as conn:
            rows = conn.execute(sql, params).fetchall()
            return [self._row_to_maintenance_run(row) for row in rows]

    def _memory_subtype(self, metadata: dict[str, Any]) -> str | None:
        record_kind = metadata.get("record_kind") or metadata.get("legacy_kind") or metadata.get("subtype")
        mapping = {
            "research_run": "research_run",
            "research_step": "research_step",
            "source": "research_source",
            "research_source": "research_source",
            "claim": "research_claim",
            "research_claim": "research_claim",
            "artifact": "research_artifact",
            "research_artifact": "research_artifact",
            "leisure_item": "leisure_item",
            "router_handoff": "router_handoff",
        }
        return mapping.get(record_kind, record_kind)

    def _infer_run_id(self, row: dict[str, Any], subtype: str | None) -> str | None:
        source_ref = row.get("source_ref") or ""
        if source_ref.startswith("personal-agent:run:"):
            return source_ref.rsplit(":", 1)[-1]
        if subtype == "research_run":
            return row["id"]
        return None

    def _infer_url(self, row: dict[str, Any], subtype: str | None) -> str | None:
        if subtype in {"research_source", "research_claim", "research_artifact"}:
            evidence_ref = row.get("evidence_ref")
            if isinstance(evidence_ref, str) and evidence_ref.startswith(("http://", "https://")):
                return evidence_ref
        return None

    def _upsert_fts(self, conn: Any, record: dict[str, Any]) -> None:
        conn.execute("DELETE FROM memories_fts WHERE memory_id = ?", (record["id"],))
        conn.execute(
            "INSERT INTO memories_fts (memory_id, title, summary, content) VALUES (?, ?, ?, ?)",
            (record["id"], record["title"], record["summary"], record["content"]),
        )

    def _row_to_memory(self, row: Any) -> dict[str, Any]:
        memory = dict(row)
        memory["metadata"] = self.store.loads(memory.pop("metadata_json")) or {}
        memory["embedding"] = self._decode_embedding(memory.pop("embedding_json"))
        return memory

    def _row_to_task(self, row: Any) -> dict[str, Any]:
        task = dict(row)
        task["metadata"] = self.store.loads(task.pop("metadata_json")) or {}
        task["requires_human_input"] = bool(task["requires_human_input"])
        return task

    def _row_to_task_run(self, row: Any) -> dict[str, Any]:
        run = dict(row)
        run["input_payload"] = self.store.loads(run.pop("input_payload_json")) or {}
        run["metadata"] = self.store.loads(run.pop("metadata_json")) or {}
        return run

    def _row_to_artifact(self, row: Any) -> dict[str, Any]:
        artifact = dict(row)
        artifact["metadata"] = self.store.loads(artifact.pop("metadata_json")) or {}
        return artifact

    def _row_to_maintenance_job(self, row: Any) -> dict[str, Any]:
        job = dict(row)
        job["metadata"] = self.store.loads(job.pop("metadata_json")) or {}
        job["enabled"] = bool(job["enabled"])
        return job

    def _row_to_maintenance_run(self, row: Any) -> dict[str, Any]:
        run = dict(row)
        run["stats"] = self.store.loads(run.pop("stats_json")) or {}
        return run

    def _make_summary(self, content: str, max_len: int = 180) -> str:
        compact = " ".join(content.split())
        return compact[: max_len - 1] + "…" if len(compact) > max_len else compact

    def _encode_embedding(self, embedding: list[float] | None) -> str | None:
        if embedding is None:
            return None
        return json.dumps(embedding)

    def _decode_embedding(self, embedding: str | None) -> list[float] | None:
        if not embedding:
            return None
        return list(json.loads(embedding))

    def _lexical_score(self, query_terms: list[str], haystack: str) -> float:
        if not query_terms:
            return 0.0
        matches = sum(1 for term in query_terms if term in haystack)
        return matches / len(query_terms)

    def _text_embedding(self, text: str, dimensions: int = 8) -> list[float]:
        vector = [0.0] * dimensions
        for token in normalize_text(text).split():
            for idx, char in enumerate(token.encode("utf-8")):
                vector[idx % dimensions] += float(char)
        norm = math.sqrt(sum(value * value for value in vector))
        if norm == 0:
            return vector
        return [value / norm for value in vector]

    def _semantic_score(self, query_embedding: list[float], memory_embedding: list[float] | None) -> float:
        if not query_embedding or memory_embedding is None:
            return 0.0
        numerator = sum(a * b for a, b in zip(query_embedding, memory_embedding))
        return max(0.0, min(1.0, numerator))

    def _table_exists(self, conn: Any, table: str) -> bool:
        row = conn.execute("SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?", (table,)).fetchone()
        return row is not None

    def _count_if_table_exists(self, conn: Any, table: str) -> int:
        if not self._table_exists(conn, table):
            return 0
        return conn.execute(f"SELECT COUNT(*) AS count FROM {table}").fetchone()["count"]

    def _compute_next_due(
        self,
        now: str,
        cadence: str,
        interval_minutes: int | None,
        window_start: str | None,
        window_end: str | None,
    ) -> str:
        base = datetime.fromisoformat(now)
        if cadence == "interval" and interval_minutes:
            return (base + timedelta(minutes=interval_minutes)).isoformat()
        if cadence == "weekly":
            target = base + timedelta(days=7)
        else:
            target = base + timedelta(days=1)
        if window_start:
            hour, minute = self._parse_hhmm(window_start)
            target = target.replace(hour=hour, minute=minute, second=0, microsecond=0)
        if window_end:
            end_hour, end_minute = self._parse_hhmm(window_end)
            end = target.replace(hour=end_hour, minute=end_minute, second=0, microsecond=0)
            if target > end:
                target = end
        return target.isoformat()

    def _parse_hhmm(self, value: str) -> tuple[int, int]:
        parts = value.split(":")
        if len(parts) != 2:
            return (0, 0)
        return (int(parts[0]), int(parts[1]))

    def _link_memories(self, conn: Any, from_id: str, to_id: str, relation: str, reason: str) -> None:
        conn.execute(
            """
            INSERT INTO memory_links (id, from_memory_id, to_memory_id, relation, created_at, metadata_json)
            VALUES (?, ?, ?, ?, ?, ?)
            """,
            (make_id("lnk"), from_id, to_id, relation, utcnow(), self.store.dumps({"reason": reason})),
        )

    def _record_conflict(self, conn: Any, memory_id: str, conflicting_id: str, reason: str) -> None:
        conn.execute(
            """
            INSERT INTO memory_conflicts (id, memory_id, conflicting_memory_id, reason, created_at, metadata_json)
            VALUES (?, ?, ?, ?, ?, ?)
            """,
            (make_id("mcf"), memory_id, conflicting_id, reason, utcnow(), self.store.dumps({})),
        )

    def _conflict_signal(self, left: dict[str, Any], right: dict[str, Any]) -> bool:
        allowed_types = {"profile", "decision", "project"}
        if left.get("type") not in allowed_types or right.get("type") not in allowed_types:
            return False
        negations = {"not", "no", "never", "nunca", "sin"}
        left_tokens = set(normalize_text(left["content"]).split())
        right_tokens = set(normalize_text(right["content"]).split())
        left_neg = bool(negations & left_tokens)
        right_neg = bool(negations & right_tokens)
        overlap = len(left_tokens & right_tokens)
        return overlap >= 3 and left_neg != right_neg

    def _llm_summarize_pair(
        self,
        left: dict[str, Any],
        right: dict[str, Any],
        *,
        provider: str,
        command: list[str],
    ) -> str | None:
        if provider != "opencode":
            return None
        prompt = (
            "Summarize and merge the two memory notes into one concise statement. Preserve key facts and avoid speculation.\n\n"
            f"Memory A:\n{left['content']}\n\nMemory B:\n{right['content']}\n"
        )
        try:
            result = subprocess.run(
                command,
                input=prompt,
                text=True,
                capture_output=True,
                timeout=60,
                check=False,
            )
            if result.returncode != 0:
                return None
            output = result.stdout.strip()
            return output or None
        except Exception:
            return None
