"""skills/buscador_paralelo.py — BUSCAR (estación 4) · 4 paralelos coordinados

CONFIRMO_LECTURA_AGENTE_CANONICOS — leídas memorias:
  - feedback_brief_editor_3_secciones.md (estructura PDF brief)
  - feedback_sweet_spot_30_100d_uk_usa_au_cascada_producto_msg4309.md (REGLA #141)
  - REGLA #141 REFINADA msg 644: país INPUT primero → luego UK/USA/AU/CA

Skill canónica: buscador-paralelo-fb-gethookd.
Stage 3: implementación REAL via claude CLI subprocess (ThreadPoolExecutor 4 workers).

Cada buscador (fb_1, fb_2, gh_1, gh_2):
  1. Lee shared_state.json — evita queries y video_ids ya tomados
  2. Reserva su query
  3. Invoca `claude --print` con instrucción de usar la skill apropiada:
       fb_*  → apify-fb-ads skill (FB Ads Library)
       gh_*  → video-referencia-hunter subagent (GetHookd)
  4. Cada subprocess Claude descarga candidatos, los transcribe y devuelve JSON
  5. Verifica match con _strategy.match_criteria
  6. Si <2 candidatos válidos → reintenta con query alternativa (max 5)
  7. Escribe candidatos validados en su sección + actualiza shared_state

Coordinación entre los 4: shared_state.json (lock implícito vía atomic write).

REGLAS aplicadas:
  - Sweet spot scaled_days 30-100 ESTRICTO (REGLA #141)
  - Cascada países: input PRIMERO → UK → USA → AU → CA (REGLA #141 REFINADA msg 644)
  - Cascada productos: exacto → similar → nicho → macro (REGLA #46)
  - Si saturated: rechazar UGC talking-head básico problem→solution

Inputs (de caja.interpretar):
  queries_fb       = [{buscador, query, filter_country}, ...]
  queries_gethookd = [{buscador, filters}, ...]
  match_criteria   = {awareness, sof, deseo_keywords, avatar_age_range, ...}
  cascada_paises   = [country_input, UK, USA, AU, CA]
"""
from __future__ import annotations
import json
import threading
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
from skills._lib import SkillContext, SkillResult


def run(inputs: dict, context: SkillContext) -> SkillResult:
    caja = context.read_caja()
    interp = caja.get('sections', {}).get('interpretar') or {}
    if not interp:
        return SkillResult(ok=False, errors=['caja.interpretar vacío — INTERPRETAR debió correr antes'])

    queries_fb = interp.get('queries_fb') or []
    queries_gh = interp.get('queries_gethookd') or []
    match_criteria = interp.get('match_criteria') or {}
    cascada = interp.get('cascada_paises') or ['ES', 'UK', 'USA', 'AU', 'CA']

    if len(queries_fb) < 2 or len(queries_gh) < 2:
        return SkillResult(ok=False, errors=[f'Esperaba ≥2 queries FB y ≥2 GH · recibí FB={len(queries_fb)} GH={len(queries_gh)}'])

    # Asegurar shared_state path
    if not context.shared_state_path:
        ssp = context.run_dir / '_buscar_shared_state.json'
    else:
        ssp = context.shared_state_path
    ssp.parent.mkdir(parents=True, exist_ok=True)
    if not ssp.exists():
        ssp.write_text(json.dumps({
            'queries_in_progress': [],
            'queries_done': [],
            'video_ids_already_evaluated': [],
            'candidates_found_so_far': [],
            'claims': {},
        }, ensure_ascii=False, indent=2), encoding='utf-8')

    state_lock = threading.Lock()

    tasks = []
    for q in queries_fb[:2]:
        tasks.append(('fb', q.get('buscador', 'fb_x'), q))
    for q in queries_gh[:2]:
        tasks.append(('gh', q.get('buscador', 'gh_x'), q))

    all_validated = []
    all_rejected_count = 0
    per_agent_summary = {}

    with ThreadPoolExecutor(max_workers=4) as ex:
        futures = {
            ex.submit(_run_one_buscador, kind, agent_id, q, match_criteria, cascada, ssp, state_lock, context): agent_id
            for kind, agent_id, q in tasks
        }
        for fut in as_completed(futures):
            agent_id = futures[fut]
            try:
                res = fut.result()
                per_agent_summary[agent_id] = res.get('summary', {})
                all_validated.extend(res.get('validated', []))
                all_rejected_count += int(res.get('rejected_count', 0))
            except Exception as e:
                per_agent_summary[agent_id] = {'error': str(e)}

    if len(all_validated) < 6:
        return SkillResult(
            ok=False,
            errors=[f'Total candidatos validados ({len(all_validated)}) < 6 · COMPARADOR no podrá elegir 5+1'],
            metadata={'per_agent': per_agent_summary},
        )

    context.write_section('buscar', {
        'shared_state_path': str(ssp),
        'cascada_paises': cascada,
        'agents_run': [t[1] for t in tasks],
        'candidates_total': len(all_validated),
        'candidates_validated': all_validated,
        'rejected_count': all_rejected_count,
        'per_agent_summary': per_agent_summary,
    }, mode='replace', actor='buscadores_x4')
    return SkillResult(ok=True, output={'candidates_total': len(all_validated)})


def _run_one_buscador(kind: str, agent_id: str, query_def: dict, match_criteria: dict,
                     cascada: list, shared_state_path: Path, lock: threading.Lock,
                     context: SkillContext, max_retries: int = 5) -> dict:
    validated: list = []
    rejected_count = 0
    queries_used = []
    last_attempt = 0

    for attempt in range(max_retries):
        last_attempt = attempt
        query_payload = _next_query(query_def, attempt)
        with lock:
            ss = json.loads(shared_state_path.read_text(encoding='utf-8'))
            qkey = json.dumps(query_payload, sort_keys=True)
            if qkey in ss.get('queries_in_progress', []) or qkey in ss.get('queries_done', []):
                continue
            ss.setdefault('queries_in_progress', []).append(qkey)
            ss.setdefault('claims', {}).setdefault(agent_id, []).append({'q': query_payload, 'ts': time.time()})
            shared_state_path.write_text(json.dumps(ss, ensure_ascii=False, indent=2), encoding='utf-8')
        queries_used.append(query_payload)

        if kind == 'fb':
            prompt = _build_fb_prompt(query_payload, match_criteria, cascada, agent_id)
        else:
            prompt = _build_gh_prompt(query_payload, match_criteria, cascada, agent_id)

        res = context.claude_cli(prompt, model='claude-opus-4-7', timeout_s=600)
        candidates = _parse_candidates_from_claude(res.get('stdout', ''))

        with lock:
            ss = json.loads(shared_state_path.read_text(encoding='utf-8'))
            taken_ids = set(ss.get('video_ids_already_evaluated', []))
            new_validated = []
            for c in candidates:
                vid = c.get('video_id') or c.get('video_url') or ''
                if not vid or vid in taken_ids:
                    continue
                taken_ids.add(vid)
                if c.get('match', True):
                    c['agent_id'] = agent_id
                    validated.append(c)
                    new_validated.append(c)
                else:
                    rejected_count += 1
            ss['video_ids_already_evaluated'] = sorted(taken_ids)
            ss.setdefault('candidates_found_so_far', []).extend(new_validated)
            qkey_now = json.dumps(query_payload, sort_keys=True)
            if qkey_now in ss['queries_in_progress']:
                ss['queries_in_progress'].remove(qkey_now)
            ss.setdefault('queries_done', []).append(qkey_now)
            shared_state_path.write_text(json.dumps(ss, ensure_ascii=False, indent=2), encoding='utf-8')

        if len(validated) >= 2:
            break

    return {
        'validated': validated[:5],
        'rejected_count': rejected_count,
        'summary': {
            'agent_id': agent_id,
            'kind': kind,
            'queries_used': queries_used,
            'validated_count': len(validated),
            'attempts': last_attempt + 1,
        }
    }


def _next_query(query_def: dict, attempt: int) -> dict:
    if attempt == 0:
        return query_def
    alt = dict(query_def)
    alt['_retry_attempt'] = attempt
    if 'query' in alt:
        alt['query'] = f'{alt["query"]} (alt {attempt})'
    return alt


def _build_fb_prompt(q: dict, match: dict, cascada: list, agent_id: str) -> str:
    return f"""Eres el agente {agent_id} del flow Factory v4 · BUSCAR sub-proceso · Facebook Ads Library.

CONFIRMO_LECTURA_AGENTE_CANONICOS

Usa la skill apify-fb-ads (skill canónica de Fer para minar Facebook Ads Library) para buscar videos que matcheen:

QUERY: {q.get('query', '?')}
COUNTRY filter (cascada con país INPUT primero): {cascada}
MATCH CRITERIA del ganador:
{json.dumps(match, ensure_ascii=False, indent=2)}

REGLAS DURAS:
- Sweet spot scaled_days 30-100 ESTRICTO (REGLA #141 msg 4309).
- País INPUT PRIMERO en filter (REGLA #141 REFINADA msg 644 Fer 2026-05-07) → luego UK/USA/AU/CA fallback.
- Cascada productos: exacto → similar → nicho → macro (REGLA #46).
- Verificar match en awareness, sof, jergas, deseo, avatar, fuerza_cambio, formato.
- Si saturated: rechazar UGC talking-head básico problem→solution.
- Min 2 candidatos validados, max 5.

OUTPUT — devuelve SOLO JSON (sin texto antes ni después):
{{
  "candidates": [
    {{
      "video_id": "<id único>",
      "video_url": "<url>",
      "match": true,
      "match_score": 0.85,
      "match_fields_passed": ["awareness", "deseo", "avatar"],
      "country": "ES",
      "scaled_days": 45,
      "transcription_summary": "<1-2 frases>"
    }}
  ]
}}"""


def _build_gh_prompt(q: dict, match: dict, cascada: list, agent_id: str) -> str:
    return f"""Eres el agente {agent_id} del flow Factory v4 · BUSCAR sub-proceso · GetHookd.

CONFIRMO_LECTURA_AGENTE_CANONICOS

Usa el subagente video-referencia-hunter para buscar videos en GetHookd que matcheen:

FILTERS: {json.dumps(q.get('filters', {}), ensure_ascii=False)}
COUNTRY cascada (país INPUT primero, luego UK/USA/AU/CA): {cascada}
MATCH CRITERIA del ganador:
{json.dumps(match, ensure_ascii=False, indent=2)}

REGLAS DURAS:
- Sweet spot scaled_days 30-100 ESTRICTO (REGLA #141 msg 4309).
- País INPUT PRIMERO (REGLA #141 REFINADA msg 644 Fer 2026-05-07).
- Cascada productos: exacto → similar → nicho → macro (REGLA #46).
- Si saturated: rechazar UGC talking-head básico problem→solution.
- Min 2 candidatos validados, max 5.

OUTPUT — devuelve SOLO JSON (sin texto antes ni después):
{{
  "candidates": [
    {{
      "video_id": "<id GetHookd>",
      "video_url": "<url>",
      "match": true,
      "match_score": 0.85,
      "match_fields_passed": ["awareness", "deseo", "avatar"],
      "country": "UK",
      "scaled_days": 60,
      "transcription_summary": "<1-2 frases>"
    }}
  ]
}}"""


def _parse_candidates_from_claude(stdout: str) -> list:
    s = stdout.strip()
    if s.startswith('```'):
        parts = s.split('```', 2)
        if len(parts) >= 2:
            s = parts[1]
            if s.startswith('json'):
                s = s[4:].strip()
    try:
        first = s.index('{')
        last = s.rindex('}')
        parsed = json.loads(s[first:last+1])
        return parsed.get('candidates') or []
    except Exception:
        return []
