# /var/www/html/bot/app/crawler/motie_org_pipeline.py
# -*- coding: utf-8 -*-
"""
MOTIE 조직도 크롤러(운영) — /view + empSearch 병행 수집
- 메인(조직도)에서 jsSearchOrgan(...) 4개 인자 추출 → POST /view 로 직원표 파싱
- empSearch 전체 페이지 크롤링 병행 → /view 미노출 인원 보강(파견 등)
- 스테이징에서 dedupe → finalize_motie_run 로 SCD2 반영
"""

import re
import time
import uuid
import hashlib
import logging
from typing import List, Dict, Tuple, Optional
from datetime import datetime, timezone

import requests
from bs4 import BeautifulSoup

from app.services.supabase_service import get_client

logging.basicConfig(level=logging.INFO, format="[%(levelname)s] %(message)s")
logger = logging.getLogger("motie_org_pipeline")

# -----------------------
# 상수/패턴
# -----------------------
BASE = "https://www.motie.go.kr"
HEADQUARTERS_URL = f"{BASE}/kor/26/headquarters"
INSTITUTION_URL  = f"{BASE}/kor/28/institution"

EMP_BASE = f"{BASE}/kor/25/empSearch"

BASE_HEADERS = {
    "User-Agent": "GovBot/3.0 (+https://work.jjickjjicks.com)",
    "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
    "Accept-Language": "ko-KR,ko;q=0.9,en-US;q=0.8,en;q=0.7",
}
EMP_HEADERS = {"User-Agent": BASE_HEADERS["User-Agent"]}

PHONE_RE = re.compile(r"\d{2,4}-\d{3,4}-\d{4}")
# onclick="headquarters.jsSearchOrgan('deptCdV','depth1Id','depth2Id','depth3Id')"
JS_ORGAN_RE = re.compile(r"jsSearchOrgan\('([^']*)','([^']*)','([^']*)','([^']*)'\)")

# -----------------------
# 문자열/키/해시 유틸
# -----------------------
def _clean(s: Optional[str]) -> str:
    return " ".join((s or "").replace("\u00A0", " ").replace("\u200b", "").split())

def _norm(s: str) -> str:
    return _clean(s)

def _key_hash(name: str, position: str, department: str) -> str:
    base = f"{_norm(name)}|{_norm(position)}|{_norm(department)}"
    return hashlib.sha256(base.encode()).hexdigest()

def _row_hash(name: str, position: str, department: str, task: str, phone: str) -> str:
    base = f"{_norm(name)}|{_norm(position)}|{_norm(department)}|{_norm(task)}|{_norm(phone)}"
    return hashlib.sha256(base.encode()).hexdigest()

# -----------------------
# HTTP 보조
# -----------------------
def _request(sess: requests.Session, method: str, url: str, *, headers=None, data=None, max_try=3, sleep=0.6, **kw):
    last = None
    h = dict(BASE_HEADERS); h.update(headers or {})
    for i in range(1, max_try + 1):
        try:
            if method.upper() == "POST":
                r = sess.post(url, headers=h, data=data, timeout=30, **kw)
            else:
                r = sess.get(url, headers=h, timeout=30, **kw)
            r.raise_for_status()
            return r
        except Exception as e:
            last = e
            logger.warning("%s failed(%d/%d): %s", method.upper(), i, max_try, url)
            time.sleep(sleep * i)
    raise last  # type: ignore

def _get(sess, url, **kw):  return _request(sess, "GET", url, **kw)
def _post(sess, url, data=None, **kw): return _request(sess, "POST", url, data=data, **kw)

# -----------------------
# 조직(본부/소속기관) 타깃 추출
# -----------------------
def _is_org_name(txt: str) -> bool:
    t = _clean(txt)
    return bool(t) and t.endswith(("과", "관", "국", "실", "팀"))

def _infer_path(a) -> List[str]:
    path, el, hops = [], a.parent, 0
    while el is not None and hops < 8:
        for sel in ("strong", "span", "p", "a"):
            for x in el.find_all(sel, recursive=False):
                tx = _clean(x.get_text())
                if _is_org_name(tx) and tx not in path:
                    path.insert(0, tx)
        el = el.parent; hops += 1
    out, seen = [], set()
    for p in path:
        if p not in seen:
            seen.add(p); out.append(p)
    return out

def _parse_org_tree(sess: requests.Session, url: str, scope: str) -> List[Dict]:
    html = _get(sess, url, headers={"Referer": url}).text
    soup = BeautifulSoup(html, "html.parser")
    targets: Dict[Tuple[str, str, str, str, str], Dict] = {}

    for a in soup.select('a[onclick*="jsSearchOrgan"]'):
        name = _clean(a.get_text())
        if not _is_org_name(name):
            continue
        oc = a.get("onclick") or ""
        m = JS_ORGAN_RE.search(oc)
        if not m:
            continue
        deptCdV, depth1Id, depth2Id, depth3Id = (m.group(i).strip() for i in range(1, 5))
        key = (name, deptCdV, depth1Id, depth2Id, depth3Id)
        if key not in targets:
            targets[key] = {
                "name": name,
                "deptCdV": deptCdV,
                "depth1Id": depth1Id,
                "depth2Id": depth2Id,
                "depth3Id": depth3Id,
                "path": _infer_path(a),
                "scope": scope,  # headquarters | institution
            }
    out = list(targets.values())
    logger.info("[%s] targets: %d", scope, len(out))
    return out

# -----------------------
# 상세(직원표) 파서 (공통)
# -----------------------
def _parse_detail_table_from_html(html: str) -> List[Dict]:
    soup = BeautifulSoup(html, "html.parser")
    anchor = soup.select_one("h4.tit-type02")
    table = anchor.find_next("table") if anchor else soup.select_one(".data-tbl table") or soup.select_one("table")
    if not table:
        return []
    rows: List[Dict] = []
    for tr in table.select("tbody tr"):
        tds = tr.find_all("td")
        if len(tds) < 4:
            continue
        name = _clean(tds[0].get_text())
        position = _clean(tds[1].get_text())
        task = _clean(tds[2].get_text(" ", strip=True))
        phone_cell = tds[3]
        tel_a = phone_cell.select_one('a[href^="tel:"]')
        if tel_a and tel_a.get("href"):
            phone = tel_a.get("href").replace("tel:", "").strip()
        else:
            last_text = _clean(phone_cell.get_text(" ", strip=True))
            m = PHONE_RE.search(last_text)
            phone = m.group(0) if m else last_text
        if not any([name, position, task, phone]):
            continue
        rows.append({"name": name, "position": position, "task": task, "phone": phone})
    return rows

# -----------------------
# /view POST (조직도 경로)
# -----------------------
def _post_view(sess: requests.Session, scope: str, *, deptCdV: str, depth1Id: str, depth2Id: str, depth3Id: str) -> Optional[str]:
    if scope == "headquarters":
        view_url = f"{BASE}/kor/26/headquarters/view"
        referer  = HEADQUARTERS_URL
    else:
        view_url = f"{BASE}/kor/28/institution/view"
        referer  = INSTITUTION_URL

    form = {
        "deptCdV": deptCdV,
        "depth1Id": depth1Id,
        "depth2Id": depth2Id,
        "depth3Id": depth3Id,
    }
    headers = {
        "Referer": referer,
        "Content-Type": "application/x-www-form-urlencoded; charset=UTF-8",
    }
    logger.info("[try:POST view] %s %s", view_url, form)

    try:
        html = _post(sess, view_url, data=form, headers=headers).text
        if _parse_detail_table_from_html(html):
            return html
    except Exception:
        pass
    return None

def _fetch_dept_detail(sess: requests.Session, dept: Dict) -> Tuple[List[Dict], Optional[str], str]:
    """dept: {"name","scope","deptCdV","depth1Id","depth2Id","depth3Id"}"""
    tried = f"{dept['scope']}/view POST deptCdV={dept['deptCdV']}, depth1Id={dept['depth1Id']}, depth2Id={dept['depth2Id']}, depth3Id={dept['depth3Id']}"
    html = _post_view(
        sess,
        dept["scope"],
        deptCdV=dept["deptCdV"],
        depth1Id=dept["depth1Id"],
        depth2Id=dept["depth2Id"],
        depth3Id=dept["depth3Id"],
    )
    if not html:
        return [], None, tried
    rows = _parse_detail_table_from_html(html)
    if not rows:
        return [], None, tried
    return rows, html, tried

# -----------------------
# empSearch 페이지 수집
# -----------------------
def _emp_fetch_last_page(sess: requests.Session) -> int:
    r = _get(sess, EMP_BASE, headers=EMP_HEADERS)
    soup = BeautifulSoup(r.text, "html.parser")
    last = soup.select_one("a.direction.last")
    if not last:
        return 1
    m = re.search(r"empSearch\.list\((\d+)\)", last.get("onclick", ""))
    return int(m.group(1)) if m else 1

def _emp_parse_page(sess: requests.Session, page: int) -> Tuple[List[Dict], str]:
    url = f"{EMP_BASE}?pageIndex={page}"
    r = _get(sess, url, headers=EMP_HEADERS)
    html = r.text
    soup = BeautifulSoup(html, "html.parser")

    rows: List[Dict] = []
    for tr in soup.select("table tbody tr"):
        tds = tr.find_all("td")
        if len(tds) < 4:
            continue

        name = _clean(tds[0].get_text(strip=True))
        position = _clean(tds[1].get_text(strip=True))
        # department 셀: 종종 '부서소개' 링크가 함께 있어 잘못 파싱되는 경우가 있어 정제
        dep_td = tds[2]
        department = ""
        # 1) a 태그 중 '부서소개', '조직도' 등 안내 링크는 제외하고 첫 의미있는 텍스트를 채택
        bad_labels = {"부서소개", "조직도"}
        a_list = dep_td.find_all("a")
        for a in a_list:
            at = _clean(a.get_text(strip=True))
            if at and at not in bad_labels:
                department = at
                break
        # 2) 앵커에서 고르지 못했다면 전체 텍스트에서 불필요한 토큰 제거
        if not department:
            dep_text = _clean(dep_td.get_text(" ", strip=True))
            if dep_text:
                parts = [p for p in dep_text.split() if p and p not in bad_labels]
                department = " ".join(parts)
        department = _clean(department)

        phone_td = tds[-1]
        tel_a = phone_td.select_one('a[href^="tel:"]')
        if tel_a and tel_a.get("href"):
            phone = tel_a.get("href").replace("tel:", "").strip()
        else:
            last_text = _clean(phone_td.get_text(" ", strip=True))
            m = PHONE_RE.search(last_text)
            phone = m.group(0) if m else last_text

        task = (
            " ".join(_clean(td.get_text(" ", strip=True)) for td in tds[3:-1]).strip()
            if len(tds) > 4
            else ""
        )
        if phone and task:
            task = task.replace(phone, "").strip()

        # 업로드 제외: 파견 & 기재부
        if department == "파견" and "기획재정부" in task:
            logger.debug("skip 파견·기재부: %s/%s", name, position)
            continue

        if not any([name, position, department, task, phone]):
            continue
        rows.append(
            {"name": name, "position": position, "department": department, "task": task, "phone": phone}
        )
    return rows, html

# -----------------------
# DB 보조
# -----------------------
def _count_cur(sb) -> int:
    """현재(open) 스냅샷 수 — 뷰(motie_org_cur)에서 읽기"""
    try:
        res = sb.table("motie_org_cur").select("key_hash", count="exact").execute()
        return res.count or 0
    except Exception:
        return 0

def _validate(stg_count: int, cur_count: int, *, min_abs: int, min_ratio: float) -> Tuple[bool, str]:
    if stg_count < min_abs:
        return False, f"too_few_rows: {stg_count} < {min_abs}"
    if cur_count > 0 and stg_count < int(cur_count * min_ratio):
        return False, f"ratio_drop: {stg_count} < {int(cur_count * min_ratio)} (cur={cur_count})"
    return True, "ok"

# -----------------------
# 스테이징 준비/도우미
# -----------------------
def _merge_tasks(a: Optional[str], b: Optional[str]) -> str:
    parts = [p.strip() for p in (a or "").split("/") if p.strip()] + [p.strip() for p in (b or "").split("/") if p.strip()]
    seen, out = set(), []
    for p in parts:
        if p not in seen:
            seen.add(p); out.append(p)
    return " / ".join(out)

def _pick_better_phone(a: Optional[str], b: Optional[str]) -> str:
    a = (a or "").strip()
    b = (b or "").strip()
    a_ok = bool(PHONE_RE.fullmatch(a))
    b_ok = bool(PHONE_RE.fullmatch(b))
    if a_ok and not b_ok:
        return a
    if b_ok and not a_ok:
        return b
    if not a and b:
        return b
    if not b and a:
        return a
    return a if len(a) >= len(b) else b

def _dedupe_stage_rows(rows: List[Dict]) -> List[Dict]:
    by_key: Dict[str, Dict] = {}
    for r in rows:
        k = r["key_hash"]
        if k not in by_key:
            by_key[k] = dict(r)
            continue
        base = by_key[k]
        base["task"] = _merge_tasks(base.get("task"), r.get("task"))
        base["phone"] = _pick_better_phone(base.get("phone"), r.get("phone"))
    for v in by_key.values():
        v["row_hash"] = _row_hash(
            v["name"], v["position"], v["department"], v.get("task", ""), v.get("phone", "")
        )
    return list(by_key.values())

def _chunked_upsert(sb, table: str, rows: List[Dict], *, chunk: int = 1000):
    for i in range(0, len(rows), chunk):
        part = rows[i : i + chunk]
        if not part:
            continue
        sb.table(table).upsert(part).execute()

def _prepare_stage_batch_from_view(run_id: str, dept_name: str, rows: List[Dict]) -> List[Dict]:
    staged: List[Dict] = []
    for r in rows:
        k = _key_hash(r["name"], r["position"], dept_name)
        staged.append(
            {
                "run_id": run_id,
                "name": r["name"],
                "position": r["position"],
                "department": dept_name,
                "task": r.get("task", ""),
                "phone": r.get("phone", ""),
                "key_hash": k,
            }
        )
    return _dedupe_stage_rows(staged)

def _prepare_stage_batch_from_emp(run_id: str, rows: List[Dict]) -> List[Dict]:
    staged: List[Dict] = []
    for r in rows:
        dep = r.get("department", "")
        k = _key_hash(r["name"], r["position"], dep)
        staged.append(
            {
                "run_id": run_id,
                "name": r["name"],
                "position": r["position"],
                "department": dep,
                "task": r.get("task", ""),
                "phone": r.get("phone", ""),
                "key_hash": k,
            }
        )
    return _dedupe_stage_rows(staged)

# -----------------------
# 실행 본체
# -----------------------
def run_once(sleep_sec: float = 0.2, min_abs: int = 200, min_ratio: float = 0.1) -> Dict:
    sb = get_client()
    run_id = str(uuid.uuid4())
    sb.table("crawler_run").insert({"id": run_id, "target": "motie_org", "status": "running"}).execute()

    pages = 0
    collected = 0
    try:
        with requests.Session() as sess:
            # 1) 조직도 타깃 수집
            hq_targets   = _parse_org_tree(sess, HEADQUARTERS_URL, "headquarters")
            inst_targets = _parse_org_tree(sess, INSTITUTION_URL, "institution")
            targets = hq_targets + inst_targets

            # 2) 조직도 상세(/view POST)
            for t in targets:
                dept_name = t["name"]
                rows_view, html, tried = _fetch_dept_detail(sess, t)

                # MISS 기록
                if not rows_view and tried:
                    try:
                        sb.table("motie_org_raw").upsert({
                            "run_id": run_id,
                            "page": f"MISS:{t['scope']}:{t['deptCdV'] or dept_name}",
                            "html": tried[:8000],
                        }).execute()
                    except Exception:
                        pass

                if rows_view:
                    stage = _prepare_stage_batch_from_view(run_id, dept_name, rows_view)
                    if stage:
                        _chunked_upsert(sb, "motie_org_stg", stage, chunk=1000)
                        collected += len(stage)

                if html is not None:
                    sb.table("motie_org_raw").upsert(
                        {"run_id": run_id, "page": f"{t['scope']}:{t['deptCdV'] or dept_name}", "html": html}
                    ).execute()

                pages += 1
                time.sleep(sleep_sec)

            # 3) empSearch 병행 수집
            last = _emp_fetch_last_page(sess)
            for p in range(1, last + 1):
                rows_emp, html_emp = _emp_parse_page(sess, p)

                # 원문 저장(페이지별)
                sb.table("motie_org_raw").upsert(
                    {"run_id": run_id, "page": f"emp:{p}", "html": html_emp}
                ).execute()

                if rows_emp:
                    stage_emp = _prepare_stage_batch_from_emp(run_id, rows_emp)
                    if stage_emp:
                        _chunked_upsert(sb, "motie_org_stg", stage_emp, chunk=1000)
                        collected += len(stage_emp)

                pages += 1
                time.sleep(0.1)

        # 4) 스냅샷/검증
        sb.table("motie_org_snapshot").insert(
            {"run_id": run_id, "rows": collected, "status": "collected", "note": f"pages={pages}"}
        ).execute()
        cur_count = _count_cur(sb)
        ok, reason = _validate(collected, cur_count, min_abs=min_abs, min_ratio=min_ratio)
        if not ok:
            now = datetime.now(timezone.utc).isoformat()
            sb.table("crawler_run").update(
                {"status": "failed", "finished_at": now, "pages": pages, "rows": collected, "fail_reason": reason}
            ).eq("id", run_id).execute()
            sb.table("motie_org_snapshot").update({"status": "failed", "note": reason}).eq("run_id", run_id).execute()
            logger.error("[motie_org] validation failed: %s", reason)
            return {"run_id": run_id, "status": "failed", "reason": reason}

        # 5) SCD2 반영: DB 원자 함수 호출 (cur는 뷰이므로 절대 쓰지 않음)
        res = sb.rpc("finalize_motie_run", {"p_run_id": run_id}).execute()
        logger.info("[motie_org] finalize_motie_run: %s", res.data)

        # Best-effort: attribute hist rows of this run with open/close_run_id using motie_org_raw window
        try:
            raw = (
                sb.table("motie_org_raw")
                .select("fetched_at")
                .eq("run_id", run_id)
                .order("fetched_at", desc=False)
                .limit(50000)
                .execute()
                .data
                or []
            )
            if raw:
                start_at = raw[0].get("fetched_at")
                end_at = raw[-1].get("fetched_at")
                # widen by small buffer to be safe
                from datetime import datetime, timedelta
                def _to_dt(s: str):
                    try:
                        return datetime.fromisoformat(str(s).replace("Z", "+00:00"))
                    except Exception:
                        return None
                st = _to_dt(start_at); en = _to_dt(end_at)
                if st and en:
                    stb = (st - timedelta(hours=2)).isoformat()
                    enb = (en + timedelta(hours=2)).isoformat()
                    # open_run_id backfill for this run's window
                    try:
                        sb.table("motie_org_hist").update({"open_run_id": run_id}).is_("open_run_id", "null").gte("valid_from", stb).lte("valid_from", enb).execute()
                    except Exception:
                        pass
                    # close_run_id backfill for this run's window
                    try:
                        sb.table("motie_org_hist").update({"close_run_id": run_id}).is_("close_run_id", "null").not_.is_("valid_to", "null").gte("valid_to", stb).lte("valid_to", enb).execute()
                    except Exception:
                        pass
        except Exception:
            logger.warning("[motie_org] run_id attribution post-finalize skipped (no raw window)")

        now = datetime.now(timezone.utc).isoformat()
        sb.table("motie_org_snapshot").update({"status": "passed"}).eq("run_id", run_id).execute()
        sb.table("crawler_run").update(
            {"status": "passed", "finished_at": now, "pages": pages, "rows": collected}
        ).eq("id", run_id).execute()
        logger.info("[motie_org] run passed: run_id=%s, rows=%d, pages=%d", run_id, collected, pages)
        return {"run_id": run_id, "status": "passed", "rows": collected, "pages": pages}

    except Exception as e:
        now = datetime.now(timezone.utc).isoformat()
        try:
            sb.table("crawler_run").update(
                {"status": "aborted", "finished_at": now, "pages": pages, "rows": collected, "fail_reason": str(e)}
            ).eq("id", run_id).execute()
        except Exception:
            pass
        logger.exception("[motie_org] run aborted: %s", e)
        return {"run_id": run_id, "status": "aborted", "error": str(e)}

if __name__ == "__main__":
    print(run_once())
