#define MS_CLASS "RTC::Transport"
// #define MS_LOG_DEV_LEVEL 3

#include "RTC/Transport.hpp"
#include "Logger.hpp"
#include "MediaSoupErrors.hpp"
#include "Utils.hpp"
#ifdef MS_LIBURING_SUPPORTED
#include "DepLibUring.hpp"
#endif
#include "FBS/sctpAssociation.h"
#include "FBS/transport.h"
#include "RTC/BweType.hpp"
#include "RTC/Consts.hpp"
#include "RTC/PipeConsumer.hpp"
#include "RTC/RTCP/FeedbackPs.hpp"
#include "RTC/RTCP/FeedbackPsAfb.hpp"
#include "RTC/RTCP/FeedbackPsRemb.hpp"
#include "RTC/RTCP/FeedbackRtpNack.hpp"
#include "RTC/RTCP/FeedbackRtpTransport.hpp"
#include "RTC/RTCP/XrDelaySinceLastRr.hpp"
#include "RTC/RtpDictionaries.hpp"
#include "RTC/SCTP/association/Association.hpp"
#include "RTC/SCTP/public/SctpOptions.hpp"
#include "RTC/SimpleConsumer.hpp"
#include "RTC/SimulcastConsumer.hpp"
#include "RTC/SvcConsumer.hpp"
#ifdef MS_RTC_LOGGER_RTP
#include "RTC/RtcLogger.hpp"
#endif
#include <libwebrtc/modules/rtp_rtcp/include/rtp_rtcp_defines.h> // webrtc::RtpPacketSendInfo
#include <array>
#include <iterator> // std::ostream_iterator
#include <map>      // std::multimap

namespace RTC
{
	/* Instance methods. */

	Transport::Transport(
	  SharedInterface* shared,
	  const std::string& id,
	  RTC::Transport::Listener* listener,
	  const FBS::Transport::Options* options)
	  : id(id),
	    shared(shared),
	    listener(listener),
	    recvRtpTransmission(shared, /*ignorePaddingOnlyPackets*/ false),
	    sendRtpTransmission(shared, /*ignorePaddingOnlyPackets*/ false),
	    recvRtxTransmission(shared, /*ignorePaddingOnlyPackets*/ false, 1000u),
	    sendRtxTransmission(shared, /*ignorePaddingOnlyPackets*/ false, 1000u),
	    sendProbationTransmission(shared, /*ignorePaddingOnlyPackets*/ false, 100u)
	{
		MS_TRACE();

		this->maxSendMessageSize    = options->maxSendMessageSize();
		this->maxReceiveMessageSize = options->maxReceiveMessageSize();

		if (options->direct())
		{
			this->direct = true;
		}
		else
		{
			this->sctpSendBufferSize              = options->sctpSendBufferSize();
			this->sctpPerStreamSendQueueLimit     = options->sctpPerStreamSendQueueLimit();
			this->sctpMaxReceiverWindowBufferSize = options->sctpMaxReceiverWindowBufferSize();
		}

		if (
		  auto initialAvailableOutgoingBitrate = options->initialAvailableOutgoingBitrate();
		  initialAvailableOutgoingBitrate.has_value())
		{
			this->initialAvailableOutgoingBitrate = initialAvailableOutgoingBitrate.value();
		}

		if (options->enableSctp())
		{
			if (this->direct)
			{
				MS_THROW_TYPE_ERROR("cannot enable SCTP in a direct Transport");
			}

			const RTC::SCTP::SctpOptions sctpOptions = {
				.mtu                         = RTC::Consts::MaxSafeMtuSizeForSctp,
				.maxSendMessageSize          = this->maxSendMessageSize,
				.maxSendBufferSize           = this->sctpSendBufferSize,
				.perStreamSendQueueLimit     = this->sctpPerStreamSendQueueLimit,
				.maxReceiveMessageSize       = this->maxReceiveMessageSize,
				.maxReceiverWindowBufferSize = this->sctpMaxReceiverWindowBufferSize
			};

			this->sctpAssociation = std::make_unique<RTC::SCTP::Association>(
			  sctpOptions, this, this->shared, options->isDataChannel());
		}

		// Create the RTCP timer.
		this->rtcpTimer = this->shared->CreateTimer(this);
	}

	Transport::~Transport()
	{
		MS_TRACE();

		// The destructor must delete and clear everything silently.

		// Delete all Producers.
		for (auto& kv : this->mapProducers)
		{
			auto* producer = kv.second;

			delete producer;
		}
		this->mapProducers.clear();

		// Delete all Consumers.
		for (auto& kv : this->mapConsumers)
		{
			auto* consumer = kv.second;

			delete consumer;
		}
		this->mapConsumers.clear();
		this->mapSsrcConsumer.clear();
		this->mapRtxSsrcConsumer.clear();

		// Delete all DataProducers.
		for (auto& kv : this->mapDataProducers)
		{
			auto* dataProducer = kv.second;

			delete dataProducer;
		}
		this->mapDataProducers.clear();

		// Delete all DataConsumers.
		for (auto& kv : this->mapDataConsumers)
		{
			auto* dataConsumer = kv.second;

			delete dataConsumer;
		}
		this->mapDataConsumers.clear();
		this->mapSctpStreamIdDataConsumers.clear();

		// NOTE: We don't close `this->sctpAssociation` here since the
		// `SetDestroying()` method has already been called by the Transport
		// subclass and it closed the SCTP Association.
		//
		// NOTE: We cannot do it here in the destructor because here we are no longer
		// the Transport subclass but Transport parent (this is how the destruction
		// chain works in C++).

		// Delete the RTCP timer.
		delete this->rtcpTimer;
		this->rtcpTimer = nullptr;
	}

	void Transport::CloseProducersAndConsumers()
	{
		MS_TRACE();

		// This method is called by the Router and must notify him about all Producers
		// and Consumers that we are gonna close.
		//
		// The caller is supposed to delete this Transport instance after calling
		// this method.

		// Close all Producers.
		for (auto& kv : this->mapProducers)
		{
			auto* producer = kv.second;

			// Notify the listener.
			this->listener->OnTransportProducerClosed(this, producer);

			delete producer;
		}
		this->mapProducers.clear();

		// Delete all Consumers.
		for (auto& kv : this->mapConsumers)
		{
			auto* consumer = kv.second;

			// Notify the listener.
			this->listener->OnTransportConsumerClosed(this, consumer);

			delete consumer;
		}
		this->mapConsumers.clear();
		this->mapSsrcConsumer.clear();
		this->mapRtxSsrcConsumer.clear();

		// Delete all DataProducers.
		for (auto& kv : this->mapDataProducers)
		{
			auto* dataProducer = kv.second;

			// Notify the listener.
			this->listener->OnTransportDataProducerClosed(this, dataProducer);

			delete dataProducer;
		}
		this->mapDataProducers.clear();

		// Delete all DataConsumers.
		for (auto& kv : this->mapDataConsumers)
		{
			auto* dataConsumer = kv.second;

			// Notify the listener.
			this->listener->OnTransportDataConsumerClosed(this, dataConsumer);

			delete dataConsumer;
		}
		this->mapDataConsumers.clear();
		this->mapSctpStreamIdDataConsumers.clear();
	}

	void Transport::ListenServerClosed()
	{
		MS_TRACE();

		// Ask our parent Router to close/delete us.
		this->listener->OnTransportListenServerClosed(this);
	}

	flatbuffers::Offset<FBS::Transport::Dump> Transport::FillBuffer(
	  flatbuffers::FlatBufferBuilder& builder) const
	{
		MS_TRACE();

		// Add producerIds.
		std::vector<flatbuffers::Offset<flatbuffers::String>> producerIds;

		for (const auto& kv : this->mapProducers)
		{
			const auto& producerId = kv.first;

			producerIds.emplace_back(builder.CreateString(producerId));
		}

		// Add consumerIds.
		std::vector<flatbuffers::Offset<flatbuffers::String>> consumerIds;

		for (const auto& kv : this->mapConsumers)
		{
			const auto& consumerId = kv.first;

			consumerIds.emplace_back(builder.CreateString(consumerId));
		}

		// Add mapSsrcConsumerId.
		std::vector<flatbuffers::Offset<FBS::Common::Uint32String>> mapSsrcConsumerId;

		for (const auto& kv : this->mapSsrcConsumer)
		{
			auto ssrc      = kv.first;
			auto* consumer = kv.second;

			mapSsrcConsumerId.emplace_back(
			  FBS::Common::CreateUint32StringDirect(builder, ssrc, consumer->id.c_str()));
		}

		// Add mapRtxSsrcConsumerId.
		std::vector<flatbuffers::Offset<FBS::Common::Uint32String>> mapRtxSsrcConsumerId;

		for (const auto& kv : this->mapRtxSsrcConsumer)
		{
			auto ssrc      = kv.first;
			auto* consumer = kv.second;

			mapRtxSsrcConsumerId.emplace_back(
			  FBS::Common::CreateUint32StringDirect(builder, ssrc, consumer->id.c_str()));
		}

		// Add dataProducerIds.
		std::vector<flatbuffers::Offset<flatbuffers::String>> dataProducerIds;

		for (const auto& kv : this->mapDataProducers)
		{
			const auto& dataProducerId = kv.first;

			dataProducerIds.emplace_back(builder.CreateString(dataProducerId));
		}

		// Add dataConsumerIds.
		std::vector<flatbuffers::Offset<flatbuffers::String>> dataConsumerIds;

		for (const auto& kv : this->mapDataConsumers)
		{
			const auto& dataConsumerId = kv.first;

			dataConsumerIds.emplace_back(builder.CreateString(dataConsumerId));
		}

		// Add headerExtensionIds.
		auto recvRtpHeaderExtensions = FBS::Transport::CreateRecvRtpHeaderExtensions(
		  builder,
		  this->recvRtpHeaderExtensionIds.mid != 0u
		    ? flatbuffers::Optional<uint8_t>(this->recvRtpHeaderExtensionIds.mid)
		    : flatbuffers::nullopt,
		  this->recvRtpHeaderExtensionIds.rid != 0u
		    ? flatbuffers::Optional<uint8_t>(this->recvRtpHeaderExtensionIds.rid)
		    : flatbuffers::nullopt,
		  this->recvRtpHeaderExtensionIds.rrid != 0u
		    ? flatbuffers::Optional<uint8_t>(this->recvRtpHeaderExtensionIds.rrid)
		    : flatbuffers::nullopt,
		  this->recvRtpHeaderExtensionIds.absSendTime != 0u
		    ? flatbuffers::Optional<uint8_t>(this->recvRtpHeaderExtensionIds.absSendTime)
		    : flatbuffers::nullopt,
		  this->recvRtpHeaderExtensionIds.transportWideCc01 != 0u
		    ? flatbuffers::Optional<uint8_t>(this->recvRtpHeaderExtensionIds.transportWideCc01)
		    : flatbuffers::nullopt);

		auto rtpListenerOffset = this->rtpListener.FillBuffer(builder);

		// Add sctpParameters.
		flatbuffers::Offset<FBS::SctpParameters::SctpParameters> sctpParameters;
		// Add sctpState.
		FBS::SctpAssociation::SctpState sctpState{ FBS::SctpAssociation::SctpState::NEW };
		// Add sctpListener.
		flatbuffers::Offset<FBS::Transport::SctpListener> sctpListener;

		if (this->sctpAssociation)
		{
			// Add sctpParameters.
			sctpParameters = this->sctpAssociation->FillBuffer(builder);

			// NOTE: There is never permanent FAILED state.
			switch (this->sctpAssociation->GetAssociationState())
			{
				case RTC::SCTP::Types::AssociationState::NEW:
				{
					sctpState = FBS::SctpAssociation::SctpState::NEW;
					break;
				}

				case RTC::SCTP::Types::AssociationState::CONNECTING:
				{
					sctpState = FBS::SctpAssociation::SctpState::CONNECTING;
					break;
				}

				case RTC::SCTP::Types::AssociationState::CONNECTED:
				{
					sctpState = FBS::SctpAssociation::SctpState::CONNECTED;
					break;
				}

				case RTC::SCTP::Types::AssociationState::SHUTTING_DOWN:
				case RTC::SCTP::Types::AssociationState::CLOSED:
				{
					sctpState = FBS::SctpAssociation::SctpState::CLOSED;
					break;
				}
			}

			sctpListener = this->sctpListener.FillBuffer(builder);
		}

		// Add traceEventTypes.
		std::vector<FBS::Transport::TraceEventType> traceEventTypes;

		if (this->traceEventTypes.probation)
		{
			traceEventTypes.emplace_back(FBS::Transport::TraceEventType::PROBATION);
		}
		if (this->traceEventTypes.bwe)
		{
			traceEventTypes.emplace_back(FBS::Transport::TraceEventType::BWE);
		}

		return FBS::Transport::CreateDumpDirect(
		  builder,
		  this->id.c_str(),
		  this->direct,
		  &producerIds,
		  &consumerIds,
		  &mapSsrcConsumerId,
		  &mapRtxSsrcConsumerId,
		  &dataProducerIds,
		  &dataConsumerIds,
		  recvRtpHeaderExtensions,
		  rtpListenerOffset,
		  this->maxSendMessageSize,
		  this->maxReceiveMessageSize,
		  sctpParameters,
		  this->sctpAssociation ? flatbuffers::Optional<FBS::SctpAssociation::SctpState>(sctpState)
		                        : flatbuffers::nullopt,
		  sctpListener,
		  &traceEventTypes);
	}

	flatbuffers::Offset<FBS::Transport::Stats> Transport::FillBufferStats(
	  flatbuffers::FlatBufferBuilder& builder)
	{
		MS_TRACE();

		auto nowMs = this->shared->GetTimeMs();

		// Add sctpState.
		FBS::SctpAssociation::SctpState sctpState{ FBS::SctpAssociation::SctpState::NEW };

		// Add sctpState.
		if (this->sctpAssociation)
		{
			// NOTE: There is never permanent FAILED state.
			switch (this->sctpAssociation->GetAssociationState())
			{
				case RTC::SCTP::Types::AssociationState::NEW:
				{
					sctpState = FBS::SctpAssociation::SctpState::NEW;
					break;
				}

				case RTC::SCTP::Types::AssociationState::CONNECTING:
				{
					sctpState = FBS::SctpAssociation::SctpState::CONNECTING;
					break;
				}

				case RTC::SCTP::Types::AssociationState::CONNECTED:
				{
					sctpState = FBS::SctpAssociation::SctpState::CONNECTED;
					break;
				}

				case RTC::SCTP::Types::AssociationState::SHUTTING_DOWN:
				case RTC::SCTP::Types::AssociationState::CLOSED:
				{
					sctpState = FBS::SctpAssociation::SctpState::CLOSED;
					break;
				}
			}
		}

		return FBS::Transport::CreateStatsDirect(
		  builder,
		  // transportId.
		  this->id.c_str(),
		  // timestamp.
		  nowMs,
		  // sctpState.
		  this->sctpAssociation ? flatbuffers::Optional<FBS::SctpAssociation::SctpState>(sctpState)
		                        : flatbuffers::nullopt,
		  // bytesReceived.
		  this->recvTransmission.GetBytes(),
		  // recvBitrate.
		  this->recvTransmission.GetRate(nowMs),
		  // bytesSent.
		  this->sendTransmission.GetBytes(),
		  // sendBitrate.
		  this->sendTransmission.GetRate(nowMs),
		  // rtpBytesReceived.
		  this->recvRtpTransmission.GetBytes(),
		  // rtpRecvBitrate.
		  this->recvRtpTransmission.GetBitrate(nowMs),
		  // rtpBytesSent.
		  this->sendRtpTransmission.GetBytes(),
		  // rtpSendBitrate.
		  this->sendRtpTransmission.GetBitrate(nowMs),
		  // rtxBytesReceived.
		  this->recvRtxTransmission.GetBytes(),
		  // rtxRecvBitrate.
		  this->recvRtxTransmission.GetBitrate(nowMs),
		  // rtxBytesSent.
		  this->sendRtxTransmission.GetBytes(),
		  // rtxSendBitrate.
		  this->sendRtxTransmission.GetBitrate(nowMs),
		  // probationBytesSent.
		  this->sendProbationTransmission.GetBytes(),
		  // probationSendBitrate.
		  this->sendProbationTransmission.GetBitrate(nowMs),
		  // availableOutgoingBitrate.
		  this->tccClient ? flatbuffers::Optional<uint32_t>(this->tccClient->GetAvailableBitrate())
		                  : flatbuffers::nullopt,
		  // availableIncomingBitrate.
		  this->tccServer ? flatbuffers::Optional<uint32_t>(this->tccServer->GetAvailableBitrate())
		                  : flatbuffers::nullopt,
		  // maxIncomingBitrate.
		  this->maxIncomingBitrate ? flatbuffers::Optional<uint32_t>(this->maxIncomingBitrate)
		                           : flatbuffers::nullopt,
		  // maxOutgoingBitrate.
		  this->maxOutgoingBitrate ? flatbuffers::Optional<uint32_t>(this->maxOutgoingBitrate)
		                           : flatbuffers::nullopt,
		  // minOutgoingBitrate.
		  this->minOutgoingBitrate ? flatbuffers::Optional<uint32_t>(this->minOutgoingBitrate)
		                           : flatbuffers::nullopt,
		  // rtpPacketLossReceived.
		  this->tccServer ? flatbuffers::Optional<double>(this->tccServer->GetPacketLoss())
		                  : flatbuffers::nullopt,
		  // rtpPacketLossSent.
		  this->tccClient ? flatbuffers::Optional<double>(this->tccClient->GetPacketLoss())
		                  : flatbuffers::nullopt);
	}

	void Transport::HandleRequest(Channel::ChannelRequest* request)
	{
		MS_TRACE();

		switch (request->method)
		{
			case Channel::ChannelRequest::Method::TRANSPORT_SET_MAX_INCOMING_BITRATE:
			{
				const auto* body = request->data->body_as<FBS::Transport::SetMaxIncomingBitrateRequest>();

				this->maxIncomingBitrate = body->maxIncomingBitrate();

				MS_DEBUG_TAG(bwe, "maximum incoming bitrate set to %" PRIu32, this->maxIncomingBitrate);

				request->Accept();

				if (this->tccServer)
				{
					this->tccServer->SetMaxIncomingBitrate(this->maxIncomingBitrate);
				}

				break;
			}

			case Channel::ChannelRequest::Method::TRANSPORT_SET_MAX_OUTGOING_BITRATE:
			{
				const auto* body = request->data->body_as<FBS::Transport::SetMaxOutgoingBitrateRequest>();
				const uint32_t bitrate = body->maxOutgoingBitrate();

				if (bitrate > 0u && bitrate < RTC::TransportCongestionControlMinOutgoingBitrate)
				{
					MS_THROW_TYPE_ERROR(
					  "bitrate must be >= %" PRIu32 " or 0 (unlimited)",
					  RTC::TransportCongestionControlMinOutgoingBitrate);
				}
				else if (bitrate > 0u && bitrate < this->minOutgoingBitrate)
				{
					MS_THROW_TYPE_ERROR(
					  "bitrate must be >= current min outgoing bitrate (%" PRIu32 ") or 0 (unlimited)",
					  this->minOutgoingBitrate);
				}

				if (this->tccClient)
				{
					// NOTE: This may throw so don't update things before calling this
					// method.
					this->tccClient->SetMaxOutgoingBitrate(bitrate);
					this->maxOutgoingBitrate = bitrate;

					MS_DEBUG_TAG(bwe, "maximum outgoing bitrate set to %" PRIu32, this->maxOutgoingBitrate);

					ComputeOutgoingDesiredBitrate();
				}
				else
				{
					this->maxOutgoingBitrate = bitrate;
				}

				request->Accept();

				break;
			}

			case Channel::ChannelRequest::Method::TRANSPORT_SET_MIN_OUTGOING_BITRATE:
			{
				const auto* body = request->data->body_as<FBS::Transport::SetMinOutgoingBitrateRequest>();
				const uint32_t bitrate = body->minOutgoingBitrate();

				if (bitrate > 0u && bitrate < RTC::TransportCongestionControlMinOutgoingBitrate)
				{
					MS_THROW_TYPE_ERROR(
					  "bitrate must be >= %" PRIu32 " or 0 (unlimited)",
					  RTC::TransportCongestionControlMinOutgoingBitrate);
				}
				else if (bitrate > 0u && this->maxOutgoingBitrate > 0 && bitrate > this->maxOutgoingBitrate)
				{
					MS_THROW_TYPE_ERROR(
					  "bitrate must be <= current max outgoing bitrate (%" PRIu32 ") or 0 (unlimited)",
					  this->maxOutgoingBitrate);
				}

				if (this->tccClient)
				{
					// NOTE: This may throw so don't update things before calling this
					// method.
					this->tccClient->SetMinOutgoingBitrate(bitrate);
					this->minOutgoingBitrate = bitrate;

					MS_DEBUG_TAG(bwe, "minimum outgoing bitrate set to %" PRIu32, this->minOutgoingBitrate);

					ComputeOutgoingDesiredBitrate();
				}
				else
				{
					this->minOutgoingBitrate = bitrate;
				}

				request->Accept();

				break;
			}

			case Channel::ChannelRequest::Method::TRANSPORT_PRODUCE:
			{
				const auto* body = request->data->body_as<FBS::Transport::ProduceRequest>();
				auto producerId  = body->producerId()->str();

				if (this->mapProducers.find(producerId) != this->mapProducers.end())
				{
					MS_THROW_ERROR("a Producer with same producerId already exists");
				}

				// This may throw.
				auto* producer = new RTC::Producer(this->shared, producerId, this, body);

				// Insert the Producer into the RtpListener.
				// This may throw. If so, delete the Producer and throw.
				try
				{
					this->rtpListener.AddProducer(producer);
				}
				catch (const MediaSoupError& error)
				{
					delete producer;

					throw;
				}

				// Notify the listener.
				// This may throw if a Producer with same id already exists.
				try
				{
					this->listener->OnTransportNewProducer(this, producer);
				}
				catch (const MediaSoupError& error)
				{
					this->rtpListener.RemoveProducer(producer);

					delete producer;

					throw;
				}

				// Insert into the map.
				this->mapProducers[producerId] = producer;

				MS_DEBUG_DEV("Producer created [producerId:%s]", producerId.c_str());

				// Take the transport related RTP header extensions of the Producer and
				// add them to the Transport.
				// NOTE: Producer::GetRtpHeaderExtensionIds() returns the original
				// header extension ids of the Producer (and not their mapped values).
				const auto& producerRtpHeaderExtensionIds = producer->GetRtpHeaderExtensionIds();

				if (producerRtpHeaderExtensionIds.mid != 0u)
				{
					this->recvRtpHeaderExtensionIds.mid = producerRtpHeaderExtensionIds.mid;
				}

				if (producerRtpHeaderExtensionIds.rid != 0u)
				{
					this->recvRtpHeaderExtensionIds.rid = producerRtpHeaderExtensionIds.rid;
				}

				if (producerRtpHeaderExtensionIds.rrid != 0u)
				{
					this->recvRtpHeaderExtensionIds.rrid = producerRtpHeaderExtensionIds.rrid;
				}

				if (producerRtpHeaderExtensionIds.absSendTime != 0u)
				{
					this->recvRtpHeaderExtensionIds.absSendTime = producerRtpHeaderExtensionIds.absSendTime;
				}

				if (producerRtpHeaderExtensionIds.transportWideCc01 != 0u)
				{
					this->recvRtpHeaderExtensionIds.transportWideCc01 =
					  producerRtpHeaderExtensionIds.transportWideCc01;
				}

				if (producerRtpHeaderExtensionIds.dependencyDescriptor != 0u)
				{
					this->recvRtpHeaderExtensionIds.dependencyDescriptor =
					  producerRtpHeaderExtensionIds.dependencyDescriptor;
				}

				// Create status response.
				auto responseOffset = FBS::Transport::CreateProduceResponse(
				  request->GetBufferBuilder(), FBS::RtpParameters::Type(producer->GetType()));

				request->Accept(FBS::Response::Body::Transport_ProduceResponse, responseOffset);

				// Check if TransportCongestionControlServer or REMB server must be
				// created.
				const auto& rtpHeaderExtensionIds = producer->GetRtpHeaderExtensionIds();
				const auto& codecs                = producer->GetRtpParameters().codecs;

				// Set TransportCongestionControlServer.
				if (!this->tccServer)
				{
					bool createTccServer{ false };
					RTC::BweType bweType;

					// Use transport-cc if:
					// - there is transport-wide-cc-01 RTP header extension, and
					// - there is "transport-cc" in codecs RTCP feedback.
					//
					if (
					  rtpHeaderExtensionIds.transportWideCc01 != 0u &&
					  std::any_of(
					    codecs.begin(),
					    codecs.end(),
					    [](const RTC::RtpCodecParameters& codec)
					    {
						    return std::any_of(
						      codec.rtcpFeedback.begin(),
						      codec.rtcpFeedback.end(),
						      [](const RTC::RtcpFeedback& fb)
						      {
							      return fb.type == "transport-cc";
						      });
					    }))
					{
						MS_DEBUG_TAG(bwe, "enabling TransportCongestionControlServer with transport-cc");

						createTccServer = true;
						bweType         = RTC::BweType::TRANSPORT_CC;
					}
					// Use REMB if:
					// - there is abs-send-time RTP header extension, and
					// - there is "remb" in codecs RTCP feedback.
					//
					else if (
					  rtpHeaderExtensionIds.absSendTime != 0u && std::any_of(
					                                               codecs.begin(),
					                                               codecs.end(),
					                                               [](const RTC::RtpCodecParameters& codec)
					                                               {
						                                               return std::any_of(
						                                                 codec.rtcpFeedback.begin(),
						                                                 codec.rtcpFeedback.end(),
						                                                 [](const RTC::RtcpFeedback& fb)
						                                                 {
							                                                 return fb.type == "goog-remb";
						                                                 });
					                                               }))
					{
						MS_DEBUG_TAG(bwe, "enabling TransportCongestionControlServer with REMB");

						createTccServer = true;
						bweType         = RTC::BweType::REMB;
					}

					if (createTccServer)
					{
						this->tccServer = std::make_shared<RTC::TransportCongestionControlServer>(
						  this, this->shared, bweType, RTC::Consts::RtcpPacketMaxSize);

						if (this->maxIncomingBitrate != 0u)
						{
							this->tccServer->SetMaxIncomingBitrate(this->maxIncomingBitrate);
						}

						if (IsConnected())
						{
							this->tccServer->TransportConnected();
						}
					}
				}

				break;
			}

			case Channel::ChannelRequest::Method::TRANSPORT_CONSUME:
			{
				const auto* body             = request->data->body_as<FBS::Transport::ConsumeRequest>();
				const std::string producerId = body->producerId()->str();
				const std::string consumerId = body->consumerId()->str();

				if (this->mapConsumers.find(consumerId) != this->mapConsumers.end())
				{
					MS_THROW_ERROR("a Consumer with same consumerId already exists");
				}

				auto type = RTC::RtpParameters::Type(body->type());

				RTC::Consumer* consumer{ nullptr };

				switch (type)
				{
					case RTC::RtpParameters::Type::SIMPLE:
					{
						// This may throw.
						consumer = new RTC::SimpleConsumer(this->shared, consumerId, producerId, this, body);

						break;
					}

					case RTC::RtpParameters::Type::SIMULCAST:
					{
						// This may throw.
						consumer = new RTC::SimulcastConsumer(this->shared, consumerId, producerId, this, body);

						break;
					}

					case RTC::RtpParameters::Type::SVC:
					{
						// This may throw.
						consumer = new RTC::SvcConsumer(this->shared, consumerId, producerId, this, body);

						break;
					}

					case RTC::RtpParameters::Type::PIPE:
					{
						// This may throw.
						consumer = new RTC::PipeConsumer(this->shared, consumerId, producerId, this, body);

						break;
					}
				}

				// Notify the listener.
				// This may throw if no Producer is found.
				try
				{
					this->listener->OnTransportNewConsumer(this, consumer, producerId);
				}
				catch (const MediaSoupError& error)
				{
					delete consumer;

					throw;
				}

				// Insert into the maps.
				this->mapConsumers[consumerId] = consumer;

				for (auto ssrc : consumer->GetMediaSsrcs())
				{
					this->mapSsrcConsumer[ssrc] = consumer;
				}

				for (auto ssrc : consumer->GetRtxSsrcs())
				{
					this->mapRtxSsrcConsumer[ssrc] = consumer;
				}

				MS_DEBUG_DEV(
				  "Consumer created [consumerId:%s, producerId:%s]", consumerId.c_str(), producerId.c_str());

				flatbuffers::Offset<FBS::Consumer::ConsumerLayers> preferredLayersOffset;
				auto preferredLayers = consumer->GetPreferredLayers();

				if (preferredLayers.spatial > -1 && preferredLayers.temporal > -1)
				{
					const flatbuffers::Optional<int16_t> preferredTemporalLayer{ preferredLayers.temporal };
					preferredLayersOffset = FBS::Consumer::CreateConsumerLayers(
					  request->GetBufferBuilder(), preferredLayers.spatial, preferredTemporalLayer);
				}

				auto scoreOffset    = consumer->FillBufferScore(request->GetBufferBuilder());
				auto responseOffset = FBS::Transport::CreateConsumeResponse(
				  request->GetBufferBuilder(),
				  consumer->IsPaused(),
				  consumer->IsProducerPaused(),
				  scoreOffset,
				  preferredLayersOffset);

				request->Accept(FBS::Response::Body::Transport_ConsumeResponse, responseOffset);

				// Check if Transport Congestion Control client must be created.
				const auto& rtpHeaderExtensionIds = consumer->GetRtpHeaderExtensionIds();
				const auto& codecs                = consumer->GetRtpParameters().codecs;

				// Set TransportCongestionControlClient.
				if (!this->tccClient)
				{
					bool createTccClient{ false };
					RTC::BweType bweType;

					// Use transport-cc if:
					// - it's a video Consumer, and
					// - there is transport-wide-cc-01 RTP header extension, and
					// - there is "transport-cc" in codecs RTCP feedback.
					//
					if (
					  consumer->GetKind() == RTC::Media::Kind::VIDEO &&
					  rtpHeaderExtensionIds.transportWideCc01 != 0u &&
					  std::any_of(
					    codecs.begin(),
					    codecs.end(),
					    [](const RTC::RtpCodecParameters& codec)
					    {
						    return std::any_of(
						      codec.rtcpFeedback.begin(),
						      codec.rtcpFeedback.end(),
						      [](const RTC::RtcpFeedback& fb)
						      {
							      return fb.type == "transport-cc";
						      });
					    }))
					{
						MS_DEBUG_TAG(bwe, "enabling TransportCongestionControlClient with transport-cc");

						createTccClient = true;
						bweType         = RTC::BweType::TRANSPORT_CC;
					}
					// Use REMB if:
					// - it's a video Consumer, and
					// - there is abs-send-time RTP header extension, and
					// - there is "remb" in codecs RTCP feedback.
					//
					else if (
					  consumer->GetKind() == RTC::Media::Kind::VIDEO &&
					  rtpHeaderExtensionIds.absSendTime != 0u &&
					  std::any_of(
					    codecs.begin(),
					    codecs.end(),
					    [](const RTC::RtpCodecParameters& codec)
					    {
						    return std::any_of(
						      codec.rtcpFeedback.begin(),
						      codec.rtcpFeedback.end(),
						      [](const RTC::RtcpFeedback& fb)
						      {
							      return fb.type == "goog-remb";
						      });
					    }))
					{
						MS_DEBUG_TAG(bwe, "enabling TransportCongestionControlClient with REMB");

						createTccClient = true;
						bweType         = RTC::BweType::REMB;
					}

					if (createTccClient)
					{
						// Tell all the Consumers that we are gonna manage their bitrate.
						for (auto& kv : this->mapConsumers)
						{
							auto* consumer = kv.second;

							consumer->SetExternallyManagedBitrate();
						};

						this->tccClient = std::make_shared<RTC::TransportCongestionControlClient>(
						  this,
						  this->shared,
						  bweType,
						  this->initialAvailableOutgoingBitrate,
						  this->maxOutgoingBitrate,
						  this->minOutgoingBitrate);

						if (IsConnected())
						{
							this->tccClient->TransportConnected();
						}
					}
				}

				// If applicable, tell the new Consumer that we are gonna manage its
				// bitrate.
				if (this->tccClient)
				{
					consumer->SetExternallyManagedBitrate();
				}

#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
				// Create SenderBandwidthEstimator if:
				// - not already created,
				// - it's a video Consumer, and
				// - there is transport-wide-cc-01 RTP header extension, and
				// - there is "transport-cc" in codecs RTCP feedback.
				//
				if (
				  !this->senderBwe && consumer->GetKind() == RTC::Media::Kind::VIDEO &&
				  rtpHeaderExtensionIds.transportWideCc01 != 0u &&
				  std::any_of(
				    codecs.begin(),
				    codecs.end(),
				    [](const RTC::RtpCodecParameters& codec)
				    {
					    return std::any_of(
					      codec.rtcpFeedback.begin(),
					      codec.rtcpFeedback.end(),
					      [](const RTC::RtcpFeedback& fb)
					      {
						      return fb.type == "transport-cc";
					      });
				    }))
				{
					MS_DEBUG_TAG(bwe, "enabling SenderBandwidthEstimator");

					// Tell all the Consumers that we are gonna manage their bitrate.
					for (auto& kv : this->mapConsumers)
					{
						auto* consumer = kv.second;

						consumer->SetExternallyManagedBitrate();
					};

					this->senderBwe = std::make_shared<RTC::SenderBandwidthEstimator>(
					  this, this->shared, this->initialAvailableOutgoingBitrate);

					if (IsConnected())
					{
						this->senderBwe->TransportConnected();
					}
				}

				// If applicable, tell the new Consumer that we are gonna manage its
				// bitrate.
				if (this->senderBwe)
				{
					consumer->SetExternallyManagedBitrate();
				}
#endif

				if (IsConnected())
				{
					consumer->TransportConnected();
				}

				break;
			}

			case Channel::ChannelRequest::Method::TRANSPORT_PRODUCE_DATA:
			{
				// Early check. The Transport must support SCTP or be direct.
				if (!this->sctpAssociation && !this->direct)
				{
					MS_THROW_ERROR("SCTP not enabled and not a direct Transport");
				}

				const auto* body = request->data->body_as<FBS::Transport::ProduceDataRequest>();

				auto dataProducerId = body->dataProducerId()->str();

				// This may throw.
				CheckNoDataProducer(dataProducerId);

				// This may throw.
				auto* dataProducer = new RTC::DataProducer(
				  this->shared, dataProducerId, this->maxReceiveMessageSize, this, body);

				// Verify the type of the DataProducer.
				switch (dataProducer->GetType())
				{
					case RTC::DataProducer::Type::SCTP:
					{
						if (!this->sctpAssociation)
						{
							delete dataProducer;

							MS_THROW_TYPE_ERROR(
							  "cannot create a DataProducer of type 'sctp', SCTP not enabled in this Transport");
							;
						}

						break;
					}

					case RTC::DataProducer::Type::DIRECT:
					{
						if (!this->direct)
						{
							delete dataProducer;

							MS_THROW_TYPE_ERROR(
							  "cannot create a DataProducer of type 'direct', not a direct Transport");
							;
						}

						break;
					}
				}

				if (dataProducer->GetType() == RTC::DataProducer::Type::SCTP)
				{
					// Insert the DataProducer into the SctpListener.
					// This may throw. If so, delete the DataProducer and throw.
					try
					{
						this->sctpListener.AddDataProducer(dataProducer);
					}
					catch (const MediaSoupError& error)
					{
						delete dataProducer;

						throw;
					}
				}

				// Notify the listener.
				// This may throw if a DataProducer with same id already exists.
				try
				{
					this->listener->OnTransportNewDataProducer(this, dataProducer);
				}
				catch (const MediaSoupError& error)
				{
					if (dataProducer->GetType() == RTC::DataProducer::Type::SCTP)
					{
						this->sctpListener.RemoveDataProducer(dataProducer);
					}

					delete dataProducer;

					throw;
				}

				// Insert into the map.
				this->mapDataProducers[dataProducerId] = dataProducer;

				MS_DEBUG_DEV("DataProducer created [dataProducerId:%s]", dataProducerId.c_str());

				auto dumpOffset = dataProducer->FillBuffer(request->GetBufferBuilder());

				request->Accept(FBS::Response::Body::DataProducer_DumpResponse, dumpOffset);

				if (dataProducer->GetType() == RTC::DataProducer::Type::SCTP)
				{
					// Tell to the SCTP association.
					this->sctpAssociation->MayConnect();
				}

				break;
			}

			case Channel::ChannelRequest::Method::TRANSPORT_CONSUME_DATA:
			{
				// Early check. The Transport must support SCTP or be direct.
				if (!this->sctpAssociation && !this->direct)
				{
					MS_THROW_ERROR("SCTP not enabled and not a direct Transport");
				}

				const auto* body = request->data->body_as<FBS::Transport::ConsumeDataRequest>();

				auto dataProducerId = body->dataProducerId()->str();
				auto dataConsumerId = body->dataConsumerId()->str();

				// This may throw.
				CheckNoDataConsumer(dataConsumerId);

				// This may throw.
				auto* dataConsumer = new RTC::DataConsumer(
				  this->shared, dataConsumerId, dataProducerId, this, body, this->maxSendMessageSize);

				// Verify the type of the DataConsumer.
				switch (dataConsumer->GetType())
				{
					case RTC::DataConsumer::Type::SCTP:
					{
						if (!this->sctpAssociation)
						{
							delete dataConsumer;

							MS_THROW_TYPE_ERROR(
							  "cannot create a DataConsumer of type 'sctp', SCTP not enabled in this Transport");
							;
						}

						try
						{
							// This may throw.
							CheckNoSctpDataConsumer(dataConsumer->GetSctpStreamParameters().streamId);
						}
						catch (const MediaSoupError& error)
						{
							delete dataConsumer;

							throw;
						}

						break;
					}

					case RTC::DataConsumer::Type::DIRECT:
					{
						if (!this->direct)
						{
							delete dataConsumer;

							MS_THROW_TYPE_ERROR(
							  "cannot create a DataConsumer of type 'direct', not a direct Transport");
							;
						}

						break;
					}
				}

				// Notify the listener.
				// This may throw if no DataProducer is found.
				try
				{
					this->listener->OnTransportNewDataConsumer(this, dataConsumer, dataProducerId);
				}
				catch (const MediaSoupError& error)
				{
					delete dataConsumer;

					throw;
				}

				// Insert into the maps.
				this->mapDataConsumers[dataConsumerId] = dataConsumer;

				if (dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP)
				{
					this->mapSctpStreamIdDataConsumers[dataConsumer->GetSctpStreamParameters().streamId] =
					  dataConsumer;
				}

				MS_DEBUG_DEV(
				  "DataConsumer created [dataConsumerId:%s, dataProducerId:%s]",
				  dataConsumerId.c_str(),
				  dataProducerId.c_str());

				auto dumpOffset = dataConsumer->FillBuffer(request->GetBufferBuilder());

				request->Accept(FBS::Response::Body::DataConsumer_DumpResponse, dumpOffset);

				if (IsConnected())
				{
					dataConsumer->TransportConnected();
				}

				if (dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP)
				{
					if (this->sctpAssociation->GetAssociationState() == RTC::SCTP::Types::AssociationState::CONNECTED)
					{
						// Tell to the DataConsumer.
						dataConsumer->SctpAssociationConnected();
					}

					// Tell to the SCTP association.
					this->sctpAssociation->MayConnect();
				}

				break;
			}

			case Channel::ChannelRequest::Method::TRANSPORT_ENABLE_TRACE_EVENT:
			{
				const auto* body = request->data->body_as<FBS::Transport::EnableTraceEventRequest>();

				// Reset traceEventTypes.
				struct TraceEventTypes newTraceEventTypes;

				for (const auto& type : *body->events())
				{
					switch (type)
					{
						case FBS::Transport::TraceEventType::PROBATION:
						{
							newTraceEventTypes.probation = true;

							break;
						}

						case FBS::Transport::TraceEventType::BWE:
						{
							newTraceEventTypes.bwe = true;

							break;
						}
					}
				}

				this->traceEventTypes = newTraceEventTypes;

				request->Accept();

				break;
			}

			case Channel::ChannelRequest::Method::TRANSPORT_CLOSE_PRODUCER:
			{
				const auto* body = request->data->body_as<FBS::Transport::CloseProducerRequest>();

				// This may throw.
				RTC::Producer* producer = AssertAndGetProducerById(body->producerId()->str());

				// Remove it from the RtpListener.
				this->rtpListener.RemoveProducer(producer);

				// Remove it from the map.
				this->mapProducers.erase(producer->id);

				// Tell the child class to clear associated SSRCs.
				for (const auto& kv : producer->GetRtpStreams())
				{
					auto* rtpStream = kv.first;

					RecvStreamClosed(rtpStream->GetSsrc());

					if (rtpStream->HasRtx())
					{
						RecvStreamClosed(rtpStream->GetRtxSsrc());
					}
				}

				// Notify the listener.
				this->listener->OnTransportProducerClosed(this, producer);

				MS_DEBUG_DEV("Producer closed [producerId:%s]", producer->id.c_str());

				// Delete it.
				delete producer;

				request->Accept();

				break;
			}

			case Channel::ChannelRequest::Method::TRANSPORT_CLOSE_CONSUMER:
			{
				const auto* body = request->data->body_as<FBS::Transport::CloseConsumerRequest>();

				// This may throw.
				RTC::Consumer* consumer = AssertAndGetConsumerById(body->consumerId()->str());

				// Remove it from the maps.
				this->mapConsumers.erase(consumer->id);

				for (auto ssrc : consumer->GetMediaSsrcs())
				{
					this->mapSsrcConsumer.erase(ssrc);

					// Tell the child class to clear associated SSRCs.
					SendStreamClosed(ssrc);
				}

				for (auto ssrc : consumer->GetRtxSsrcs())
				{
					this->mapRtxSsrcConsumer.erase(ssrc);

					// Tell the child class to clear associated SSRCs.
					SendStreamClosed(ssrc);
				}

				// Notify the listener.
				this->listener->OnTransportConsumerClosed(this, consumer);

				MS_DEBUG_DEV("Consumer closed [consumerId:%s]", consumer->id.c_str());

				// Delete it.
				delete consumer;

				request->Accept();

				// This may be the latest active Consumer with BWE. If so we have to stop
				// probation.
				if (this->tccClient)
				{
					ComputeOutgoingDesiredBitrate(/*forceBitrate*/ true);
				}

				break;
			}

			case Channel::ChannelRequest::Method::TRANSPORT_CLOSE_DATAPRODUCER:
			{
				if (!this->sctpAssociation && !this->direct)
				{
					MS_THROW_ERROR("cannot close DataProducer, SCTP not enabled and not a direct Transport");
				}

				const auto* body = request->data->body_as<FBS::Transport::CloseDataProducerRequest>();

				// This may throw.
				RTC::DataProducer* dataProducer = AssertAndGetDataProducerById(body->dataProducerId()->str());

				if (dataProducer->GetType() == RTC::DataProducer::Type::SCTP)
				{
					// Remove it from the SctpListener.
					this->sctpListener.RemoveDataProducer(dataProducer);
				}

				// Remove it from the map.
				this->mapDataProducers.erase(dataProducer->id);

				// https://datatracker.ietf.org/doc/html/rfc8831#section-6.7
				//
				// "Closing of a data channel MUST be signaled by resetting the corresponding
				// outgoing streams [RFC6525]. This means that if one side decides to close
				// the data channel, it resets the corresponding outgoing stream. When the
				// peer sees that an incoming stream was reset, it also resets its
				// corresponding outgoing stream."
				if (this->sctpAssociation && this->sctpAssociation->IsDataChannel())
				{
					this->sctpAssociation->ResetStreams(
					  std::array<uint16_t, 1>{ dataProducer->GetSctpStreamParameters().streamId });
				}

				// Notify the listener.
				this->listener->OnTransportDataProducerClosed(this, dataProducer);

				MS_DEBUG_DEV("DataProducer closed [dataProducerId:%s]", dataProducer->id.c_str());

				// Delete it.
				delete dataProducer;

				request->Accept();

				break;
			}

			case Channel::ChannelRequest::Method::TRANSPORT_CLOSE_DATACONSUMER:
			{
				if (!this->sctpAssociation && !this->direct)
				{
					MS_THROW_ERROR("cannot close DataConsumer, SCTP not enabled and not a direct Transport");
				}

				const auto* body = request->data->body_as<FBS::Transport::CloseDataConsumerRequest>();

				// This may throw.
				RTC::DataConsumer* dataConsumer = AssertAndGetDataConsumerById(body->dataConsumerId()->str());

				// Remove it from the maps.
				this->mapDataConsumers.erase(dataConsumer->id);

				if (dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP)
				{
					this->mapSctpStreamIdDataConsumers.erase(dataConsumer->GetSctpStreamParameters().streamId);
				}

				if (this->sctpAssociation)
				{
					this->sctpAssociation->ResetStreams(
					  std::array<uint16_t, 1>{ dataConsumer->GetSctpStreamParameters().streamId });
				}

				// Notify the listener.
				this->listener->OnTransportDataConsumerClosed(this, dataConsumer);

				MS_DEBUG_DEV("DataConsumer closed [dataConsumerId:%s]", dataConsumer->id.c_str());

				// Delete it.
				delete dataConsumer;

				request->Accept();

				break;
			}

			default:
			{
				MS_THROW_ERROR("unknown method '%s'", request->methodCStr);
			}
		}

		return;

		switch (request->method)
		{
			default:
			{
				MS_ERROR("unknown method");
			}
		}
	}

	void Transport::HandleNotification(Channel::ChannelNotification* notification)
	{
		MS_TRACE();

		switch (notification->event)
		{
			default:
			{
				MS_ERROR("unknown event '%s'", notification->eventCStr);
			}
		}
	}

	void Transport::SetDestroying()
	{
		MS_TRACE();

		if (this->sctpAssociation)
		{
			// NOTE: We don't invoke `Shutdown()` but `Close()` in the SCTP Association
			// because at this point we are closing everything and we won't have any
			// chance to complete the SCTP SHUTDOWN + SHUTDOWN_ACK + SHUTDOWN_COMPLETE
			// dance, so we invoke `Close()` which just sends a SCTP ABORT.
			this->sctpAssociation->Close();
		}

		this->isDestroying = true;
	}

	void Transport::Connected()
	{
		MS_TRACE();

		// Tell all Consumers.
		for (auto& kv : this->mapConsumers)
		{
			auto* consumer = kv.second;

			consumer->TransportConnected();
		}

		// Tell all DataConsumers.
		for (auto& kv : this->mapDataConsumers)
		{
			auto* dataConsumer = kv.second;

			dataConsumer->TransportConnected();
		}

		// Tell the SctpAssociation.
		if (this->sctpAssociation)
		{
			this->sctpAssociation->MayConnect();
		}

		// Start the RTCP timer.
		this->rtcpTimer->Start(static_cast<uint64_t>(RTC::RTCP::MaxVideoIntervalMs / 2));

		// Tell the TransportCongestionControlClient.
		if (this->tccClient)
		{
			this->tccClient->TransportConnected();
		}

		// Tell the TransportCongestionControlServer.
		if (this->tccServer)
		{
			this->tccServer->TransportConnected();
		}

#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
		// Tell the SenderBandwidthEstimator.
		if (this->senderBwe)
		{
			this->senderBwe->TransportConnected();
		}
#endif
	}

	void Transport::Disconnected()
	{
		MS_TRACE();

		// Tell all Consumers.
		for (auto& kv : this->mapConsumers)
		{
			auto* consumer = kv.second;

			consumer->TransportDisconnected();
		}

		// Tell all DataConsumers.
		for (auto& kv : this->mapDataConsumers)
		{
			auto* dataConsumer = kv.second;

			dataConsumer->TransportDisconnected();
		}

		// Stop the RTCP timer.
		this->rtcpTimer->Stop();

		// Tell the TransportCongestionControlClient.
		if (this->tccClient)
		{
			this->tccClient->TransportDisconnected();
		}

		// Tell the TransportCongestionControlServer.
		if (this->tccServer)
		{
			this->tccServer->TransportDisconnected();
		}

#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
		// Tell the SenderBandwidthEstimator.
		if (this->senderBwe)
		{
			this->senderBwe->TransportDisconnected();
		}
#endif
	}

	void Transport::ReceiveRtpPacket(RTC::RTP::Packet* packet)
	{
		MS_TRACE();

#ifdef MS_RTC_LOGGER_RTP
		packet->logger.recvTransportId = this->id;
#endif

		// Apply the Transport RTP header extension ids so the RTP listener can use
		// them.
		packet->AssignExtensionIds(this->recvRtpHeaderExtensionIds);

		auto nowMs = this->shared->GetTimeMs();

		// Feed the TransportCongestionControlServer.
		if (this->tccServer)
		{
			this->tccServer->IncomingPacket(nowMs, packet);
		}

		// Get the associated Producer.
		RTC::Producer* producer = this->rtpListener.GetProducer(packet);

		if (!producer)
		{
#ifdef MS_RTC_LOGGER_RTP
			packet->logger.Discarded(RTC::RtcLogger::RtpPacket::DiscardReason::PRODUCER_NOT_FOUND);
#endif

			MS_WARN_TAG(
			  rtp,
			  "no suitable Producer for received RTP packet [ssrc:%" PRIu32 ", payloadType:%" PRIu8 "]",
			  packet->GetSsrc(),
			  packet->GetPayloadType());

			// Tell the child class to remove this SSRC.
			RecvStreamClosed(packet->GetSsrc());

			delete packet;

			return;
		}

		// MS_DEBUG_DEV(
		//   "RTP packet received [ssrc:%" PRIu32 ", payloadType:%" PRIu8 ", producerId:%s]",
		//   packet->GetSsrc(),
		//   packet->GetPayloadType(),
		//   producer->id.c_str());

		// Pass the RTP packet to the corresponding Producer.
		auto result = producer->ReceiveRtpPacket(packet);

		switch (result)
		{
			case RTC::Producer::ReceiveRtpPacketResult::MEDIA:
			{
				this->recvRtpTransmission.Update(packet);
				break;
			}

			case RTC::Producer::ReceiveRtpPacketResult::RETRANSMISSION:
			{
				this->recvRtxTransmission.Update(packet);
				break;
			}

			case RTC::Producer::ReceiveRtpPacketResult::DISCARDED:
			{
				// Tell the child class to remove this SSRC.
				RecvStreamClosed(packet->GetSsrc());
				break;
			}

			default:;
		}

		delete packet;
	}

	void Transport::ReceiveRtcpPacket(RTC::RTCP::Packet* packet)
	{
		MS_TRACE();

		// Handle each RTCP packet.
		while (packet)
		{
			HandleRtcpPacket(packet);

			auto* previousPacket = packet;

			packet = packet->GetNext();

			delete previousPacket;
		}
	}

	void Transport::ReceiveSctpData(const uint8_t* data, size_t len)
	{
		MS_TRACE();

		if (!this->sctpAssociation)
		{
			MS_DEBUG_TAG(sctp, "ignoring SCTP packet (SCTP not enabled)");

			return;
		}

		// Pass it to the SctpAssociation.
		this->sctpAssociation->ReceiveSctpData(data, len);
	}

	void Transport::SendSctpMessage(
	  RTC::DataConsumer* dataConsumer, RTC::SCTP::Message message, onQueuedCallback* cb)
	{
		MS_TRACE();

		// NOTE: The `message` must already have its `streamId` pointing to the same
		// as in the `dataConsumer` if its type is "sctp", or 0 otherwise.

		if (!this->sctpAssociation)
		{
			MS_THROW_ERROR("SCTP not enabled");

			if (cb)
			{
				(*cb)(false, false);
				delete cb;
			}

			return;
		}

		const auto& sctpStreamParameters = dataConsumer->GetSctpStreamParameters();
		const RTC::SCTP::SendMessageOptions sendMessageOptions{
			.unordered          = !sctpStreamParameters.ordered,
			.lifetimeMs         = sctpStreamParameters.ordered
			                        ? std::nullopt
			                        : std::optional<uint64_t>(sctpStreamParameters.maxPacketLifeTime),
			.maxRetransmissions = sctpStreamParameters.ordered
			                        ? std::nullopt
			                        : std::optional<uint64_t>(sctpStreamParameters.maxRetransmits),
			// NOTE: We don't set `lifecyleId` in production.
		};

		const auto sendStatus =
		  this->sctpAssociation->SendMessage(std::move(message), sendMessageOptions);

		switch (sendStatus)
		{
			case RTC::SCTP::Types::SendMessageStatus::SUCCESS:
			{
				if (cb)
				{
					(*cb)(true, /*sctpSendBufferFull*/ false);
				}

				break;
			}

			case RTC::SCTP::Types::SendMessageStatus::ERROR_RESOURCE_EXHAUSTION:
			{
				const auto sendStatusStringView = RTC::SCTP::Types::SendMessageStatusToString(sendStatus);

				MS_WARN_TAG(
				  sctp,
				  "failed to send SCTP message [sendStatus:%.*s]",
				  static_cast<int>(sendStatusStringView.size()),
				  sendStatusStringView.data());

				if (cb)
				{
					(*cb)(false, /*sctpSendBufferFull*/ true);
				}

				dataConsumer->SctpSendBufferFull();

				break;
			}

			default:
			{
				const auto sendStatusStringView = RTC::SCTP::Types::SendMessageStatusToString(sendStatus);

				MS_WARN_TAG(
				  sctp,
				  "failed to send SCTP message [sendStatus:%.*s]",
				  static_cast<int>(sendStatusStringView.size()),
				  sendStatusStringView.data());

				if (cb)
				{
					(*cb)(false, /*sctpSendBufferFull*/ false);
				}

				break;
			}
		}

		delete cb;
	}

	void Transport::CheckNoDataProducer(const std::string& dataProducerId) const
	{
		if (this->mapDataProducers.find(dataProducerId) != this->mapDataProducers.end())
		{
			MS_THROW_ERROR("a DataProducer with same dataProducerId already exists");
		}
	}

	void Transport::CheckNoDataConsumer(const std::string& dataConsumerId) const
	{
		MS_TRACE();

		if (this->mapDataConsumers.find(dataConsumerId) != this->mapDataConsumers.end())
		{
			MS_THROW_ERROR("a DataConsumer with same dataConsumerId already exists");
		}
	}

	void Transport::CheckNoSctpDataConsumer(uint16_t streamId) const
	{
		MS_TRACE();

		if (this->mapSctpStreamIdDataConsumers.find(streamId) != this->mapSctpStreamIdDataConsumers.end())
		{
			MS_THROW_ERROR("an SCTP DataConsumer with same streamId %" PRIu16 " already exists", streamId);
		}
	}

	RTC::Producer* Transport::AssertAndGetProducerById(const std::string& producerId) const
	{
		MS_TRACE();

		auto it = this->mapProducers.find(producerId);

		if (it == this->mapProducers.end())
		{
			MS_THROW_ERROR("Producer not found");
		}

		return it->second;
	}

	RTC::Consumer* Transport::AssertAndGetConsumerById(const std::string& consumerId) const
	{
		MS_TRACE();

		auto it = this->mapConsumers.find(consumerId);

		if (it == this->mapConsumers.end())
		{
			MS_THROW_ERROR("Consumer not found");
		}

		return it->second;
	}

	inline RTC::Consumer* Transport::GetConsumerByMediaSsrc(uint32_t ssrc) const
	{
		MS_TRACE();

		auto mapSsrcConsumerIt = this->mapSsrcConsumer.find(ssrc);

		if (mapSsrcConsumerIt == this->mapSsrcConsumer.end())
		{
			return nullptr;
		}

		auto* consumer = mapSsrcConsumerIt->second;

		return consumer;
	}

	inline RTC::Consumer* Transport::GetConsumerByRtxSsrc(uint32_t ssrc) const
	{
		MS_TRACE();

		auto mapRtxSsrcConsumerIt = this->mapRtxSsrcConsumer.find(ssrc);

		if (mapRtxSsrcConsumerIt == this->mapRtxSsrcConsumer.end())
		{
			return nullptr;
		}

		auto* consumer = mapRtxSsrcConsumerIt->second;

		return consumer;
	}

	RTC::DataProducer* Transport::AssertAndGetDataProducerById(const std::string& dataProducerId) const
	{
		MS_TRACE();

		auto it = this->mapDataProducers.find(dataProducerId);

		if (it == this->mapDataProducers.end())
		{
			MS_THROW_ERROR("DataProducer not found");
		}

		return it->second;
	}

	RTC::DataConsumer* Transport::AssertAndGetDataConsumerById(const std::string& dataConsumerId) const
	{
		MS_TRACE();

		auto it = this->mapDataConsumers.find(dataConsumerId);

		if (it == this->mapDataConsumers.end())
		{
			MS_THROW_ERROR("DataConsumer not found");
		}

		return it->second;
	}

	RTC::DataConsumer* Transport::GetSctpDataConsumerByStreamId(uint16_t streamId) const
	{
		MS_TRACE();

		auto it = this->mapSctpStreamIdDataConsumers.find(streamId);

		if (it == this->mapSctpStreamIdDataConsumers.end())
		{
			MS_THROW_ERROR("SCTP DataConsumer with streamId %" PRIu16 " not found", streamId);
		}

		return it->second;
	}

	void Transport::HandleRtcpPacket(RTC::RTCP::Packet* packet)
	{
		MS_TRACE();

		switch (packet->GetType())
		{
			case RTC::RTCP::Type::RR:
			{
				auto* rr = static_cast<RTC::RTCP::ReceiverReportPacket*>(packet);

				for (auto it = rr->Begin(); it != rr->End(); ++it)
				{
					auto& report   = *it;
					auto* consumer = GetConsumerByMediaSsrc(report->GetSsrc());

					if (!consumer)
					{
						// Special case for the RTP probator.
						if (report->GetSsrc() == RTC::RTP::ProbationGenerator::Ssrc)
						{
							continue;
						}

						// Special case for (unused) RTCP-RR from the RTX stream.
						if (GetConsumerByRtxSsrc(report->GetSsrc()) != nullptr)
						{
							continue;
						}

						MS_DEBUG_TAG(
						  rtcp,
						  "no Consumer found for received Receiver Report [ssrc:%" PRIu32 "]",
						  report->GetSsrc());

						continue;
					}

					consumer->ReceiveRtcpReceiverReport(report);
				}

				if (this->tccClient && !this->mapConsumers.empty())
				{
					float rtt = 0;

					// Retrieve the RTT from the first active consumer.
					for (auto& kv : this->mapConsumers)
					{
						auto* consumer = kv.second;

						if (consumer->IsActive())
						{
							rtt = consumer->GetRtt();

							break;
						}
					}

					this->tccClient->ReceiveRtcpReceiverReport(rr, rtt, this->shared->GetTimeMsInt64());
				}

				break;
			}

			case RTC::RTCP::Type::PSFB:
			{
				auto* feedback = static_cast<RTC::RTCP::FeedbackPsPacket*>(packet);

				switch (feedback->GetMessageType())
				{
					case RTC::RTCP::FeedbackPs::MessageType::PLI:
					{
						auto* consumer = GetConsumerByMediaSsrc(feedback->GetMediaSsrc());

						if (feedback->GetMediaSsrc() == RTC::RTP::ProbationGenerator::Ssrc)
						{
							break;
						}
						else if (!consumer)
						{
							MS_DEBUG_TAG(
							  rtcp,
							  "no Consumer found for received PLI Feedback packet "
							  "[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 "]",
							  feedback->GetSenderSsrc(),
							  feedback->GetMediaSsrc());

							break;
						}

						MS_DEBUG_TAG(
						  rtcp,
						  "PLI received, requesting key frame for Consumer "
						  "[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 "]",
						  feedback->GetSenderSsrc(),
						  feedback->GetMediaSsrc());

						consumer->ReceiveKeyFrameRequest(
						  RTC::RTCP::FeedbackPs::MessageType::PLI, feedback->GetMediaSsrc());

						break;
					}

					case RTC::RTCP::FeedbackPs::MessageType::FIR:
					{
						// Must iterate FIR items.
						auto* fir = static_cast<RTC::RTCP::FeedbackPsFirPacket*>(packet);

						for (auto it = fir->Begin(); it != fir->End(); ++it)
						{
							auto& item     = *it;
							auto* consumer = GetConsumerByMediaSsrc(item->GetSsrc());

							if (item->GetSsrc() == RTC::RTP::ProbationGenerator::Ssrc)
							{
								continue;
							}
							else if (!consumer)
							{
								MS_DEBUG_TAG(
								  rtcp,
								  "no Consumer found for received FIR Feedback packet "
								  "[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 ", item ssrc:%" PRIu32 "]",
								  feedback->GetSenderSsrc(),
								  feedback->GetMediaSsrc(),
								  item->GetSsrc());

								continue;
							}

							MS_DEBUG_TAG(
							  rtcp,
							  "FIR received, requesting key frame for Consumer "
							  "[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 ", item ssrc:%" PRIu32 "]",
							  feedback->GetSenderSsrc(),
							  feedback->GetMediaSsrc(),
							  item->GetSsrc());

							consumer->ReceiveKeyFrameRequest(feedback->GetMessageType(), item->GetSsrc());
						}

						break;
					}

					case RTC::RTCP::FeedbackPs::MessageType::AFB:
					{
						auto* afb = static_cast<RTC::RTCP::FeedbackPsAfbPacket*>(feedback);

						// Store REMB info.
						if (afb->GetApplication() == RTC::RTCP::FeedbackPsAfbPacket::Application::REMB)
						{
							auto* remb = static_cast<RTC::RTCP::FeedbackPsRembPacket*>(afb);

							// Pass it to the TCC client.
							if (this->tccClient && this->tccClient->GetBweType() == RTC::BweType::REMB)
							{
								this->tccClient->ReceiveEstimatedBitrate(remb->GetBitrate());
							}

							break;
						}
						else
						{
							MS_DEBUG_TAG(
							  rtcp,
							  "ignoring unsupported %s Feedback PS AFB packet "
							  "[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 "]",
							  RTC::RTCP::FeedbackPsPacket::MessageTypeToString(feedback->GetMessageType()).c_str(),
							  feedback->GetSenderSsrc(),
							  feedback->GetMediaSsrc());

							break;
						}
					}

					default:
					{
						MS_DEBUG_TAG(
						  rtcp,
						  "ignoring unsupported %s Feedback packet "
						  "[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 "]",
						  RTC::RTCP::FeedbackPsPacket::MessageTypeToString(feedback->GetMessageType()).c_str(),
						  feedback->GetSenderSsrc(),
						  feedback->GetMediaSsrc());
					}
				}

				break;
			}

			case RTC::RTCP::Type::RTPFB:
			{
				auto* feedback = static_cast<RTC::RTCP::FeedbackRtpPacket*>(packet);
				auto* consumer = GetConsumerByMediaSsrc(feedback->GetMediaSsrc());

				// If no Consumer is found and this is not a Transport Feedback for the
				// probation SSRC or any Consumer RTX SSRC, ignore it.
				if (
				  !consumer && feedback->GetMessageType() != RTC::RTCP::FeedbackRtp::MessageType::TCC &&
				  (feedback->GetMediaSsrc() != RTC::RTP::ProbationGenerator::Ssrc ||
				   !GetConsumerByRtxSsrc(feedback->GetMediaSsrc())))
				{
					MS_DEBUG_TAG(
					  rtcp,
					  "no Consumer found for received Feedback packet "
					  "[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 "]",
					  feedback->GetSenderSsrc(),
					  feedback->GetMediaSsrc());

					break;
				}

				switch (feedback->GetMessageType())
				{
					case RTC::RTCP::FeedbackRtp::MessageType::NACK:
					{
						if (!consumer)
						{
							MS_DEBUG_TAG(
							  rtcp,
							  "no Consumer found for received NACK Feedback packet "
							  "[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 "]",
							  feedback->GetSenderSsrc(),
							  feedback->GetMediaSsrc());

							break;
						}

						auto* nackPacket = static_cast<RTC::RTCP::FeedbackRtpNackPacket*>(packet);

						consumer->ReceiveNack(nackPacket);

						break;
					}

					case RTC::RTCP::FeedbackRtp::MessageType::TCC:
					{
						auto* feedback = static_cast<RTC::RTCP::FeedbackRtpTransportPacket*>(packet);

						if (this->tccClient)
						{
							this->tccClient->ReceiveRtcpTransportFeedback(feedback);
						}

#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
						// Pass it to the SenderBandwidthEstimator client.
						if (this->senderBwe)
						{
							this->senderBwe->ReceiveRtcpTransportFeedback(feedback);
						}
#endif

						break;
					}

					default:
					{
						MS_DEBUG_TAG(
						  rtcp,
						  "ignoring unsupported %s Feedback packet "
						  "[sender ssrc:%" PRIu32 ", media ssrc:%" PRIu32 "]",
						  RTC::RTCP::FeedbackRtpPacket::MessageTypeToString(feedback->GetMessageType()).c_str(),
						  feedback->GetSenderSsrc(),
						  feedback->GetMediaSsrc());
					}
				}

				break;
			}

			case RTC::RTCP::Type::SR:
			{
				auto* sr = static_cast<RTC::RTCP::SenderReportPacket*>(packet);

				// Even if Sender Report packet can only contains one report.
				for (auto it = sr->Begin(); it != sr->End(); ++it)
				{
					auto& report   = *it;
					auto* producer = this->rtpListener.GetProducer(report->GetSsrc());

					if (!producer)
					{
						MS_DEBUG_TAG(
						  rtcp,
						  "no Producer found for received Sender Report [ssrc:%" PRIu32 "]",
						  report->GetSsrc());

						continue;
					}

					producer->ReceiveRtcpSenderReport(report);
				}

				break;
			}

			case RTC::RTCP::Type::SDES:
			{
				// According to RFC 3550 section 6.1 "a CNAME item MUST be included in
				// each compound RTCP packet". So this is true even for compound
				// packets sent by endpoints that are not sending any RTP stream to us
				// (thus chunks in such a SDES will have an SSCR does not match with
				// any Producer created in this Transport).
				// Therefore, and given that we do nothing with SDES, just ignore them.

				break;
			}

			case RTC::RTCP::Type::BYE:
			{
				MS_DEBUG_TAG(rtcp, "ignoring received RTCP BYE");

				break;
			}

			case RTC::RTCP::Type::XR:
			{
				auto* xr = static_cast<RTC::RTCP::ExtendedReportPacket*>(packet);

				for (auto it = xr->Begin(); it != xr->End(); ++it)
				{
					auto& report = *it;

					switch (report->GetType())
					{
						case RTC::RTCP::ExtendedReportBlock::Type::DLRR:
						{
							auto* dlrr = static_cast<RTC::RTCP::DelaySinceLastRr*>(report);

							for (auto it2 = dlrr->Begin(); it2 != dlrr->End(); ++it2)
							{
								auto& ssrcInfo = *it2;

								// SSRC should be filled in the sub-block.
								if (ssrcInfo->GetSsrc() == 0)
								{
									ssrcInfo->SetSsrc(xr->GetSsrc());
								}

								auto* producer = this->rtpListener.GetProducer(ssrcInfo->GetSsrc());

								if (!producer)
								{
									MS_DEBUG_TAG(
									  rtcp,
									  "no Producer found for received Sender Extended Report [ssrc:%" PRIu32 "]",
									  ssrcInfo->GetSsrc());

									continue;
								}

								producer->ReceiveRtcpXrDelaySinceLastRr(ssrcInfo);
							}

							break;
						}

						case RTC::RTCP::ExtendedReportBlock::Type::RRT:
						{
							auto* rrt = static_cast<RTC::RTCP::ReceiverReferenceTime*>(report);

							for (auto& kv : this->mapConsumers)
							{
								auto* consumer = kv.second;

								consumer->ReceiveRtcpXrReceiverReferenceTime(rrt);
							}

							break;
						}

						default:;
					}
				}

				break;
			}

			default:
			{
				MS_DEBUG_TAG(
				  rtcp,
				  "unhandled RTCP type received [type:%" PRIu8 "]",
				  static_cast<uint8_t>(packet->GetType()));
			}
		}
	}

	void Transport::SendRtcp(uint64_t nowMs)
	{
		MS_TRACE();

		std::unique_ptr<RTC::RTCP::CompoundPacket> packet{ new RTC::RTCP::CompoundPacket() };

#ifdef MS_LIBURING_SUPPORTED
		if (DepLibUring::IsEnabled())
		{
			// Activate liburing usage.
			DepLibUring::SetActive();
		}
#endif

		for (auto& kv : this->mapConsumers)
		{
			auto* consumer = kv.second;
			auto rtcpAdded = consumer->GetRtcp(packet.get(), nowMs);

			// RTCP data couldn't be added because the Compound packet is full.
			// Send the RTCP compound packet and request for RTCP again.
			if (!rtcpAdded)
			{
				SendRtcpCompoundPacket(packet.get());

				// Create a new compount packet.
				packet.reset(new RTC::RTCP::CompoundPacket());

				// Retrieve the RTCP again.
				consumer->GetRtcp(packet.get(), nowMs);
			}
		}

		for (auto& kv : this->mapProducers)
		{
			auto* producer = kv.second;
			auto rtcpAdded = producer->GetRtcp(packet.get(), nowMs);

			// RTCP data couldn't be added because the Compound packet is full.
			// Send the RTCP compound packet and request for RTCP again.
			if (!rtcpAdded)
			{
				SendRtcpCompoundPacket(packet.get());

				// Create a new compount packet.
				packet.reset(new RTC::RTCP::CompoundPacket());

				// Retrieve the RTCP again.
				producer->GetRtcp(packet.get(), nowMs);
			}
		}

		// Send the RTCP compound packet if there is any sender or receiver report.
		if (packet->GetReceiverReportCount() > 0u || packet->GetSenderReportCount() > 0u)
		{
			SendRtcpCompoundPacket(packet.get());
		}

#ifdef MS_LIBURING_SUPPORTED
		if (DepLibUring::IsEnabled())
		{
			// Submit all prepared submission entries.
			DepLibUring::Submit();
		}
#endif
	}

	void Transport::DistributeAvailableOutgoingBitrate()
	{
		MS_TRACE();

		MS_ASSERT(this->tccClient, "no TransportCongestionClient");

		std::multimap<uint8_t, RTC::Consumer*> multimapPriorityConsumer;

		// Fill the map with Consumers and their priority (if > 0).
		for (auto& kv : this->mapConsumers)
		{
			auto* consumer = kv.second;
			auto priority  = consumer->GetBitratePriority();

			if (priority > 0u)
			{
				multimapPriorityConsumer.emplace(priority, consumer);
			}
		}

		// Nobody wants bitrate. Exit.
		if (multimapPriorityConsumer.empty())
		{
			return;
		}

		bool baseAllocation       = true;
		uint32_t availableBitrate = this->tccClient->GetAvailableBitrate();

		this->tccClient->RescheduleNextAvailableBitrateEvent();

		MS_DEBUG_DEV("before layer-by-layer iterations [availableBitrate:%" PRIu32 "]", availableBitrate);

		// Redistribute the available bitrate by allowing Consumers to increase
		// layer by layer. Initially try to spread the bitrate across all
		// consumers. Then allocate the excess bitrate to Consumers starting
		// with the highest priorty.
		while (availableBitrate > 0u)
		{
			auto previousAvailableBitrate = availableBitrate;

			for (auto it = multimapPriorityConsumer.rbegin(); it != multimapPriorityConsumer.rend(); ++it)
			{
				auto priority  = it->first;
				auto* consumer = it->second;
				auto bweType   = this->tccClient->GetBweType();

				// NOLINTNEXTLINE(bugprone-too-small-loop-variable)
				for (uint8_t i{ 1u }; i <= (baseAllocation ? 1u : priority); ++i)
				{
					uint32_t usedBitrate{ 0u };
					const bool considerLoss = (bweType == RTC::BweType::REMB);

					usedBitrate = consumer->IncreaseLayer(availableBitrate, considerLoss);

					MS_ASSERT(usedBitrate <= availableBitrate, "Consumer used more layer bitrate than given");

					availableBitrate -= usedBitrate;

					// Exit the loop fast if used bitrate is 0.
					if (usedBitrate == 0u)
					{
						break;
					}
				}
			}

			// If no Consumer used bitrate, exit the loop.
			if (availableBitrate == previousAvailableBitrate)
			{
				break;
			}

			baseAllocation = false;
		}

		MS_DEBUG_DEV("after layer-by-layer iterations [availableBitrate:%" PRIu32 "]", availableBitrate);

		// Finally instruct Consumers to apply their computed layers.
		for (auto it = multimapPriorityConsumer.rbegin(); it != multimapPriorityConsumer.rend(); ++it)
		{
			auto* consumer = it->second;

			consumer->ApplyLayers();
		}
	}

	void Transport::ComputeOutgoingDesiredBitrate(bool forceBitrate)
	{
		MS_TRACE();

		MS_ASSERT(this->tccClient, "no TransportCongestionClient");

		uint32_t totalDesiredBitrate{ 0u };

		for (auto& kv : this->mapConsumers)
		{
			auto* consumer      = kv.second;
			auto desiredBitrate = consumer->GetDesiredBitrate();

			totalDesiredBitrate += desiredBitrate;
		}

		MS_DEBUG_DEV("total desired bitrate: %" PRIu32, totalDesiredBitrate);

		this->tccClient->SetDesiredBitrate(totalDesiredBitrate, forceBitrate);
	}

	inline void Transport::EmitTraceEventProbationType(RTC::RTP::Packet* /*packet*/) const
	{
		MS_TRACE();

		if (!this->traceEventTypes.probation)
		{
			return;
		}

		// TODO: Missing trace info (RTP packet dump).
		auto notification = FBS::Transport::CreateTraceNotification(
		  this->shared->GetChannelNotifier()->GetBufferBuilder(),
		  FBS::Transport::TraceEventType::PROBATION,
		  this->shared->GetTimeMs(),
		  FBS::Common::TraceDirection::DIRECTION_OUT);

		this->shared->GetChannelNotifier()->Emit(
		  this->id,
		  FBS::Notification::Event::TRANSPORT_TRACE,
		  FBS::Notification::Body::Transport_TraceNotification,
		  notification);
	}

	inline void Transport::EmitTraceEventBweType(
	  RTC::TransportCongestionControlClient::Bitrates& bitrates) const
	{
		MS_TRACE();

		if (!this->traceEventTypes.bwe)
		{
			return;
		}

		auto traceInfo = FBS::Transport::CreateBweTraceInfo(
		  this->shared->GetChannelNotifier()->GetBufferBuilder(),
		  this->tccClient->GetBweType() == RTC::BweType::TRANSPORT_CC
		    ? FBS::Transport::BweType::TRANSPORT_CC
		    : FBS::Transport::BweType::REMB,
		  bitrates.desiredBitrate,
		  bitrates.effectiveDesiredBitrate,
		  bitrates.minBitrate,
		  bitrates.maxBitrate,
		  bitrates.startBitrate,
		  bitrates.maxPaddingBitrate,
		  bitrates.availableBitrate);

		auto notification = FBS::Transport::CreateTraceNotification(
		  this->shared->GetChannelNotifier()->GetBufferBuilder(),
		  FBS::Transport::TraceEventType::BWE,
		  this->shared->GetTimeMs(),
		  FBS::Common::TraceDirection::DIRECTION_OUT,
		  FBS::Transport::TraceInfo::BweTraceInfo,
		  traceInfo.Union());

		this->shared->GetChannelNotifier()->Emit(
		  this->id,
		  FBS::Notification::Event::TRANSPORT_TRACE,
		  FBS::Notification::Body::Transport_TraceNotification,
		  notification);
	}

	void Transport::OnProducerPaused(RTC::Producer* producer)
	{
		MS_TRACE();

		this->listener->OnTransportProducerPaused(this, producer);
	}

	void Transport::OnProducerResumed(RTC::Producer* producer)
	{
		MS_TRACE();

		this->listener->OnTransportProducerResumed(this, producer);
	}

	void Transport::OnProducerNewRtpStream(
	  RTC::Producer* producer, RTC::RTP::RtpStreamRecv* rtpStream, uint32_t mappedSsrc)
	{
		MS_TRACE();

		this->listener->OnTransportProducerNewRtpStream(this, producer, rtpStream, mappedSsrc);
	}

	void Transport::OnProducerRtpStreamScore(
	  RTC::Producer* producer, RTC::RTP::RtpStreamRecv* rtpStream, uint8_t score, uint8_t previousScore)
	{
		MS_TRACE();

		this->listener->OnTransportProducerRtpStreamScore(this, producer, rtpStream, score, previousScore);
	}

	void Transport::OnProducerRtcpSenderReport(
	  RTC::Producer* producer, RTC::RTP::RtpStreamRecv* rtpStream, bool first)
	{
		MS_TRACE();

		this->listener->OnTransportProducerRtcpSenderReport(this, producer, rtpStream, first);
	}

	void Transport::OnProducerRtpPacketReceived(RTC::Producer* producer, RTC::RTP::Packet* packet)
	{
		MS_TRACE();

		this->listener->OnTransportProducerRtpPacketReceived(this, producer, packet);
	}

	void Transport::OnProducerSendRtcpPacket(RTC::Producer* /*producer*/, RTC::RTCP::Packet* packet)
	{
		MS_TRACE();

		SendRtcpPacket(packet);
	}

	void Transport::OnProducerNeedWorstRemoteFractionLost(
	  RTC::Producer* producer, uint32_t mappedSsrc, uint8_t& worstRemoteFractionLost)
	{
		MS_TRACE();

		this->listener->OnTransportNeedWorstRemoteFractionLost(
		  this, producer, mappedSsrc, worstRemoteFractionLost);
	}

	void Transport::OnConsumerSendRtpPacket(RTC::Consumer* consumer, RTC::RTP::Packet* packet)
	{
		MS_TRACE();

#ifdef MS_RTC_LOGGER_RTP
		packet->logger.sendTransportId = this->id;
		packet->logger.Sent();
#endif

		// Update abs-send-time if present.
		packet->UpdateAbsSendTime(this->shared->GetTimeMs());

		// Update transport wide sequence number if present.
		if (
		  this->tccClient && this->tccClient->GetBweType() == RTC::BweType::TRANSPORT_CC &&
		  packet->UpdateTransportWideCc01(this->transportWideCcSeq + 1))
		{
			this->transportWideCcSeq++;

			webrtc::RtpPacketSendInfo packetInfo;

			packetInfo.ssrc                      = packet->GetSsrc();
			packetInfo.transport_sequence_number = this->transportWideCcSeq;
			packetInfo.has_rtp_sequence_number   = true;
			packetInfo.rtp_sequence_number       = packet->GetSequenceNumber();
			packetInfo.length                    = packet->GetLength();
			packetInfo.pacing_info               = this->tccClient->GetPacingInfo();

			// Indicate the pacer (and prober) that a packet is to be sent.
			this->tccClient->InsertPacket(packetInfo);

			// When using WebRtcServer, the lifecycle of a RTC::UdpSocket maybe longer
			// than WebRtcTransport so there is a chance for the send callback to be
			// invoked *after* the WebRtcTransport has been closed (freed). To avoid
			// invalid memory access we need to use weak_ptr. Same applies in other
			// send callbacks.
			const std::weak_ptr<RTC::TransportCongestionControlClient> tccClientWeakPtr(this->tccClient);

			auto* shared = this->shared;

#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
			std::weak_ptr<RTC::SenderBandwidthEstimator> senderBweWeakPtr(this->senderBwe);
			RTC::SenderBandwidthEstimator::SentInfo sentInfo;

			sentInfo.wideSeq     = this->transportWideCcSeq;
			sentInfo.size        = packet->GetLength();
			sentInfo.sendingAtMs = this->shared->GetTimeMs();

			const auto* cb = new onSendCallback(
			  [tccClientWeakPtr, shared, packetInfo, senderBweWeakPtr, sentInfo](bool sent) mutable
			  {
				  if (sent)
				  {
					  auto tccClient = tccClientWeakPtr.lock();

					  if (tccClient)
					  {
						  tccClient->PacketSent(packetInfo, shared->GetTimeMsInt64());
					  }

					  auto senderBwe = senderBweWeakPtr.lock();

					  if (senderBwe)
					  {
						  sentInfo.sentAtMs = shared->GetTimeMs();
						  senderBwe->RtpPacketSent(sentInfo);
					  }
				  }
			  });

			SendRtpPacket(consumer, packet, cb);
#else
			const auto* cb = new onSendCallback(
			  [tccClientWeakPtr, shared, packetInfo](bool sent)
			  {
				  if (sent)
				  {
					  auto tccClient = tccClientWeakPtr.lock();

					  if (tccClient)
					  {
						  tccClient->PacketSent(packetInfo, shared->GetTimeMsInt64());
					  }
				  }
			  });

			SendRtpPacket(consumer, packet, cb);
#endif
		}
		else
		{
			SendRtpPacket(consumer, packet);
		}

		this->sendRtpTransmission.Update(packet);
	}

	void Transport::OnConsumerRetransmitRtpPacket(RTC::Consumer* consumer, RTC::RTP::Packet* packet)
	{
		MS_TRACE();

		// Update abs-send-time if present.
		packet->UpdateAbsSendTime(this->shared->GetTimeMs());

		// Update transport wide sequence number if present.
		if (
		  this->tccClient && this->tccClient->GetBweType() == RTC::BweType::TRANSPORT_CC &&
		  packet->UpdateTransportWideCc01(this->transportWideCcSeq + 1))
		{
			this->transportWideCcSeq++;

			webrtc::RtpPacketSendInfo packetInfo;

			packetInfo.ssrc                      = packet->GetSsrc();
			packetInfo.transport_sequence_number = this->transportWideCcSeq;
			packetInfo.has_rtp_sequence_number   = true;
			packetInfo.rtp_sequence_number       = packet->GetSequenceNumber();
			packetInfo.length                    = packet->GetLength();
			packetInfo.pacing_info               = this->tccClient->GetPacingInfo();

			// Indicate the pacer (and prober) that a packet is to be sent.
			this->tccClient->InsertPacket(packetInfo);

			const std::weak_ptr<RTC::TransportCongestionControlClient> tccClientWeakPtr(this->tccClient);

			auto* shared = this->shared;

#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
			std::weak_ptr<RTC::SenderBandwidthEstimator> senderBweWeakPtr = this->senderBwe;
			RTC::SenderBandwidthEstimator::SentInfo sentInfo;

			sentInfo.wideSeq     = this->transportWideCcSeq;
			sentInfo.size        = packet->GetLength();
			sentInfo.sendingAtMs = this->shared->GetTimeMs();

			const auto* cb = new onSendCallback(
			  [tccClientWeakPtr, shared, packetInfo, senderBweWeakPtr, sentInfo](bool sent) mutable
			  {
				  if (sent)
				  {
					  auto tccClient = tccClientWeakPtr.lock();

					  if (tccClient)
					  {
						  tccClient->PacketSent(packetInfo, shared->GetTimeMsInt64());
					  }

					  auto senderBwe = senderBweWeakPtr.lock();

					  if (senderBwe)
					  {
						  sentInfo.sentAtMs = shared->GetTimeMs();
						  senderBwe->RtpPacketSent(sentInfo);
					  }
				  }
			  });

			SendRtpPacket(consumer, packet, cb);
#else
			const auto* cb = new onSendCallback(
			  [tccClientWeakPtr, shared, packetInfo](bool sent)
			  {
				  if (sent)
				  {
					  auto tccClient = tccClientWeakPtr.lock();

					  if (tccClient)
					  {
						  tccClient->PacketSent(packetInfo, shared->GetTimeMsInt64());
					  }
				  }
			  });

			SendRtpPacket(consumer, packet, cb);
#endif
		}
		else
		{
			SendRtpPacket(consumer, packet);
		}

		this->sendRtxTransmission.Update(packet);
	}

	void Transport::OnConsumerKeyFrameRequested(RTC::Consumer* consumer, uint32_t mappedSsrc)
	{
		MS_TRACE();

		if (!IsConnected())
		{
			MS_WARN_TAG(rtcp, "ignoring key rame request (transport not connected)");

			return;
		}

		this->listener->OnTransportConsumerKeyFrameRequested(this, consumer, mappedSsrc);
	}

	void Transport::OnConsumerNeedBitrateChange(RTC::Consumer* /*consumer*/)
	{
		MS_TRACE();

		MS_ASSERT(this->tccClient, "no TransportCongestionClient");

		DistributeAvailableOutgoingBitrate();
		ComputeOutgoingDesiredBitrate();
	}

	void Transport::OnConsumerNeedZeroBitrate(RTC::Consumer* /*consumer*/)
	{
		MS_TRACE();

		MS_ASSERT(this->tccClient, "no TransportCongestionClient");

		DistributeAvailableOutgoingBitrate();

		// This may be the latest active Consumer with BWE. If so we have to stop probation.
		ComputeOutgoingDesiredBitrate(/*forceBitrate*/ true);
	}

	void Transport::OnConsumerProducerClosed(RTC::Consumer* consumer)
	{
		MS_TRACE();

		// Remove it from the maps.
		this->mapConsumers.erase(consumer->id);

		for (auto ssrc : consumer->GetMediaSsrcs())
		{
			this->mapSsrcConsumer.erase(ssrc);

			// Tell the child class to clear associated SSRCs.
			SendStreamClosed(ssrc);
		}

		for (auto ssrc : consumer->GetRtxSsrcs())
		{
			this->mapRtxSsrcConsumer.erase(ssrc);

			// Tell the child class to clear associated SSRCs.
			SendStreamClosed(ssrc);
		}

		// Notify the listener.
		this->listener->OnTransportConsumerProducerClosed(this, consumer);

		MS_DEBUG_DEV("Consumer closed [consumerId:%s]", consumer->id.c_str());

		// Delete it.
		delete consumer;

		// This may be the latest active Consumer with BWE. If so we have to stop probation.
		if (this->tccClient)
		{
			ComputeOutgoingDesiredBitrate(/*forceBitrate*/ true);
		}
	}

	void Transport::OnDataProducerMessageReceived(
	  RTC::DataProducer* dataProducer,
	  RTC::SCTP::Message message,
	  std::vector<uint16_t>& subchannels,
	  std::optional<uint16_t> requiredSubchannel)
	{
		MS_TRACE();

		this->listener->OnTransportDataProducerMessageReceived(
		  this, dataProducer, std::move(message), subchannels, requiredSubchannel);
	}

	void Transport::OnDataProducerPaused(RTC::DataProducer* dataProducer)
	{
		MS_TRACE();

		this->listener->OnTransportDataProducerPaused(this, dataProducer);
	}

	void Transport::OnDataProducerResumed(RTC::DataProducer* dataProducer)
	{
		MS_TRACE();

		this->listener->OnTransportDataProducerResumed(this, dataProducer);
	}

	void Transport::OnDataConsumerSendMessage(
	  RTC::DataConsumer* dataConsumer, RTC::SCTP::Message message, onQueuedCallback* cb)
	{
		MS_TRACE();

		SendMessage(dataConsumer, std::move(message), cb);
	}

	void Transport::OnDataConsumerNeedBufferedAmount(
	  const RTC::DataConsumer* dataConsumer, uint32_t& bufferedAmount) const
	{
		MS_TRACE();

		if (this->sctpAssociation)
		{
			bufferedAmount = static_cast<uint32_t>(this->sctpAssociation->GetStreamBufferedAmount(
			  dataConsumer->GetSctpStreamParameters().streamId));
		}
		else
		{
			bufferedAmount = 0;
		}
	}

	void Transport::OnDataConsumerNeedBufferedAmountLowThreshold(
	  const RTC::DataConsumer* dataConsumer, uint32_t& bufferedAmountLowThreshold) const
	{
		if (this->sctpAssociation)
		{
			bufferedAmountLowThreshold =
			  static_cast<uint32_t>(this->sctpAssociation->GetStreamBufferedAmountLowThreshold(
			    dataConsumer->GetSctpStreamParameters().streamId));
		}
		else
		{
			bufferedAmountLowThreshold = 0;
		}
	}

	void Transport::OnDataConsumerSetBufferedAmountLowThreshold(
	  const RTC::DataConsumer* dataConsumer, uint32_t bytes) const
	{
		MS_TRACE();

		MS_ASSERT(
		  dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP, "DataConsumer must have type SCTP");

		if (this->sctpAssociation)
		{
			this->sctpAssociation->SetStreamBufferedAmountLowThreshold(
			  dataConsumer->GetSctpStreamParameters().streamId, static_cast<size_t>(bytes));
		}
	}

	void Transport::OnDataConsumerDataProducerClosed(RTC::DataConsumer* dataConsumer)
	{
		MS_TRACE();

		// Remove it from the maps.
		this->mapDataConsumers.erase(dataConsumer->id);

		if (dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP)
		{
			this->mapSctpStreamIdDataConsumers.erase(dataConsumer->GetSctpStreamParameters().streamId);
		}

		if (this->sctpAssociation)
		{
			this->sctpAssociation->ResetStreams(
			  std::array<uint16_t, 1>{ dataConsumer->GetSctpStreamParameters().streamId });
		}

		// Notify the listener.
		this->listener->OnTransportDataConsumerDataProducerClosed(this, dataConsumer);

		MS_DEBUG_DEV("DataConsumer closed [dataConsumerId:%s]", dataConsumer->id.c_str());

		// Delete it.
		delete dataConsumer;
	}

	bool Transport::OnAssociationSendData(const uint8_t* data, size_t len)
	{
		MS_TRACE();

		// Ignore if destroying.
		// NOTE: This is because when the child class (i.e. WebRtcTransport) is deleted,
		// its destructor is called first and then the parent Transport's destructor,
		// and we would end here calling SendData() which is an abstract method.
		if (this->isDestroying)
		{
			MS_WARN_DEV("ignoring sending data because Transport is being destroying");

			return false;
		}

		return SendData(data, len);
	}

	void Transport::OnAssociationConnecting()
	{
		MS_TRACE();

		// Notify the Node Transport.
		auto sctpStateChangeNotification = FBS::Transport::CreateSctpStateChangeNotification(
		  this->shared->GetChannelNotifier()->GetBufferBuilder(),
		  FBS::SctpAssociation::SctpState::CONNECTING);

		this->shared->GetChannelNotifier()->Emit(
		  this->id,
		  FBS::Notification::Event::TRANSPORT_SCTP_STATE_CHANGE,
		  FBS::Notification::Body::Transport_SctpStateChangeNotification,
		  sctpStateChangeNotification);
	}

	void Transport::OnAssociationConnected()
	{
		MS_TRACE();

		// Tell all DataConsumers.
		for (auto& kv : this->mapDataConsumers)
		{
			auto* dataConsumer = kv.second;

			if (dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP)
			{
				dataConsumer->SctpAssociationConnected();
			}
		}

		// Notify the upper layer.

		// First tell it about the SCTP negotiated capabilities.
		auto sctpNegotiatedCapabilitiesOffset = FBS::SctpAssociation::CreateSctpNegotiatedCapabilities(
		  this->shared->GetChannelNotifier()->GetBufferBuilder(),
		  this->sctpAssociation->GetNegotiatedMaxOutboundStreams(),
		  this->sctpAssociation->GetNegotiatedMaxInboundStreams());

		auto sctpNegotiatedCapabilitiesNotification =
		  FBS::Transport::CreateSctpNegotiatedCapabilitiesNotification(
		    this->shared->GetChannelNotifier()->GetBufferBuilder(), sctpNegotiatedCapabilitiesOffset);

		this->shared->GetChannelNotifier()->Emit(
		  this->id,
		  FBS::Notification::Event::TRANSPORT_SCTP_NEGOTIATED_CAPABILITIES,
		  FBS::Notification::Body::Transport_SctpNegotiatedCapabilitiesNotification,
		  sctpNegotiatedCapabilitiesNotification);

		// Then announce "connected" SCTP state.
		auto sctpStateChangeNotification = FBS::Transport::CreateSctpStateChangeNotification(
		  this->shared->GetChannelNotifier()->GetBufferBuilder(),
		  FBS::SctpAssociation::SctpState::CONNECTED);

		this->shared->GetChannelNotifier()->Emit(
		  this->id,
		  FBS::Notification::Event::TRANSPORT_SCTP_STATE_CHANGE,
		  FBS::Notification::Body::Transport_SctpStateChangeNotification,
		  sctpStateChangeNotification);

// For debugging purposes.
#if MS_LOG_DEV_LEVEL == 3
		MS_DUMP("--- SCTP association connected:");
		this->sctpAssociation->Dump();
#endif
	}

	void Transport::OnAssociationFailed(RTC::SCTP::Types::ErrorKind errorKind, std::string_view errorMessage)
	{
		MS_TRACE();

		const auto errorKindStringView = RTC::SCTP::Types::ErrorKindToString(errorKind);

		MS_WARN_TAG(
		  sctp,
		  "SCTP association failed [errorKind:%.*s, message:%.*s]",
		  static_cast<int>(errorKindStringView.size()),
		  errorKindStringView.data(),
		  static_cast<int>(errorMessage.size()),
		  errorMessage.data());

		// Tell all DataConsumers.
		for (auto& kv : this->mapDataConsumers)
		{
			auto* dataConsumer = kv.second;

			if (dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP)
			{
				dataConsumer->SctpAssociationClosed();
			}
		}

		// Notify the Node Transport.
		auto sctpStateChangeNotification = FBS::Transport::CreateSctpStateChangeNotification(
		  this->shared->GetChannelNotifier()->GetBufferBuilder(),
		  FBS::SctpAssociation::SctpState::FAILED);

		this->shared->GetChannelNotifier()->Emit(
		  this->id,
		  FBS::Notification::Event::TRANSPORT_SCTP_STATE_CHANGE,
		  FBS::Notification::Body::Transport_SctpStateChangeNotification,
		  sctpStateChangeNotification);
	}

	void Transport::OnAssociationClosed(RTC::SCTP::Types::ErrorKind errorKind, std::string_view errorMessage)
	{
		MS_TRACE();

		if (errorKind != RTC::SCTP::Types::ErrorKind::SUCCESS)
		{
			const auto errorKindStringView = RTC::SCTP::Types::ErrorKindToString(errorKind);

			MS_WARN_TAG(
			  sctp,
			  "SCTP association closed [errorKind:%.*s, message:%.*s]",
			  static_cast<int>(errorKindStringView.size()),
			  errorKindStringView.data(),
			  static_cast<int>(errorMessage.size()),
			  errorMessage.data());
		}

		// Tell all DataConsumers.
		for (auto& kv : this->mapDataConsumers)
		{
			auto* dataConsumer = kv.second;

			if (dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP)
			{
				dataConsumer->SctpAssociationClosed();
			}
		}

		// Notify the Node Transport.
		auto sctpStateChangeNotification = FBS::Transport::CreateSctpStateChangeNotification(
		  this->shared->GetChannelNotifier()->GetBufferBuilder(),
		  FBS::SctpAssociation::SctpState::CLOSED);

		this->shared->GetChannelNotifier()->Emit(
		  this->id,
		  FBS::Notification::Event::TRANSPORT_SCTP_STATE_CHANGE,
		  FBS::Notification::Body::Transport_SctpStateChangeNotification,
		  sctpStateChangeNotification);
	}

	void Transport::OnAssociationRestarted()
	{
		MS_TRACE();

		MS_DEBUG_TAG(sctp, "SCTP association restarted");
	}

	void Transport::OnAssociationError(RTC::SCTP::Types::ErrorKind errorKind, std::string_view errorMessage)
	{
		MS_TRACE();

		const auto errorKindStringView = RTC::SCTP::Types::ErrorKindToString(errorKind);

		MS_WARN_TAG(
		  sctp,
		  "SCTP association error [errorKind:%.*s, message:%.*s]",
		  static_cast<int>(errorKindStringView.size()),
		  errorKindStringView.data(),
		  static_cast<int>(errorMessage.size()),
		  errorMessage.data());
	}

	void Transport::OnAssociationMessageReceived(RTC::SCTP::Message message)
	{
		MS_TRACE();

		RTC::DataProducer* dataProducer = this->sctpListener.GetDataProducer(message.GetStreamId());

		if (!dataProducer)
		{
			MS_WARN_TAG(
			  sctp,
			  "no suitable DataProducer for received SCTP message [streamId:%" PRIu16 "]",
			  message.GetStreamId());

			return;
		}

		// Pass the SCTP message to the corresponding DataProducer.
		try
		{
			static thread_local std::vector<uint16_t> emptySubchannels;

			dataProducer->ReceiveMessage(
			  std::move(message), emptySubchannels, /*requiredSubchannel*/ std::nullopt);
		}
		catch (std::exception& error)
		{
			MS_WARN_TAG(
			  sctp,
			  "DataProducer::ReceiveMessage() failed for received SCTP message [streamId:%" PRIu16 "]: %s",
			  message.GetStreamId(),
			  error.what());
		}
	}

	void Transport::OnAssociationStreamsResetPerformed(std::span<const uint16_t> /*outboundStreamIds*/)
	{
		MS_TRACE();

		MS_DEBUG_DEV("SCTP association streams reset performed");
	}

	void Transport::OnAssociationStreamsResetFailed(
	  std::span<const uint16_t> /*outboundStreamIds*/, std::string_view errorMessage)
	{
		MS_TRACE();

		MS_WARN_TAG(
		  sctp,
		  "SCTP association streams reset failed [message:%.*s]",
		  static_cast<int>(errorMessage.size()),
		  errorMessage.data());
	}

	void Transport::OnAssociationInboundStreamsReset(std::span<const uint16_t> inboundStreamIds)
	{
		MS_TRACE();

		// https://datatracker.ietf.org/doc/html/rfc8831#section-6.7
		//
		// "Closing of a data channel MUST be signaled by resetting the corresponding
		// outgoing streams [RFC6525]. This means that if one side decides to close
		// the data channel, it resets the corresponding outgoing stream. When the
		// peer sees that an incoming stream was reset, it also resets its
		// corresponding outgoing stream."
		if (this->sctpAssociation->IsDataChannel())
		{
			std::vector<RTC::DataConsumer*> dataConsumersToClose;
			std::vector<uint16_t> streamsToReset;

			for (const auto streamId : inboundStreamIds)
			{
				// Only reset the outgoing stream if there is a live DataConsumer
				// using it. If the DataChannel was closed by the app, the DataConsumer
				// will have already been closed and removed.
				const auto it = this->mapSctpStreamIdDataConsumers.find(streamId);

				if (it != this->mapSctpStreamIdDataConsumers.end())
				{
					auto* dataConsumer = it->second;

					dataConsumersToClose.push_back(dataConsumer);
					streamsToReset.push_back(streamId);
				}
			}

			if (!dataConsumersToClose.empty())
			{
				this->sctpAssociation->ResetStreams(streamsToReset);

				for (auto* dataConsumer : dataConsumersToClose)
				{
					// Remove it from the maps.
					this->mapDataConsumers.erase(dataConsumer->id);

					if (dataConsumer->GetType() == RTC::DataConsumer::Type::SCTP)
					{
						this->mapSctpStreamIdDataConsumers.erase(dataConsumer->GetSctpStreamParameters().streamId);
					}

					// Notify the listener.
					this->listener->OnTransportDataConsumerClosed(this, dataConsumer);

					MS_DEBUG_DEV(
					  "SCTP DataConsumer closed via SCTP inbound stream reset [dataConsumerId:%s, streamId:%" PRIu16
					  "]",
					  dataConsumer->id.c_str(),
					  dataConsumer->GetSctpStreamParameters().streamId);

					// Delete it.
					delete dataConsumer;
				}
			}
		}
	}

	void Transport::OnAssociationStreamBufferedAmountLow(uint16_t streamId)
	{
		MS_TRACE();

		const auto* dataConsumer = GetSctpDataConsumerByStreamId(streamId);

		if (!dataConsumer)
		{
			return;
		}

		dataConsumer->SctpBufferedAmountLow(this->sctpAssociation->GetStreamBufferedAmount(streamId));
	}

	void Transport::OnAssociationTotalBufferedAmountLow()
	{
		MS_TRACE();

		// TODO: SCTP: Here we should emit a new event to the upper layer saying
		// that the transport SCTP total buffered amount is low. However we don't
		// expose `SctpOptions::totalBufferedAmountLowThreshold` to Transport.
	}

	bool Transport::OnAssociationIsTransportReadyForSctp()
	{
		MS_TRACE();

		// We are ready for SCTP traffic if the transport is connected (e.g. the
		// WebRtcTransport has ICE and DTLS connected) and there is at least a
		// DataProducer or DataConsumer.
		//
		// NOTE: We don't want to start SCTP connection if there are no DataProducers
		// and DataConsumers because the peer (e.g. a browser) may have not started
		// its SCTP stack (e.g. no "m=application" media section in its SDP) so if we
		// initiate the SCTP connection it would fail after some time.
		return IsConnected() && (!this->mapDataProducers.empty() || !this->mapDataConsumers.empty());
	}

	void Transport::OnTransportCongestionControlClientBitrates(
	  RTC::TransportCongestionControlClient* /*tccClient*/,
	  RTC::TransportCongestionControlClient::Bitrates& bitrates)
	{
		MS_TRACE();

		MS_DEBUG_DEV("outgoing available bitrate:%" PRIu32, bitrates.availableBitrate);

		DistributeAvailableOutgoingBitrate();
		ComputeOutgoingDesiredBitrate();

		// May emit 'trace' event.
		EmitTraceEventBweType(bitrates);
	}

	void Transport::OnTransportCongestionControlClientSendRtpPacket(
	  RTC::TransportCongestionControlClient* /*tccClient*/,
	  RTC::RTP::Packet* packet,
	  const webrtc::PacedPacketInfo& pacingInfo)
	{
		MS_TRACE();

		// Update abs-send-time if present.
		packet->UpdateAbsSendTime(this->shared->GetTimeMs());

		// Update transport wide sequence number if present.
		if (
		  this->tccClient->GetBweType() == RTC::BweType::TRANSPORT_CC &&
		  packet->UpdateTransportWideCc01(this->transportWideCcSeq + 1))
		{
			this->transportWideCcSeq++;

			// May emit 'trace' event.
			EmitTraceEventProbationType(packet);

			webrtc::RtpPacketSendInfo packetInfo;

			packetInfo.ssrc                      = packet->GetSsrc();
			packetInfo.transport_sequence_number = this->transportWideCcSeq;
			packetInfo.has_rtp_sequence_number   = true;
			packetInfo.rtp_sequence_number       = packet->GetSequenceNumber();
			packetInfo.length                    = packet->GetLength();
			packetInfo.pacing_info               = pacingInfo;

			// Indicate the pacer (and prober) that a packet is to be sent.
			this->tccClient->InsertPacket(packetInfo);

			const std::weak_ptr<RTC::TransportCongestionControlClient> tccClientWeakPtr(this->tccClient);

			auto* shared = this->shared;

#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
			std::weak_ptr<RTC::SenderBandwidthEstimator> senderBweWeakPtr = this->senderBwe;
			RTC::SenderBandwidthEstimator::SentInfo sentInfo;

			sentInfo.wideSeq     = this->transportWideCcSeq;
			sentInfo.size        = packet->GetLength();
			sentInfo.isProbation = true;
			sentInfo.sendingAtMs = this->shared->GetTimeMs();

			const auto* cb = new onSendCallback(
			  [tccClientWeakPtr, shared, packetInfo, senderBweWeakPtr, sentInfo](bool sent) mutable
			  {
				  if (sent)
				  {
					  auto tccClient = tccClientWeakPtr.lock();

					  if (tccClient)
					  {
						  tccClient->PacketSent(packetInfo, shared->GetTimeMsInt64());
					  }

					  auto senderBwe = senderBweWeakPtr.lock();

					  if (senderBwe)
					  {
						  sentInfo.sentAtMs = shared->GetTimeMs();
						  senderBwe->RtpPacketSent(sentInfo);
					  }
				  }
			  });

			SendRtpPacket(nullptr, packet, cb);
#else
			const auto* cb = new onSendCallback(
			  [tccClientWeakPtr, shared, packetInfo](bool sent)
			  {
				  if (sent)
				  {
					  auto tccClient = tccClientWeakPtr.lock();

					  if (tccClient)
					  {
						  tccClient->PacketSent(packetInfo, shared->GetTimeMsInt64());
					  }
				  }
			  });

			SendRtpPacket(nullptr, packet, cb);
#endif
		}
		else
		{
			// May emit 'trace' event.
			EmitTraceEventProbationType(packet);

			SendRtpPacket(nullptr, packet);
		}

		this->sendProbationTransmission.Update(packet);

		MS_DEBUG_DEV(
		  "probation sent [seq:%" PRIu16 ", wideSeq:%" PRIu16 ", size:%zu, bitrate:%" PRIu32 "]",
		  packet->GetSequenceNumber(),
		  this->transportWideCcSeq,
		  packet->GetLength(),
		  this->sendProbationTransmission.GetBitrate(this->shared->GetTimeMs()));
	}

	void Transport::OnTransportCongestionControlServerSendRtcpPacket(
	  RTC::TransportCongestionControlServer* /*tccServer*/, RTC::RTCP::Packet* packet)
	{
		MS_TRACE();

		packet->Serialize(RTC::RTCP::SerializationBuffer);

		SendRtcpPacket(packet);
	}

#ifdef ENABLE_RTC_SENDER_BANDWIDTH_ESTIMATOR
	void Transport::OnSenderBandwidthEstimatorAvailableBitrate(
	  RTC::SenderBandwidthEstimator* /*senderBwe*/,
	  uint32_t availableBitrate,
	  uint32_t previousAvailableBitrate)
	{
		MS_TRACE();

		MS_DEBUG_DEV(
		  "outgoing available bitrate [now:%" PRIu32 ", before:%" PRIu32 "]",
		  availableBitrate,
		  previousAvailableBitrate);

		// TODO: Uncomment once just SenderBandwidthEstimator is used.
		// DistributeAvailableOutgoingBitrate();
		// ComputeOutgoingDesiredBitrate();
	}
#endif

	void Transport::OnTimer(TimerHandleInterface* timer)
	{
		MS_TRACE();

		// RTCP timer.
		if (timer == this->rtcpTimer)
		{
			auto interval        = static_cast<uint64_t>(RTC::RTCP::MaxVideoIntervalMs);
			const uint64_t nowMs = this->shared->GetTimeMs();

			SendRtcp(nowMs);

			/*
			 * The interval between RTCP packets is varied randomly over the range
			 * [1.0, 1.5] times the calculated interval to avoid unintended
			 * synchronization of all participants.
			 */
			interval *= static_cast<float>(Utils::Crypto::GetRandomUInt<uint16_t>(10, 15)) / 10;

			this->rtcpTimer->Start(interval);
		}
	}
} // namespace RTC
