#!/usr/bin/env python3
"""
Swing Shift customer / CRM analysis

Reads a customer CSV export and produces:
- likely duplicate profiles
- customer segments
- likely membership prospects
- likely punch card prospects
- likely slow-day promo targets

Designed for compatibility with the current observed uSchedule customer export.
"""

from __future__ import annotations

import csv
import re
import sys
from collections import Counter, defaultdict
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Dict, Iterable, List, Optional, Tuple

INPUT_DEFAULT = Path("12_Output/customer_master_summary.csv")
OUTPUT_DIR = Path("12_Output")


@dataclass
class RowProfile:
    row_number: int
    raw: Dict[str, str]
    external_id: str
    account_id: str
    email: str
    phone: str
    first_name: str
    last_name: str
    full_name: str
    tags: str
    membership_status: str
    membership_plan: str
    punch_card_status: str
    lead_status: str
    total_visits: Optional[float]
    total_bookings: Optional[float]
    total_spend: Optional[float]
    avg_booking_hours: Optional[float]
    days_since_last_visit: Optional[float]
    days_since_last_booking: Optional[float]
    last_visit: Optional[datetime]
    last_booking: Optional[datetime]
    created_date: Optional[datetime]
    notes: str
    opt_out: str
    is_email_valid: str
    email_verification_result: str
    email_verification_level: Optional[float]


def normalize_header(header: str) -> str:
    return re.sub(r"[^a-z0-9]+", "_", (header or "").strip().lower()).strip("_")


def clean_string(value: str) -> str:
    return (value or "").strip()


def clean_lower(value: str) -> str:
    return clean_string(value).lower()


def normalize_email(value: str) -> str:
    return clean_lower(value)


def normalize_phone(value: str) -> str:
    digits = re.sub(r"\D", "", value or "")
    if len(digits) == 11 and digits.startswith("1"):
        digits = digits[1:]
    return digits


def parse_float(value: str) -> Optional[float]:
    if value is None:
        return None
    text = clean_string(value)
    if not text:
        return None
    text = text.replace("$", "").replace(",", "")
    try:
        return float(text)
    except ValueError:
        return None


def parse_datetime(value: str) -> Optional[datetime]:
    text = clean_string(value)
    if not text:
        return None
    for fmt in (
        "%Y-%m-%d",
        "%Y-%m-%d %H:%M:%S",
        "%Y-%m-%d %H:%M",
        "%m/%d/%Y",
        "%m/%d/%Y %H:%M",
        "%m/%d/%Y %H:%M:%S",
        "%m/%d/%Y %I:%M %p",
        "%m/%d/%Y %I:%M:%S %p",
        "%Y-%m-%dT%H:%M:%S",
        "%Y-%m-%dT%H:%M:%S.%f",
        "%Y-%m-%dT%H:%M:%S%z",
        "%Y-%m-%dT%H:%M:%S.%f%z",
    ):
        try:
            dt = datetime.strptime(text, fmt)
            if dt.tzinfo is None:
                dt = dt.replace(tzinfo=timezone.utc)
            return dt
        except ValueError:
            continue
    try:
        dt = datetime.fromisoformat(text.replace("Z", "+00:00"))
        if dt.tzinfo is None:
            dt = dt.replace(tzinfo=timezone.utc)
        return dt
    except ValueError:
        return None


def pick(row: Dict[str, str], *candidates: str) -> str:
    for candidate in candidates:
        if candidate in row and clean_string(row[candidate]):
            return clean_string(row[candidate])
    return ""


def pick_number(row: Dict[str, str], *candidates: str) -> Optional[float]:
    return parse_float(pick(row, *candidates))


def pick_datetime(row: Dict[str, str], *candidates: str) -> Optional[datetime]:
    return parse_datetime(pick(row, *candidates))


def derive_days_since(dt: Optional[datetime]) -> Optional[float]:
    if not dt:
        return None
    now = datetime.now(timezone.utc)
    return round((now - dt).total_seconds() / 86400, 1)


def load_rows(csv_path: Path) -> Tuple[List[Dict[str, str]], List[str], List[str]]:
    with csv_path.open("r", encoding="utf-8-sig", newline="") as f:
        reader = csv.DictReader(f)
        if not reader.fieldnames:
            raise ValueError("CSV has no header row.")
        original_headers = list(reader.fieldnames)
        normalized_headers = [normalize_header(h) for h in reader.fieldnames]
        rows: List[Dict[str, str]] = []
        for raw_row in reader:
            normalized_row = {}
            for original, normalized in zip(reader.fieldnames, normalized_headers):
                normalized_row[normalized] = raw_row.get(original, "")
            rows.append(normalized_row)
        return rows, normalized_headers, original_headers


def build_profile(row: Dict[str, str], row_number: int) -> RowProfile:
    first_name = pick(row, "first_name", "firstname", "given_name", "customer_first_name")
    last_name = pick(row, "last_name", "lastname", "surname", "family_name", "customer_last_name")
    full_name = " ".join(part for part in [first_name, last_name] if part).strip()
    if not full_name:
        full_name = pick(row, "full_name", "name", "customer_name", "display_name")

    membership_status = clean_lower(
        pick(
            row,
            "membership_status",
            "member_status",
            "is_member",
            "membership",
            "active_membership_status",
            "membership_state",
        )
    )
    membership_plan = pick(
        row,
        "membership_plan",
        "member_plan",
        "plan_name",
        "membership_name",
        "active_membership_name",
        "membership_type",
        "membershipplan",
    )
    punch_card_status = clean_lower(
        pick(
            row,
            "punch_card_status",
            "punchcard_status",
            "punch_card",
            "package_status",
            "package_name",
            "package_type",
            "haspackage",
            "packagenames",
        )
    )
    lead_status = clean_lower(
        pick(
            row,
            "lead_status",
            "status",
            "customer_status",
            "crm_status",
            "lifecycle_stage",
            "contact_status",
        )
    )

    last_visit = pick_datetime(
        row,
        "last_visit_date",
        "last_visit",
        "most_recent_visit",
        "last_checkin",
        "last_checked_in_at",
        "recent_visit_date",
    )
    last_booking = pick_datetime(
        row,
        "last_booking_date",
        "last_booking",
        "most_recent_booking",
        "recent_booking_date",
        "last_appointment_date",
        "lastbooking",
    )
    created_date = pick_datetime(row, "created_date", "createddate", "created_at", "date_created", "signup_date")

    return RowProfile(
        row_number=row_number,
        raw=row,
        external_id=pick(row, "customer_id", "customerid", "id", "profile_id", "contact_id", "member_id"),
        account_id=pick(row, "account_id", "accountid"),
        email=normalize_email(pick(row, "email", "email_address", "emailaddress", "e_mail")),
        phone=normalize_phone(pick(row, "phone", "phone_number", "mobile", "cell", "mobile_phone", "home_phone", "cellphone", "homephone")),
        first_name=first_name,
        last_name=last_name,
        full_name=full_name,
        tags=clean_lower(pick(row, "tags", "tag_list", "segments", "labels", "customer_tags")),
        membership_status=membership_status,
        membership_plan=membership_plan,
        punch_card_status=punch_card_status,
        lead_status=lead_status,
        total_visits=pick_number(row, "total_visits", "visit_count", "visits", "lifetime_visits", "check_in_count"),
        total_bookings=pick_number(row, "total_bookings", "booking_count", "bookings", "lifetime_bookings", "appointment_count", "reservation_count", "bookingcount"),
        total_spend=pick_number(row, "total_spend", "lifetime_value", "revenue", "amount_spent", "gross_spend", "total_revenue", "grossrevenueproxy"),
        avg_booking_hours=pick_number(row, "avg_booking_hours", "average_booking_hours", "avg_hours", "average_hours", "average_booking_length"),
        days_since_last_visit=pick_number(row, "days_since_last_visit"),
        days_since_last_booking=pick_number(row, "days_since_last_booking"),
        last_visit=last_visit,
        last_booking=last_booking,
        created_date=created_date,
        notes=pick(row, "notes", "internal_notes", "customer_notes", "comment", "comments"),
        opt_out=clean_lower(pick(row, "opt_out", "optout", "email_opt_out", "marketing_opt_out")),
        is_email_valid=clean_lower(pick(row, "is_email_valid", "isemailvalid")),
        email_verification_result=clean_lower(pick(row, "email_verification_result", "emailverificationresult", "email_result", "emailresult")),
        email_verification_level=pick_number(row, "email_verification_level", "emailverificationlevel"),
    )


def backfill_days(profile: RowProfile) -> None:
    if profile.days_since_last_visit is None:
        profile.days_since_last_visit = derive_days_since(profile.last_visit)
    if profile.days_since_last_booking is None:
        profile.days_since_last_booking = derive_days_since(profile.last_booking)


def days_since_created(profile: RowProfile) -> Optional[float]:
    return derive_days_since(profile.created_date)


def is_active_member(profile: RowProfile) -> bool:
    text = " ".join([profile.membership_status, profile.membership_plan, profile.tags]).lower()
    return any(
        term in text
        for term in ["member", "full swing", "solo swing", "first swing", "perfect swing", "active membership"]
    )


def is_punch_card_customer(profile: RowProfile) -> bool:
    text = " ".join([profile.punch_card_status, profile.tags, profile.lead_status]).lower()
    return "punch" in text or "package" in text or "swing savers" in text


def safe_number(value: Optional[float], default: float = 0.0) -> float:
    return float(value) if value is not None else default


def can_market(profile: RowProfile) -> bool:
    return profile.opt_out not in {"yes", "true", "1"}


def has_valid_email(profile: RowProfile) -> bool:
    if not profile.email:
        return False
    if profile.is_email_valid in {"false", "no", "0"}:
        return False
    return True


def classify_segment(profile: RowProfile) -> Tuple[str, str]:
    visits = safe_number(profile.total_visits, safe_number(profile.total_bookings))
    status_text = " ".join([profile.lead_status, profile.tags, profile.notes, profile.email_verification_result]).lower()
    created_recency = days_since_created(profile)

    if is_active_member(profile):
        return "member", "Active membership indicators present"
    if is_punch_card_customer(profile):
        return "punch_card_customer", "Punch card/package indicators present"
    if any(term in status_text for term in ["event", "corporate", "birthday", "private event", "group booking"]):
        return "event_prospect", "Event/group intent found in available CRM text"
    if "guest" in status_text:
        return "guest_prospect", "Guest indicator found in available CRM text"
    if visits >= 2:
        return "repeat_customer", f"{visits:.0f} visits/bookings suggests repeat behavior"
    if visits >= 1:
        return "first_time_customer", f"{visits:.0f} visit/booking suggests first-time customer"
    if any(term in status_text for term in ["warm", "lead", "interested", "inquiry", "prospect"]):
        return "warm_lead", "Lead/warm intent found in CRM fields"
    if created_recency is not None and created_recency <= 45:
        return "new_lead", "Recently created profile with no richer behavior data yet"
    return "new_lead", "Profile exists but export lacks behavior/status fields for stronger classification"


def membership_prospect(profile: RowProfile, segment: str) -> Tuple[str, int, str]:
    if is_active_member(profile):
        return "no", 0, "Already a member"
    if is_punch_card_customer(profile):
        base = 45
        reason = "Punch card customer may convert if usage becomes frequent"
    else:
        base = 0
        reason = ""

    visits = safe_number(profile.total_visits, safe_number(profile.total_bookings))
    spend = safe_number(profile.total_spend)
    recency = profile.days_since_last_visit if profile.days_since_last_visit is not None else profile.days_since_last_booking

    score = base
    reasons = []
    if segment in {"repeat_customer", "first_time_customer"}:
        reasons.append(f"segment={segment}")
    if visits >= 6:
        score += 35
        reasons.append("6+ visits/bookings")
    elif visits >= 3:
        score += 20
        reasons.append("3+ visits/bookings")
    if spend >= 300:
        score += 20
        reasons.append("high cumulative spend")
    elif spend >= 150:
        score += 10
        reasons.append("moderate cumulative spend")
    if recency is not None and recency <= 30:
        score += 15
        reasons.append("recent activity")
    if profile.avg_booking_hours is not None and profile.avg_booking_hours >= 1.5:
        score += 10
        reasons.append("longer average sessions")

    if score >= 60:
        return "yes", score, "; ".join(reasons) or reason or "Strong repeat-use pattern"
    return "no", score, "; ".join(reasons) or reason or "Insufficient repeat-use signal in this export"


def punch_card_prospect(profile: RowProfile, segment: str) -> Tuple[str, int, str]:
    if is_active_member(profile):
        return "no", 0, "Already a member"
    if is_punch_card_customer(profile):
        return "no", 0, "Already a punch card/package customer"

    visits = safe_number(profile.total_visits, safe_number(profile.total_bookings))
    spend = safe_number(profile.total_spend)
    recency = profile.days_since_last_visit if profile.days_since_last_visit is not None else profile.days_since_last_booking

    score = 0
    reasons = []
    if segment in {"repeat_customer", "first_time_customer", "warm_lead"}:
        score += 10
        reasons.append(f"segment={segment}")
    if 2 <= visits <= 5:
        score += 30
        reasons.append("moderate repeat-use pattern")
    elif visits >= 6:
        score += 15
        reasons.append("high usage may fit membership better, but punch card still possible")
    if 60 <= spend <= 300:
        score += 20
        reasons.append("spend range fits prepay upsell")
    if recency is not None and recency <= 45:
        score += 20
        reasons.append("recent activity")
    if profile.avg_booking_hours is not None and 1.0 <= profile.avg_booking_hours <= 2.5:
        score += 10
        reasons.append("session length fits flexible package use")

    if score >= 50:
        return "yes", score, "; ".join(reasons) or "Flexible value-focused customer"
    return "no", score, "; ".join(reasons) or "Insufficient behavior/spend signal in this export"


def slow_day_target(profile: RowProfile, segment: str) -> Tuple[str, int, str]:
    if not has_valid_email(profile) or not can_market(profile):
        return "no", 0, "Unusable for promo outreach due to invalid email or opt-out"
    if is_active_member(profile):
        return "no", 10, "Members usually need a different retention workflow"

    recency = profile.days_since_last_visit if profile.days_since_last_visit is not None else profile.days_since_last_booking
    created_recency = days_since_created(profile)
    visits = safe_number(profile.total_visits, safe_number(profile.total_bookings))

    score = 0
    reasons = []
    if segment in {"warm_lead", "first_time_customer", "repeat_customer", "guest_prospect", "new_lead"}:
        score += 20
        reasons.append(f"segment={segment}")
    if recency is not None and 14 <= recency <= 120:
        score += 30
        reasons.append("not too recent, not fully cold")
    elif recency is not None and recency < 14:
        score += 10
        reasons.append("recent enough to re-engage")
    elif recency is not None and recency > 120:
        score += 10
        reasons.append("cold reactivation candidate")
    elif created_recency is not None and created_recency <= 120:
        score += 20
        reasons.append("recently created lead with marketable email")
    if 0 <= visits <= 3:
        score += 20
        reasons.append("good nurture / reactivation candidate")
    if profile.email_verification_level is not None and profile.email_verification_level >= 4:
        score += 10
        reasons.append("strong email verification level")
    if "event" in profile.tags or "corporate" in profile.tags:
        score -= 15
        reasons.append("event-oriented contact, not ideal for standard slow-day offer")

    if score >= 40:
        return "yes", score, "; ".join(reasons) or "Fits slow-day activation profile"
    return "no", score, "; ".join(reasons) or "Not a strong slow-day promo target from this export"


def duplicate_candidates(profiles: List[RowProfile]) -> List[Dict[str, str]]:
    by_email: Dict[str, List[RowProfile]] = defaultdict(list)
    by_phone: Dict[str, List[RowProfile]] = defaultdict(list)
    by_name_email: Dict[Tuple[str, str], List[RowProfile]] = defaultdict(list)
    by_name_phone: Dict[Tuple[str, str], List[RowProfile]] = defaultdict(list)

    for p in profiles:
        if p.email:
            by_email[p.email].append(p)
        if p.phone:
            by_phone[p.phone].append(p)
        if p.full_name and p.email:
            by_name_email[(p.full_name.lower(), p.email)].append(p)
        if p.full_name and p.phone:
            by_name_phone[(p.full_name.lower(), p.phone)].append(p)

    rows: List[Dict[str, str]] = []
    seen = set()

    def add_group(group: List[RowProfile], reason: str, confidence: str) -> None:
        if len(group) < 2:
            return
        ordered = sorted(group, key=lambda x: x.row_number)
        key = tuple(p.row_number for p in ordered)
        if key in seen:
            return
        seen.add(key)
        primary = ordered[0]
        for dup in ordered[1:]:
            rows.append({
                "primary_row": str(primary.row_number),
                "duplicate_row": str(dup.row_number),
                "primary_customer_id": primary.external_id,
                "duplicate_customer_id": dup.external_id,
                "primary_name": primary.full_name,
                "duplicate_name": dup.full_name,
                "primary_email": primary.email,
                "duplicate_email": dup.email,
                "primary_phone": primary.phone,
                "duplicate_phone": dup.phone,
                "confidence": confidence,
                "reason": reason,
            })

    for email, group in by_email.items():
        if email:
            add_group(group, "Exact same email address", "high")
    for phone, group in by_phone.items():
        if phone:
            add_group(group, "Exact same phone number", "high")
    for _, group in by_name_email.items():
        add_group(group, "Matching full name + email", "high")
    for _, group in by_name_phone.items():
        add_group(group, "Matching full name + phone", "high")

    return rows


def write_csv(path: Path, rows: Iterable[Dict[str, object]], fieldnames: List[str]) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    with path.open("w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        for row in rows:
            writer.writerow({k: row.get(k, "") for k in fieldnames})


def analyze(csv_path: Path) -> Dict[str, int]:
    raw_rows, headers, original_headers = load_rows(csv_path)
    profiles = [build_profile(r, i + 2) for i, r in enumerate(raw_rows)]
    for profile in profiles:
        backfill_days(profile)

    duplicate_rows = duplicate_candidates(profiles)
    segment_rows = []
    membership_rows = []
    punch_rows = []
    slow_rows = []

    for p in profiles:
        segment, segment_reason = classify_segment(p)
        member_flag, member_score, member_reason = membership_prospect(p, segment)
        punch_flag, punch_score, punch_reason = punch_card_prospect(p, segment)
        slow_flag, slow_score, slow_reason = slow_day_target(p, segment)

        base = {
            "row_number": p.row_number,
            "external_id": p.external_id,
            "account_id": p.account_id,
            "full_name": p.full_name,
            "email": p.email,
            "phone": p.phone,
            "customer_segment": segment,
            "segment_reason": segment_reason,
            "lead_status": p.lead_status,
            "membership_status": p.membership_status,
            "membership_plan": p.membership_plan,
            "punch_card_status": p.punch_card_status,
            "total_visits": p.total_visits,
            "total_bookings": p.total_bookings,
            "total_spend": p.total_spend,
            "avg_booking_hours": p.avg_booking_hours,
            "days_since_last_visit": p.days_since_last_visit,
            "days_since_last_booking": p.days_since_last_booking,
            "created_days_ago": days_since_created(p),
            "opt_out": p.opt_out,
            "is_email_valid": p.is_email_valid,
            "email_verification_result": p.email_verification_result,
            "email_verification_level": p.email_verification_level,
            "tags": p.tags,
        }

        segment_rows.append(base)
        if member_flag == "yes":
            membership_rows.append({**base, "membership_prospect_score": member_score, "reason": member_reason})
        if punch_flag == "yes":
            punch_rows.append({**base, "punch_card_prospect_score": punch_score, "reason": punch_reason})
        if slow_flag == "yes":
            slow_rows.append({**base, "slow_day_target_score": slow_score, "reason": slow_reason})

    summary_rows = [{"metric": "total_profiles", "value": len(profiles)}]
    segment_counts = Counter(row["customer_segment"] for row in segment_rows)
    for segment, count in segment_counts.items():
        summary_rows.append({"metric": f"segment_{segment}", "value": count})
    summary_rows.extend([
        {"metric": "likely_duplicates", "value": len(duplicate_rows)},
        {"metric": "membership_prospects", "value": len(membership_rows)},
        {"metric": "punch_card_prospects", "value": len(punch_rows)},
        {"metric": "slow_day_targets", "value": len(slow_rows)},
        {"metric": "valid_emails", "value": sum(1 for p in profiles if has_valid_email(p))},
        {"metric": "marketable_contacts", "value": sum(1 for p in profiles if has_valid_email(p) and can_market(p))},
    ])

    write_csv(OUTPUT_DIR / "customer_segments.csv", segment_rows, [
        "row_number", "external_id", "account_id", "full_name", "email", "phone",
        "customer_segment", "segment_reason", "lead_status", "membership_status", "membership_plan",
        "punch_card_status", "total_visits", "total_bookings", "total_spend", "avg_booking_hours",
        "days_since_last_visit", "days_since_last_booking", "created_days_ago", "opt_out",
        "is_email_valid", "email_verification_result", "email_verification_level", "tags",
    ])
    write_csv(OUTPUT_DIR / "likely_duplicates.csv", duplicate_rows, [
        "primary_row", "duplicate_row", "primary_customer_id", "duplicate_customer_id",
        "primary_name", "duplicate_name", "primary_email", "duplicate_email",
        "primary_phone", "duplicate_phone", "confidence", "reason",
    ])
    write_csv(OUTPUT_DIR / "membership_prospects.csv", membership_rows, [
        "row_number", "external_id", "account_id", "full_name", "email", "phone",
        "membership_prospect_score", "reason", "customer_segment", "lead_status",
        "membership_status", "membership_plan", "punch_card_status", "total_visits",
        "total_bookings", "total_spend", "avg_booking_hours", "days_since_last_visit",
        "days_since_last_booking", "created_days_ago", "opt_out", "is_email_valid",
        "email_verification_result", "email_verification_level", "tags",
    ])
    write_csv(OUTPUT_DIR / "punch_card_prospects.csv", punch_rows, [
        "row_number", "external_id", "account_id", "full_name", "email", "phone",
        "punch_card_prospect_score", "reason", "customer_segment", "lead_status",
        "membership_status", "membership_plan", "punch_card_status", "total_visits",
        "total_bookings", "total_spend", "avg_booking_hours", "days_since_last_visit",
        "days_since_last_booking", "created_days_ago", "opt_out", "is_email_valid",
        "email_verification_result", "email_verification_level", "tags",
    ])
    write_csv(OUTPUT_DIR / "slow_day_promo_targets.csv", slow_rows, [
        "row_number", "external_id", "account_id", "full_name", "email", "phone",
        "slow_day_target_score", "reason", "customer_segment", "lead_status",
        "membership_status", "membership_plan", "punch_card_status", "total_visits",
        "total_bookings", "total_spend", "avg_booking_hours", "days_since_last_visit",
        "days_since_last_booking", "created_days_ago", "opt_out", "is_email_valid",
        "email_verification_result", "email_verification_level", "tags",
    ])
    write_csv(OUTPUT_DIR / "analysis_summary.csv", summary_rows, ["metric", "value"])
    write_csv(OUTPUT_DIR / "detected_input_headers.csv", [{"original_header": oh, "normalized_header": nh} for oh, nh in zip(original_headers, headers)], ["original_header", "normalized_header"])

    print(f"Analyzed {len(profiles)} profiles from {csv_path}")
    print(f"Wrote outputs to {OUTPUT_DIR.resolve()}")
    print(f"Detected {len(duplicate_rows)} likely duplicates")
    print(f"Found {len(membership_rows)} likely membership prospects")
    print(f"Found {len(punch_rows)} likely punch card prospects")
    print(f"Found {len(slow_rows)} likely slow-day promo targets")
    print("CSV headers detected:")
    for original, normalized in zip(original_headers, headers):
        print(f"- {original} -> {normalized}")

    return {
        "profiles": len(profiles),
        "duplicates": len(duplicate_rows),
        "membership_prospects": len(membership_rows),
        "punch_card_prospects": len(punch_rows),
        "slow_day_targets": len(slow_rows),
        "valid_emails": sum(1 for p in profiles if has_valid_email(p)),
        "marketable_contacts": sum(1 for p in profiles if has_valid_email(p) and can_market(p)),
    }


if __name__ == "__main__":
    csv_path = Path(sys.argv[1]) if len(sys.argv) > 1 else INPUT_DEFAULT
    if not csv_path.exists():
        print(f"CSV not found: {csv_path}")
        print("Put your export in 11_Input/ and run:")
        print("  python3 13_Scripts/customer_analysis.py 11_Input/your_export.csv")
        sys.exit(1)
    analyze(csv_path)
