"""Index local folder tool - walk, parse, summarize, save."""

from collections.abc import Generator
import hashlib
import logging
import os
import threading
import time
from collections import defaultdict
from functools import lru_cache
from pathlib import Path
from typing import Optional
import re

import pathspec

logger = logging.getLogger(__name__)

from .. import config as _config
from ..parser import parse_file, LANGUAGE_EXTENSIONS, get_language_for_path
from ..parser.context import discover_providers, enrich_symbols, collect_metadata, collect_extra_imports
from ..parser.context.framework_profiles import detect_framework, profile_to_meta
from ..parser.imports import extract_imports, _alias_map_cache as _imap_cache, _LANGUAGE_EXTRACTORS as _IMPORT_EXTRACTORS
from ..security import (
    validate_path,
    is_symlink_escape,
    is_secret_file,
    is_binary_file,
    should_exclude_file,
    DEFAULT_MAX_FILE_SIZE,
    get_max_folder_files,
    get_extra_ignore_patterns,
    get_skip_directories,
    SKIP_FILES
)
from ..storage import IndexStore
from ..storage.index_store import _file_hash, _file_hash_bytes, _get_git_head
from ..summarizer import summarize_symbols
from ..reindex_state import WatcherChange
from ..path_map import parse_path_map, remap

SKIP_FILES_REGEX = re.compile("(" + "|".join(re.escape(p) for p in SKIP_FILES) + ")$")

def _build_skip_dirs_regex() -> re.Pattern:
    """Build regex from config-filtered skip directories (called per-index)."""
    dirs = get_skip_directories()
    return re.compile("^(" + "|".join(dirs) + ")$")


def _maybe_apply_adaptive(folder_path: str, result: dict) -> None:
    """Apply adaptive language config if enabled. Never raises."""
    if not isinstance(result, dict) or not result.get("success"):
        return
    detected = set(result.get("languages", {}).keys())
    if not detected:
        return
    try:
        from ..config import apply_adaptive_languages
        apply_adaptive_languages(str(folder_path), detected)
    except Exception:
        logger.debug("adaptive language update skipped", exc_info=True)


def get_filtered_files(path: str) -> Generator[str, None, None]:
    """Generator function to filter directories and files"""
    skip_dirs_regex = _build_skip_dirs_regex()
    # Use os.walk with followlinks=False to avoid infinite loops caused by
    # NTFS junctions or symlinks pointing back to ancestor directories.
    for dirpath, dirnames, filenames in os.walk(path, followlinks=False):
        # Don't walk directories that should be skipped
        dirnames[:] = [dir for dir in dirnames if not skip_dirs_regex.match(dir)]
        dpath = Path(dirpath)
        for file in filenames:
            if not SKIP_FILES_REGEX.search(file):
                yield dpath / file


def _load_gitignore(folder_path: Path) -> Optional[pathspec.PathSpec]:
    """Load .gitignore from the folder root if it exists."""
    gitignore_path = folder_path / ".gitignore"
    if gitignore_path.is_file():
        try:
            content = gitignore_path.read_text(encoding="utf-8", errors="replace")
            return pathspec.PathSpec.from_lines("gitignore", content.splitlines())
        except Exception:
            pass
    return None


def _load_all_gitignores(root: Path) -> dict[Path, pathspec.PathSpec]:
    """Load all .gitignore files in the tree, keyed by their directory.

    Supports monorepos and poncho-style projects where subdirectories each
    have their own .gitignore (e.g. cap/.gitignore, core/.gitignore).

    Uses os.walk(followlinks=False) to avoid infinite loops caused by
    NTFS junctions or symlinks pointing back to ancestor directories.
    """
    specs: dict[Path, pathspec.PathSpec] = {}
    for dirpath, dirnames, filenames in os.walk(str(root), followlinks=False):
        if ".gitignore" in filenames:
            gitignore_path = Path(dirpath) / ".gitignore"
            try:
                content = gitignore_path.read_text(encoding="utf-8", errors="replace")
                spec = pathspec.PathSpec.from_lines("gitignore", content.splitlines())
                specs[gitignore_path.parent.resolve()] = spec
            except Exception:
                pass
    return specs


@lru_cache(maxsize=512)
def _is_trusted(
    folder_path: Path, trusted_folders: tuple, whitelist_mode: bool = True
) -> bool:
    """Return True when folder_path is trusted.

    whitelist_mode=True (default): trusted_folders contains trusted paths
    whitelist_mode=False: trusted_folders contains untrusted paths (blacklist)

    Empty list returns False (nothing explicitly trusted) for backward compatibility.
    The trust check is skipped for empty list, but the broad check uses this value.
    """
    if not trusted_folders:
        # Empty list: nothing explicitly trusted (backward compatible)
        return False

    is_in_list = any(
        folder_path == Path(trusted_folder)
        or Path(trusted_folder) in folder_path.parents
        for trusted_folder in trusted_folders
    )

    return is_in_list if whitelist_mode else not is_in_list

def _is_gitignored(file_path: Path, gitignore_specs: dict[Path, pathspec.PathSpec]) -> bool:
    """Check if a file is excluded by any .gitignore in its ancestor chain.

    Each spec is applied relative to its own directory, matching standard git behaviour.
    """
    for gitignore_dir, spec in gitignore_specs.items():
        try:
            rel = file_path.relative_to(gitignore_dir)
            if spec.match_file(rel.as_posix()):
                return True
        except ValueError:
            continue
    return False


def _is_gitignored_fast(resolved_str: str, specs: list[tuple[str, "pathspec.PathSpec"]]) -> bool:
    """String-based gitignore check — avoids Path.relative_to() overhead.

    Same semantics as _is_gitignored but uses string prefix matching instead
    of Path operations (~10x faster in the inner loop). Uses os.path.normcase
    for the prefix comparison so the check is case-insensitive on Windows.
    """
    resolved_norm = os.path.normcase(resolved_str)
    for dir_prefix, spec in specs:
        if not resolved_norm.startswith(os.path.normcase(dir_prefix)):
            continue
        rel = resolved_str[len(dir_prefix):].replace("\\", "/")
        if spec.match_file(rel):
            return True
    return False


def _local_repo_name(folder_path: Path) -> str:
    """Stable local repo id derived from basename + resolved path hash."""
    digest = hashlib.sha1(str(folder_path).encode("utf-8")).hexdigest()[:8]
    return f"{folder_path.name}-{digest}"


from ._indexing_pipeline import (
    file_languages_for_paths as _file_languages_for_paths,
    language_counts as _language_counts,
    complete_file_summaries as _complete_file_summaries,
    parse_and_prepare_incremental,
    parse_and_prepare_full,
    parse_immediate,
    deferred_summarize,
)
from .package_registry import extract_package_names as _extract_package_names


def discover_local_files(
    folder_path: Path,
    max_files: Optional[int] = None,
    max_size: int = DEFAULT_MAX_FILE_SIZE,
    extra_ignore_patterns: Optional[list[str]] = None,
    follow_symlinks: bool = False,
) -> tuple[list[Path], list[str], dict[str, int]]:
    """Discover source files in a local folder with security filtering.

    Args:
        folder_path: Root folder to scan (must be resolved).
        max_files: Maximum number of files to index.
        max_size: Maximum file size in bytes.
        extra_ignore_patterns: Additional gitignore-style patterns to exclude.
        follow_symlinks: Whether to include symlinked files in indexing.
            Symlinked directories are never followed to prevent infinite
            loops from circular symlinks. Default False for safety.

    Returns:
        Tuple of (list of Path objects for source files, list of warning strings).
    """
    max_files = get_max_folder_files(max_files)
    files = []
    warnings = []
    root = folder_path.resolve()

    skip_counts: dict[str, int] = {
        "skip_dir": 0,
        "skip_file": 0,
        "symlink": 0,
        "symlink_escape": 0,
        "path_traversal": 0,
        "gitignore": 0,
        "extra_ignore": 0,
        "secret": 0,
        "wrong_extension": 0,
        "too_large": 0,
        "unreadable": 0,
        "binary": 0,
        "file_limit": 0,
    }

    # Pre-compute string-based gitignore specs — built incrementally during
    # the walk below (P8: single os.walk pass instead of two).
    gitignore_str_specs: list[tuple[str, pathspec.PathSpec]] = []

    # Pre-compute root path strings (root is already resolved above).
    # Normalized variants use os.path.normcase for case-insensitive comparison
    # on Windows (no-op on POSIX).
    root_str = str(root)
    root_prefix = root_str + os.sep
    root_str_norm = os.path.normcase(root_str)
    root_prefix_norm = os.path.normcase(root_prefix)

    # Merge env-var global patterns with per-call patterns, then build spec
    effective_extra = get_extra_ignore_patterns(extra_ignore_patterns)
    extra_spec = None
    if effective_extra:
        try:
            extra_spec = pathspec.PathSpec.from_lines("gitignore", effective_extra)
        except Exception:
            pass

    skip_dirs_regex = _build_skip_dirs_regex()
    for dirpath, dirnames, filenames in os.walk(str(root), followlinks=False):
        # Prune directories that should always be skipped before descending.
        pruned = []
        kept = []
        for d in dirnames:
            if skip_dirs_regex.match(d):
                pruned.append(d)
            else:
                kept.append(d)
        if pruned:
            rel_dir = os.path.relpath(dirpath, root_str)
            for d in pruned:
                skip_counts["skip_dir"] += 1
                logger.debug("SKIP skip_dir: %s", os.path.join(rel_dir, d))
        dirnames[:] = kept
        dpath = Path(dirpath)

        # Load .gitignore for this directory BEFORE filtering its files so
        # that patterns defined here apply to siblings in the same directory.
        if ".gitignore" in filenames:
            gitignore_path = dpath / ".gitignore"
            try:
                content = gitignore_path.read_text(encoding="utf-8", errors="replace")
                spec = pathspec.PathSpec.from_lines("gitignore", content.splitlines())
                gitignore_str_specs.append((str(dpath.resolve()) + os.sep, spec))
            except Exception:
                pass

        for filename in filenames:
            if SKIP_FILES_REGEX.search(filename):
                skip_counts["skip_file"] += 1
                logger.debug("SKIP skip_file: %s", os.path.join(os.path.relpath(dirpath, root_str), filename))
                continue
            file_path = dpath / filename
            # Symlink protection
            if not follow_symlinks and file_path.is_symlink():
                skip_counts["symlink"] += 1
                logger.debug("SKIP symlink: %s", file_path)
                continue
            if file_path.is_symlink() and is_symlink_escape(root, file_path):
                skip_counts["symlink_escape"] += 1
                warnings.append(f"Skipped symlink escape: {file_path}")
                continue

            # Resolve once per file — reused for traversal check, relative path,
            # and gitignore matching (was resolved 2-3x before this optimization).
            try:
                resolved = file_path.resolve()
            except OSError:
                skip_counts["unreadable"] += 1
                logger.debug("SKIP unreadable (resolve failed): %s", file_path)
                continue
            resolved_str = str(resolved)
            resolved_norm = os.path.normcase(resolved_str)

            # Path traversal check (same logic as validate_path but avoids
            # re-resolving root on every iteration). Uses normcase so the check
            # is case-insensitive on Windows.
            if not (resolved_norm == root_str_norm or resolved_norm.startswith(root_prefix_norm)):
                skip_counts["path_traversal"] += 1
                warnings.append(f"Skipped path traversal: {file_path}")
                continue

            # Get relative path via string slicing (avoids Path.relative_to)
            rel_path = resolved_str[len(root_prefix):].replace("\\", "/") if resolved_norm != root_str_norm else ""
            if not rel_path:
                continue

            # .gitignore matching (string-based, avoids Path.relative_to per spec)
            if gitignore_str_specs and _is_gitignored_fast(resolved_str, gitignore_str_specs):
                skip_counts["gitignore"] += 1
                logger.debug("SKIP gitignore: %s", rel_path)
                continue

            # Extra ignore patterns
            if extra_spec and extra_spec.match_file(rel_path):
                skip_counts["extra_ignore"] += 1
                logger.debug("SKIP extra_ignore: %s", rel_path)
                continue

            # Secret detection
            if is_secret_file(rel_path):
                skip_counts["secret"] += 1
                logger.debug("SKIP secret: %s", rel_path)
                warnings.append(f"Skipped secret file: {rel_path}")
                continue

            # Extension filter
            ext = file_path.suffix
            if ext not in LANGUAGE_EXTENSIONS and get_language_for_path(str(file_path)) is None:
                skip_counts["wrong_extension"] += 1
                logger.debug("SKIP wrong_extension: %s", rel_path)
                continue

            # Size limit
            try:
                if file_path.stat().st_size > max_size:
                    skip_counts["too_large"] += 1
                    logger.debug("SKIP too_large: %s", rel_path)
                    continue
            except OSError:
                skip_counts["unreadable"] += 1
                logger.debug("SKIP unreadable (stat failed): %s", rel_path)
                continue

            # Binary detection (content sniff for files with source extensions)
            if is_binary_file(file_path):
                skip_counts["binary"] += 1
                warnings.append(f"Skipped binary file: {rel_path}")
                continue

            logger.debug("ACCEPT: %s", rel_path)
            files.append(file_path)

    logger.info(
        "Discovery complete — accepted: %d, skipped by reason: %s",
        len(files),
        skip_counts,
    )

    # File count limit with prioritization
    if len(files) > max_files:
        skip_counts["file_limit"] = len(files) - max_files
        # Prioritize: src/, lib/, pkg/, cmd/, internal/ first
        priority_dirs = ["src/", "lib/", "pkg/", "cmd/", "internal/"]

        def priority_key(file_path: Path) -> tuple:
            try:
                rel_path = file_path.relative_to(root).as_posix()
            except ValueError:
                return (999, 999, str(file_path))

            # Check if in priority dir
            for i, prefix in enumerate(priority_dirs):
                if rel_path.startswith(prefix):
                    return (i, rel_path.count("/"), rel_path)
            # Not in priority dir - sort after
            return (len(priority_dirs), rel_path.count("/"), rel_path)

        files.sort(key=priority_key)
        files = files[:max_files]

    return files, warnings, skip_counts


def index_folder(
    path: str,
    use_ai_summaries: bool = True,
    storage_path: Optional[str] = None,
    extra_ignore_patterns: Optional[list[str]] = None,
    follow_symlinks: bool = False,
    incremental: bool = True,
    context_providers: bool = True,
    changed_paths: Optional[list[WatcherChange]] = None,
) -> dict:
    """Index a local folder containing source code.

    Args:
        path: Path to local folder (absolute or relative).
        use_ai_summaries: Whether to use AI for symbol summaries.
        storage_path: Custom storage path (default: ~/.code-index/).
        extra_ignore_patterns: Additional gitignore-style patterns to exclude.
        follow_symlinks: Whether to include symlinked files. Symlinked directories
            are never followed (prevents infinite loops). Default False.
        context_providers: Whether to run context providers (default True).
            Set to False or set JCODEMUNCH_CONTEXT_PROVIDERS=0 to disable.
        incremental: When True and an existing index exists, only re-index changed files.
        changed_paths: Optional pre-known change set from the watcher, as a list of
            (change_type, absolute_path) tuples where change_type is one of
            "added", "modified", "deleted".  When provided with incremental=True
            and an existing index, skips full directory discovery (~3s → ~50ms).

    Returns:
        Dict with indexing results.
    """
    # Resolve folder path
    folder_path = Path(path).expanduser().resolve()

    if not folder_path.exists():
        return {"success": False, "error": f"Folder not found: {path}"}

    if not folder_path.is_dir():
        return {"success": False, "error": f"Path is not a directory: {path}"}

    # Evict stale tsconfig alias map so re-indexing picks up edited tsconfig.json (C6-A)
    _imap_cache.pop(str(folder_path), None)

    # Load and cache project-level config (.jcodemunch.jsonc) so subsequent
    # config.get() calls within this indexing run use project overrides.
    # This handles both first-time indexing and re-indexing of existing projects.
    _config.load_project_config(str(folder_path))

    warnings = []
    trusted_folders = _config.get("trusted_folders", [], repo=str(folder_path))
    whitelist_mode = _config.get(
        "trusted_folders_whitelist_mode", True, repo=str(folder_path)
    )

    # Handle empty blacklist as error
    if not whitelist_mode and not trusted_folders:
        error_msg = (
            "trusted_folders_whitelist_mode is False (blacklist mode) but "
            "trusted_folders is empty. No folders would be trusted. "
            "Add entries to trusted_folders to specify which folders should be untrusted."
        )
        logger.error(error_msg)
        return {"success": False, "error": error_msg}

    is_trusted = _is_trusted(folder_path, tuple(trusted_folders), whitelist_mode)
    if trusted_folders and not is_trusted:
        return {
            "success": False,
            "error": f"Resolved path '{folder_path}' is not under trusted_folders.",
        }

    # Guard against dangerously broad roots.  A relative path like "." resolves
    # against the MCP server's CWD (not the caller's project directory), which
    # can be "/" or "~" when the server is launched by a system launcher.
    # Reject paths with fewer than 3 parts (e.g. "/", "/home", "C:\Users") and
    # warn whenever the caller supplied a relative path so the resolved value is
    # always visible in the tool response.
    _MIN_PATH_PARTS = 3
    if len(folder_path.parts) < _MIN_PATH_PARTS:
        if not is_trusted:
            error_msg = (
                f"Resolved path '{folder_path}' is too broad to index safely "
                f"(fewer than {_MIN_PATH_PARTS} path components). "
                "Pass an absolute path to the specific project directory instead of a "
                "relative path like '.' — relative paths resolve against the MCP "
                "server's working directory, which may not be your project root."
            )
            logger.error(error_msg)
            return {"success": False, "error": error_msg}

        warning_msg = (
            f"Resolved path '{folder_path}' would normally be rejected as too broad, "
            "but it matched trusted_folders and was allowed."
        )
        logger.warning(warning_msg)
        warnings.append(warning_msg)

    # Warn when a relative path was given so callers can see what it resolved to.
    if not Path(path).expanduser().is_absolute():
        warning_msg = (
            f"Relative path '{path}' resolved to '{folder_path}' (MCP server CWD). "
            "Prefer passing an absolute path to avoid unexpected behaviour."
        )
        logger.warning(warning_msg)
        warnings.append(warning_msg)

    # Redact absolute path from responses when redact_source_root is enabled
    _redact = _config.get("redact_source_root", False)
    _folder_display = folder_path.name if _redact else str(folder_path)

    max_files = get_max_folder_files()

    try:
        t0 = time.monotonic()

        # ── Deferred summarization helper (defined before fast path so it is in scope) ──

        def _run_deferred_summarize(
            gen: int,
            repo_full: str,
            symbols: list,
            file_contents: dict,
            store: "IndexStore",
            owner: str,
            repo_name: str,
        ) -> None:
            """Fill in AI summaries and update the store. Checks generation counter to abandon stale work."""
            from ..reindex_state import _get_state, get_deferred_save_lock
            from ._indexing_pipeline import deferred_summarize

            # Check 1: has a newer reindex started while we were parsing?
            if _get_state(repo_full).deferred_generation != gen:
                logger.debug(
                    "Deferred summarize gen=%d abandoned for %s (generation advanced before summarize)",
                    gen, repo_full,
                )
                return

            summarized = deferred_summarize(symbols, file_contents, use_ai_summaries=True)
            if not summarized:
                return

            # Check 2 + save are held under the deferred-save lock (T7).
            # mark_reindex_start also acquires this lock before bumping the generation,
            # so the check and the write are atomic with respect to new reindexes:
            # either we write before the new gen is bumped, or we see the new gen and abort.
            save_lock = get_deferred_save_lock(repo_full)
            with save_lock:
                if _get_state(repo_full).deferred_generation != gen:
                    logger.debug(
                        "Deferred summarize gen=%d abandoned for %s (generation advanced before save)",
                        gen, repo_full,
                    )
                    return

                # Update only the symbol summaries (empty change lists → INSERT OR REPLACE updates existing rows)
                try:
                    store.incremental_save(
                        owner=owner, name=repo_name,
                        changed_files=[], new_files=[], deleted_files=[],
                        new_symbols=summarized,
                        raw_files={},
                    )
                    logger.info(
                        "Deferred AI summarization gen=%d saved %d symbols for %s",
                        gen, len(summarized), repo_full,
                    )
                except Exception as e:
                    logger.warning("Deferred summarization failed for %s: %s", repo_full, e)

        # ── Fast path: watcher-driven incremental reindex ──
        # When the watcher provides the exact change set, skip full directory
        # discovery (~3s on Windows) and only process the affected files.
        if changed_paths and incremental:
            _pairs = parse_path_map()
            repo_name = _local_repo_name(Path(remap(str(folder_path), _pairs, reverse=True)))
            owner = "local"
            store = IndexStore(base_path=storage_path)

            # Determine if watcher provided old_hash via WatcherChange objects.
            # If so, we can skip loading the index and use the memory-cached hashes.
            watcher_changes_with_hashes = [
                c for c in changed_paths
                if isinstance(c, WatcherChange) and c.old_hash
            ]
            use_memory_hash_cache = bool(watcher_changes_with_hashes)

            existing_index = store.load_index(owner, repo_name) if not use_memory_hash_cache else None

            # Build memory hash map from WatcherChange objects (from watcher memory cache)
            _old_hash_map: dict[str, str] = {}
            if use_memory_hash_cache:
                for wc in watcher_changes_with_hashes:
                    # Use index access for both WatcherChange and legacy tuple compat
                    change_type = wc[0]
                    abs_path_str = wc[1]
                    old_hash = wc[2]
                    abs_path = Path(abs_path_str)
                    try:
                        rel_path = abs_path.relative_to(folder_path).as_posix()
                    except ValueError:
                        continue
                    _old_hash_map[rel_path] = old_hash

            if existing_index is not None or use_memory_hash_cache:
                # Skip discover_providers on the watcher fast path — provider
                # detection walks the tree (~500ms) and providers don't change
                # between file edits.  The initial index_folder call (without
                # changed_paths) already ran provider detection.
                active_providers = []

                # Classify watcher events into changed/new/deleted rel_paths
                changed_files: list[str] = []
                new_files: list[str] = []
                deleted_files: list[str] = []
                rel_path_map_fast: dict[str, Path] = {}

                for wc_item in changed_paths:
                    # Support both WatcherChange (with .change_type/.path/.old_hash)
                    # and legacy (change_type, path) or (change_type, path, old_hash) tuples
                    if isinstance(wc_item, WatcherChange):
                        change_type = wc_item.change_type
                        abs_path_str = wc_item.path
                        old_hash = wc_item.old_hash
                    else:
                        change_type = wc_item[0]
                        abs_path_str = wc_item[1]
                        old_hash = wc_item[2] if len(wc_item) > 2 else ""

                    abs_path = Path(abs_path_str)
                    try:
                        rel_path = abs_path.relative_to(folder_path).as_posix()
                    except ValueError:
                        continue
                    # Skip non-source files
                    ext = abs_path.suffix
                    if ext not in LANGUAGE_EXTENSIONS and get_language_for_path(str(abs_path)) is None:
                        continue

                    if change_type == "deleted":
                        if use_memory_hash_cache:
                            # Memory cache path: the watcher confirmed this file was
                            # in the index (it was in the hash cache), so trust it.
                            deleted_files.append(rel_path)
                        elif existing_index is not None and existing_index.has_source_file(rel_path):
                            deleted_files.append(rel_path)
                    elif change_type == "added":
                        if existing_index is None or not existing_index.has_source_file(rel_path):
                            new_files.append(rel_path)
                            rel_path_map_fast[rel_path] = abs_path
                        else:
                            # File exists in index but watcher says "added" (e.g. recreated)
                            changed_files.append(rel_path)
                            rel_path_map_fast[rel_path] = abs_path
                    else:  # modified
                        changed_files.append(rel_path)
                        rel_path_map_fast[rel_path] = abs_path

                if not changed_files and not new_files and not deleted_files:
                    return {
                        "success": True,
                        "message": "No changes detected",
                        "repo": f"{owner}/{repo_name}",
                        "folder_path": _folder_display,
                        "changed": 0, "new": 0, "deleted": 0,
                        "duration_seconds": round(time.monotonic() - t0, 2),
                    }

                # Read and hash only the changed/new files.
                # For "modified" files, compare hash against stored hash —
                # if content is identical (e.g. touch, save-without-change),
                # skip re-parsing and just update the mtime.
                # Use memory cache (_old_hash_map) if available, otherwise fall back to
                # the index's stored hashes.
                old_hashes: dict[str, str]
                if use_memory_hash_cache:
                    old_hashes = _old_hash_map
                else:
                    _idx = existing_index  # type: ignore[assignment]
                    old_hashes = _idx.file_hashes or {}
                actually_changed: list[str] = []
                raw_files_subset: dict[str, str] = {}
                subset_hashes: dict[str, str] = {}
                fast_mtimes: dict[str, int] = {}
                fast_warnings: list[str] = []
                mtime_only_updates: dict[str, int] = {}

                for rel_path in set(changed_files) | set(new_files):
                    abs_path = rel_path_map_fast[rel_path]
                    try:
                        with open(abs_path, "r", encoding="utf-8", errors="replace", newline="") as f:
                            content = f.read()
                    except Exception as e:
                        fast_warnings.append(f"Failed to read {abs_path}: {e}")
                        continue
                    new_hash = _file_hash(content)
                    try:
                        cur_mtime = os.stat(abs_path).st_mtime_ns
                    except OSError:
                        cur_mtime = None

                    # Content unchanged — skip parse, just record new mtime
                    if rel_path in changed_files and new_hash == old_hashes.get(rel_path, ""):
                        if cur_mtime is not None:
                            mtime_only_updates[rel_path] = cur_mtime
                        continue

                    raw_files_subset[rel_path] = content
                    subset_hashes[rel_path] = new_hash
                    if cur_mtime is not None:
                        fast_mtimes[rel_path] = cur_mtime
                    if rel_path in changed_files:
                        actually_changed.append(rel_path)

                # Replace changed_files with only the truly changed ones
                changed_files = actually_changed

                # If only mtimes changed (no content changes, no new, no deleted),
                # update mtimes in DB and return early — no parsing needed.
                if not changed_files and not new_files and not deleted_files:
                    if mtime_only_updates:
                        # Update mtimes directly via incremental_save with empty deltas
                        store.incremental_save(
                            owner=owner, name=repo_name,
                            changed_files=[], new_files=[], deleted_files=[],
                            new_symbols=[], raw_files={},
                            file_mtimes=mtime_only_updates,
                        )
                    return {
                        "success": True,
                        "message": "No changes detected",
                        "repo": f"{owner}/{repo_name}",
                        "folder_path": _folder_display,
                        "fast_path": True,
                        "changed": 0, "new": 0, "deleted": 0,
                        "duration_seconds": round(time.monotonic() - t0, 2),
                    }

                files_to_parse = set(changed_files) | set(new_files)
                # Split pipeline: parse immediately (no AI), fire summarization thread.
                new_symbols, incr_file_summaries, incr_file_languages, incr_file_imports, incremental_no_symbols = (
                    parse_immediate(
                        files_to_parse=files_to_parse,
                        file_contents=raw_files_subset,
                        active_providers=active_providers,
                        warnings=fast_warnings,
                        repo=str(folder_path),
                    )
                )

                git_head = _get_git_head(folder_path) or ""
                incr_context_metadata = collect_metadata(active_providers) if active_providers else None

                # Merge mtime-only updates so they're persisted alongside real changes
                all_mtimes = {**mtime_only_updates, **fast_mtimes}

                # Capture deferred generation BEFORE incremental_save to avoid a race:
                # if mark_reindex_start fires between save and read, the deferred thread
                # would incorrectly think it belongs to the newer generation.
                _repo_full = f"{owner}/{repo_name}"
                from ..reindex_state import _get_state
                _deferred_gen = _get_state(_repo_full).deferred_generation

                updated = store.incremental_save(
                    owner=owner, name=repo_name,
                    changed_files=changed_files, new_files=new_files, deleted_files=deleted_files,
                    new_symbols=new_symbols,
                    raw_files=raw_files_subset,
                    git_head=git_head,
                    file_summaries=incr_file_summaries,
                    file_languages=incr_file_languages,
                    imports=incr_file_imports,
                    context_metadata=incr_context_metadata,
                    file_hashes=subset_hashes,
                    file_mtimes=all_mtimes,
                )

                # Fire daemon thread for deferred summarization — index is already saved
                # with empty summaries; this fills them in without blocking the response.
                _summarization_deferred = False
                if new_symbols and use_ai_summaries:
                    _summaries_copy = list(new_symbols)
                    _contents_copy = dict(raw_files_subset)
                    _daemon = threading.Thread(
                        target=lambda _g=_deferred_gen, _s=_summaries_copy, _c=_contents_copy: _run_deferred_summarize(
                            _g, _repo_full, _s, _c, store, owner, repo_name,
                        ),
                        daemon=True,
                        name="deferred-summarizer",
                    )
                    _daemon.start()
                    _summarization_deferred = True
                    logger.info(
                        "Deferred AI summarization started for %s/%s (%d symbols)",
                        owner, repo_name, len(new_symbols),
                    )

                result = {
                    "success": True,
                    "repo": f"{owner}/{repo_name}",
                    "folder_path": _folder_display,
                    "incremental": True,
                    "fast_path": True,
                    "changed": len(changed_files), "new": len(new_files), "deleted": len(deleted_files),
                    "symbol_count": len(updated.symbols) if updated else 0,
                    "indexed_at": updated.indexed_at if updated else "",
                    "duration_seconds": round(time.monotonic() - t0, 2),
                }
                if _summarization_deferred:
                    result["summarization_deferred"] = True
                    result["summarization_note"] = (
                        "AI summarization is running in the background. "
                        "Call summarize_repo to run it synchronously if summaries are missing."
                    )
                if fast_warnings:
                    result["warnings"] = fast_warnings
                _maybe_apply_adaptive(folder_path, result)
                return result

        # ── Standard path: full directory discovery ──
        # Detect framework profile and merge its ignore patterns before discovery
        _framework_profile = detect_framework(folder_path)
        _profile_ignore: list[str] = []
        if _framework_profile:
            _profile_ignore = _framework_profile.ignore_patterns
            logger.info(
                "Framework profile '%s' active — adding %d ignore patterns",
                _framework_profile.name,
                len(_profile_ignore),
            )

        _merged_ignore = list(extra_ignore_patterns or []) + _profile_ignore

        # Discover source files (with security filtering)
        source_files, discover_warnings, skip_counts = discover_local_files(
            folder_path,
            max_files=max_files,
            extra_ignore_patterns=_merged_ignore or None,
            follow_symlinks=follow_symlinks,
        )
        warnings.extend(discover_warnings)
        logger.info("Discovery skip counts: %s", skip_counts)

        # Warn when no root .gitignore is present and the file count is large —
        # a common cause of bloated indexes that then overflow get_file_tree.
        gitignore_warn_threshold = _config.get("gitignore_warn_threshold", 500)
        if (
            gitignore_warn_threshold > 0
            and not (folder_path / ".gitignore").exists()
            and len(source_files) >= gitignore_warn_threshold
        ):
            gitignore_warning = (
                f"No .gitignore found in {folder_path}. "
                f"{len(source_files)} files were indexed — this may include unintended files "
                f"(build artifacts, vendored dependencies, etc.). "
                f"Add a .gitignore and re-run index_folder to exclude them."
            )
            logger.warning(gitignore_warning)
            warnings.append(gitignore_warning)

        if not source_files:
            result = {"success": False, "error": "No source files found"}
            if warnings:
                result["warnings"] = warnings
            return result

        # Discover context providers (dbt, terraform, etc.)
        _providers_enabled = context_providers and _config.get("context_providers", True)
        active_providers = discover_providers(folder_path) if _providers_enabled else []
        # Gate SQL-dependent providers: when SQL is removed from languages config,
        # filter out the dbt provider to avoid unnecessary detection overhead.
        if active_providers and not _config.is_language_enabled("sql"):
            active_providers = [p for p in active_providers if p.name != "dbt"]
            if active_providers:
                names = ", ".join(p.name for p in active_providers)
                logger.info("Active context providers (SQL disabled): %s", names)
            else:
                logger.info("Active context providers: none (SQL disabled)")
        elif active_providers:
            names = ", ".join(p.name for p in active_providers)
            logger.info("Active context providers: %s", names)

        # Create repo identifier from folder path
        _pairs = parse_path_map()
        repo_name = _local_repo_name(Path(remap(str(folder_path), _pairs, reverse=True)))
        owner = "local"
        store = IndexStore(base_path=storage_path)
        existing_index = store.load_index(owner, repo_name)

        if existing_index is None and store.has_index(owner, repo_name):
            logger.warning(
                "index_folder version_mismatch — %s/%s: on-disk index is a newer version; full re-index required",
                owner, repo_name,
            )
            warnings.append(
                "Existing index was created by a newer version of jcodemunch-mcp "
                "and cannot be read — performing a full re-index. "
                "If you downgraded the package, delete ~/.code-index/ (or your "
                "CODE_INDEX_PATH directory) to remove the stale index."
            )

        # Discovery pass — resolve rel_paths and collect mtimes without
        # reading file contents (P2-5: avoids 200MB-1GB allocation
        # for large projects). Content is read on-demand later.
        file_mtimes: dict[str, int] = {}
        rel_path_map: dict[str, Path] = {}  # rel_path -> absolute Path
        for file_path in source_files:
            if not validate_path(folder_path, file_path):
                continue
            try:
                rel_path = file_path.relative_to(folder_path).as_posix()
            except ValueError:
                continue
            ext = file_path.suffix
            if ext not in LANGUAGE_EXTENSIONS and get_language_for_path(str(file_path)) is None:
                continue
            try:
                file_mtimes[rel_path] = os.stat(file_path).st_mtime_ns
            except OSError as e:
                warnings.append(f"Failed to stat {file_path}: {e}")
                continue
            rel_path_map[rel_path] = file_path

        def _read_file(rel_path: str) -> str | None:
            """Re-read a file by its rel_path. Returns content or None on error."""
            abs_path = rel_path_map[rel_path]
            try:
                with open(abs_path, "r", encoding="utf-8", errors="replace", newline="") as f:
                    return f.read()
            except Exception as e:
                warnings.append(f"Failed to read {abs_path}: {e}")
                return None

        _hash_file_cache: dict[str, str] = {}  # rel_path -> content

        def _hash_file(rel_path: str) -> str:
            """Read and hash a single file on demand; cache content for parse step."""
            abs_path = rel_path_map[rel_path]
            with open(abs_path, "r", encoding="utf-8", errors="replace", newline="") as f:
                content = f.read()
            _hash_file_cache[rel_path] = content
            return _file_hash(content)

        # Incremental path: detect changes using mtime fast-path
        if incremental and existing_index is not None:
            changed, new, deleted, computed_hashes, updated_mtimes = (
                store.detect_changes_with_mtimes(
                    owner, repo_name, file_mtimes, _hash_file
                )
            )

            if not changed and not new and not deleted:
                return {
                    "success": True,
                    "message": "No changes detected",
                    "repo": f"{owner}/{repo_name}",
                    "folder_path": _folder_display,
                    "changed": 0, "new": 0, "deleted": 0,
                    "duration_seconds": round(time.monotonic() - t0, 2),
                }

            # Read changed + new files into memory
            files_to_parse = set(changed) | set(new)
            raw_files_subset: dict[str, str] = {}
            subset_hashes: dict[str, str] = {}
            for rel_path in files_to_parse:
                # Use content cached by _hash_file if available (avoids second read)
                content = _hash_file_cache.pop(rel_path, None) or _read_file(rel_path)
                if content is None:
                    continue
                raw_files_subset[rel_path] = content
                subset_hashes[rel_path] = computed_hashes.get(rel_path, _file_hash(content))

            # Shared pipeline: parse, enrich, summarize, extract metadata
            new_symbols, incr_file_summaries, incr_file_languages, incr_file_imports, incremental_no_symbols = (
                parse_and_prepare_incremental(
                    files_to_parse=files_to_parse,
                    file_contents=raw_files_subset,
                    active_providers=active_providers,
                    use_ai_summaries=use_ai_summaries,
                    warnings=warnings,
                    repo=str(folder_path),
                )
            )

            git_head = _get_git_head(folder_path) or ""
            incr_context_metadata = collect_metadata(active_providers) if active_providers else None

            updated = store.incremental_save(
                owner=owner, name=repo_name,
                changed_files=changed, new_files=new, deleted_files=deleted,
                new_symbols=new_symbols,
                raw_files=raw_files_subset,
                git_head=git_head,
                file_summaries=incr_file_summaries,
                file_languages=incr_file_languages,
                imports=incr_file_imports,
                context_metadata=incr_context_metadata,
                file_hashes=subset_hashes,
                file_mtimes=updated_mtimes,
            )

            result = {
                "success": True,
                "repo": f"{owner}/{repo_name}",
                "folder_path": _folder_display,
                "incremental": True,
                "changed": len(changed), "new": len(new), "deleted": len(deleted),
                "symbol_count": len(updated.symbols) if updated else 0,
                "indexed_at": updated.indexed_at if updated else "",
                "duration_seconds": round(time.monotonic() - t0, 2),
                "discovery_skip_counts": skip_counts,
                "no_symbols_count": len(incremental_no_symbols),
                "no_symbols_files": incremental_no_symbols[:50],
            }
            if warnings:
                result["warnings"] = warnings
            _maybe_apply_adaptive(folder_path, result)
            return result

        # Full index path — stream through files one at a time to avoid
        # loading all contents into memory simultaneously.
        # Compute hashes and collect mtimes during the per-file loop.
        file_hashes: dict[str, str] = {}
        all_symbols = []
        symbols_by_file: dict[str, list] = defaultdict(list)
        source_file_list = sorted(file_mtimes)
        file_imports: dict[str, list[dict]] = {}
        content_dir = store._content_dir(owner, repo_name)
        content_dir.mkdir(parents=True, exist_ok=True)

        no_symbols_files: list[str] = []
        _languages_with_symbols: set[str] = set()
        for rel_path in source_file_list:
            content = _read_file(rel_path)
            if content is None:
                continue

            # Encode once — reused for both hashing and tree-sitter parsing
            content_bytes = content.encode("utf-8")
            file_hashes[rel_path] = _file_hash_bytes(content_bytes)

            # Write raw content to cache immediately, then process
            file_dest = store._safe_content_path(content_dir, rel_path)
            if file_dest:
                file_dest.parent.mkdir(parents=True, exist_ok=True)
                store._write_cached_text(file_dest, content)

            language = get_language_for_path(rel_path)
            if not language:
                no_symbols_files.append(rel_path)
                # content eligible for GC after this iteration
                continue
            try:
                symbols = parse_file(content, rel_path, language, source_bytes=content_bytes, repo=str(folder_path))
                if symbols:
                    all_symbols.extend(symbols)
                    symbols_by_file[rel_path].extend(symbols)
                    _languages_with_symbols.add(language)
                else:
                    no_symbols_files.append(rel_path)
                    logger.debug("NO SYMBOLS: %s", rel_path)
            except Exception as e:
                warnings.append(f"Failed to parse {rel_path}: {e}")
                logger.debug("PARSE ERROR: %s — %s", rel_path, e)

            # Extract imports while content is in scope
            imps = extract_imports(content, rel_path, language)
            if imps:
                file_imports[rel_path] = imps
            # content is discarded at end of iteration

        logger.info(
            "Parsing complete — with symbols: %d, no symbols: %d",
            len(symbols_by_file),
            len(no_symbols_files),
        )

        # Enrich with context providers before summarization
        if active_providers and all_symbols:
            enrich_symbols(all_symbols, active_providers)

        # Merge extra imports from context providers (Blade refs, facades, etc.)
        if active_providers:
            collect_extra_imports(active_providers, file_imports)

        # Generate summaries — preserve existing summaries for unchanged files
        if all_symbols:
            _folder_existing_summaries: dict[tuple[str, str, str], str] | None = None
            _folder_unchanged_files: set[str] | None = None
            if (
                existing_index is not None
                and existing_index.file_hashes
                and existing_index.symbols
            ):
                _folder_unchanged_files = {
                    f for f, h in file_hashes.items()
                    if existing_index.file_hashes.get(f) == h
                }
                if _folder_unchanged_files:
                    _folder_existing_summaries = {
                        (s["file"], s["name"], s["kind"]): s["summary"]
                        for s in existing_index.symbols
                        if s.get("summary") and s.get("file") in _folder_unchanged_files
                    }
                    logger.info(
                        "index_folder full — %d/%d files unchanged, %d summaries preserved",
                        len(_folder_unchanged_files), len(file_hashes),
                        len(_folder_existing_summaries) if _folder_existing_summaries else 0,
                    )

            if _folder_existing_summaries and _folder_unchanged_files:
                from ._indexing_pipeline import _split_for_summarization
                _needs_summary, _already_summarized = _split_for_summarization(
                    all_symbols, _folder_existing_summaries, _folder_unchanged_files
                )
                _summarized = summarize_symbols(_needs_summary, use_ai=use_ai_summaries) if _needs_summary else []
                all_symbols = _summarized + _already_summarized
            else:
                all_symbols = summarize_symbols(all_symbols, use_ai=use_ai_summaries)

        # Generate file-level summaries (single-pass grouping) using shared helpers
        file_symbols_map = defaultdict(list)
        for s in all_symbols:
            file_symbols_map[s.file].append(s)
        file_languages = _file_languages_for_paths(source_file_list, file_symbols_map)
        languages = _language_counts(file_languages)
        file_summaries = _complete_file_summaries(source_file_list, file_symbols_map, context_providers=active_providers)

        # Collect structured metadata from providers
        full_context_metadata = collect_metadata(active_providers) if active_providers else None

        # Merge framework profile metadata into context_metadata
        if _framework_profile:
            profile_meta = profile_to_meta(_framework_profile)
            if full_context_metadata:
                full_context_metadata.update(profile_meta)
            else:
                full_context_metadata = profile_meta

        # Extract package names from manifest files
        _pkg_names: list[str] = []
        try:
            _pkg_names = _extract_package_names(str(folder_path))
        except Exception:
            logger.debug("extract_package_names failed for %s", folder_path, exc_info=True)

        # Save index — raw files already written to content dir above,
        # pass empty dict to skip duplicate writes.
        index = store.save_index(
            owner=owner,
            name=repo_name,
            source_files=source_file_list,
            symbols=all_symbols,
            raw_files={},
            languages=languages,
            file_hashes=file_hashes,
            file_summaries=file_summaries,
            git_head=_get_git_head(folder_path) or "",
            source_root=str(folder_path),
            file_languages=file_languages,
            display_name=folder_path.name,
            imports=file_imports,
            context_metadata=full_context_metadata,
            file_mtimes=file_mtimes,
            package_names=_pkg_names,
        )

        # Identify languages that were indexed (symbols found) but have no import extractor
        _missing_import_extractors = sorted(
            lang for lang in _languages_with_symbols
            if lang not in _IMPORT_EXTRACTORS
        )

        result = {
            "success": True,
            "repo": index.repo,
            "folder_path": _folder_display,
            "indexed_at": index.indexed_at,
            "file_count": len(source_file_list),
            "symbol_count": len(all_symbols),
            "file_summary_count": sum(1 for v in file_summaries.values() if v),
            "languages": languages,
            "files": source_file_list[:20],  # Limit files in response
            "duration_seconds": round(time.monotonic() - t0, 2),
            "discovery_skip_counts": skip_counts,
            "no_symbols_count": len(no_symbols_files),
            "no_symbols_files": no_symbols_files[:50],  # Show up to 50 for inspection
        }
        if _missing_import_extractors:
            result["missing_extractors"] = _missing_import_extractors
            result.setdefault("parse_warnings", []).append(
                f"Import graph incomplete for: {', '.join(_missing_import_extractors)}. "
                "Dead code and dependency analysis may be less accurate for these languages."
            )

        # Report context enrichment stats from all active providers
        if active_providers:
            enrichment = {}
            for provider in active_providers:
                enrichment[provider.name] = provider.stats()
            result["context_enrichment"] = enrichment

        if _framework_profile:
            result["framework_profile"] = _framework_profile.name

        if warnings:
            result["warnings"] = warnings

        files_skipped_cap = skip_counts.get("file_limit", 0)
        if files_skipped_cap > 0:
            files_discovered = max_files + files_skipped_cap
            result["files_discovered"] = files_discovered
            result["files_indexed"] = max_files
            result["files_skipped_cap"] = files_skipped_cap
            cap_warning = (
                f"File cap reached: {files_discovered} files discovered, {max_files} indexed, "
                f"{files_skipped_cap} dropped. Raise JCODEMUNCH_MAX_FOLDER_FILES or narrow the path."
            )
            result.setdefault("warnings", []).append(cap_warning)

        _maybe_apply_adaptive(folder_path, result)
        return result

    except Exception as e:
        return {"success": False, "error": f"Indexing failed: {str(e)}"}
