import type { TrackType } from '../domain/configuration'
import { setTimeout } from '../tools/timer'
import { clocksNow, ONE_MINUTE, ONE_SECOND } from '../tools/utils/timeUtils'
import { ONE_MEBI_BYTE, ONE_KIBI_BYTE } from '../tools/utils/byteUtils'
import { isServerError } from '../tools/utils/responseUtils'
import type { RawError } from '../domain/error/error.types'
import { ErrorSource } from '../domain/error/error.types'
import type { Observable } from '../tools/observable'
import type { Payload, HttpRequestEvent, HttpResponse, BandwidthStats } from './httpRequest'
export const MAX_ONGOING_BYTES_COUNT = 80 * ONE_KIBI_BYTE
export const MAX_ONGOING_REQUESTS = 32
export const MAX_QUEUE_BYTES_COUNT = 20 * ONE_MEBI_BYTE
export const MAX_BACKOFF_TIME = ONE_MINUTE
export const INITIAL_BACKOFF_TIME = ONE_SECOND
const enum TransportStatus {
UP,
FAILURE_DETECTED,
DOWN,
}
const enum RetryReason {
AFTER_SUCCESS,
AFTER_RESUME,
}
export interface RetryState
{
transportStatus: TransportStatus
currentBackoffTime: number
bandwidthMonitor: ReturnType
queuedPayloads: ReturnType>
queueFullReported: boolean
}
type SendStrategy = (payload: Body, onResponse: (r: HttpResponse) => void) => void
export function sendWithRetryStrategy(
payload: Body,
state: RetryState,
sendStrategy: SendStrategy,
trackType: TrackType,
reportError: (error: RawError) => void,
requestObservable: Observable>
) {
if (
state.transportStatus === TransportStatus.UP &&
state.queuedPayloads.size() === 0 &&
state.bandwidthMonitor.canHandle(payload)
) {
send(payload, state, sendStrategy, requestObservable, {
onSuccess: () =>
retryQueuedPayloads(RetryReason.AFTER_SUCCESS, state, sendStrategy, trackType, reportError, requestObservable),
onFailure: () => {
if (!state.queuedPayloads.enqueue(payload)) {
requestObservable.notify({ type: 'queue-full', bandwidth: state.bandwidthMonitor.stats(), payload })
}
scheduleRetry(state, sendStrategy, trackType, reportError, requestObservable)
},
})
} else {
if (!state.queuedPayloads.enqueue(payload)) {
requestObservable.notify({ type: 'queue-full', bandwidth: state.bandwidthMonitor.stats(), payload })
}
}
}
function scheduleRetry(
state: RetryState,
sendStrategy: SendStrategy,
trackType: TrackType,
reportError: (error: RawError) => void,
requestObservable: Observable>
) {
if (state.transportStatus !== TransportStatus.DOWN) {
return
}
setTimeout(() => {
const payload = state.queuedPayloads.first()
send(payload, state, sendStrategy, requestObservable, {
onSuccess: () => {
state.queuedPayloads.dequeue()
state.currentBackoffTime = INITIAL_BACKOFF_TIME
retryQueuedPayloads(RetryReason.AFTER_RESUME, state, sendStrategy, trackType, reportError, requestObservable)
},
onFailure: () => {
state.currentBackoffTime = Math.min(MAX_BACKOFF_TIME, state.currentBackoffTime * 2)
scheduleRetry(state, sendStrategy, trackType, reportError, requestObservable)
},
})
}, state.currentBackoffTime)
}
function send(
payload: Body,
state: RetryState,
sendStrategy: SendStrategy,
requestObservable: Observable>,
{ onSuccess, onFailure }: { onSuccess: () => void; onFailure: () => void }
) {
state.bandwidthMonitor.add(payload)
sendStrategy(payload, (response) => {
state.bandwidthMonitor.remove(payload)
if (!shouldRetryRequest(response)) {
state.transportStatus = TransportStatus.UP
requestObservable.notify({ type: 'success', bandwidth: state.bandwidthMonitor.stats(), payload })
onSuccess()
} else {
// do not consider transport down if another ongoing request could succeed
state.transportStatus =
state.bandwidthMonitor.ongoingRequestCount > 0 ? TransportStatus.FAILURE_DETECTED : TransportStatus.DOWN
payload.retry = {
count: payload.retry ? payload.retry.count + 1 : 1,
lastFailureStatus: response.status,
}
requestObservable.notify({ type: 'failure', bandwidth: state.bandwidthMonitor.stats(), payload })
onFailure()
}
})
}
function retryQueuedPayloads(
reason: RetryReason,
state: RetryState,
sendStrategy: SendStrategy,
trackType: TrackType,
reportError: (error: RawError) => void,
requestObservable: Observable>
) {
if (reason === RetryReason.AFTER_SUCCESS && state.queuedPayloads.isFull() && !state.queueFullReported) {
reportError({
message: `Reached max ${trackType} events size queued for upload: ${MAX_QUEUE_BYTES_COUNT / ONE_MEBI_BYTE}MiB`,
source: ErrorSource.AGENT,
startClocks: clocksNow(),
})
state.queueFullReported = true
}
const previousQueue = state.queuedPayloads
state.queuedPayloads = newPayloadQueue()
while (previousQueue.size() > 0) {
sendWithRetryStrategy(previousQueue.dequeue()!, state, sendStrategy, trackType, reportError, requestObservable)
}
}
function shouldRetryRequest(response: HttpResponse) {
return (
response.type !== 'opaque' &&
((response.status === 0 && !navigator.onLine) ||
response.status === 408 ||
response.status === 429 ||
isServerError(response.status))
)
}
export function newRetryState(): RetryState {
return {
transportStatus: TransportStatus.UP,
currentBackoffTime: INITIAL_BACKOFF_TIME,
bandwidthMonitor: newBandwidthMonitor(),
queuedPayloads: newPayloadQueue(),
queueFullReported: false,
}
}
function newPayloadQueue() {
const queue: Body[] = []
return {
bytesCount: 0,
enqueue(payload: Body) {
if (this.isFull()) {
return false
}
queue.push(payload)
this.bytesCount += payload.bytesCount
return true
},
first() {
return queue[0]
},
dequeue() {
const payload = queue.shift()
if (payload) {
this.bytesCount -= payload.bytesCount
}
return payload
},
size() {
return queue.length
},
isFull() {
return this.bytesCount >= MAX_QUEUE_BYTES_COUNT
},
}
}
function newBandwidthMonitor() {
return {
ongoingRequestCount: 0,
ongoingByteCount: 0,
canHandle(payload: Payload) {
return (
this.ongoingRequestCount === 0 ||
(this.ongoingByteCount + payload.bytesCount <= MAX_ONGOING_BYTES_COUNT &&
this.ongoingRequestCount < MAX_ONGOING_REQUESTS)
)
},
add(payload: Payload) {
this.ongoingRequestCount += 1
this.ongoingByteCount += payload.bytesCount
},
remove(payload: Payload) {
this.ongoingRequestCount -= 1
this.ongoingByteCount -= payload.bytesCount
},
stats(): BandwidthStats {
return {
ongoingByteCount: this.ongoingByteCount,
ongoingRequestCount: this.ongoingRequestCount,
}
},
}
}