"""orchestrator_v2 — Factory 3D Escalado Formatos · CONTRATO FACTRY 2026-05-09
============================================================================

Wire actualizado al contrato cristalizado en chat Fer 2026-05-09 (msgs 4684–4701):

  1. CAJA ÚNICA MUTABLE — un solo `caja.json` por flow que viaja de estación
     en estación. Cada estación lee la caja entera, escribe SU sección. NO
     archivos sueltos `_analysis.json`, `_strategy.json`, etc.

  2. SUPERVISOR BLOQUEA + RETROLOOP — tras cada estación, supervisor (L1+L2,
     escalando a L3 en flag) audita lo escrito en la caja. Si BREAK → re-spawn
     ejecutor con feedback inline. max_retries=5 default.

  3. SHARED_STATE.JSON BUSCAR×4 — los 4 buscadores en paralelo coordinan
     antes de cada query: leen `_buscar_shared_state.json`, evitan duplicar
     queries/video_ids, escriben sus claims antes de avanzar.

  4. DRY MODE (--dry-run) — smoke test sin LLM/network/Trello reales.
     Ejecutores generan fixtures plausibles, supervisor siempre APPROVE.

Modo de uso:

    # Smoke test interno (sin red, sin LLM, sin Trello):
    python orchestrator_v2.py --dry-run --from-inbox _flow_inbox/<file>.json --once

    # Run real (cuando los stubs estén conectados a subagentes/skills):
    python orchestrator_v2.py --from-inbox _flow_inbox/<file>.json

REGLAS DURAS HARDWIRED:
  - 1 agente = 1 skill (factory_metadata.json es la fuente de verdad)
  - Supervisor BLOQUEA · max_retries=5 · sin atajos
  - Caja única mutable · append/merge por sección · NUNCA dispersar
  - Default Opus en cualquier llamada LLM (msg 4658)
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import time
import subprocess
import re
from pathlib import Path
from datetime import datetime, timezone

ROOT = Path(__file__).parent
META_PATH = ROOT / 'factory_metadata.json'
INBOX_DIR = ROOT / '_flow_inbox'
RUNS_DIR = ROOT / '_runs'
STATE_PATH = ROOT / '_task_state.json'
STOP_FLAG = ROOT / '_orchestrator_stop.flag'

POLL_INTERVAL_S = 2
DEFAULT_MAX_RETRIES = 5

# Telegram SOS — ping a Fer cuando la rescue ladder agota QC + MASTER
# (REGLA Fer 2026-05-09 msg 4789: 3 hipótesis QC en serie → 3 master en serie → telegram)
TELEGRAM_TOKEN_SOS = "8345671365:AAFNAmnzIBleXEOMvMOiYFUR6xkLU_Mehf8"
TELEGRAM_CHAT_FER = "6002482241"

# ============================================================
# UTILIDADES
# ============================================================

def now_iso():
    return datetime.now(timezone.utc).isoformat()

def now_ms():
    return int(time.time() * 1000)

def slugify(s: str) -> str:
    s = re.sub(r'[^a-zA-Z0-9_-]+', '_', s).strip('_')
    return s or 'flow'

def derive_flow_id(inbox_payload: dict, inbox_path: Path | None = None) -> str:
    product = slugify(inbox_payload.get('product', 'unknown'))
    ts = inbox_payload.get('timestamp', now_iso()).replace(':', '-').replace('.', '-')
    base = f"{product}_{ts}"
    if inbox_path is not None:
        base = inbox_path.stem
    return slugify(base)

def load_meta() -> dict:
    return json.loads(META_PATH.read_text(encoding='utf-8'))

# ============================================================
# CAJA — único doc mutable que viaja
# ============================================================

class Caja:
    """Documento único mutable por flow. JSON con secciones por station_id."""

    def __init__(self, flow_id: str, run_dir: Path, initial_input: dict):
        self.flow_id = flow_id
        self.run_dir = run_dir
        self.path = run_dir / 'caja.json'
        self.run_dir.mkdir(parents=True, exist_ok=True)
        if not self.path.exists():
            initial = {
                'flow_id': flow_id,
                'created_at': now_iso(),
                'input_start': initial_input,
                'sections': {},
                'history': [],
            }
            self._save(initial)

    def _load(self) -> dict:
        return json.loads(self.path.read_text(encoding='utf-8'))

    def _save(self, doc: dict):
        self.path.write_text(json.dumps(doc, ensure_ascii=False, indent=2), encoding='utf-8')

    def read_full(self) -> dict:
        return self._load()

    def read_section(self, key: str) -> dict | None:
        doc = self._load()
        return doc.get('sections', {}).get(key)

    def write_section(self, key: str, value: dict, mode: str = 'replace', actor: str = 'unknown'):
        """mode: replace | append | merge"""
        doc = self._load()
        sections = doc.setdefault('sections', {})
        existing = sections.get(key)
        if mode == 'append':
            if existing is None:
                sections[key] = [value]
            elif isinstance(existing, list):
                existing.append(value)
            else:
                sections[key] = [existing, value]
        elif mode == 'merge':
            if isinstance(existing, dict) and isinstance(value, dict):
                existing.update(value)
            else:
                sections[key] = value
        else:  # replace
            sections[key] = value
        doc.setdefault('history', []).append({
            'ts': now_iso(),
            'actor': actor,
            'section': key,
            'mode': mode,
            'bytes_after': len(json.dumps(sections.get(key), ensure_ascii=False)),
        })
        self._save(doc)

    def size_bytes(self) -> int:
        return self.path.stat().st_size if self.path.exists() else 0


# ============================================================
# SHARED STATE — coordinación paralelos (mesa larga)
# ============================================================

class SharedState:
    """JSON compartido entre los N agentes paralelos del cluster.
    Patrón equivalente a `buscador-paralelo-fb-gethookd._buscar/shared_state.json`."""

    def __init__(self, path: Path):
        self.path = path
        if not self.path.exists():
            self.path.parent.mkdir(parents=True, exist_ok=True)
            self.path.write_text(json.dumps({
                'created_at': now_iso(),
                'queries_in_progress': [],
                'queries_done': [],
                'video_ids_already_evaluated': [],
                'candidates_found_so_far': [],
                'claims': {},
            }, ensure_ascii=False, indent=2), encoding='utf-8')

    def _load(self) -> dict:
        return json.loads(self.path.read_text(encoding='utf-8'))

    def _save(self, d: dict):
        self.path.write_text(json.dumps(d, ensure_ascii=False, indent=2), encoding='utf-8')

    def take_query(self, agent_id: str, query: str) -> bool:
        """Returns True si pudo tomar la query, False si ya estaba en progreso/done."""
        d = self._load()
        if query in d['queries_in_progress'] or query in d['queries_done']:
            return False
        d['queries_in_progress'].append(query)
        d['claims'].setdefault(agent_id, []).append({'q': query, 'ts': now_iso()})
        self._save(d)
        return True

    def finish_query(self, agent_id: str, query: str, video_ids: list[str], candidates: list[dict]):
        d = self._load()
        if query in d['queries_in_progress']:
            d['queries_in_progress'].remove(query)
        if query not in d['queries_done']:
            d['queries_done'].append(query)
        for vid in video_ids:
            if vid not in d['video_ids_already_evaluated']:
                d['video_ids_already_evaluated'].append(vid)
        d['candidates_found_so_far'].extend(candidates)
        self._save(d)


# ============================================================
# SUPERVISOR — BLOQUEA + retroloop
# ============================================================

class Supervisor:
    """Wrapper L1 (verifier determinista) + L2 (Haiku olfato) + L3 escalación Opus.

    Cada supervisor lleva su PDF sub-proceso como audit base canónica
    (`_pdfs/PDF_<STATION>.pdf` · msg 4717+4723 Fer 2026-05-09).
    El PDF define literalmente qué validar — supervisor NO inventa criterios.

    En DRY mode: L1 valida que la sección exista y no esté vacía + PDF presente → APPROVE.
    En modo real: L1 ejecuta verifier_l1 declarado en metadata; L2 invoca Haiku
    con el PDF como contexto; L3 escala a Opus si L2 levanta flag o L1 falla,
    también con el PDF como contexto.
    """

    PDF_NAME_MAP = {
        'descargar': 'PDF_DESCARGAR.pdf',
        'analizar': 'PDF_ANALIZAR.pdf',
        'interpretar': 'PDF_INTERPRETAR.pdf',
        'buscar': 'PDF_BUSCAR.pdf',
        'comparador': 'PDF_COMPARADOR.pdf',
        'trello_editor': 'PDF_TRELLO_EDITOR.pdf',
    }

    def __init__(self, station_def: dict, dry_run: bool = False):
        self.station_def = station_def
        self.dry_run = dry_run
        self.supervisor_id = station_def.get('supervisor', {}).get('id', 'sup_unknown')
        self.station_id = station_def['id']
        # PDF sub-proceso = base de auditoría
        pdf_name = self.PDF_NAME_MAP.get(self.station_id)
        self.pdf_audit_base = (ROOT / '_pdfs' / pdf_name) if pdf_name else None

    def audit(self, caja: Caja) -> tuple[str, str]:
        """Devuelve (verdict, reason).
        verdict ∈ {APPROVE, BREAK_<reason_code>}"""
        section = caja.read_section(self.station_id)
        if section is None:
            return ('BREAK_EMPTY_SECTION', f'caja.sections.{self.station_id} no existe')
        if isinstance(section, dict) and not section:
            return ('BREAK_EMPTY_SECTION', f'caja.sections.{self.station_id} es {{}}')
        if isinstance(section, list) and len(section) == 0:
            return ('BREAK_EMPTY_SECTION', f'caja.sections.{self.station_id} es []')
        # PDF sub-proceso debe existir como base canónica
        if self.pdf_audit_base and not self.pdf_audit_base.exists():
            return ('BREAK_PDF_BASE_MISSING', f'PDF base de auditoría no existe: {self.pdf_audit_base}')
        if self.dry_run:
            base = f' · base PDF: {self.pdf_audit_base.name}' if self.pdf_audit_base else ''
            return ('APPROVE', f'dry_run_pass{base}')
        # MODO REAL: invocar verifier_l1 + L2 Haiku con PDF como contexto.
        # El prompt del supervisor real (cuando se invoque vía Agent tool) DEBE
        # incluir el contenido de self.pdf_audit_base como audit base canónica.
        # Por ahora, fallback a estructura mínima.
        return ('APPROVE', 'real_pass_stub')


# ============================================================
# RESCUE LADDER — 3 QC + 3 MASTER + Telegram SOS
# ------------------------------------------------------------
# Contrato Fer 2026-05-09 (msgs 4789–4791):
#   1) Sub-agente ejecutor falla
#   2) → QC de la estación genera 3 HIPÓTESIS distintas RAÍZ y las prueba en SERIE
#   3) Si las 3 fallan → MASTER (QC central que ve el flow entero) genera OTRAS 3
#      DISTINTAS de las que probó QC, y las prueba en SERIE
#   4) Si las 3 del master también fallan → ping Telegram a Fer pidiendo ayuda
#   El supervisor de la estación valida cada hipótesis (no inventa criterios nuevos).
# ============================================================

class Hypothesis:
    """Una hipótesis raíz para resolver un fallo. Lleva una idea humana + el
    feedback que se inyecta al ejecutor en el siguiente intento."""
    def __init__(self, id_: str, level: str, idea: str, feedback: str):
        self.id = id_           # qc_<sid>_1 / master_<sid>_2 / ...
        self.level = level      # 'qc' | 'master'
        self.idea = idea        # texto humano de la idea raíz
        self.feedback = feedback  # texto que el ejecutor recibirá como retry_feedback

    def to_dict(self):
        return {'id': self.id, 'level': self.level, 'idea': self.idea, 'feedback': self.feedback}


def _invoke_opus_for_hypotheses(prompt: str, station_id: str, level: str,
                                  expected_ids_prefix: str, timeout_s: int = 120) -> list[Hypothesis] | None:
    """Llama a `claude --print --model opus --output-format json` con el prompt
    y parsea 3 hipótesis. Devuelve None si algo falla (caller hará fallback).
    Patrón: el outer JSON tiene `result` con el inner JSON string a parsear."""
    try:
        proc = subprocess.run(
            ['claude', '--print', '--model', 'opus', '--output-format', 'json', prompt],
            capture_output=True, text=True, timeout=timeout_s, encoding='utf-8',
        )
        if proc.returncode != 0:
            return None
        outer = json.loads(proc.stdout)
        if outer.get('is_error'):
            return None
        inner_text = outer.get('result', '').strip()
        # El modelo a veces envuelve el JSON en ```json ... ``` — limpiar
        if inner_text.startswith('```'):
            inner_text = re.sub(r'^```(?:json)?\s*|\s*```$', '', inner_text, flags=re.MULTILINE).strip()
        inner = json.loads(inner_text)
        hyps_data = inner.get('hypotheses') or []
        if len(hyps_data) != 3:
            return None
        out = []
        for i, h in enumerate(hyps_data, start=1):
            idea = (h.get('idea') or '').strip()
            feedback = (h.get('feedback') or '').strip()
            if not idea or not feedback:
                return None
            out.append(Hypothesis(
                f'{expected_ids_prefix}_{i}', level, idea=idea, feedback=feedback
            ))
        return out
    except Exception:
        return None


def qc_generate_3_hypotheses(station_def: dict, caja: 'Caja', initial_break: str, dry_run: bool) -> list[Hypothesis]:
    """QC de la estación → 3 hipótesis RAÍZ distintas entre sí.
    Distintas en el sentido de raíz causal, no rephrasings.
    DRY mode → 3 stubs canónicos.
    REAL mode → invoca Opus headless (claude --print) con prompt sólido; si falla
    parse o CLI, fallback a los stubs canónicos para no bloquear el flow."""
    sid = station_def['id']
    if dry_run:
        return [
            Hypothesis(
                f'qc_{sid}_1', 'qc',
                idea=f'[QC raíz 1] PARÁMETROS — {sid} con valores demasiado agresivos',
                feedback=(f'\n[RESCUE QC-H1 · {sid}] El intento estándar falló: {initial_break}.\n'
                          'Hipótesis raíz 1 (PARÁMETROS): los parámetros usados son demasiado agresivos. '
                          'Reintenta con valores conservadores y timeout x2 sobre el default. '
                          'NO cambies de herramienta, NO cambies de input — solo afloja parámetros.')),
            Hypothesis(
                f'qc_{sid}_2', 'qc',
                idea=f'[QC raíz 2] HERRAMIENTA — la herramienta usada en {sid} no aplica al caso',
                feedback=(f'\n[RESCUE QC-H2 · {sid}] El intento estándar falló: {initial_break}.\n'
                          'Hipótesis raíz 2 (HERRAMIENTA): la herramienta primaria no aplica a este caso concreto. '
                          'Cambia a la alternativa documentada en el PDF de la estación (segundo método declarado). '
                          'NO toques parámetros — solo cambia de método.')),
            Hypothesis(
                f'qc_{sid}_3', 'qc',
                idea=f'[QC raíz 3] INPUT — el input que recibe {sid} está mal formateado',
                feedback=(f'\n[RESCUE QC-H3 · {sid}] El intento estándar falló: {initial_break}.\n'
                          'Hipótesis raíz 3 (INPUT): el input que recibe la estación viene mal formateado / con caracteres raros / encoding inconsistente. '
                          'Normaliza/valida input (strip, ASCII safe, schema check) ANTES de invocar la herramienta. '
                          'Mismo método y parámetros que el intento estándar.')),
        ]
    # MODO REAL — invocar Opus headless con PDF estación + caja + break como contexto
    pdf_name = Supervisor.PDF_NAME_MAP.get(sid, '')
    pdf_path = ROOT / '_pdfs' / pdf_name if pdf_name else None
    pdf_excerpt = ''
    if pdf_path and pdf_path.exists():
        try:
            pdf_excerpt = pdf_path.read_text(encoding='utf-8', errors='ignore')[:6000]
        except Exception:
            pdf_excerpt = ''
    caja_excerpt = json.dumps(caja.read_full().get('sections', {}), ensure_ascii=False)[:4000]
    prompt = (
        f'Eres el QC de la estación {sid.upper()} de la factoría escalado_formatos.\n'
        f'El sub-agente ejecutor falló con: {initial_break}\n\n'
        f'Genera 3 HIPÓTESIS RAÍZ distintas entre sí (NO 3 rephrasings de la misma idea, '
        f'tres causas raíz CAUSALMENTE distintas — p.ej. una sobre PARÁMETROS, otra sobre '
        f'HERRAMIENTA/MÉTODO, otra sobre INPUT/DATOS), cada una con un fix concreto que el '
        f'ejecutor de la estación pueda aplicar literalmente en el siguiente intento. '
        f'No inventes pasos fuera del scope de {sid}.\n\n'
        f'PDF base de la estación (recortado):\n{pdf_excerpt}\n\n'
        f'Caja del flow (recortada):\n{caja_excerpt}\n\n'
        f'Devuelve SOLO un JSON con esta forma exacta, sin texto adicional, sin ```:\n'
        f'{{"hypotheses":[{{"idea":"<una línea humana describiendo la causa raíz 1>",'
        f'"feedback":"<instrucción literal al ejecutor con el fix concreto, máximo 4 líneas>"}},'
        f'{{"idea":"...","feedback":"..."}},{{"idea":"...","feedback":"..."}}]}}'
    )
    hyps = _invoke_opus_for_hypotheses(prompt, sid, 'qc', expected_ids_prefix=f'qc_{sid}')
    if hyps is None:
        # Fallback canónico: nunca bloqueamos el flow por fallo de Opus
        return qc_generate_3_hypotheses(station_def, caja, initial_break, dry_run=True)
    return hyps


def master_generate_3_hypotheses(station_def: dict, caja: 'Caja', initial_break: str,
                                  qc_attempts: list, dry_run: bool) -> list[Hypothesis]:
    """MASTER QC central → 3 hipótesis NUEVAS, distintas de las 3 que QC ya probó.
    El master ve el flow entero (caja completa), no solo la estación."""
    sid = station_def['id']
    if dry_run:
        return [
            Hypothesis(
                f'master_{sid}_1', 'master',
                idea=f'[MASTER raíz 1] CROSS-FLOW — el fallo no está en {sid}, viene de aguas arriba',
                feedback=(f'\n[RESCUE MASTER-H1 · {sid}] Las 3 hipótesis QC fallaron. '
                          f'Break inicial: {initial_break}.\n'
                          'Hipótesis cross-flow 1: el problema NO es de esta estación, viene de una estación previa que dejó datos malos en la caja. '
                          'Lee caja.sections completa, identifica qué sección previa parece corrupta, y rehaz '
                          f'{sid} usando un input reconstruido a partir de input_start (no de la sección previa).')),
            Hypothesis(
                f'master_{sid}_2', 'master',
                idea=f'[MASTER raíz 2] FALLBACK GLOBAL — saltar {sid} con modo degradado',
                feedback=(f'\n[RESCUE MASTER-H2 · {sid}] Las 3 hipótesis QC fallaron. '
                          f'Break inicial: {initial_break}.\n'
                          'Hipótesis cross-flow 2: usa el FALLBACK a nivel flow para esta estación. '
                          'Genera el output mínimo viable de la sección (cache previo, mock canónico, valores por defecto razonables) '
                          'marcado con flag degraded_mode=True para que estaciones siguientes lo sepan, y sigue.')),
            Hypothesis(
                f'master_{sid}_3', 'master',
                idea=f'[MASTER raíz 3] CONTRATO — el supervisor de {sid} aplica criterios incorrectos',
                feedback=(f'\n[RESCUE MASTER-H3 · {sid}] Las 3 hipótesis QC fallaron. '
                          f'Break inicial: {initial_break}.\n'
                          'Hipótesis cross-flow 3: el contrato del supervisor de la estación está roto vs la realidad del input concreto. '
                          'Re-lee el PDF base de la estación y la sección de la caja, decide si el output actual realmente cumple el espíritu del PDF, '
                          'y si sí, marca la sección con override_supervisor_reason="<por qué pasa el espíritu del PDF aunque no la letra>" para que el supervisor apruebe.')),
        ]
    # MODO REAL — Opus headless con visión cross-flow (caja entera + qc_attempts ya probados)
    qc_summary = '\n'.join(
        f'  - QC {a["hypothesis"]["id"]} (FAIL): {a["hypothesis"]["idea"]} → {a["detail"][:140]}'
        for a in qc_attempts
    )
    caja_full = json.dumps(caja.read_full(), ensure_ascii=False)[:10000]
    prompt = (
        f'Eres el MASTER QC central de la factoría escalado_formatos. Ves el flow ENTERO, '
        f'no solo la estación que falla. La estación que falla es {sid.upper()}. '
        f'Break inicial: {initial_break}\n\n'
        f'El QC de la estación ya probó estas 3 hipótesis raíz y TODAS fallaron:\n{qc_summary}\n\n'
        f'Genera OTRAS 3 hipótesis distintas entre sí Y distintas de las 3 que ya probó QC. '
        f'Tu nivel es CROSS-FLOW: piensa en (a) el problema viene de aguas arriba (otra estación '
        f'dejó datos malos en la caja), (b) usar fallback global / modo degradado, (c) el contrato '
        f'del supervisor está mal calibrado contra este input concreto, (d) re-arquitectura puntual. '
        f'Cada hipótesis con fix concreto aplicable.\n\n'
        f'Caja completa del flow (recortada):\n{caja_full}\n\n'
        f'Devuelve SOLO JSON, sin texto, sin ```:\n'
        f'{{"hypotheses":[{{"idea":"...","feedback":"..."}},'
        f'{{"idea":"...","feedback":"..."}},{{"idea":"...","feedback":"..."}}]}}'
    )
    hyps = _invoke_opus_for_hypotheses(prompt, sid, 'master', expected_ids_prefix=f'master_{sid}')
    if hyps is None:
        return master_generate_3_hypotheses(station_def, caja, initial_break, qc_attempts, dry_run=True)
    return hyps


def telegram_alert_fer(station_def: dict, caja: 'Caja', initial_break: str,
                        qc_attempts: list, master_attempts: list, dry_run: bool) -> bool:
    """Ping SOS a Fer cuando QC+MASTER agotaron las 6 hipótesis sin solución.
    DRY mode: deja un marker file _telegram_SOS_<sid>.txt en run_dir (no llama API).
    REAL mode: POST a Telegram Bot API."""
    sid = station_def['id']
    flow_id = caja.flow_id
    summary_qc = '\n'.join(
        f'  QC {a["hypothesis"]["id"]} → {a["result"]} · {a["detail"][:120]}' for a in qc_attempts
    )
    summary_master = '\n'.join(
        f'  MASTER {a["hypothesis"]["id"]} → {a["result"]} · {a["detail"][:120]}' for a in master_attempts
    )
    text = (
        f'🚨 SOS factory_v4 · estación {sid.upper()} bloqueada\n'
        f'flow_id: {flow_id}\n'
        f'break inicial: {initial_break[:200]}\n\n'
        f'QC probó 3 hipótesis raíz · todas FAIL:\n{summary_qc}\n\n'
        f'MASTER probó otras 3 hipótesis · todas FAIL:\n{summary_master}\n\n'
        f'caja: {caja.path}\n'
        f'Necesito tu ayuda para destrabar.'
    )
    marker = caja.run_dir / f'_telegram_SOS_{sid}.txt'
    try:
        marker.write_text(text, encoding='utf-8')
    except Exception:
        pass
    if dry_run:
        return True
    # REAL — POST a Telegram Bot API
    try:
        import urllib.request as _ur
        import urllib.parse as _up
        body = _up.urlencode({
            'chat_id': TELEGRAM_CHAT_FER,
            'text': text,
            'disable_web_page_preview': 'true',
        }).encode('utf-8')
        req = _ur.Request(
            f'https://api.telegram.org/bot{TELEGRAM_TOKEN_SOS}/sendMessage',
            data=body,
            headers={'Content-Type': 'application/x-www-form-urlencoded'},
        )
        with _ur.urlopen(req, timeout=10) as r:
            return 200 <= r.status < 300
    except Exception as e:
        # Si Telegram falla, dejamos el marker file como prueba escrita.
        try:
            (caja.run_dir / f'_telegram_SOS_{sid}.error.txt').write_text(str(e), encoding='utf-8')
        except Exception:
            pass
        return False


# ============================================================
# EJECUTORES POR ESTACIÓN — DRY mode + stub real
# ============================================================

def _retry_feedback_text(supervisor_break: str | None) -> str:
    if not supervisor_break:
        return ''
    return f'\n[RETRY] Supervisor previo rechazó: {supervisor_break}. Corrige específicamente eso.'

# BRIDGE singleton accesible desde executors para escupir logs en vivo
# (Fer msg 4814: "las animaciones son el estado de cada segundo del código interno
# de claude code como va trabajando y se representa ahí" → cada log del subagente
# real va al bridge en TIEMPO REAL, no por timer artificial).
_GLOBAL_BRIDGE: 'TaskStateBridge | None' = None

def _set_global_bridge(b):
    global _GLOBAL_BRIDGE
    _GLOBAL_BRIDGE = b

def _bridge():
    return _GLOBAL_BRIDGE

def _emit(level: str, text: str):
    """Escupe un log al HTML en vivo si hay bridge montado, sin bloquear si no."""
    b = _bridge()
    if b is not None:
        try:
            b.append_log(level, text)
            return
        except Exception:
            pass
    # Sin bridge → stdout (orchestrator standalone)
    print(f'[{level.upper()}] {text}')


def _spawn_canonical_agent(canonical_name: str, prompt: str, station_id: str,
                            timeout_s: int = 600) -> tuple[bool, dict | None, str]:
    """Spawnea el subagente canónico vía `claude --print --model opus` con stream-json.
    Cada evento del subagente (tool_use, text) se escupe al bridge en vivo.
    Devuelve (ok, parsed_result_dict, error_msg). El subagente debe terminar
    devolviendo en su mensaje final un JSON que parsearemos."""
    _emit('info', f'{station_id} · spawneando subagente canónico {canonical_name}')
    full_prompt = (
        f'Use the {canonical_name} agent to perform this task.\n\n{prompt}\n\n'
        f'CRITICAL: cuando el subagente termine, devuelve en tu mensaje final UN ÚNICO JSON '
        f'sin texto adicional, sin ``` blocks, en una sola línea, con todos los campos pedidos.'
    )
    try:
        # CRITICAL: --output-format stream-json EXIGE --verbose o el CLI sale exit=1
        # antes de cargar nada (lección feedback_claude_print_stream_json_requiere_verbose.md).
        proc = subprocess.Popen(
            ['claude', '--print', '--model', 'opus',
             '--output-format', 'stream-json',
             '--verbose',
             '--include-partial-messages',
             full_prompt],
            stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
            text=True, encoding='utf-8', errors='replace', bufsize=1,
        )
        last_text = ''
        result_obj = None
        for raw in proc.stdout:
            line = raw.strip()
            if not line:
                continue
            try:
                ev = json.loads(line)
            except Exception:
                continue
            t = ev.get('type', '')
            if t == 'assistant':
                # mensaje parcial o final del modelo principal — emitir TODO sin cortar
                # (Fer msg 4854: "necesito sus consolas reales como yo veo en mi terminal").
                msg = ev.get('message', {})
                for c in msg.get('content', []):
                    if c.get('type') == 'tool_use':
                        tname = c.get('name', '')
                        tinput = json.dumps(c.get('input', {}), ensure_ascii=False)
                        # Si el input es muy largo, partirlo en chunks de 200 para que se vean enteros
                        _emit('info', f'{canonical_name} · 🔧 {tname}')
                        for i in range(0, len(tinput), 200):
                            _emit('info', f'{canonical_name} ·    {tinput[i:i+200]}')
                    elif c.get('type') == 'text':
                        text = c.get('text', '').strip()
                        if text:
                            last_text = text
                            for ln in text.split('\n'):
                                ln = ln.strip()
                                if not ln:
                                    continue
                                # Partir líneas largas en chunks
                                if len(ln) <= 200:
                                    _emit('info', f'{canonical_name} · {ln}')
                                else:
                                    for i in range(0, len(ln), 200):
                                        _emit('info', f'{canonical_name} · {ln[i:i+200]}')
            elif t == 'user':
                # tool_result (output del tool) — emitir entero, multi-línea
                msg = ev.get('message', {})
                for c in msg.get('content', []):
                    if c.get('type') == 'tool_result':
                        out = c.get('content', '')
                        if isinstance(out, list):
                            out = '\n'.join(x.get('text', '') for x in out if isinstance(x, dict))
                        out_s = str(out)
                        for ln in out_s.split('\n'):
                            ln = ln.strip()
                            if not ln:
                                continue
                            if len(ln) <= 200:
                                _emit('info', f'{canonical_name} · ↩ {ln}')
                            else:
                                for i in range(0, len(ln), 200):
                                    _emit('info', f'{canonical_name} · ↩ {ln[i:i+200]}')
            elif t == 'result':
                # evento final con result string
                inner = ev.get('result', '').strip()
                if inner.startswith('```'):
                    inner = re.sub(r'^```(?:json)?\s*|\s*```$', '', inner, flags=re.MULTILINE).strip()
                try:
                    result_obj = json.loads(inner)
                except Exception:
                    # tal vez el JSON está dentro de un texto más largo; buscar último {...}
                    m = re.search(r'\{[\s\S]*\}\s*$', inner)
                    if m:
                        try:
                            result_obj = json.loads(m.group(0))
                        except Exception:
                            pass
        rc = proc.wait()
        if result_obj is None and last_text:
            # Fallback parse del último text
            m = re.search(r'\{[\s\S]*\}\s*$', last_text)
            if m:
                try:
                    result_obj = json.loads(m.group(0))
                except Exception:
                    pass
        if rc != 0:
            return False, None, f'claude exit={rc}'
        if result_obj is None:
            return False, None, 'subagente no devolvió JSON parseable'
        return True, result_obj, ''
    except Exception as e:
        return False, None, f'spawn exception: {str(e)[:160]}'


def run_descargar(caja: Caja, station_def: dict, dry_run: bool, retry_feedback: str = ''):
    """Estación DESCARGAR: spawnea el subagente canónico descargador-video-escalado.
    El subagente decide qué herramienta usar (yt-dlp / Playwright MCP / servicio web FB
    downloader / inspect DOM) según funcione. Si recibe retry_feedback de la rescue ladder,
    el subagente lo aplica cambiando de método."""
    inp = caja.read_full().get('input_start', {})
    url = inp.get('video_url') or inp.get('url')
    product = inp.get('product', 'unknown')
    out_dir = RUNS_DIR / caja.flow_id
    out_dir.mkdir(parents=True, exist_ok=True)
    out_path = out_dir / 'video.mp4'

    if not url:
        _emit('error', 'descargador-video-escalado · no hay video_url en input_start')
        return False, 'no_video_url'

    # Cache-hit corto-circuito
    if out_path.exists() and out_path.stat().st_size > 100_000:
        size_kb = out_path.stat().st_size // 1024
        _emit('success', f'descargador-video-escalado · cache hit · {size_kb} KB')
        caja.write_section('descargar', {
            'cache_hit': True, 'video_path': str(out_path), 'size_kb': size_kb, 'url': url,
        }, mode='replace', actor='descargador-video-escalado')
        return True, f'cache_hit ({size_kb}KB)'

    if dry_run:
        _emit('info', f'descargador-video-escalado · [DRY] spawnearía subagente canónico para bajar {url}')
        caja.write_section('descargar', {
            'dry_run': True, 'url': url, 'video_path': str(out_path) + '.FAKE', 'size_kb': 1234,
            'retry_feedback_applied': retry_feedback or None,
        }, mode='replace', actor='descargador-video-escalado')
        return True, 'DRY OK'

    # REAL — spawn subagente canónico
    feedback_note = f'\n\nRETRY FEEDBACK (aplica este cambio si la herramienta anterior falló):\n{retry_feedback}' if retry_feedback else ''
    prompt = (
        f'Tarea: descargar el video Facebook de esta URL al disco local.\n'
        f'URL: {url}\n'
        f'Producto: {product}\n'
        f'Ruta de salida exacta: {out_path}\n\n'
        f'Métodos disponibles en cascada (usar el primero que funcione, fallback al siguiente si falla):\n'
        f'  1) yt-dlp con cookies-from-browser chrome\n'
        f'  2) MCP playwright: navegar a un servicio web FB downloader (ej. fdown.net, getfvid.com), '
        f'pegar la URL, descargar el mp4 resultante con browser_evaluate o request directo\n'
        f'  3) MCP playwright: navegar a la URL FB original, inspeccionar DOM/network requests para extraer media_url firmado\n'
        f'Cuando termines, devuelve JSON: {{"video_path":"<ruta absoluta>","size_kb":N,"method_used":"<yt-dlp|playwright_web|playwright_inspect>","duration_s":N}}'
        f'{feedback_note}'
    )
    ok, result, err = _spawn_canonical_agent('descargador-video-escalado', prompt, 'descargar', timeout_s=600)
    if not ok:
        return False, err
    video_path = result.get('video_path')
    if not video_path or not Path(video_path).exists():
        return False, f'subagente devolvió video_path no existe: {video_path}'
    size_kb = result.get('size_kb') or (Path(video_path).stat().st_size // 1024)
    _emit('success', f'descargador-video-escalado · mp4 listo · {size_kb} KB · método: {result.get("method_used")}')
    caja.write_section('descargar', {
        'cache_hit': False, 'video_path': str(video_path), 'size_kb': size_kb,
        'url': url, 'method_used': result.get('method_used'),
        'retry_feedback_applied': retry_feedback or None,
    }, mode='replace', actor='descargador-video-escalado')
    return True, f'downloaded {size_kb}KB via {result.get("method_used")}'

def run_analizar(caja: Caja, station_def: dict, dry_run: bool, retry_feedback: str = ''):
    """Estación ANALIZAR: spawnea analizador-combo-completo. El subagente decide
    cache lookup, faster-whisper, frame extraction y síntesis combo."""
    desc = caja.read_section('descargar') or {}
    video_path = desc.get('video_path')
    if not video_path:
        return False, 'no_video_path en caja.descargar'
    inp = caja.read_full().get('input_start', {})
    product = inp.get('product', 'unknown').replace(' ', '_')[:30]
    cache_path = ROOT / '_productos' / product / '_analysis.json'

    if dry_run:
        _emit('info', 'analizador-combo-completo · [DRY] spawnearía subagente para análisis combo')
        caja.write_section('analizar', {
            'dry_run': True, 'cache_hit': False, 'transcript': 'DRY · sin trabajo real',
            'word_count': 42, 'duration_seconds': 30.0, 'language_detected': 'es',
            'retry_feedback_applied': retry_feedback or None,
        }, mode='replace', actor='analizador-combo-completo')
        return True, 'DRY OK'

    feedback_note = f'\n\nRETRY FEEDBACK:\n{retry_feedback}' if retry_feedback else ''
    prompt = (
        f'Tarea: análisis combo completo del video.\n'
        f'video_path: {video_path}\n'
        f'producto: {product}\n'
        f'país: {inp.get("country","ES")}\n'
        f'cache_path: {cache_path}\n\n'
        f'Sigue el procedimiento del subagente: PASO 1 cache lookup → si miss, PASO 2 faster-whisper '
        f'transcripción + ffmpeg frames + síntesis combo (awareness, sof, jergas, deseo, avatar, etc).\n'
        f'Cuando termines, devuelve JSON: {{"cache_hit":bool,"transcript":"...","duration_seconds":N,'
        f'"language_detected":"es","combo":{{...}}}}'
        f'{feedback_note}'
    )
    ok, result, err = _spawn_canonical_agent('analizador-combo-completo', prompt, 'analizar', timeout_s=900)
    if not ok:
        return False, err
    _emit('success', f'analizador-combo-completo · análisis OK · cache_hit={result.get("cache_hit")}')
    caja.write_section('analizar', {**result, 'video_path': video_path,
                                      'retry_feedback_applied': retry_feedback or None},
                        mode='replace', actor='analizador-combo-completo')
    return True, f'analizado · {result.get("duration_seconds","?")}s'

def run_interpretar(caja: Caja, station_def: dict, dry_run: bool, retry_feedback: str = ''):
    """Estación INTERPRETAR: spawnea interpretador-estrategia-busqueda. El subagente
    arma la estrategia de búsqueda a partir del combo del análisis."""
    analizar = caja.read_section('analizar') or {}
    inp = caja.read_full().get('input_start', {})

    if dry_run:
        _emit('info', 'interpretador-estrategia-busqueda · [DRY] spawnearía subagente')
        caja.write_section('interpretar', {
            'dry_run': True, 'awareness': 'product_aware', 'sofisticacion': 4,
            'jergas': ['jerga_a'], 'deseo_principal': 'dormir mejor',
            'avatar': {'edad': '40-55'}, 'need_complex_videos': inp.get('saturated', False),
            'retry_feedback_applied': retry_feedback or None,
        }, mode='replace', actor='interpretador-estrategia-busqueda')
        return True, 'DRY OK'

    feedback_note = f'\n\nRETRY FEEDBACK:\n{retry_feedback}' if retry_feedback else ''
    analisis_summary = json.dumps(analizar, ensure_ascii=False)[:6000]
    prompt = (
        f'Tarea: interpretar el análisis del video y producir la estrategia de búsqueda.\n'
        f'producto: {inp.get("product","")}\n'
        f'país: {inp.get("country","ES")}\n'
        f'saturated: {inp.get("saturated", False)}\n'
        f'análisis previo:\n{analisis_summary}\n\n'
        f'Devuelve JSON con: awareness, sofisticacion, jergas[], deseo_principal, avatar{{edad,genero,'
        f'contexto,profesion}}, temporalidad, fuerza_cambio, antagonistas[], formato, '
        f'need_complex_videos, match_criteria{{}}, queries[]'
        f'{feedback_note}'
    )
    ok, result, err = _spawn_canonical_agent('interpretador-estrategia-busqueda', prompt, 'interpretar', timeout_s=300)
    if not ok:
        return False, err
    _emit('success', f'interpretador-estrategia-busqueda · awareness={result.get("awareness")} sof={result.get("sofisticacion")}')
    caja.write_section('interpretar', {**result, 'retry_feedback_applied': retry_feedback or None},
                        mode='replace', actor='interpretador-estrategia-busqueda')
    return True, f'awareness={result.get("awareness")}'

def run_buscar(caja: Caja, station_def: dict, dry_run: bool, retry_feedback: str = ''):
    """Cluster paralelo de 4 buscadores · coordinados vía shared_state.json."""
    shared_state_path = RUNS_DIR / caja.flow_id / '_buscar_shared_state.json'
    ss = SharedState(shared_state_path)

    agents = ['buscador-fb-1', 'buscador-fb-2', 'buscador-gethookd-1', 'buscador-gethookd-2']

    if dry_run:
        _emit('info', 'buscar · arrancando 4 sub-agentes en paralelo (factory_metadata declara 4)')
        all_candidates = []
        for agent_id in agents:
            q = f'fake_query_{agent_id}_v1'
            took = ss.take_query(agent_id, q)
            if not took:
                q = f'fake_query_{agent_id}_v2'
                ss.take_query(agent_id, q)
            _emit('info', f'{agent_id} · query "{q}" tomada del shared_state · ejecutando')
            video_ids = [f'vid_{agent_id}_{j}' for j in range(2)]
            candidates = [{'agent_id': agent_id, 'video_id': vid, 'match_score': 0.85} for vid in video_ids]
            ss.finish_query(agent_id, q, video_ids, candidates)
            _emit('success', f'{agent_id} · {len(candidates)} candidatos encontrados (DRY)')
            all_candidates.extend(candidates)
        caja.write_section('buscar', {
            'dry_run': True,
            'shared_state_path': str(shared_state_path),
            'agents_run': agents,
            'candidates_total': len(all_candidates),
            'candidates_sample': all_candidates[:4],
            'retry_feedback_applied': retry_feedback or None,
        }, mode='replace', actor='buscadores_x4')
        return True, f'DRY OK · {len(all_candidates)} candidatos · shared_state coordinado'

    # REAL — 4 subprocesses claude --print PARALELOS, cada uno spawnea el subagente canónico
    # buscador-paralelo-fb-gethookd con su id de buscador (fb_1/fb_2/gh_1/gh_2).
    interp = caja.read_section('interpretar') or {}
    strategy_summary = json.dumps(interp, ensure_ascii=False)[:3000]
    inp = caja.read_full().get('input_start', {})
    feedback_note = f'\n\nRETRY FEEDBACK:\n{retry_feedback}' if retry_feedback else ''
    procs = {}
    for agent_id, search_id in [('buscador-fb-1','fb_1'), ('buscador-fb-2','fb_2'),
                                  ('buscador-gethookd-1','gh_1'), ('buscador-gethookd-2','gh_2')]:
        prompt = (
            f'Use the buscador-paralelo-fb-gethookd agent with id={search_id}.\n'
            f'Tarea: buscar candidatos similares al ganador en FB Ads Library / GetHookd.\n'
            f'Strategy:\n{strategy_summary}\n'
            f'producto: {inp.get("product","")} · país: {inp.get("country","ES")}\n'
            f'shared_state_path: {shared_state_path} (coordina con los otros 3 paralelos para no duplicar queries)\n'
            f'Cuando termine, devuelve JSON {{"agent_id":"{agent_id}","candidates":[{{"video_id":"...","url":"...","match_score":N}}]}}'
            f'{feedback_note}'
        )
        full_prompt = (
            f'{prompt}\n\nCRITICAL: termina devolviendo UN ÚNICO JSON sin texto adicional ni ```'
        )
        _emit('info', f'{agent_id} · spawneando buscador-paralelo-fb-gethookd ({search_id})')
        procs[agent_id] = subprocess.Popen(
            ['claude', '--print', '--model', 'opus', '--output-format', 'json', full_prompt],
            stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
            text=True, encoding='utf-8', errors='replace',
        )
    all_candidates = []
    for agent_id, p in procs.items():
        try:
            out, _ = p.communicate(timeout=900)
            outer = json.loads(out) if out else {}
            inner = outer.get('result', '').strip()
            if inner.startswith('```'):
                inner = re.sub(r'^```(?:json)?\s*|\s*```$', '', inner, flags=re.MULTILINE).strip()
            m = re.search(r'\{[\s\S]*\}\s*$', inner)
            data = json.loads(m.group(0)) if m else {}
            cands = data.get('candidates', [])
            _emit('success', f'{agent_id} · devolvió {len(cands)} candidatos')
            all_candidates.extend(cands)
        except Exception as e:
            _emit('warn', f'{agent_id} · falló: {str(e)[:120]}')
    caja.write_section('buscar', {
        'agents_run': agents,
        'candidates_total': len(all_candidates),
        'candidates': all_candidates,
        'shared_state_path': str(shared_state_path),
        'retry_feedback_applied': retry_feedback or None,
    }, mode='replace', actor='buscadores_x4')
    return True, f'buscar · {len(all_candidates)} candidatos totales (4 paralelos)'

def run_comparador(caja: Caja, station_def: dict, dry_run: bool, retry_feedback: str = ''):
    """Estación COMPARADOR: 5 similares + 1 arriesgado a partir de los candidatos buscar."""
    buscar = caja.read_section('buscar') or {}
    candidates = buscar.get('candidates') or buscar.get('candidates_sample') or []
    interp = caja.read_section('interpretar') or {}

    if dry_run:
        _emit('info', f'comparador-finalistas · [DRY] scoring sobre {len(candidates)} candidatos')
        _emit('info', 'comparador-finalistas · [DRY] elegiría 5 similares + 1 arriesgado')
        caja.write_section('comparador', {
            'dry_run': True,
            'finalists': [{'video_id': f'vid_{i}', 'rol': 'similar' if i < 5 else 'arriesgado'} for i in range(6)],
            'count_similares': 5, 'count_arriesgados': 1,
            'retry_feedback_applied': retry_feedback or None,
        }, mode='replace', actor='comparador-finalistas')
        return True, 'DRY OK · 5+1 finalistas'

    if not candidates:
        return False, 'no hay candidatos en buscar para comparar'

    feedback_note = f'\n\nRETRY FEEDBACK:\n{retry_feedback}' if retry_feedback else ''
    prompt = (
        f'Tarea: scoring sobre {len(candidates)} candidatos y selección de 5 similares + 1 arriesgado.\n'
        f'COMBO ganador: {json.dumps(interp, ensure_ascii=False)[:3000]}\n'
        f'CANDIDATOS: {json.dumps(candidates, ensure_ascii=False)[:8000]}\n\n'
        f'Devuelve JSON: {{"finalists":[{{"video_id":"...","url":"...","rol":"similar|arriesgado","score":0-1,"reason":"..."}}]}}'
        f'{feedback_note}'
    )
    ok, result, err = _spawn_canonical_agent('comparador-finalistas', prompt, 'comparador', timeout_s=600)
    if not ok:
        return False, err
    finalists = result.get('finalists', [])
    sim = sum(1 for f in finalists if f.get('rol') == 'similar')
    arr = sum(1 for f in finalists if f.get('rol') == 'arriesgado')
    _emit('success', f'comparador-finalistas · {sim}+{arr} finalistas seleccionados')
    caja.write_section('comparador', {
        'finalists': finalists, 'count_similares': sim, 'count_arriesgados': arr,
        'retry_feedback_applied': retry_feedback or None,
    }, mode='replace', actor='comparador-finalistas')
    return True, f'{sim}+{arr} finalistas'

def run_trello_editor(caja: Caja, station_def: dict, dry_run: bool, retry_feedback: str = ''):
    """Estación TRELLO_EDITOR: crea tarjeta real en board rH2kazG2 (editor de videos)
    con los 6 finalistas como adjuntos + brief PDF + cover."""
    comparador = caja.read_section('comparador') or {}
    finalists = comparador.get('finalists', [])
    inp = caja.read_full().get('input_start', {})

    if dry_run:
        _emit('info', f'creador-trello-editor-videos · [DRY] crearía card en board rH2kazG2 con {len(finalists)} finalistas')
        caja.write_section('trello_editor', {
            'dry_run': True,
            'card_id': 'FAKE_CARD_123',
            'short_url': 'https://trello.com/c/FAKE_CARD_123',
            'attachments_count': len(finalists),
            'retry_feedback_applied': retry_feedback or None,
        }, mode='replace', actor='creador-trello-editor-videos')
        return True, 'DRY OK · tarjeta fake creada'

    # REAL — spawn subagente canónico. CRITICAL: una sola card, en pos="bottom" (Fer msg 4868-4870)
    feedback_note = f'\n\nRETRY FEEDBACK:\n{retry_feedback}' if retry_feedback else ''
    prompt = (
        f'Tarea: crear UNA SOLA card en el board Trello editor de videos (rH2kazG2, lista "Input"/'
        f'"Nueva tarea") para los {len(finalists)} finalistas seleccionados.\n\n'
        f'REGLAS DURAS:\n'
        f'  R1) UNA card · NO crear duplicadas · si ya creaste una y quieres cambiar algo, '
        f'     PUT sobre la misma card_id, NO POST otra.\n'
        f'  R2) Crear con pos="bottom" (abajo del todo de la lista Input · Fer msg 4868-4870).\n'
        f'  R3) Nombre prefix obligatorio: "[ESCALADO FORMATOS]" antes del producto.\n\n'
        f'La card lleva:\n'
        f'  - cover: foto del producto (input)\n'
        f'  - {len(finalists)} mp4 attachments (descarga los videos finalistas si no están en disco)\n'
        f'  - brief PDF por cada finalista (estructura completa con awareness/sof/jergas/deseo/'
        f'avatar/temporalidad/fuerza_cambio/antagonistas + script/hooks específicos por video, '
        f'minimo 60KB por brief, no light)\n'
        f'  - URLs originales de los finalistas en la descripción\n'
        f'  - análisis del ganador como contexto (descripción de la card)\n\n'
        f'producto: {inp.get("product","")}\n'
        f'país: {inp.get("country","ES")}\n'
        f'flow_id: {caja.flow_id}\n'
        f'finalistas: {json.dumps(finalists, ensure_ascii=False)[:4000]}\n\n'
        f'Cuando termine, devuelve JSON: {{"card_id":"...","short_url":"...","attachments_count":N}}'
        f'{feedback_note}'
    )
    ok, result, err = _spawn_canonical_agent('creador-trello-editor-videos', prompt, 'trello_editor', timeout_s=900)
    if not ok:
        return False, err
    _emit('success', f'creador-trello-editor-videos · card creada · {result.get("short_url")}')
    caja.write_section('trello_editor', {
        **result, 'retry_feedback_applied': retry_feedback or None,
    }, mode='replace', actor='creador-trello-editor-videos')
    return True, f'card creada · {result.get("short_url")}'

EXECUTORS = {
    'descargar': run_descargar,
    'analizar': run_analizar,
    'interpretar': run_interpretar,
    'buscar': run_buscar,
    'comparador': run_comparador,
    'trello_editor': run_trello_editor,
}


# ============================================================
# WIRE — wrapper supervisor + retroloop
# ============================================================

def run_with_rescue_ladder(caja: Caja, station_def: dict, dry_run: bool,
                            bridge: 'TaskStateBridge | None' = None) -> tuple[bool, str]:
    """Pipeline canónico de una estación con rescue ladder de Fer (msg 4789):

      1) Intento estándar (executor + supervisor.audit)
      2) Si falla → QC genera 3 hipótesis raíz distintas y se prueban en SERIE
      3) Si las 3 fallan → MASTER genera otras 3 hipótesis distintas y se prueban en SERIE
      4) Si las 3 del master fallan → ping Telegram a Fer pidiendo ayuda

    Devuelve (ok, msg) y deja en la caja la sección `_rescue_<sid>` con
    el dossier completo de qué se probó (para el master/forense)."""
    sid = station_def['id']
    executor = EXECUTORS.get(sid)
    if not executor:
        return False, f'no executor para {sid}'

    supervisor = Supervisor(station_def, dry_run=dry_run)

    sup_id = station_def.get('supervisor', {}).get('id', f'supervisor-{sid}')
    agent_names = [a.get('name', a.get('id','agent')) for a in station_def.get('agents', [])]
    agent_label = '/'.join(agent_names) or 'agente'

    def _attempt(retry_feedback: str) -> tuple[bool, str]:
        ok, exec_msg = executor(caja, station_def, dry_run, retry_feedback)
        if not ok:
            # Agente reporta fallo a su jefe (Fer msg 4880: "hola jefe ya termine, esta bien?")
            _emit('warn', f'{agent_label} → {sup_id}: «Jefe, fallé. {exec_msg[:120]}»')
            _emit('warn', f'{sup_id} → {agent_label}: «Recibido. Pasamos a hipótesis alternativas.»')
            return False, f'EXEC_FAIL · {exec_msg}'
        # El agente termina y avisa al jefe
        _emit('info', f'{agent_label} → {sup_id}: «Hola jefe, ya terminé {sid}. ¿Está bien? Mira el log.»')
        verdict, reason = supervisor.audit(caja)
        if verdict == 'APPROVE':
            _emit('success', f'{sup_id} → {agent_label}: «Revisado. Todo OK ({reason}). Aprobado, pasa al siguiente.»')
            return True, f'EXEC_OK · SUP_OK · {reason}'
        _emit('warn', f'{sup_id} → {agent_label}: «No me cuadra: {verdict} · {reason}. Tienes que corregir.»')
        return False, f'SUP_BREAK · {verdict} · {reason}'

    # 1) INTENTO ESTÁNDAR
    ok, msg = _attempt('')
    if ok:
        return True, f'std · {msg}'
    initial_break = msg
    if bridge:
        bridge.append_log('warn', f'⚠ {sid} estándar FAIL · {initial_break} · escalando a QC (3 hipótesis raíz)')

    # 2) QC × 3 (en SERIE)
    qc_hyps = qc_generate_3_hypotheses(station_def, caja, initial_break, dry_run)
    qc_attempts = []
    for h in qc_hyps:
        if bridge:
            bridge.append_log('info', f'🧪 QC {h.id} · {h.idea}')
        ok, msg = _attempt(h.feedback)
        qc_attempts.append({
            'hypothesis': h.to_dict(),
            'result': 'PASS' if ok else 'FAIL',
            'detail': msg,
        })
        if ok:
            caja.write_section(f'_rescue_{sid}', {
                'level_resolved': 'qc',
                'winning_hypothesis': h.to_dict(),
                'initial_break': initial_break,
                'qc_attempts': qc_attempts,
                'master_attempts': [],
                'fer_pinged': False,
                'ts': now_iso(),
            }, mode='replace', actor='rescue-ladder')
            if bridge:
                bridge.append_log('success', f'✓ QC {h.id} PASS · {msg}')
            return True, f'qc_{h.id} · {msg}'
        if bridge:
            bridge.append_log('warn', f'✗ QC {h.id} FAIL · {msg}')

    # 3) MASTER × 3 (en SERIE)
    if bridge:
        bridge.append_log('warn', f'⚠ {sid} las 3 QC fallaron · escalando a MASTER (QC central)')
    master_hyps = master_generate_3_hypotheses(station_def, caja, initial_break, qc_attempts, dry_run)
    master_attempts = []
    for h in master_hyps:
        if bridge:
            bridge.append_log('info', f'🧪 MASTER {h.id} · {h.idea}')
        ok, msg = _attempt(h.feedback)
        master_attempts.append({
            'hypothesis': h.to_dict(),
            'result': 'PASS' if ok else 'FAIL',
            'detail': msg,
        })
        if ok:
            caja.write_section(f'_rescue_{sid}', {
                'level_resolved': 'master',
                'winning_hypothesis': h.to_dict(),
                'initial_break': initial_break,
                'qc_attempts': qc_attempts,
                'master_attempts': master_attempts,
                'fer_pinged': False,
                'ts': now_iso(),
            }, mode='replace', actor='rescue-ladder')
            if bridge:
                bridge.append_log('success', f'✓ MASTER {h.id} PASS · {msg}')
            return True, f'master_{h.id} · {msg}'
        if bridge:
            bridge.append_log('warn', f'✗ MASTER {h.id} FAIL · {msg}')

    # 4) TELEGRAM SOS A FER
    if bridge:
        bridge.append_log('error', f'❌ {sid} 3+3 hipótesis FAIL · pidiendo ayuda a Fer por Telegram')
    pinged = telegram_alert_fer(station_def, caja, initial_break, qc_attempts, master_attempts, dry_run)
    caja.write_section(f'_rescue_{sid}', {
        'level_resolved': None,
        'winning_hypothesis': None,
        'initial_break': initial_break,
        'qc_attempts': qc_attempts,
        'master_attempts': master_attempts,
        'fer_pinged': bool(pinged),
        'ts': now_iso(),
    }, mode='replace', actor='rescue-ladder')
    return False, f'rescue_ladder_exhausted · fer_pinged={pinged}'


# Alias retro-compatible: orchestrator antiguo llamaba run_with_supervisor.
def run_with_supervisor(caja: Caja, station_def: dict, dry_run: bool,
                        max_retries: int = DEFAULT_MAX_RETRIES,
                        bridge: 'TaskStateBridge | None' = None) -> tuple[bool, str]:
    return run_with_rescue_ladder(caja, station_def, dry_run, bridge=bridge)


# ============================================================
# RUN A FLOW (start to finish)
# ============================================================

# ============================================================
# TASK STATE BRIDGE — espejo a _task_state.json para que factory_v4.html
# vea progreso real (REGLA #151 + #153). El HTML lee /api/task-state cada
# 2-3s y muestra los logs en la consola 3D + cross-browser sync.
# ============================================================

class TaskStateBridge:
    """Mantiene _task_state.json sincronizado con el progreso real del flow."""

    def __init__(self, flow_id: str, payload: dict, station_ids: list[str]):
        self.path = STATE_PATH
        self.flow_id = flow_id
        self.station_ids = station_ids
        self._init_state(payload)

    def _init_state(self, payload: dict):
        # Schema compatible con factory_v4.html restoreTaskState() y polling
        state = {
            'flow_id': self.flow_id,
            'input': payload,  # {flow, product, video_url, country, saturated, ...}
            'route': self.station_ids,
            'routeIdx': 0,
            'state': 'processing',
            'history': [],
            'transition_t': 0,
            'success': None,
            'demo_logs_completed': [],
            'logs': [],
            'started_at_flow_s': 0,
            'started_at_station_s': 0,
            'elapsed_at_flow_s': 0,
            'elapsed_at_station_s': 0,
            'saved_at_wall': int(time.time() * 1000),
        }
        self._save(state)

    def _load(self) -> dict:
        if self.path.exists():
            try:
                return json.loads(self.path.read_text(encoding='utf-8'))
            except Exception:
                pass
        return {}

    def _save(self, state: dict):
        state['saved_at_wall'] = int(time.time() * 1000)
        try:
            self.path.write_text(json.dumps(state, ensure_ascii=False, indent=2), encoding='utf-8')
        except Exception as e:
            print(f'[bridge] WARN: no se pudo escribir _task_state.json: {e}', file=sys.stderr)

    def append_log(self, level: str, text: str):
        s = self._load()
        if not s:
            return
        s.setdefault('logs', []).append({
            'ts': now_iso(),
            'level': level,
            'text': text,
        })
        # Cap a 200 entries (HTML solo muestra los últimos)
        # FIX 2026-05-09 (Fer msg 4854 "necesito sus consolas reales como mi terminal"):
        # cap subido de 200 a 1000. Antes se truncaban logs intermedios y la consola del
        # HTML mostraba "huecos" entre estaciones que parecían cosméticos.
        s['logs'] = s['logs'][-1000:]
        self._save(s)
        # Print stdout también para watcher logs
        try:
            print(f'[{level.upper()}] {text}')
        except Exception:
            pass

    def advance_station(self, route_idx: int):
        s = self._load()
        if not s:
            return
        s['routeIdx'] = route_idx
        s['state'] = 'processing'
        s['transition_t'] = 0
        s['started_at_station_s'] = 0
        s['saved_at_wall'] = int(time.time() * 1000)
        self._save(s)

    def mark_done(self, success: bool, summary: str = ''):
        if summary:
            self.append_log('success' if success else 'error', summary)
        s = self._load()
        if not s:
            return
        s['state'] = 'done'
        s['success'] = success
        self._save(s)

    def clear(self):
        if self.path.exists():
            try:
                self.path.unlink()
            except Exception:
                pass


def run_flow(inbox_path: Path, dry_run: bool = False, bridge_enabled: bool = True) -> dict:
    payload = json.loads(inbox_path.read_text(encoding='utf-8'))
    flow_id = derive_flow_id(payload, inbox_path)
    run_dir = RUNS_DIR / flow_id
    run_dir.mkdir(parents=True, exist_ok=True)
    caja = Caja(flow_id, run_dir, payload)

    meta = load_meta()
    stations = meta['stations']

    bridge = TaskStateBridge(flow_id, payload, [s['id'] for s in stations]) if bridge_enabled else None
    _set_global_bridge(bridge)  # accesible desde dentro de los executors vía _emit()

    log = []
    log.append(f'[start] flow_id={flow_id} · dry_run={dry_run} · stations={len(stations)}')
    if bridge:
        bridge.append_log('info', f'Flow {flow_id} arrancado · dry_run={dry_run} · 6 estaciones')

    for idx, station in enumerate(stations):
        sid = station['id']
        agent_names = [a["name"] for a in station.get("agents", [])]
        log.append(f'[station] {sid} · agentes={agent_names}')
        if bridge:
            bridge.advance_station(idx)
            bridge.append_log('info', f'▶ Entrando estación {sid.upper()} · agentes: {", ".join(agent_names)}')
            sup_id = station.get('supervisor', {}).get('id', 'sup_unknown')
            bridge.append_log('info', f'👁 Supervisor {sup_id} en posición · base PDF: PDF_{sid.upper()}.pdf')

        ok, msg = run_with_rescue_ladder(caja, station, dry_run, bridge=bridge)

        if not ok:
            log.append(f'[ABORT] {sid} · {msg}')
            if bridge:
                bridge.append_log('error', f'❌ {sid.upper()} FAIL · {msg}')
                bridge.mark_done(False, f'Flow abortado en {sid}: {msg}')
            return {'status': 'FAIL', 'flow_id': flow_id, 'station': sid, 'reason': msg, 'log': log, 'caja_path': str(caja.path)}
        log.append(f'[ok] {sid} · {msg}')
        if bridge:
            bridge.append_log('success', f'✓ {sid.upper()} OK · {msg}')

    log.append(f'[done] flow {flow_id} cerrado · caja={caja.path} · size={caja.size_bytes()}b')
    if bridge:
        bridge.append_log('success', f'🎯 Flow completado · 6 estaciones · caja {caja.size_bytes()}b')
        bridge.mark_done(True, f'Flow {flow_id} OK · caja {caja.size_bytes()}b')
    return {'status': 'OK', 'flow_id': flow_id, 'log': log, 'caja_path': str(caja.path)}


# ============================================================
# CLI
# ============================================================

def main():
    p = argparse.ArgumentParser()
    p.add_argument('--dry-run', action='store_true', help='Smoke test sin red/LLM/Trello reales')
    p.add_argument('--from-inbox', type=str, help='Ruta a inbox JSON (sino, watcher infinito)')
    p.add_argument('--once', action='store_true', help='Ejecuta un solo flow y sale')
    args = p.parse_args()

    if args.from_inbox:
        inbox_path = Path(args.from_inbox)
        if not inbox_path.is_absolute():
            inbox_path = ROOT / inbox_path
        if not inbox_path.exists():
            print(f'ERROR: inbox file no existe: {inbox_path}', file=sys.stderr)
            sys.exit(2)
        result = run_flow(inbox_path, dry_run=args.dry_run)
        print(json.dumps(result, ensure_ascii=False, indent=2))
        sys.exit(0 if result['status'] == 'OK' else 1)

    # watcher infinito
    print(f'[watcher] poll {INBOX_DIR} cada {POLL_INTERVAL_S}s · dry_run={args.dry_run}')
    while True:
        if STOP_FLAG.exists():
            print('[watcher] stop flag detectado · exit')
            return
        for f in sorted(INBOX_DIR.glob('flow_*.json')):
            done_marker = f.with_suffix('.done')
            if done_marker.exists():
                continue
            print(f'[watcher] procesando {f.name}')
            result = run_flow(f, dry_run=args.dry_run)
            done_marker.write_text(json.dumps(result, ensure_ascii=False, indent=2), encoding='utf-8')
            print(f'[watcher] {f.name} → {result["status"]}')
            if args.once:
                return
        time.sleep(POLL_INTERVAL_S)


if __name__ == '__main__':
    sys.stdout.reconfigure(encoding='utf-8')
    main()
