# /var/www/html/bot/app/crawler/me_org_pipeline.py
import os, re, time, uuid, hashlib, logging
from typing import List, Dict, Tuple
from datetime import datetime, timezone

import requests
from bs4 import BeautifulSoup

from app.services.supabase_service import get_client


LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO").upper()
logging.basicConfig(level=LOG_LEVEL, format="[%(levelname)s] %(message)s")
logger = logging.getLogger("me_org_pipeline")

BASE = "https://me.go.kr"
LIST_URL = BASE + "/home/web/index.do?menuId=10428"
HEADERS = {"User-Agent": "GovBot/1.0 (+https://work.jjickjjicks.com)"}
PHONE_RE = re.compile(r"\d{2,4}-\d{3,4}-\d{4}")


def _clean(s: str | None) -> str:
    return " ".join((s or "").replace("\u00A0", " ").replace("\u200b", "").split())


def _norm(s: str) -> str:
    return _clean(s)


def _key_hash(name: str, position: str, department: str) -> str:
    base = f"{_norm(name)}|{_norm(position)}|{_norm(department)}"
    return hashlib.sha256(base.encode()).hexdigest()


def _row_hash(name: str, position: str, department: str, task: str, phone: str) -> str:
    base = f"{_norm(name)}|{_norm(position)}|{_norm(department)}|{_norm(task)}|{_norm(phone)}"
    return hashlib.sha256(base.encode()).hexdigest()


def strip_position_from_name(name_raw: str, position_raw: str) -> str:
    n = _clean(name_raw)
    p = _clean(position_raw)
    if not n or not p:
        return n
    original = n
    base = re.split(r"[ \t(\[/·\-]", p)[0].strip()
    candidates = [c for c in {p, base} if c]
    for cand in candidates:
        for suf in (cand, " " + cand):
            if n.endswith(suf):
                n = _clean(n[: -len(suf)]).rstrip(" -·/–—")
        n = re.sub(rf"\s*\(\s*{re.escape(cand)}\s*\)\s*$", "", n).rstrip(" -·/–—")
        n = re.sub(rf"\s*{re.escape(cand)}\s*\([^)]+\)\s*$", "", n).rstrip(" -·/–—")
        if n.endswith(cand):
            n = _clean(n[: -len(cand)]).rstrip(" -·/–—")
    n = _clean(n)
    return n if n else original


def _retry_get(sess: requests.Session, url: str, timeout=30, tries=3, backoff=0.7) -> requests.Response:
    last = None
    for i in range(tries):
        try:
            r = sess.get(url, headers=HEADERS, timeout=timeout)
            r.raise_for_status()
            return r
        except Exception as e:
            last = e
            sleep = backoff * (2 ** i)
            logger.warning("GET retry %d/%d in %.1fs: %s", i + 1, tries, sleep, url)
            time.sleep(sleep)
    raise last  # type: ignore


def fetch_org_links(sess: requests.Session) -> List[Dict]:
    """Parse the org chart page and extract unit links.

    The ME org page tends to link each unit to a detail page. We conservatively
    collect anchors that look like org detail links. Heuristics:
    - href starts with '/' or 'http'
    - and contains one of: 'org', 'dept', 'team' or ends with 'index.do' while including menuId
    - avoid javascript/hash links
    """
    r = _retry_get(sess, LIST_URL)
    soup = BeautifulSoup(r.text, "html.parser")

    seen: set[str] = set()
    out: List[Dict] = []
    for a in soup.find_all("a"):
        href = (a.get("href") or "").strip()
        if not href or href.startswith(("#", "javascript:")):
            continue

        # Normalize to absolute
        link = href if href.startswith("http") else (BASE + href if href.startswith("/") else BASE + "/" + href)

        # Only keep plausible org detail links
        h = href.lower()
        if not (
            ("menuid=10428" in h) or ("/org" in h) or ("org" in h) or ("dept" in h) or ("team" in h)
        ):
            continue

        if link in seen:
            continue
        seen.add(link)
        out.append({"link": link, "title": _clean(a.get("title") or a.get_text())})
    return out


def _guess_department_from_page(soup: BeautifulSoup) -> str:
    # try common headings first
    for sel in ("h1", "h2", "h3", "strong.tit", "div.title h2", "div.title h3"):
        h = soup.select_one(sel)
        if h:
            tx = _clean(h.get_text())
            if tx and len(tx) <= 30:
                return tx
    # fallback: breadcrumb last item
    for sel in (".breadcrumb li:last-child", ".location li:last-child", ".path li:last-child"):
        li = soup.select_one(sel)
        if li:
            tx = _clean(li.get_text())
            if tx:
                return tx
    return ""


def parse_org_detail(sess: requests.Session, link: str) -> Tuple[List[Dict], str, str]:
    r = _retry_get(sess, link)
    html = r.text
    soup = BeautifulSoup(html, "html.parser")

    # Identify department context from page
    page_department = _guess_department_from_page(soup)

    rows: List[Dict] = []

    # Prefer tables with thead having human-readable headers
    candidate_tables = soup.select("table")
    for table in candidate_tables:
        headers = [
            _clean(th.get_text(" ", strip=True)).lower()
            for th in table.find_all("th")
        ]
        if headers and not any(h for h in headers if any(k in h for k in ["성명", "이름", "직위", "직급", "연락", "전화", "담당"])):
            continue

        for tr in table.find_all("tr"):
            if tr.find("th"):
                continue
            tds = tr.find_all("td")
            if len(tds) < 3:
                continue

            texts = [
                _clean(td.get_text(" ", strip=True)) for td in tds
            ]

            # Try to map columns by common patterns
            department, name, position, phone, task = "", "", "", "", ""

            # If table header exists, map by header names
            if headers and len(headers) == len(tds):
                for h, td in zip(headers, tds):
                    t = _clean(td.get_text(" ", strip=True))
                    if any(k in h for k in ["부서", "부서명", "소속"]):
                        department = t
                    elif any(k in h for k in ["성명", "이름"]):
                        name = t
                    elif any(k in h for k in ["직위", "직급", "직책"]):
                        position = t
                    elif any(k in h for k in ["전화", "연락", "연락처", "번호"]):
                        tel_a = td.select_one('a[href^="tel:"]')
                        phone = (
                            tel_a.get("href").replace("tel:", "").strip()
                            if tel_a and tel_a.get("href")
                            else _clean(td.get_text(" ", strip=True))
                        )
                        m = PHONE_RE.search(phone)
                        phone = m.group(0) if m else phone
                    elif "담당" in h or "업무" in h:
                        task = t
                if not department:
                    department = page_department
            else:
                # Best-effort positional mapping for 4~5 columns
                # common: [부서, 성명, 직위, 전화, 담당업무] or [성명, 직위, 전화, 담당업무]
                cols = [c for c in texts if c]
                if len(cols) >= 4:
                    if len(tds) >= 5:
                        department, name, position, phone, task = cols[0], cols[1], cols[2], cols[3], " ".join(cols[4:])
                    else:
                        name, position, phone = cols[0], cols[1], cols[2]
                        task = " ".join(cols[3:]) if len(cols) > 3 else ""
                        department = page_department

                    # refine phone
                    m = PHONE_RE.search(phone)
                    if m:
                        phone = m.group(0)
                else:
                    continue

            name = strip_position_from_name(name, position)
            if not (name or position or department or task or phone):
                continue
            rows.append(
                {
                    "department": department,
                    "name": name,
                    "position": position,
                    "phone": phone,
                    "task": task,
                }
            )

    return rows, link, html


def _count_cur(sb) -> int:
    try:
        res = sb.table("me_org_cur").select("key_hash", count="exact").execute()
        return res.count or 0
    except Exception:
        return 0


def _validate(stg_count: int, cur_count: int, *, min_abs: int, min_ratio: float) -> Tuple[bool, str]:
    if stg_count < min_abs:
        return False, f"too_few_rows: {stg_count} < {min_abs}"
    if cur_count > 0 and stg_count < int(cur_count * min_ratio):
        return False, f"ratio_drop: {stg_count} < {int(cur_count * min_ratio)} (cur={cur_count})"
    return True, "ok"


def _load_current_map(sb) -> Dict[str, Dict]:
    rows = (sb.table("me_org_cur").select("*").limit(50000).execute().data) or []
    return {r["key_hash"]: r for r in rows}


def _load_stg_map(sb, run_id: str) -> Dict[str, Dict]:
    rows = (sb.table("me_org_stg").select("*").eq("run_id", run_id).limit(50000).execute().data) or []
    return {r["key_hash"]: r for r in rows}


def _apply_scd2(sb, run_id: str):
    cur_map = _load_current_map(sb)
    stg_map = _load_stg_map(sb, run_id)
    to_close, to_add = [], []
    now_iso = datetime.now(timezone.utc).isoformat()

    for k in cur_map.keys():
        if k not in stg_map:
            to_close.append(k)

    for k, v in stg_map.items():
        if (k not in cur_map) or (cur_map[k]["row_hash"] != v["row_hash"]):
            to_add.append(
                {
                    "key_hash": k,
                    "department": v["department"],
                    "name": v["name"],
                    "position": v["position"],
                    "task": v.get("task"),
                    "phone": v.get("phone"),
                    "row_hash": v["row_hash"],
                    "valid_from": now_iso,
                    "valid_to": None,
                    "is_current": True,
                }
            )

    if to_close:
        sb.table("me_org_hist").update({"valid_to": now_iso, "is_current": False}).in_("key_hash", to_close).eq(
            "is_current", True
        ).execute()
    if to_add:
        sb.table("me_org_hist").insert(to_add).execute()


def run_once(sleep_sec: float = 0.2, min_abs: int = 500, min_ratio: float = 0.6) -> Dict:
    sb = get_client()
    # Preflight: ensure required tables/views exist so we fail fast with a clear message
    try:
        for t in ("me_org_raw", "me_org_stg", "me_org_hist", "me_org_snapshot", "me_org_cur"):
            try:
                sb.table(t).select("*").limit(1).execute()
            except Exception:
                raise RuntimeError(
                    f"Supabase object '{t}' is missing. Run sql/me_org_pipeline.sql in your Supabase project before running this crawler."
                )
    except Exception as preflight_err:
        logger.error("[me_org] schema check failed: %s", preflight_err)
        run_id = str(uuid.uuid4())
        # Best-effort log to crawler_run if table exists, otherwise just raise
        try:
            sb.table("crawler_run").insert({
                "id": run_id,
                "target": "me_org",
                "status": "aborted",
                "fail_reason": str(preflight_err),
            }).execute()
        except Exception:
            pass
        return {"run_id": run_id, "status": "aborted", "error": str(preflight_err)}
    run_id = str(uuid.uuid4())
    sb.table("crawler_run").insert({"id": run_id, "target": "me_org", "status": "running"}).execute()

    pages = 0
    collected = 0
    try:
        with requests.Session() as sess:
            # Save list page HTML
            list_r = _retry_get(sess, LIST_URL)
            sb.table("me_org_raw").upsert({
                "run_id": run_id,
                "kind": "list",
                "page": 0,
                "url": LIST_URL,
                "html": list_r.text,
            }).execute()

            links = fetch_org_links(sess)
            for i, item in enumerate(links, 1):
                rows, url, html = parse_org_detail(sess, item["link"])
                sb.table("me_org_raw").upsert({
                    "run_id": run_id,
                    "kind": "detail",
                    "page": i,
                    "url": url,
                    "html": html,
                }).execute()

                if rows:
                    payload = []
                    for r in rows:
                        k = _key_hash(r["name"], r["position"], r["department"])
                        h = _row_hash(
                            r["name"], r["position"], r["department"], r.get("task", ""), r.get("phone", "")
                        )
                        payload.append({"run_id": run_id, **r, "key_hash": k, "row_hash": h})
                    sb.table("me_org_stg").upsert(payload).execute()

                collected += len(rows)
                pages += 1
                time.sleep(sleep_sec)

        # snapshot and validate
        sb.table("me_org_snapshot").insert(
            {"run_id": run_id, "rows": collected, "status": "collected", "note": f"pages={pages}"}
        ).execute()

        cur_count = _count_cur(sb)
        ok, reason = _validate(collected, cur_count, min_abs=min_abs, min_ratio=min_ratio)
        if not ok:
            sb.table("crawler_run").update(
                {
                    "status": "failed",
                    "finished_at": datetime.now(timezone.utc).isoformat(),
                    "pages": pages,
                    "rows": collected,
                    "fail_reason": reason,
                }
            ).eq("id", run_id).execute()
            sb.table("me_org_snapshot").update({"status": "failed", "note": reason}).eq("run_id", run_id).execute()
            logger.error("[me_org] validation failed: %s", reason)
            return {"run_id": run_id, "status": "failed", "reason": reason}

        _apply_scd2(sb, run_id)
        sb.table("me_org_snapshot").update({"status": "passed"}).eq("run_id", run_id).execute()
        sb.table("crawler_run").update(
            {
                "status": "passed",
                "finished_at": datetime.now(timezone.utc).isoformat(),
                "pages": pages,
                "rows": collected,
            }
        ).eq("id", run_id).execute()
        logger.info("[me_org] run passed: run_id=%s, rows=%d, pages=%d", run_id, collected, pages)
        return {"run_id": run_id, "status": "passed", "rows": collected, "pages": pages}

    except Exception as e:
        sb.table("crawler_run").update(
            {
                "status": "aborted",
                "finished_at": datetime.now(timezone.utc).isoformat(),
                "pages": pages,
                "rows": collected,
                "fail_reason": str(e),
            }
        ).eq("id", run_id).execute()
        logger.exception("[me_org] run aborted: %s", e)
        return {"run_id": run_id, "status": "aborted", "error": str(e)}


if __name__ == "__main__":
    print(run_once())
