//
// Copyright (c) Double Symmetry GmbH
// Commercial use requires a license. See https://rntp.dev/pricing
//

import Foundation
import Network

/// An HTTP proxy server running on localhost that sits between AVPlayer and
/// the network. It serves fully-cached audio from disk and streams in-progress
/// downloads using chunked Transfer-Encoding.
final class CacheProxyServer {

  private let coordinator: DownloadCoordinator
  private let cache: AudioCache
  private let listener: NWListener
  private let serverQueue = DispatchQueue(label: "trackplayer.proxy-server")
  private var connections: [NWConnection] = []

  /// The TCP port the server is listening on. Available after `start()`.
  var port: UInt16 {
    listener.port?.rawValue ?? 0
  }

  init(coordinator: DownloadCoordinator, cache: AudioCache) throws {
    self.coordinator = coordinator
    self.cache = cache
    self.listener = try NWListener(using: .tcp, on: .any)
  }

  // MARK: - Lifecycle

  func start() {
    let ready = DispatchSemaphore(value: 0)
    listener.stateUpdateHandler = { state in
      if case .ready = state { ready.signal() }
    }
    listener.newConnectionHandler = { [weak self] connection in
      self?.acceptConnection(connection)
    }
    listener.start(queue: serverQueue)
    ready.wait()
  }

  func stop() {
    // `connections` is mutated on `serverQueue` (accept/state handlers); `stop()`
    // is called from arbitrary threads (e.g. test tearDown), so marshal the
    // teardown through the same queue to avoid racing the array.
    serverQueue.sync {
      for conn in connections { conn.cancel() }
      connections.removeAll()
    }
    listener.cancel()
  }

  // MARK: - URL Building

  /// Build a proxy URL that encodes the upstream URL and optional headers
  /// as query parameters.
  ///
  /// Format: `http://localhost:<port>/<cacheKey>?url=<encoded>&header_Key=Value`
  ///
  /// `cacheable: false` adds a `nocache=1` marker so the proxy streams this
  /// resource straight through without persisting it — used for the segments of
  /// a live (non-VOD) playlist, which are write-once-read-once and would only
  /// churn the LRU cache. The marker doesn't affect the cache key (path-derived)
  /// or the encoded upstream URL.
  func proxyURL(for url: URL, headers: [String: String]?, cacheable: Bool = true) -> URL {
    let key = cache.cacheKey(for: url)
    let ext = url.pathExtension
    var components = URLComponents()
    components.scheme = "http"
    components.host = "localhost"
    components.port = Int(port)
    components.path = ext.isEmpty ? "/\(key)" : "/\(key).\(ext)"

    var queryItems = [URLQueryItem(name: "url", value: url.absoluteString)]
    if !cacheable {
      queryItems.append(URLQueryItem(name: "nocache", value: "1"))
    }
    if let headers = headers {
      for (k, v) in headers.sorted(by: { $0.key < $1.key }) {
        queryItems.append(URLQueryItem(name: "header_\(k)", value: v))
      }
    }
    components.queryItems = queryItems

    return components.url!
  }

  // MARK: - Connection Handling

  private func acceptConnection(_ connection: NWConnection) {
    connections.append(connection)

    connection.stateUpdateHandler = { [weak self] state in
      switch state {
      case .cancelled, .failed:
        self?.connections.removeAll { $0 === connection }
      default:
        break
      }
    }

    connection.start(queue: serverQueue)

    connection.receive(minimumIncompleteLength: 1, maximumLength: 16384) { [weak self] data, _, _, error in
      guard let self = self, let data = data, error == nil else {
        connection.cancel()
        return
      }
      self.handleRawRequest(data, on: connection)
    }
  }

  // MARK: - HTTP Request Parsing

  private struct ParsedRequest {
    let cacheKey: String
    let upstreamURL: URL
    let headers: [String: String]
    /// The request path's file extension (lowercased, no dot), captured before
    /// it is stripped to form `cacheKey`. Empty when the path has no extension.
    let pathExtension: String
    /// Byte range from a `Range: bytes=N-` or `Range: bytes=N-M` header.
    let rangeStart: Int64?
    let rangeEnd: Int64?
    /// Whether this resource may be persisted to the cache. False (set by the
    /// `nocache=1` proxy-URL marker) for live-playlist segments, which are
    /// streamed straight through without caching. Manifests ignore this — their
    /// cacheability is re-decided per body via `isVOD`.
    let cacheable: Bool

    /// Whether this request targets an HLS playlist, detected by the proxy-URL
    /// path extension (`proxyURL(for:)` preserves the upstream extension).
    var isManifest: Bool {
      pathExtension == "m3u8" || pathExtension == "m3u"
    }
  }

  private func parseRequest(_ data: Data) -> ParsedRequest? {
    guard let requestString = String(data: data, encoding: .utf8) else { return nil }

    // Parse the request line: GET /path?query HTTP/1.1
    let lines = requestString.components(separatedBy: "\r\n")
    guard let requestLine = lines.first else { return nil }
    let parts = requestLine.split(separator: " ")
    guard parts.count >= 2 else { return nil }

    let rawPath = String(parts[1])
    guard let components = URLComponents(string: rawPath) else { return nil }

    // Cache key is the path component (strip leading / and file extension).
    // Capture the extension before stripping so manifest requests can be
    // detected by it (proxyURL(for:) preserves the upstream path extension).
    var cacheKey = String(components.path.dropFirst())
    var pathExtension = ""
    if let dotIdx = cacheKey.lastIndex(of: ".") {
      pathExtension = String(cacheKey[cacheKey.index(after: dotIdx)...]).lowercased()
      cacheKey = String(cacheKey[cacheKey.startIndex..<dotIdx])
    }
    guard !cacheKey.isEmpty else { return nil }

    // Extract query parameters
    let queryItems = components.queryItems ?? []

    guard let urlParam = queryItems.first(where: { $0.name == "url" })?.value,
          let upstreamURL = URL(string: urlParam) else { return nil }

    var headers: [String: String] = [:]
    for item in queryItems where item.name.hasPrefix("header_") {
      let headerKey = String(item.name.dropFirst("header_".count))
      headers[headerKey] = item.value ?? ""
    }

    let cacheable = queryItems.first(where: { $0.name == "nocache" })?.value != "1"

    // Parse Range header from AVPlayer's request (e.g. "Range: bytes=N-" or "Range: bytes=N-M")
    var rangeStart: Int64?
    var rangeEnd: Int64?
    for line in lines.dropFirst() {
      let lower = line.lowercased()
      guard lower.hasPrefix("range:") else { continue }
      let value = line.dropFirst("range:".count).trimmingCharacters(in: .whitespaces)
      guard value.hasPrefix("bytes=") else { break }
      let byteRange = value.dropFirst("bytes=".count)
      let parts = byteRange.components(separatedBy: "-")
      if let startStr = parts.first, let start = Int64(startStr) {
        rangeStart = start
      }
      if parts.count > 1, let endStr = parts.last, !endStr.isEmpty, let end = Int64(endStr) {
        rangeEnd = end
      }
      break
    }

    return ParsedRequest(cacheKey: cacheKey, upstreamURL: upstreamURL, headers: headers, pathExtension: pathExtension, rangeStart: rangeStart, rangeEnd: rangeEnd, cacheable: cacheable)
  }

  // MARK: - Request Routing

  private func handleRawRequest(_ data: Data, on connection: NWConnection) {
    guard let request = parseRequest(data) else {
      sendError(status: 400, message: "Bad Request", on: connection)
      return
    }

    // Manifest requests are rewritten on every serve, so they must bypass the
    // raw byte-stream paths below — a cached RAW manifest would otherwise be
    // served unrewritten.
    if request.isManifest {
      serveManifest(request: request, on: connection)
      return
    }

    // Live-playlist segments carry `nocache=1`: stream them straight through
    // without ever touching the cache, regardless of any (stale) cached bytes.
    if !request.cacheable {
      servePassthrough(request: request, on: connection)
      return
    }

    if cache.isFullyCached(key: request.cacheKey) {
      serveFromCache(request: request, on: connection)
    } else {
      serveThroughCoordinator(request: request, on: connection)
    }
  }

  // MARK: - Cache Hit (full)

  private func serveFromCache(request: ParsedRequest, on connection: NWConnection) {
    let key = request.cacheKey
    guard let info = cache.contentInfo(for: key) else {
      sendError(status: 500, message: "Cache read error", on: connection)
      return
    }

    let totalLength = info.contentLength
    let startByte = request.rangeStart ?? 0
    let endByte = request.rangeEnd ?? (totalLength - 1)
    let sliceLength = endByte - startByte + 1

    var header: String
    if request.rangeStart != nil {
      header = "HTTP/1.1 206 Partial Content\r\n"
      header += "Content-Type: \(info.contentType)\r\n"
      header += "Content-Range: bytes \(startByte)-\(endByte)/\(totalLength)\r\n"
      header += "Content-Length: \(sliceLength)\r\n"
      header += "Accept-Ranges: bytes\r\n"
      header += "Connection: close\r\n"
      header += "\r\n"
    } else {
      header = "HTTP/1.1 200 OK\r\n"
      header += "Content-Type: \(info.contentType)\r\n"
      header += "Content-Length: \(totalLength)\r\n"
      header += "Accept-Ranges: bytes\r\n"
      header += "Connection: close\r\n"
      header += "\r\n"
    }

    connection.send(content: Data(header.utf8), completion: .contentProcessed { _ in })

    let chunkSize = 64 * 1024
    var offset: Int64 = startByte
    let serveEnd = endByte + 1

    func sendNextChunk() {
      guard offset < serveEnd else {
        connection.cancel()
        return
      }
      let length = min(chunkSize, Int(serveEnd - offset))
      guard let chunk = self.cache.readData(for: key, offset: offset, length: length) else {
        connection.cancel()
        return
      }
      offset += Int64(chunk.count)
      let isComplete = offset >= serveEnd
      connection.send(content: chunk, completion: .contentProcessed { error in
        if error != nil || isComplete {
          connection.cancel()
        } else {
          sendNextChunk()
        }
      })
    }

    sendNextChunk()
  }

  // MARK: - Cache Miss / Partial — Stream via Coordinator

  private func serveThroughCoordinator(request: ParsedRequest, on connection: NWConnection) {
    let key = request.cacheKey
    let headers = request.headers.isEmpty ? nil : request.headers
    let rangeStart = request.rangeStart ?? 0

    // Track how many cache bytes we've sent to this client (absolute offset)
    var bytesSent: Int64 = rangeStart
    // Whether we're using chunked transfer encoding (no Content-Length known)
    var isChunked = false
    // Whether we've sent the HTTP response header yet
    var headerSent = false
    // Set when a send fails (client disconnected), prevents further writes.
    var connectionClosed = false
    // Download state — set from coordinator completion on serverQueue.
    var downloadFinished = false
    var downloadResult: Result<Void, Error> = .success(())

    // Try to send the HTTP response header once content info is available.
    func trySendResponseHeader() {
      guard !headerSent else { return }
      guard let contentInfo = self.cache.contentInfo(for: key) else { return }
      headerSent = true

      let contentLength = contentInfo.contentLength
      var header: String
      if contentLength > 0 {
        isChunked = false
        if request.rangeStart != nil {
          let endByte = request.rangeEnd ?? (contentLength - 1)
          let responseLength = endByte - rangeStart + 1
          header = "HTTP/1.1 206 Partial Content\r\n"
          header += "Content-Type: \(contentInfo.contentType)\r\n"
          header += "Content-Range: bytes \(rangeStart)-\(endByte)/\(contentLength)\r\n"
          header += "Content-Length: \(responseLength)\r\n"
          header += "Accept-Ranges: bytes\r\n"
          header += "Connection: close\r\n"
          header += "\r\n"
        } else {
          header = "HTTP/1.1 200 OK\r\n"
          header += "Content-Type: \(contentInfo.contentType)\r\n"
          header += "Content-Length: \(contentLength)\r\n"
          header += "Accept-Ranges: bytes\r\n"
          header += "Connection: close\r\n"
          header += "\r\n"
        }
      } else {
        isChunked = true
        header = "HTTP/1.1 200 OK\r\n"
        header += "Content-Type: \(contentInfo.contentType)\r\n"
        header += "Transfer-Encoding: chunked\r\n"
        header += "Connection: close\r\n"
        header += "\r\n"
      }

      connection.send(content: Data(header.utf8), completion: .contentProcessed { error in
        if error != nil { connectionClosed = true }
      })
    }

    // Send ONE chunk, then schedule the next via completion callback.
    // This ensures only one write is in flight at a time, preventing
    // broken pipe spam when the client disconnects.
    func sendNextChunk() {
      guard !connectionClosed else { return }

      // If the download failed before we sent headers, send an error response.
      if downloadFinished, case .failure = downloadResult, !headerSent {
        self.sendError(status: 502, message: "Bad Gateway", on: connection)
        return
      }

      trySendResponseHeader()
      guard headerSent else {
        // Content info not ready yet — retry shortly.
        self.serverQueue.asyncAfter(deadline: .now() + .milliseconds(5)) {
          sendNextChunk()
        }
        return
      }

      let available = self.cache.cachedBytes(for: key)
      if bytesSent < available {
        let chunkSize = min(Int(available - bytesSent), 64 * 1024)
        guard let chunk = self.cache.readData(for: key, offset: bytesSent, length: chunkSize) else {
          // Read failed — close.
          connection.cancel()
          return
        }

        let dataToSend: Data
        if isChunked {
          var chunked = Data(String(chunk.count, radix: 16).utf8)
          chunked.append(Data("\r\n".utf8))
          chunked.append(chunk)
          chunked.append(Data("\r\n".utf8))
          dataToSend = chunked
        } else {
          dataToSend = chunk
        }

        bytesSent += Int64(chunk.count)
        connection.send(content: dataToSend, completion: .contentProcessed { [weak self] error in
          guard self != nil else { return }
          if error != nil {
            connectionClosed = true
            connection.cancel()
            return
          }
          // Send next chunk on the server queue.
          self?.serverQueue.async { sendNextChunk() }
        })
        return
      }

      // No more cached data available right now.
      if downloadFinished {
        // Verify all bytes were sent (cache flush race).
        if !isChunked, let info = self.cache.contentInfo(for: key),
           info.contentLength > 0, bytesSent < info.contentLength {
          // Retry after a brief delay.
          self.serverQueue.asyncAfter(deadline: .now() + .milliseconds(10)) {
            sendNextChunk()
          }
          return
        }

        // All done — close the connection.
        switch downloadResult {
        case .success:
          if isChunked {
            connection.send(content: Data("0\r\n\r\n".utf8), completion: .contentProcessed { _ in
              connection.cancel()
            })
          } else {
            connection.send(content: Data(), completion: .contentProcessed { _ in
              connection.cancel()
            })
          }
        case .failure:
          if !headerSent {
            self.sendError(status: 502, message: "Bad Gateway", on: connection)
          } else {
            connection.cancel()
          }
        }
        return
      }

      // Download still in progress — wait for more data.
      self.serverQueue.asyncAfter(deadline: .now() + .milliseconds(5)) {
        sendNextChunk()
      }
    }

    // Start the send chain.
    sendNextChunk()

    // Kick off the download — the coordinator stores contentInfo as soon
    // as the upstream response headers arrive.
    coordinator.download(url: request.upstreamURL, headers: headers, cacheKey: key) { [weak self] result in
      self?.serverQueue.async {
        downloadResult = result
        downloadFinished = true
      }
    }
  }

  // MARK: - Passthrough (no caching) — Live Segments

  /// Relay an upstream resource straight to the client WITHOUT persisting it to
  /// the cache. Used for the segments of a live (non-VOD) playlist: they are
  /// write-once-read-once, so caching them would only churn the bounded LRU and
  /// evict reusable VOD content.
  ///
  /// The whole resource is buffered in memory — HLS audio segments are small (a
  /// few seconds each) — then relayed, mirroring the upstream `Content-Type` and
  /// forwarding/echoing a byte range (206) when present. On any transport error
  /// or non-2xx upstream status it returns 502, matching the manifest path.
  /// Uses `URLSession.shared` (no delegate), so it never touches the streaming
  /// `DownloadCoordinator` or the cache.
  private func servePassthrough(request: ParsedRequest, on connection: NWConnection) {
    var urlRequest = URLRequest(url: request.upstreamURL)
    for (k, v) in request.headers {
      urlRequest.setValue(v, forHTTPHeaderField: k)
    }
    if let start = request.rangeStart {
      let value = request.rangeEnd.map { "bytes=\(start)-\($0)" } ?? "bytes=\(start)-"
      urlRequest.setValue(value, forHTTPHeaderField: "Range")
    }

    let task = URLSession.shared.dataTask(with: urlRequest) { [weak self] data, response, error in
      guard let self = self else { return }
      // Hop back onto serverQueue to serialize the NWConnection.send.
      self.serverQueue.async {
        let http = response as? HTTPURLResponse
        let status = http?.statusCode ?? 0
        guard let data = data, error == nil, (200...299).contains(status) else {
          self.sendError(status: 502, message: "Bad Gateway", on: connection)
          return
        }

        let contentType = http?.value(forHTTPHeaderField: "Content-Type") ?? "application/octet-stream"

        var header: String
        if status == 206, let contentRange = http?.value(forHTTPHeaderField: "Content-Range") {
          header = "HTTP/1.1 206 Partial Content\r\n"
          header += "Content-Type: \(contentType)\r\n"
          header += "Content-Range: \(contentRange)\r\n"
          header += "Content-Length: \(data.count)\r\n"
          header += "Accept-Ranges: bytes\r\n"
          header += "Connection: close\r\n"
          header += "\r\n"
        } else {
          header = "HTTP/1.1 200 OK\r\n"
          header += "Content-Type: \(contentType)\r\n"
          header += "Content-Length: \(data.count)\r\n"
          header += "Accept-Ranges: bytes\r\n"
          header += "Connection: close\r\n"
          header += "\r\n"
        }

        var response = Data(header.utf8)
        response.append(data)
        connection.send(content: response, completion: .contentProcessed { _ in
          connection.cancel()
        })
      }
    }
    task.resume()
  }

  // MARK: - Manifest (HLS Playlist) Rewrite

  /// Serve an HLS playlist by rewriting its child URIs back through the proxy
  /// so AVPlayer fetches every segment/variant via the proxy (cached per-URL).
  ///
  /// The RAW (un-rewritten) bytes of VOD playlists are cached; the REWRITTEN
  /// bytes are never cached because they embed the localhost port, which changes
  /// every app launch — so we always rewrite fresh on serve. Live (non-VOD)
  /// playlists are never cached.
  private func serveManifest(request: ParsedRequest, on connection: NWConnection) {
    let key = request.cacheKey

    // Cache hit: we only ever cache VOD playlists, so a hit is a confirmed
    // VOD playlist — read the raw bytes back from cache and serve them rewritten.
    // If the cached bytes are missing/empty (e.g. a meta-without-data
    // inconsistency from an interrupted write or partial eviction), fall through
    // to a fresh upstream fetch rather than hard-failing playback with a 500.
    if cache.isFullyCached(key: key),
       let info = cache.contentInfo(for: key),
       info.contentLength > 0,
       let raw = cache.readData(for: key, offset: 0, length: Int(info.contentLength)) {
      finishManifest(rawData: raw, freshlyFetched: false, request: request, on: connection)
      return
    }

    // Cache miss: fetch the upstream manifest fully into memory. URLSession.shared
    // has no delegate, so this completion-handler task won't touch the streaming
    // DownloadCoordinator.
    var urlRequest = URLRequest(url: request.upstreamURL)
    for (k, v) in request.headers {
      urlRequest.setValue(v, forHTTPHeaderField: k)
    }

    let task = URLSession.shared.dataTask(with: urlRequest) { [weak self] data, response, error in
      guard let self = self else { return }
      // Completion handlers run off the server queue — hop back onto it so all
      // NWConnection.send work stays consistent with the rest of the file.
      self.serverQueue.async {
        // Treat transport errors, non-2xx upstream status, and empty bodies as a
        // bad gateway. Without the status/empty checks a 404 HTML error page
        // would be served as a 200 (masking the upstream failure) and an empty
        // 200 would yield an unparseable manifest instead of a clear 502.
        let http = response as? HTTPURLResponse
        let statusOK = http.map { (200...299).contains($0.statusCode) } ?? false
        guard let data = data, !data.isEmpty, error == nil, statusOK else {
          self.sendError(status: 502, message: "Bad Gateway", on: connection)
          return
        }

        let contentType = http?.value(forHTTPHeaderField: "Content-Type")

        // Confirm this really is an HLS playlist; the extension said so but the
        // body might not be. If not, serve the raw bytes untouched — never
        // corrupt a non-HLS body.
        guard HLSManifestRewriter.isPlaylist(contentType: contentType, url: request.upstreamURL, body: data) else {
          self.serveRawManifest(rawData: data, contentType: contentType ?? "application/octet-stream", on: connection)
          return
        }

        self.finishManifest(rawData: data, freshlyFetched: true, request: request, on: connection)
      }
    }
    task.resume()
  }

  /// Rewrite the raw playlist bytes, cache the raw VOD bytes when appropriate,
  /// and serve the rewritten body. Runs on `serverQueue`. The response is always
  /// served with an HLS content type so AVPlayer parses the rewritten body.
  private func finishManifest(
    rawData: Data,
    freshlyFetched: Bool,
    request: ParsedRequest,
    on connection: NWConnection
  ) {
    let rawString = String(decoding: rawData, as: UTF8.self)
    let key = request.cacheKey
    let headers = request.headers.isEmpty ? nil : request.headers

    // A VOD playlist (has `#EXT-X-ENDLIST`) gets cacheable child URLs; a live
    // playlist's children are marked `nocache` so its ephemeral segments stream
    // through without persisting. Each manifest level re-decides this from its
    // own body, so a VOD media playlist reached via a (non-VOD) master still
    // caches its segments.
    let isVOD = HLSManifestRewriter.isVOD(rawString)

    let rewritten = HLSManifestRewriter.rewrite(
      manifest: rawString,
      baseURL: request.upstreamURL,
      transform: { self.proxyURL(for: $0, headers: headers, cacheable: isVOD) }
    )

    // Cache the RAW (un-rewritten) bytes only for freshly fetched VOD playlists,
    // and only when nothing is cached yet for this key. `AudioCache.appendData`
    // appends without truncating, so without the empty guard a concurrent
    // (de-dup-less) re-fetch of the same manifest would double the data file
    // while `contentLength` stayed single-length. The data is written *before*
    // the content info is stored so `isFullyCached` (which needs both the meta
    // and bytes) never flips true against an as-yet-empty data file.
    if freshlyFetched,
       isVOD,
       cache.cachedBytes(for: key) == 0 {
      cache.appendData(rawData, for: key)
      let info = AudioCache.ContentInfo(
        contentType: "application/vnd.apple.mpegurl",
        contentLength: Int64(rawData.count),
        isByteRangeAccessSupported: false
      )
      cache.storeContentInfo(info, for: key, url: request.upstreamURL)
    }

    let body = Data(rewritten.utf8)
    var header = "HTTP/1.1 200 OK\r\n"
    header += "Content-Type: application/vnd.apple.mpegurl\r\n"
    header += "Content-Length: \(body.count)\r\n"
    header += "Connection: close\r\n"
    header += "\r\n"

    var response = Data(header.utf8)
    response.append(body)

    connection.send(content: response, completion: .contentProcessed { _ in
      connection.cancel()
    })
  }

  /// Serve raw bytes unchanged as a normal 200 response. Used as a fallback when
  /// a request's extension claimed HLS but the fetched body is not a playlist.
  /// Runs on `serverQueue`.
  private func serveRawManifest(rawData: Data, contentType: String, on connection: NWConnection) {
    var header = "HTTP/1.1 200 OK\r\n"
    header += "Content-Type: \(contentType)\r\n"
    header += "Content-Length: \(rawData.count)\r\n"
    header += "Connection: close\r\n"
    header += "\r\n"

    var response = Data(header.utf8)
    response.append(rawData)

    connection.send(content: response, completion: .contentProcessed { _ in
      connection.cancel()
    })
  }

  // MARK: - Error Response

  private func sendError(status: Int, message: String, on connection: NWConnection) {
    let body = message
    var header = "HTTP/1.1 \(status) \(message)\r\n"
    header += "Content-Type: text/plain\r\n"
    header += "Content-Length: \(body.utf8.count)\r\n"
    header += "Connection: close\r\n"
    header += "\r\n"

    var response = Data(header.utf8)
    response.append(Data(body.utf8))

    connection.send(content: response, completion: .contentProcessed { _ in
      connection.cancel()
    })
  }
}