"""orchestrator.py — Engine generic para Factory v4 digital twin.

Lee `flows/<flow>.json` declarativo + dispatcha a `skills/<name>.py` registry.
Independiente del flow concreto · 1 binario para N flows.

Patrón canónico (Temporal/Prefect/Dagster adaptado):
  - flows/<flow>.json = workflow declarativo (stations, skills, supervisors, parallel)
  - skills/<name>.py = plug-in con interfaz uniforme run(inputs, context)→SkillResult
  - _runs/<flow_id>/caja.json = doc único mutable que viaja
  - _runs/<flow_id>/events.jsonl = event log append-only (Stage 3)
  - _task_state.json = state para visualización HTML
  - Supervisor BLOQUEA con retroloop max_retries (configurable per station)

Uso:
    python orchestrator.py --flow escalado_formatos --from-inbox _flow_inbox/<file>.json --once
    python orchestrator.py --flow escalado_formatos              # watcher infinito

Cambia de flow con --flow <flow_name>. Mismo orquestador para crear_producto, escalar_pagina, etc.
"""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
import time
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path

ROOT = Path(__file__).resolve().parent
sys.path.insert(0, str(ROOT))

from skills._lib import SkillRegistry, SkillContext, SkillResult  # noqa: E402

FLOWS_DIR = ROOT / 'flows'
INBOX_DIR = ROOT / '_flow_inbox'
RUNS_DIR = ROOT / '_runs'
STATE_PATH = ROOT / '_task_state.json'
STOP_FLAG = ROOT / '_orchestrator_stop.flag'

POLL_INTERVAL_S = 2

# ============================================================
# UTILIDADES
# ============================================================

def now_iso():
    return datetime.now(timezone.utc).isoformat()


def slugify(s: str) -> str:
    s = re.sub(r'[^a-zA-Z0-9_-]+', '_', s).strip('_')
    return s or 'flow'


def derive_flow_id(payload: dict, inbox_path: Path | None = None) -> str:
    if inbox_path is not None:
        return slugify(inbox_path.stem)
    product = slugify(payload.get('product', 'unknown'))
    ts = payload.get('timestamp', now_iso()).replace(':', '-').replace('.', '-')
    return slugify(f'{product}_{ts}')


# ============================================================
# CAJA — único doc mutable por flow
# ============================================================

class Caja:
    def __init__(self, flow_id: str, run_dir: Path, initial_input: dict):
        self.flow_id = flow_id
        self.run_dir = run_dir
        self.path = run_dir / 'caja.json'
        run_dir.mkdir(parents=True, exist_ok=True)
        if not self.path.exists():
            initial = {
                'flow_id': flow_id,
                'created_at': now_iso(),
                'input_start': initial_input,
                'sections': {},
                'history': [],
            }
            self._save(initial)

    def _load(self) -> dict:
        return json.loads(self.path.read_text(encoding='utf-8'))

    def _save(self, doc: dict):
        self.path.write_text(json.dumps(doc, ensure_ascii=False, indent=2), encoding='utf-8')

    def read_full(self) -> dict:
        return self._load()

    def read_section(self, key: str):
        return self._load().get('sections', {}).get(key)

    def write_section(self, key: str, value, mode: str = 'replace', actor: str = 'unknown'):
        doc = self._load()
        sections = doc.setdefault('sections', {})
        existing = sections.get(key)
        if mode == 'append':
            if existing is None:
                sections[key] = [value]
            elif isinstance(existing, list):
                existing.append(value)
            else:
                sections[key] = [existing, value]
        elif mode == 'merge':
            if isinstance(existing, dict) and isinstance(value, dict):
                existing.update(value)
            else:
                sections[key] = value
        else:
            sections[key] = value
        doc.setdefault('history', []).append({
            'ts': now_iso(),
            'actor': actor,
            'section': key,
            'mode': mode,
            'bytes_after': len(json.dumps(sections.get(key), ensure_ascii=False)),
        })
        self._save(doc)

    def size_bytes(self) -> int:
        return self.path.stat().st_size if self.path.exists() else 0


# ============================================================
# EVENT LOG (Stage 3)
# ============================================================

class EventLog:
    """Append-only event log por flow. JSONL."""

    def __init__(self, run_dir: Path):
        self.path = run_dir / 'events.jsonl'

    def emit(self, event_type: str, **kwargs):
        evt = {'ts': now_iso(), 'type': event_type, **kwargs}
        with self.path.open('a', encoding='utf-8') as f:
            f.write(json.dumps(evt, ensure_ascii=False) + '\n')


# ============================================================
# TASK STATE BRIDGE (espejo a _task_state.json para HTML)
# ============================================================

class TaskStateBridge:
    def __init__(self, flow_id: str, payload: dict, station_ids: list[str]):
        self.path = STATE_PATH
        self.flow_id = flow_id
        self._init(flow_id, payload, station_ids)

    def _init(self, flow_id, payload, station_ids):
        state = {
            'flow_id': flow_id,
            'input': payload,
            'route': station_ids,
            'routeIdx': 0,
            'state': 'processing',
            'history': [],
            'transition_t': 0,
            'success': None,
            'demo_logs_completed': [],
            'logs': [],
            'started_at_flow_s': 0,
            'started_at_station_s': 0,
            'elapsed_at_flow_s': 0,
            'elapsed_at_station_s': 0,
            'saved_at_wall': int(time.time() * 1000),
        }
        self._save(state)

    def _load(self) -> dict:
        if self.path.exists():
            try:
                return json.loads(self.path.read_text(encoding='utf-8'))
            except Exception:
                pass
        return {}

    def _save(self, s: dict):
        s['saved_at_wall'] = int(time.time() * 1000)
        try:
            self.path.write_text(json.dumps(s, ensure_ascii=False, indent=2), encoding='utf-8')
        except Exception as e:
            print(f'[bridge] WARN _task_state.json: {e}', file=sys.stderr)

    def append_log(self, level: str, text: str):
        s = self._load()
        if not s:
            return
        s.setdefault('logs', []).append({'ts': now_iso(), 'level': level, 'text': text})
        s['logs'] = s['logs'][-200:]
        self._save(s)
        try:
            print(f'[{level.upper()}] {text}')
        except Exception:
            pass

    def advance_station(self, idx: int):
        s = self._load()
        if not s: return
        s['routeIdx'] = idx
        s['state'] = 'processing'
        s['transition_t'] = 0
        s['started_at_station_s'] = 0
        self._save(s)

    def mark_done(self, success: bool, summary: str = ''):
        if summary:
            self.append_log('success' if success else 'error', summary)
        s = self._load()
        if not s: return
        s['state'] = 'done'
        s['success'] = success
        self._save(s)


# ============================================================
# SUPERVISOR WRAPPER + RETROLOOP
# ============================================================

@dataclass
class StationResult:
    ok: bool
    attempts: int
    last_break: str = ''
    output_summary: str = ''


def run_with_supervisor(reg: SkillRegistry, station: dict, caja: Caja, run_dir: Path,
                        events: EventLog, bridge: TaskStateBridge | None) -> StationResult:
    station_id = station['id']
    skill_name = station['skill']
    supervisor_skill = station.get('supervisor_skill', 'supervisor_esceptico')
    max_retries = int(station.get('max_retries', 3))

    last_break = ''
    for attempt in range(max_retries + 1):
        # Construir context con shared_state_path si la station tiene parallel_cluster
        ssp = None
        if 'parallel_cluster' in station:
            ssp_template = station['parallel_cluster'].get('shared_state_path', '')
            if ssp_template:
                ssp_relative = ssp_template.replace('{flow_id}', caja.flow_id)
                ssp = ROOT / ssp_relative.lstrip('_').lstrip('/').replace('_runs/', '_runs/')
                # más simple: si template empieza con _runs, resolverlo bajo ROOT
                if ssp_template.startswith('_runs/'):
                    ssp = ROOT / ssp_template.replace('{flow_id}', caja.flow_id)

        ctx = SkillContext(
            flow_id=caja.flow_id,
            run_dir=run_dir,
            station_id=station_id,
            caja=caja,
            shared_state_path=ssp,
        )

        # Inyectar feedback retry como input
        skill_inputs = {}
        if attempt > 0 and last_break:
            skill_inputs['retry_feedback'] = last_break
            if bridge:
                bridge.append_log('warn', f'↻ Retry {attempt}/{max_retries} en {station_id} · razón: {last_break[:140]}')

        events.emit('station_start', station=station_id, skill=skill_name, attempt=attempt)
        if bridge and attempt == 0:
            bridge.append_log('info', f'▶ Estación {station_id.upper()} · skill: {skill_name}')

        # 1. Ejecutar skill
        res: SkillResult = reg.invoke(skill_name, skill_inputs, ctx)
        events.emit('station_executor_done', station=station_id, ok=res.ok,
                    errors=res.errors[:3], output_keys=list(res.output.keys()))

        if not res.ok:
            last_break = f'EXECUTOR_FAIL · {"; ".join(res.errors)[:200]}'
            if bridge:
                bridge.append_log('error', f'✗ {station_id} executor FAIL · {last_break[:200]}')
            continue

        # 2. Auditar con supervisor (BLOQUEA)
        sup_inputs = {'audit_station_id': station_id}
        sup_res: SkillResult = reg.invoke(supervisor_skill, sup_inputs, ctx)
        events.emit('station_supervisor_done', station=station_id, supervisor=supervisor_skill,
                    ok=sup_res.ok, errors=sup_res.errors[:3])

        if sup_res.ok:
            if bridge:
                bridge.append_log('success', f'✓ {station_id} OK · supervisor APPROVE (attempt={attempt})')
            return StationResult(ok=True, attempts=attempt + 1, output_summary=str(res.output)[:200])
        else:
            last_break = '; '.join(sup_res.errors)[:200]
            if bridge:
                bridge.append_log('warn', f'⚠ {station_id} supervisor BREAK · {last_break[:200]}')

    return StationResult(ok=False, attempts=max_retries + 1, last_break=last_break)


# ============================================================
# RUN A FLOW
# ============================================================

def run_flow(flow_def: dict, inbox_path: Path) -> dict:
    payload = json.loads(inbox_path.read_text(encoding='utf-8'))
    flow_id = derive_flow_id(payload, inbox_path)
    run_dir = RUNS_DIR / flow_id
    run_dir.mkdir(parents=True, exist_ok=True)

    caja = Caja(flow_id, run_dir, payload)
    events = EventLog(run_dir)
    bridge = TaskStateBridge(flow_id, payload, [s['id'] for s in flow_def['stations']])
    reg = SkillRegistry()

    bridge.append_log('info', f'Flow "{flow_def["flow_name"]}" arrancado · flow_id={flow_id}')
    events.emit('flow_start', flow_id=flow_id, flow_name=flow_def['flow_name'], input=payload)

    log = [f'[start] flow_id={flow_id} · stations={len(flow_def["stations"])}']

    for idx, station in enumerate(flow_def['stations']):
        sid = station['id']
        bridge.advance_station(idx)
        bridge.append_log('info', f'➤ Entrando estación {station["label"]} · skill canónica: {station["skill"]}')
        log.append(f'[station] {sid} · skill={station["skill"]}')

        result = run_with_supervisor(reg, station, caja, run_dir, events, bridge)

        if not result.ok:
            log.append(f'[ABORT] {sid} · retries agotados ({result.attempts}) · last_break={result.last_break[:200]}')
            events.emit('flow_abort', station=sid, reason=result.last_break)
            bridge.mark_done(False, f'Flow abortado en {sid}: {result.last_break[:200]}')
            return {'status': 'FAIL', 'flow_id': flow_id, 'station': sid, 'reason': result.last_break,
                    'log': log, 'caja_path': str(caja.path)}
        log.append(f'[ok] {sid} · attempts={result.attempts}')

    log.append(f'[done] flow {flow_id} · caja={caja.path} · size={caja.size_bytes()}b')
    events.emit('flow_done', flow_id=flow_id, caja_size=caja.size_bytes())
    bridge.mark_done(True, f'Flow {flow_id} OK · caja {caja.size_bytes()}b')
    return {'status': 'OK', 'flow_id': flow_id, 'log': log, 'caja_path': str(caja.path)}


# ============================================================
# CLI
# ============================================================

def load_flow_def(flow_name: str) -> dict:
    p = FLOWS_DIR / f'{flow_name}.json'
    if not p.exists():
        raise FileNotFoundError(f'flow no encontrado: {p}')
    return json.loads(p.read_text(encoding='utf-8'))


def main():
    p = argparse.ArgumentParser()
    p.add_argument('--flow', type=str, default='escalado_formatos', help='nombre del flow (flows/<name>.json)')
    p.add_argument('--from-inbox', type=str, help='ruta a inbox JSON · si no, watcher infinito')
    p.add_argument('--once', action='store_true', help='procesa un solo file y sale')
    args = p.parse_args()

    try:
        flow_def = load_flow_def(args.flow)
    except FileNotFoundError as e:
        print(f'ERROR: {e}', file=sys.stderr)
        sys.exit(2)

    if args.from_inbox:
        inbox_path = Path(args.from_inbox)
        if not inbox_path.is_absolute():
            inbox_path = ROOT / inbox_path
        if not inbox_path.exists():
            print(f'ERROR inbox no existe: {inbox_path}', file=sys.stderr)
            sys.exit(2)
        result = run_flow(flow_def, inbox_path)
        print(json.dumps(result, ensure_ascii=False, indent=2))
        sys.exit(0 if result['status'] == 'OK' else 1)

    print(f'[watcher] flow={args.flow} · poll {INBOX_DIR} cada {POLL_INTERVAL_S}s')
    while True:
        if STOP_FLAG.exists():
            print('[watcher] stop flag · exit')
            return
        for f in sorted(INBOX_DIR.glob('flow_*.json')):
            done = f.with_suffix('.done')
            if done.exists():
                continue
            print(f'[watcher] procesando {f.name}')
            res = run_flow(flow_def, f)
            done.write_text(json.dumps(res, ensure_ascii=False, indent=2), encoding='utf-8')
            print(f'[watcher] {f.name} → {res["status"]}')
            if args.once:
                return
        time.sleep(POLL_INTERVAL_S)


if __name__ == '__main__':
    sys.stdout.reconfigure(encoding='utf-8')
    main()
