/** * @license * Copyright 2025-2026 Open Home Foundation * SPDX-License-Identifier: Apache-2.0 */ import type { WebRtcCallbackData } from "@matter-server/ws-client"; import { Abort, AsyncObservable, camelize, ClientNode, CommissioningClient, FabricId, FabricIndex, isObject, Logger, MatterAggregateError, Millis, Minutes, NodeId, Observable, Seconds, ServerAddress, SoftwareUpdateInfo, SoftwareUpdateManager, Time, Timer, DnsRecordType, } from "@matter/main"; import { OperationalCredentialsClient } from "@matter/main/behaviors"; import { AccessControl, BasicInformation, Binding, BridgedDeviceBasicInformation, GeneralCommissioning, OperationalCredentials, } from "@matter/main/clusters"; import { WebRtcTransportDefinitions } from "@matter/main/clusters/web-rtc-transport-definitions"; import { WebRtcTransportProvider } from "@matter/main/clusters/web-rtc-transport-provider"; import { DeviceAttestationCheck, Invoke, PeerAddress, Read, Specifier, PeerSet } from "@matter/main/protocol"; import { AttributeId, ClusterId, ClusterType, DeviceTypeId, EndpointNumber, GroupId, ManualPairingCodeCodec, QrPairingCodeCodec, SpecificationVersion, Status, StatusResponseError, VendorId, } from "@matter/main/types"; import { Endpoint } from "@matter/node"; import { CameraControllerDevice } from "@matter/node/devices/camera-controller"; import { CommissioningController, NodeCommissioningOptions } from "@project-chip/matter.js"; import type { DecodedAttributeReportValue, DecodedEventReportValue } from "@project-chip/matter.js/cluster"; import { NodeStates } from "@project-chip/matter.js/device"; import { ClusterMap, ClusterMapEntry, GlobalAttributes } from "../model/ModelMapper.js"; import { buildAttributePath, convertCommandDataToMatter, convertMatterToWebSocketTagBased, convertWebSocketTagBasedToMatter, getDateAsString, splitAttributePath, toBigIntAwareJson, } from "../server/Converters.js"; import { AttributeResponseStatus, AttributesData, CommissioningRequest, CommissioningResponse, DiscoveryRequest, DiscoveryResponse, InvokeRequest, MatterNodeData, OpenCommissioningWindowRequest, OpenCommissioningWindowResponse, WriteAttributeRequest, } from "../types/CommandHandler.js"; import { AccessControlEntry, AccessControlTarget, AttributeWriteResult, BindingTarget, MatterSoftwareVersion, NodePingResult, ServerError, UpdateSource, } from "../types/WebSocketMessageTypes.js"; import { formatNodeId } from "../util/formatNodeId.js"; import { pingIp } from "../util/network.js"; import { WebRtcTransportRequestorServer } from "./behaviors/WebRtcTransportRequestorServer.js"; import { CustomClusterPoller } from "./CustomClusterPoller.js"; import { Nodes } from "./Nodes.js"; import { attachWebRtcCallbackBridge } from "./WebRtcCallbackBridge.js"; const logger = Logger.get("ControllerCommandHandler"); /** Grace period after leaving Connected before a node is declared unavailable. */ const RECONNECT_TIMEOUT = Minutes(3); /** * Determine the Matter specification version from cached attributes. * Uses SpecificationVersion attribute (0/40/21) if available, otherwise * estimates from DataModelRevision attribute (0/40/0). * * @param attributes Cached attributes for the node * @returns Matter version string (e.g., "1.2.0", "1.3.0") or undefined if unknown */ function determineMatterVersion(attributes: AttributesData): string | undefined { // BasicInformation cluster is 0x28 (40 decimal) // SpecificationVersion is attribute 0x15 (21 decimal) = path "0/40/21" // DataModelRevision is attribute 0x0 (0 decimal) = path "0/40/0" const specificationVersion = attributes["0/40/21"]; const dataModelRevision = attributes["0/40/0"]; if (typeof specificationVersion === "number" && specificationVersion > 0) { const { major, minor, patch } = SpecificationVersion.decode(specificationVersion); return `${major}.${minor}.${patch}`; } // Fall back to estimating from DataModelRevision if (typeof dataModelRevision === "number") { if (dataModelRevision <= 16) { return "<1.2.0"; } else if (dataModelRevision === 17) { return "1.2.0"; } } return undefined; } export class ControllerCommandHandler { #controller: CommissioningController; #started = false; #connected = false; readonly #bleEnabled: boolean; readonly #bleProxyEnabled: boolean; readonly #otaEnabled: boolean; /** Node management and attribute cache */ #nodes = new Nodes(); /** Cache of available updates keyed by nodeId */ #availableUpdates = new Map(); /** Poller for custom cluster attributes (Eve energy, etc.) */ #customClusterPoller: CustomClusterPoller; /** Per-node timers that fire when Reconnecting state exceeds the timeout */ #reconnectTimers = new Map(); /** Track in-flight invoke-commands for deduplication across all WebSocket connections */ readonly #inFlightInvokes = new Map>(); events = { started: new AsyncObservable(), attributeChanged: new Observable<[nodeId: NodeId, data: DecodedAttributeReportValue]>(), eventChanged: new Observable<[nodeId: NodeId, data: DecodedEventReportValue]>(), nodeAdded: new Observable<[nodeId: NodeId]>(), nodeStateChanged: new Observable<[nodeId: NodeId, state: NodeStates]>(), /** Emitted when node availability changes (for sending node_updated events) */ nodeAvailabilityChanged: new Observable<[nodeId: NodeId, available: boolean]>(), nodeStructureChanged: new Observable<[nodeId: NodeId]>(), nodeDecommissioned: new Observable<[nodeId: NodeId]>(), nodeEndpointAdded: new Observable<[nodeId: NodeId, endpointId: EndpointNumber]>(), nodeEndpointRemoved: new Observable<[nodeId: NodeId, endpointId: EndpointNumber]>(), webRtcCallback: new Observable<[WebRtcCallbackData]>(), }; #peers?: PeerSet; constructor( controllerInstance: CommissioningController, bleEnabled: boolean, bleProxyEnabled: boolean, otaEnabled: boolean, ) { this.#controller = controllerInstance; this.#bleEnabled = bleEnabled; this.#bleProxyEnabled = bleProxyEnabled; logger.info(`BLE is ${bleEnabled ? "enabled" : "disabled"}${bleProxyEnabled ? " (proxy mode)" : ""}`); this.#otaEnabled = otaEnabled; // Initialize custom cluster poller for Eve energy attributes etc. // Reads automatically trigger change events through the normal attribute flow this.#customClusterPoller = new CustomClusterPoller({ nodeConnected: peer => !!(this.#nodes.has(peer.nodeId) && this.#nodes.get(peer.nodeId).isConnected), handleReadAttributes: (peer, paths, fabricFiltered) => this.handleReadAttributes(peer.nodeId, paths, fabricFiltered), }); } /** * Build the canonical PeerAddress for the given node on this controller's fabric. * * Throws if the controller's fabric is not yet resolved. Callers must run after * controller start; a silent fallback would intern PeerAddressMap entries under the * wrong fabric index and leak poller registrations. */ #peerOf(nodeId: NodeId): PeerAddress { const fabric = this.#controller.fabric; if (fabric === undefined) { throw new Error(`Cannot resolve PeerAddress for node ${nodeId}: controller fabric is not initialized`); } return PeerAddress({ fabricIndex: fabric.fabricIndex, nodeId }); } /** * Format a NodeId as a PeerAddress string for logging. * Uses the controller's fabric index when available, otherwise "?" is used. */ formatNode(nodeId: NodeId): string { const fabricIndex = this.#controller.fabric?.fabricIndex; return formatNodeId(nodeId, fabricIndex); } get started() { return this.#started; } get bleEnabled() { return this.#bleEnabled; } get bleProxyEnabled() { return this.#bleProxyEnabled; } async start() { if (this.#started) { return; } this.#started = true; await this.#controller.start(); logger.notice(`Matter Controller started`); this.#peers = this.#controller.node.env.get(PeerSet); if (this.#otaEnabled) { // Subscribe to OTA provider events to track available updates await this.#setupOtaEventHandlers(); } await this.events.started.emit(); await this.#setupWebRtcCallbackBridge(); } /** * Set up event handlers for OTA update notifications from the SoftwareUpdateManager. */ async #setupOtaEventHandlers() { if (!this.#otaEnabled) { return; } try { const otaProvider = this.#controller.otaProvider; if (!otaProvider) { logger.info("OTA provider not available"); return; } // Access the SoftwareUpdateManager behavior events dynamically // Using 'any' because SoftwareUpdateManager is not directly exported from @matter/node const softwareUpdateManagerEvents = await otaProvider.act(agent => agent.get(SoftwareUpdateManager).events); if (softwareUpdateManagerEvents === undefined) { logger.info("SoftwareUpdateManager not available"); return; } // Handle updateAvailable events - cache the update info softwareUpdateManagerEvents.updateAvailable.on( (peerAddress: PeerAddress, updateDetails: SoftwareUpdateInfo) => { logger.notice(`Update available for node ${this.formatNode(peerAddress.nodeId)}:`, updateDetails); this.#availableUpdates.set(peerAddress.nodeId, updateDetails); }, ); // Handle updateDone events - clear the cached update info softwareUpdateManagerEvents.updateDone.on((peerAddress: PeerAddress) => { logger.notice(`Update done for node ${this.formatNode(peerAddress.nodeId)}`); this.#availableUpdates.delete(peerAddress.nodeId); }); logger.info("OTA event handlers registered"); } catch (error) { logger.warn("Failed to setup OTA event handlers:", error); } } async #setupWebRtcCallbackBridge() { try { await this.#cameraControllerEndpoint().act(agent => { attachWebRtcCallbackBridge(agent.get(WebRtcTransportRequestorServer).events, data => this.events.webRtcCallback.emit(data), ); }); logger.info("WebRTC callback bridge wired"); } catch (error) { logger.warn("Failed to setup WebRTC callback bridge:", error); } } #cameraControllerEndpoint(): Endpoint { return this.#controller.node.endpoints.for("camera-controller") as Endpoint; } /** `originatingEndpointId` is server-injected; any client-supplied value in `payload` is overwritten. */ async sendWebRtcProviderCommand(args: { nodeId: NodeId; endpointId: EndpointNumber; commandName: "ProvideOffer" | "SolicitOffer"; payload: Record; }): Promise { const { nodeId, endpointId, commandName, payload } = args; if (commandName !== "ProvideOffer" && commandName !== "SolicitOffer") { throw ServerError.invalidArguments( `Unsupported WebRTC provider command "${commandName}"; expected ProvideOffer or SolicitOffer`, ); } const requestorEndpoint = this.#cameraControllerEndpoint(); const originatingEndpointId = EndpointNumber(requestorEndpoint.number); const fabricIndex = this.#controller.fabric.fabricIndex; const node = this.#nodes.get(nodeId); const fields: Record = { ...payload, originatingEndpointId, }; const command = commandName === "ProvideOffer" ? "provideOffer" : "solicitOffer"; const response = (await this.#invokeCommand(node.node, { endpoint: endpointId, cluster: WebRtcTransportProvider, command, fields, })) as WebRtcTransportProvider.ProvideOfferResponse | WebRtcTransportProvider.SolicitOfferResponse | undefined; if (response === undefined || typeof response.webRtcSessionId !== "number") { throw ServerError.sdkStackError( `${commandName} did not return a WebRTCSessionID for node ${this.formatNode(nodeId)}`, ); } const streamUsage = payload.streamUsage as WebRtcTransportDefinitions.WebRtcSession["streamUsage"]; const metadataEnabled = (payload.metadataEnabled as boolean | undefined) ?? false; const videoStreams = response.videoStreamId != null ? [response.videoStreamId] : new Array(); const audioStreams = response.audioStreamId != null ? [response.audioStreamId] : new Array(); const session: WebRtcTransportDefinitions.WebRtcSession = { id: response.webRtcSessionId, peerNodeId: nodeId, peerEndpointId: endpointId, streamUsage, metadataEnabled, videoStreams, audioStreams, fabricIndex, }; logger.info( `upserting WebRTC session id=${session.id} peerNodeId=${nodeId} peerEndpointId=${endpointId} fabricIndex=${fabricIndex} streamUsage=${streamUsage} originatingEndpointId=${originatingEndpointId}`, ); await requestorEndpoint.act(agent => { agent.get(WebRtcTransportRequestorServer).upsertSession(session); }); return response; } async close() { for (const timer of this.#reconnectTimers.values()) { timer.stop(); } this.#reconnectTimers.clear(); await this.#customClusterPoller.stop(); if (!this.#started) { return; } return this.#controller.close(); } async #registerNode(nodeId: NodeId) { const node = await this.#controller.getNode(nodeId); const attributeCache = this.#nodes.attributeCache; // Defer the full node_updated until the subscription batch ends (connectionAlive) // so consumers see one update per batch rather than one per attribute. let basicInfoChangedInBatch = false; node.events.attributeChanged.on(data => { attributeCache.updateAttribute(nodeId, data); this.events.attributeChanged.emit(nodeId, data); if ( (data.path.clusterId === BasicInformation.id || data.path.clusterId === BridgedDeviceBasicInformation.id) && data.path.attributeId !== BasicInformation.attributes.nodeLabel.id ) { basicInfoChangedInBatch = true; } }); node.events.connectionAlive.on(() => { if (basicInfoChangedInBatch) { basicInfoChangedInBatch = false; logger.info(`Node ${this.formatNode(nodeId)} basic information changed, sending full node_updated`); this.events.nodeStructureChanged.emit(nodeId); } }); node.events.eventTriggered.on(data => this.events.eventChanged.emit(nodeId, data)); node.events.stateChanged.on(state => { if (state === NodeStates.Connected) { attributeCache.update(node); const attributes = attributeCache.get(nodeId); if (attributes) { this.#customClusterPoller.registerNode(this.#peerOf(nodeId), attributes); } } // Arm on Connected->Reconnecting only; keep running across later // non-Connected states; cancel on return to Connected. if (state === NodeStates.Connected) { this.#reconnectTimers.get(nodeId)?.stop(); this.#reconnectTimers.delete(nodeId); } else if ( state === NodeStates.Reconnecting && !this.#reconnectTimers.has(nodeId) && this.#nodes.isAvailable(nodeId) ) { const timer = Time.getTimer(`reconnect-timeout-${nodeId}`, RECONNECT_TIMEOUT, () => { this.#reconnectTimers.delete(nodeId); if (this.#nodes.forceUnavailable(nodeId)) { logger.warn( `Node ${this.formatNode(nodeId)} offline grace period expired, marking unavailable`, ); this.events.nodeAvailabilityChanged.emit(nodeId, false); } }); timer.utility = true; timer.start(); this.#reconnectTimers.set(nodeId, timer); } const debouncePending = this.#reconnectTimers.has(nodeId); const result = this.#nodes.processStateChange(nodeId, state, debouncePending); this.events.nodeStateChanged.emit(nodeId, state); if (result.availabilityChanged) { const availabilityMessage = `Node ${this.formatNode(nodeId)} availability changed to ${result.available} (state: ${NodeStates[state]})`; if (result.available) { logger.notice(availabilityMessage); } else { logger.warn(availabilityMessage); } this.events.nodeAvailabilityChanged.emit(nodeId, result.available); } }); node.events.structureChanged.on(() => { if (node.isConnected) { attributeCache.update(node); } basicInfoChangedInBatch = false; this.events.nodeStructureChanged.emit(nodeId); for (const endpointId of this.#nodes.drainPendingEndpointAdds(nodeId)) { this.events.nodeEndpointAdded.emit(nodeId, endpointId); } }); node.events.decommissioned.on(() => { this.#cleanupNodeAfterRemoval(nodeId); this.events.nodeDecommissioned.emit(nodeId); }); node.events.nodeEndpointAdded.on(endpointId => this.#nodes.queueEndpointAdded(nodeId, endpointId)); node.events.nodeEndpointRemoved.on(endpointId => this.events.nodeEndpointRemoved.emit(nodeId, endpointId)); this.#nodes.set(nodeId, node); this.#nodes.seedState(nodeId, node.connectionState); if (node.initialized) { attributeCache.add(node); const attributes = attributeCache.get(nodeId); if (attributes) { this.#customClusterPoller.registerNode(this.#peerOf(nodeId), attributes); } } return node; } /** * Initialize the controller, register all commissioned nodes (populates attribute caches), * and start connecting them to the network. * * Guarded by #connected so it runs exactly once, even if called multiple times * (e.g. when WebServer.start() registers handlers for multiple listen addresses). */ async initializeNodes() { if (this.#connected) { return; } this.#connected = true; await this.start(); const nodes = this.#controller.getCommissionedNodes(); logger.info(`Found ${nodes.length} nodes: ${nodes.map(nodeId => this.formatNode(nodeId)).join(", ")}`); for (const nodeId of nodes) { try { logger.info(`Initializing node "${this.formatNode(nodeId)}" ...`); await this.#registerNode(nodeId); } catch (error) { logger.warn(`Failed to initialize node "${this.formatNode(nodeId)}":`, error); } } logger.info(`All ${nodes.length} nodes initialized, starting connections`); // Start connecting nodes to the network (fire-and-forget, actual I/O is async). for (const nodeId of this.#nodes.getIds()) { try { this.#nodes.get(nodeId).connect({ subscribeMinIntervalFloorSeconds: 1, subscribeMaxIntervalCeilingSeconds: undefined, }); } catch (error) { logger.warn(`Failed to connect node "${this.formatNode(nodeId)}":`, error); } } } getNodeIds() { return this.#nodes.getIds(); } hasNode(nodeId: NodeId): boolean { return this.#nodes.has(nodeId); } /** * Whether a node id is already reserved on the fabric, either by a commissioned peer or the commissioner itself. * Authoritative against matter.js rather than the locally tracked node set, which can drift from the fabric. */ isNodeIdInUse(nodeId: NodeId): boolean { return nodeId === this.#controller.nodeId || this.#controller.isNodeCommissioned(nodeId); } /** * Alias for decommissionNode to match NodeCommandHandler interface. */ removeNode(nodeId: NodeId) { return this.decommissionNode(nodeId); } async interviewNode(nodeId: NodeId) { const node = this.#nodes.get(nodeId); // Our nodes are kept up-to-date via attribute subscriptions, so we don't need // to re-read all attributes like the Python server does. The caller is responsible // for broadcasting node_updated after the interview completes. logger.info(`Interview requested for node ${this.formatNode(nodeId)} - do a complete read`); // Do a full Read of the node const read = { ...Read({ fabricFilter: true, attributes: [{}], }), includeKnownVersions: true, // do not send DataVersionFilters, so we do a new clean read }; for await (const _chunk of node.node.interaction.read(read)); } /** * Get full node details in WebSocket API format. * @param nodeId The node ID * @param lastInterviewDate Optional last interview date (tracked externally) */ getNodeDetails(nodeId: NodeId, lastInterviewDate?: Date): MatterNodeData { const node = this.#nodes.get(nodeId); const attributeCache = this.#nodes.attributeCache; let isBridge = false; // Ensure the cache is populated if node is initialized but cache doesn't exist yet if (!attributeCache.has(nodeId)) { attributeCache.add(node); } // Get cached attributes (empty object if node not yet initialized) const attributes = attributeCache.get(nodeId) ?? {}; // Bridge detection: Check endpoint 1's Descriptor cluster (29) DeviceTypeList attribute (0) // for device type 14 (Aggregator), matching Python Matter Server behavior const endpoint1DeviceTypes = attributes["1/29/0"]; if (Array.isArray(endpoint1DeviceTypes)) { isBridge = endpoint1DeviceTypes.some(entry => entry["0"] === 14); } return { node_id: node.nodeId, date_commissioned: getDateAsString(new Date(node.state.commissioning.commissionedAt ?? Date.now())), last_interview: getDateAsString(lastInterviewDate ?? new Date()), interview_version: 6, available: this.#nodes.isAvailable(nodeId), is_bridge: isBridge, attributes, attribute_subscriptions: [], matter_version: determineMatterVersion(attributes), }; } /** * Read multiple attributes from a node by path strings. * Supports wildcards in paths. Batches up to 9 paths per read call. */ async handleReadAttributes( nodeId: NodeId, attributePaths: string[], fabricFiltered = false, ): Promise { const result: AttributesData = {}; const node = this.#nodes.get(nodeId); const batchSize = 9; const parsedPaths = attributePaths.map(path => splitAttributePath(path)); for (let i = 0; i < parsedPaths.length; i += batchSize) { const batch = parsedPaths.slice(i, i + batchSize); const readRequest = { ...Read({ fabricFilter: fabricFiltered, attributes: batch.map(({ endpointId, clusterId, attributeId }) => ({ endpointId: endpointId !== undefined ? EndpointNumber(endpointId) : undefined, clusterId: clusterId !== undefined ? ClusterId(clusterId) : undefined, attributeId: attributeId !== undefined ? AttributeId(attributeId) : undefined, })), }), includeKnownVersions: true, }; for await (const chunk of node.node.interaction.read(readRequest)) { for await (const entry of chunk) { if (entry.kind === "attr-value") { const { pathStr, value: wsValue } = this.#convertAttributeToWebSocket( { endpointId: EndpointNumber(entry.path.endpointId), clusterId: ClusterId(entry.path.clusterId), attributeId: entry.path.attributeId, }, entry.value, ); result[pathStr] = wsValue; } else if (entry.kind === "attr-status") { const pathStr = buildAttributePath( entry.path.endpointId, entry.path.clusterId, entry.path.attributeId, ); logger.warn(`Failed to read attribute ${pathStr}: status=${entry.status}`); } } } } return result; } /** * Convert attribute data to WebSocket tag-based format. */ #convertAttributeToWebSocket( path: { endpointId: EndpointNumber; clusterId: ClusterId; attributeId: number }, value: unknown, clusterData?: ClusterMapEntry, ) { const { endpointId, clusterId, attributeId } = path; if (!clusterData) { clusterData = ClusterMap[clusterId]; } return { pathStr: buildAttributePath(endpointId, clusterId, attributeId), value: convertMatterToWebSocketTagBased( value, clusterData?.attributes[attributeId] ?? GlobalAttributes[attributeId], clusterData?.model, ), }; } /** * Write a single attribute on a remote node. Uses `setStateOf(string, ...)` (not `set({...})`) * because peer cluster behaviors are dynamically registered and aren't on the agent's cached property getters. */ async #writeAttribute( nodeId: NodeId, endpointId: EndpointNumber, clusterId: ClusterId, attributeName: string, value: unknown, ): Promise<{ status: number; clusterStatus?: number }> { const node = this.#nodes.get(nodeId); const clusterEntry = ClusterMap[clusterId]; if (!clusterEntry) { throw ServerError.invalidArguments(`Cluster Id "${clusterId}" unknown`); } const clusterProperty = clusterEntry.model.propertyName; try { await node.node.endpoints.for(endpointId).setStateOf(clusterProperty, { [attributeName]: value }); return { status: 0 }; } catch (error) { if (error instanceof MatterAggregateError) { const first = error.errors.find((e): e is StatusResponseError => e instanceof StatusResponseError); if (first !== undefined) { const dropped = error.errors.filter(e => e !== first); if (dropped.length > 0) { logger.info( `Write aggregate: reporting first error, dropping ${dropped.length} additional`, dropped, ); } return { status: first.code, clusterStatus: first.clusterCode }; } throw error; } StatusResponseError.accept(error); return { status: error.code, clusterStatus: error.clusterCode }; } } /** * Set the fabric label. Pass null or empty string to reset to "Home". * Note: matter.js requires non-empty labels (1-32 chars), so null/empty resets to default. */ async setFabricLabel(label: string) { await this.#controller.updateFabricLabel(label); } async handleWriteAttribute(data: WriteAttributeRequest): Promise { const { nodeId, endpointId, clusterId, attributeId } = data; let { value } = data; const clusterEntry = ClusterMap[clusterId]; const attributeModel = clusterEntry?.attributes[attributeId]; if (!clusterEntry || !attributeModel) { throw ServerError.invalidArguments(`Attribute ${attributeId} on cluster ${clusterId} unknown`); } value = convertWebSocketTagBasedToMatter(value, attributeModel, clusterEntry.model); const attributeName = attributeModel.propertyName; logger.info(`Writing attribute ${clusterId}.${attributeName} (${clusterId}.${attributeId}) with value`, value); const { status, clusterStatus } = await this.#writeAttribute( nodeId, endpointId, clusterId, attributeModel.propertyName, value, ); return { attributeId, clusterId, endpointId, status, clusterStatus }; } async #invokeCommand( node: ClientNode, request: Invoke.ConcreteCommandRequest, options: Omit = {}, ) { const invoke = Invoke({ commands: [request], ...options, }); for await (const data of node.interaction.invoke(invoke)) { for (const entry of data) { // We send only one command, so we only get one response back switch (entry.kind) { case "cmd-status": if (entry.status !== Status.Success) { throw StatusResponseError.create(entry.status, undefined, entry.clusterStatus); } return; case "cmd-response": return entry.data; } } } } async handleInvoke(data: InvokeRequest): Promise { const { nodeId, endpointId, clusterId, timedInteractionTimeoutMs: timedRequestTimeoutMs, interactionTimeoutMs, } = data; let { data: commandData } = data; const clusterEntry = ClusterMap[clusterId]; if (!clusterEntry) { throw ServerError.invalidArguments(`Cluster Id "${clusterId}" unknown`); } const clusterName = clusterEntry.model.propertyName; const commandName = camelize(data.commandName); const commands = ( this.#nodes.get(nodeId).node.endpoints.for(endpointId).commands as Record> )[clusterName]; if (!commands[commandName]) { throw ServerError.invalidArguments(`Command "${commandName}" does not exist on cluster "${clusterName}"`); } if (isObject(commandData)) { if (Object.keys(commandData).length === 0) { commandData = undefined; } else { const model = clusterEntry.commands[commandName.toLowerCase()]; if (model) { commandData = convertCommandDataToMatter(commandData, model, clusterEntry.model); } } } // Build dedup key from validated/converted fields const serializedData = commandData !== undefined ? toBigIntAwareJson(commandData as object) : ""; const dedupKey = `${nodeId}:${endpointId}:${clusterId}:${commandName}:${serializedData}`; // Check for in-flight duplicate const existing = this.#inFlightInvokes.get(dedupKey); if (existing !== undefined) { logger.warn( `Duplicate command detected for ${this.formatNode(nodeId)}/${endpointId}/${clusterName}/${commandName} - coalescing with in-flight request`, ); return existing; } // Resolve cluster namespace with command definitions for typed invoke const cluster = ClusterType(clusterEntry.model) as Specifier.ClusterLike; // Execute and track the command const invokePromise = this.#invokeCommand( this.#nodes.get(nodeId).node, { endpoint: endpointId, cluster, command: commandName, fields: commandData, }, { timeout: timedRequestTimeoutMs !== undefined ? Millis(timedRequestTimeoutMs) : undefined, expectedProcessingTime: interactionTimeoutMs !== undefined ? Millis(interactionTimeoutMs) : undefined, }, ); // Store the in-flight invoke immediately so concurrent requests can see it this.#inFlightInvokes.set(dedupKey, invokePromise); // Ensure cleanup of the in-flight entry once the invoke completes return invokePromise.finally(() => { this.#inFlightInvokes.delete(dedupKey); }); } #determineCommissionOptions(data: CommissioningRequest): NodeCommissioningOptions { let passcode: number | undefined = undefined; let shortDiscriminator: number | undefined = undefined; let longDiscriminator: number | undefined = undefined; let productId: number | undefined = undefined; let vendorId: VendorId | undefined = undefined; let knownAddress: ServerAddress | undefined = undefined; if ("manualCode" in data && data.manualCode.length > 0) { const pairingCodeCodec = ManualPairingCodeCodec.decode(data.manualCode); shortDiscriminator = pairingCodeCodec.shortDiscriminator; longDiscriminator = undefined; passcode = pairingCodeCodec.passcode; } else if ("qrCode" in data && data.qrCode.length > 0) { const pairingCodeCodec = QrPairingCodeCodec.decode(data.qrCode); // TODO handle the case where multiple devices are included longDiscriminator = pairingCodeCodec[0].discriminator; shortDiscriminator = undefined; passcode = pairingCodeCodec[0].passcode; } else if ("passcode" in data) { passcode = data.passcode; // Check for discriminator-based discovery if ("shortDiscriminator" in data) { shortDiscriminator = data.shortDiscriminator; } else if ("longDiscriminator" in data) { longDiscriminator = data.longDiscriminator; } else if ("vendorId" in data && "productId" in data) { vendorId = VendorId(data.vendorId); productId = data.productId; } // If none of the above, discovers any commissionable device } else { throw ServerError.invalidArguments("No pairing code provided"); } if (data.knownAddress !== undefined) { const { ip, port } = data.knownAddress; knownAddress = { type: "udp", ip, port, }; } if (passcode == undefined) { throw ServerError.invalidArguments("No passcode provided"); } const { onNetworkOnly, wifiCredentials: wifiNetwork, threadCredentials: threadNetwork } = data; return { commissioning: { nodeId: data.nodeId, regulatoryLocation: GeneralCommissioning.RegulatoryLocationType.IndoorOutdoor, regulatoryCountryCode: "XX", wifiNetwork, threadNetwork, onAttestationFailure: findings => { let testCertReason: string | undefined; let hardError = false; for (const f of findings) { if (f.type === DeviceAttestationCheck.TrustedAsTestCertificate) { testCertReason = 'Device uses a test/development certificate. Enable the "Test Net DCL" option ' + "(--enable-test-net-dcl) to commission test or development devices"; } else if (f.level === "error") { hardError = true; } logger.info(`Attestation finding (${f.level}):`, f.type, f.message); } if (testCertReason !== undefined) { logger.notice(`Attestation rejected: ${testCertReason}`); return testCertReason; } logger.info(`Attestation ${hardError ? "rejected" : "accepted"}`); return !hardError; }, }, discovery: { knownAddress, identifierData: longDiscriminator !== undefined ? { longDiscriminator } : shortDiscriminator !== undefined ? { shortDiscriminator } : vendorId !== undefined ? { vendorId, productId } : {}, discoveryCapabilities: { ble: this.bleEnabled && !onNetworkOnly, onIpNetwork: true, }, }, passcode, }; } async commissionNode(data: CommissioningRequest): Promise { let nodeId: NodeId; try { nodeId = await this.#controller.commissionNode(this.#determineCommissionOptions(data), { connectNodeAfterCommissioning: true, }); } catch (error) { // Preserve the original error message with context const originalMessage = error instanceof Error ? error.message : String(error); throw ServerError.nodeCommissionFailed( `Commission failed: ${originalMessage}`, error instanceof Error ? error : undefined, ); } await this.#registerNode(nodeId); this.events.nodeAdded.emit(nodeId); return { nodeId }; } getCommissionerNodeId() { return this.#controller.nodeId; } async getCommissionerFabricData(): Promise<{ fabricId: FabricId; compressedFabricId: bigint; fabricIndex: number; }> { const { fabricId, globalId, fabricIndex } = this.#controller.fabric; return { fabricId, compressedFabricId: globalId, fabricIndex, }; } /** Discover commissionable devices */ async handleDiscovery({ findBy }: DiscoveryRequest): Promise { const result = await this.#controller.discoverCommissionableDevices( findBy ?? {}, { onIpNetwork: true }, undefined, Seconds(3), // Just check for 3 sec ); logger.info("Discovered result", result); // Chip is not removing old discoveries when being stopped, so we still have old and new devices in the result // but the expectation is that it was reset and only new devices are in the result const latestDiscovery = result[result.length - 1]; if (latestDiscovery === undefined) { return []; } return [latestDiscovery].map(({ DT, DN, CM, D, RI, PH, PI, T, VP, deviceIdentifier, addresses, SII, SAI }) => { const supportsTcpClient = T?.tcpClient ?? false; const supportsTcpServer = T?.tcpServer ?? false; const vendorId = VP === undefined ? -1 : VP.includes("+") ? parseInt(VP.split("+")[0]) : parseInt(VP); const productId = VP === undefined ? -1 : VP.includes("+") ? parseInt(VP.split("+")[1]) : -1; const firstAddress = addresses[0]; const port = firstAddress && ServerAddress.isIp(firstAddress) ? firstAddress.port : 0; const numIPs = addresses.length; return { commissioningMode: CM, deviceName: DN ?? "", deviceType: DT ?? 0, hostName: "000000000000", // Right now we do not return real hostname, only used internally instanceName: deviceIdentifier, longDiscriminator: D, numIPs, pairingHint: PH ?? -1, pairingInstruction: PI ?? "", port, productId, rotatingId: RI ?? "", rotatingIdLen: RI?.length ?? 0, shortDiscriminator: (D >> 8) & 0x0f, vendorId, supportsTcpServer, supportsTcpClient, addresses: addresses.filter(ServerAddress.isIp).map(({ ip }) => ip), mrpSessionIdleInterval: SII, mrpSessionActiveInterval: SAI, }; }); } async getNodeIpAddresses(nodeId: NodeId, preferCache = true) { const addresses = new Set(); const peer = this.#peers?.for(this.#controller.fabric.addressOf(nodeId)); if (peer) { for (const address of peer.service.addresses) { addresses.add(address.ip); } } if (!preferCache || !addresses.size) { // Try mDNS discovery first (like Python matter server does) for (const address of await this.#discoverNodeAddressesViaMdns(nodeId)) { addresses.add(address); } } if (peer) { const sessionIp = peer.newestSession()?.channel.networkAddress?.ip; if (sessionIp) { addresses.add(sessionIp); } } // Fall back to commissioning addresses from the node state if mDNS fails const node = this.#nodes.get(nodeId); const commissioningAddresses = node.node.maybeStateOf(CommissioningClient)?.addresses; if (commissioningAddresses !== undefined && commissioningAddresses.length > 0) { const fallbackAddresses = commissioningAddresses.filter(ServerAddress.isIp).map(addr => addr.ip); for (const address of fallbackAddresses) { addresses.add(address); } } return [...addresses.values()]; } /** * Discover node IP addresses via mDNS (like Python matter server). * Uses 3-second timeout matching Python implementation. */ async #discoverNodeAddressesViaMdns(nodeId: NodeId): Promise { try { if (this.#peers === undefined) { logger.debug(`Node ${this.formatNode(nodeId)}: No PeerSet available, Controller started?`); return []; } const peer = this.#peers.for(this.#controller.fabric.addressOf(nodeId)); const abort = new Abort({ timeout: Seconds(3) }); const names = peer.service.names; await names.solicitor.discover({ abort, name: peer.service.name, recordTypes: [DnsRecordType.SRV], }); const addresses = new Array(); for (const address of peer.service.addresses) { addresses.push(address.ip); } return addresses; } catch (error) { logger.info(`Node ${this.formatNode(nodeId)}: mDNS discovery failed`, error); return []; } } /** * Ping a node on all its known IP addresses. * @param nodeId The node ID to ping * @param attempts Number of ping attempts per IP (default: 1) * @returns A record of IP addresses to ping success status */ async pingNode(nodeId: NodeId, attempts = 1): Promise { const node = this.#nodes.get(nodeId); const result: NodePingResult = {}; // Get all IP addresses for the node (fresh lookup, not cached) const ipAddresses = await this.getNodeIpAddresses(nodeId, false); if (ipAddresses.length === 0) { logger.info(`No IP addresses found for node ${this.formatNode(nodeId)}`); return result; } logger.info(`Pinging node ${this.formatNode(nodeId)} on ${ipAddresses.length} addresses:`, ipAddresses); // Ping all addresses in parallel const pingPromises = ipAddresses.map(async ip => { const cleanIp = ip.includes("%") ? ip.split("%")[0] : ip; logger.debug(`Pinging ${cleanIp}`); const success = await pingIp(ip, 10, attempts); result[ip] = success; logger.debug(`Ping result for ${cleanIp}: ${success}`); }); await Promise.all(pingPromises); // If the node is connected, treat the connection as valid if (node.isConnected) { // Find any successful ping or mark the connection as reachable const anySuccess = Object.values(result).some(v => v); if (!anySuccess && ipAddresses.length > 0) { // Node is connected, but no pings succeeded - this can happen // with Thread devices or certain network configurations logger.info(`Node ${this.formatNode(nodeId)} is connected but no pings succeeded`); } } return result; } async decommissionNode(nodeId: NodeId) { const node = this.#nodes.has(nodeId) ? this.#nodes.get(nodeId) : undefined; if (node === undefined) { throw ServerError.nodeNotExists(nodeId); } await this.#controller.removeNode(nodeId, !!node?.isConnected); this.#cleanupNodeAfterRemoval(nodeId); } /** * Drop all references to a removed node so subsequent reads don't reach a * destroyed PairedNode. Idempotent — both the `decommissioned` listener * (external fabric leave) and `decommissionNode` invoke it, and the * listener may have run first. */ #cleanupNodeAfterRemoval(nodeId: NodeId) { this.#reconnectTimers.get(nodeId)?.stop(); this.#reconnectTimers.delete(nodeId); this.#nodes.delete(nodeId); this.#customClusterPoller.unregisterNode(this.#peerOf(nodeId)); this.#availableUpdates.delete(nodeId); } async openCommissioningWindow(data: OpenCommissioningWindowRequest): Promise { const { nodeId, timeout } = data; const node = this.#nodes.get(nodeId); const { manualPairingCode, qrPairingCode } = await node.openEnhancedCommissioningWindow(timeout); return { manualCode: manualPairingCode, qrCode: qrPairingCode }; } async getFabrics(nodeId: NodeId) { const node = this.#nodes.get(nodeId); const read = { ...Read( { fabricFilter: false, }, Read.Attribute({ endpoint: EndpointNumber(0), cluster: OperationalCredentials, attributes: "fabrics", }), ), includeKnownVersions: true, // we want to read from device }; for await (const chunk of node.node.interaction.read(read)) { for await (const attr of chunk) { if (attr.kind === "attr-value" && Array.isArray(attr.value)) { // We only expect one array response return attr.value.map(({ fabricId, fabricIndex, vendorId, label }) => ({ fabricId, vendorId, fabricIndex, label, })); } logger.warn("Unexpected response from fabrics read", attr); } } throw ServerError.sdkStackError("No or invalid response received while querying fabrics"); } removeFabric(nodeId: NodeId, fabricIndex: FabricIndex) { return this.#nodes.get(nodeId).node.commandsOf(OperationalCredentialsClient).removeFabric({ fabricIndex }); } /** * Fabric index assigned to this controller on the peer. * Some peers reject the NO_FABRIC (0) sentinel inside fabric-scoped list entries * even though the spec allows server-side auto-fill, so we send the resolved index. */ #currentFabricIndex(nodeId: NodeId): FabricIndex { const state = this.#nodes .get(nodeId) .node.endpoints.for(EndpointNumber(0)) .maybeStateOf(OperationalCredentialsClient); return state?.currentFabricIndex ?? FabricIndex.NO_FABRIC; } /** * Set Access Control List entries on a node. * Writes to the ACL attribute on the AccessControl cluster (endpoint 0). */ async setAclEntry(nodeId: NodeId, entries: AccessControlEntry[]): Promise { const fabricIndex = this.#currentFabricIndex(nodeId); const aclEntries: AccessControl.AccessControlEntry[] = entries.map(entry => ({ privilege: entry.privilege as AccessControl.AccessControlEntryPrivilege, authMode: entry.auth_mode as AccessControl.AccessControlEntryAuthMode, subjects: entry.subjects?.map(s => NodeId(BigInt(s))) ?? null, targets: entry.targets?.map((t: AccessControlTarget) => ({ cluster: t.cluster !== null ? ClusterId(t.cluster) : null, endpoint: t.endpoint !== null ? EndpointNumber(t.endpoint) : null, deviceType: t.device_type !== null ? DeviceTypeId(t.device_type) : null, })) ?? null, fabricIndex, })); // A node always has access to itself, so an ACL entry naming the node's own id as a subject // (a "self-ACL") is meaningless. Strip such subjects (and drop entries left with none) rather // than write them to the device, but still report success to the caller. const nodeKey = nodeId.toString(); let selfRemoved = false; const filteredEntries = aclEntries .map(entry => { if (!entry.subjects) return entry; const subjects = entry.subjects.filter(s => s.toString() !== nodeKey); if (subjects.length === entry.subjects.length) return entry; selfRemoved = true; return subjects.length > 0 ? { ...entry, subjects } : undefined; }) .filter((entry): entry is AccessControl.AccessControlEntry => entry !== undefined); if (selfRemoved) { logger.warn( `ACL for node ${nodeId} named the node's own id as a subject (self-ACL); a node always has access to itself, so those subjects are not written to the device. Reporting success.`, ); } logger.info("Setting ACL entries", filteredEntries); const { status } = await this.#writeAttribute( nodeId, EndpointNumber(0), AccessControl.id, "acl", filteredEntries, ); return [ { path: { endpoint_id: 0, cluster_id: AccessControl.id, attribute_id: 0 }, status, }, ]; } /** * Set bindings on a specific endpoint of a node. * Writes to the Binding attribute on the Binding cluster. */ async setNodeBinding( nodeId: NodeId, endpointId: EndpointNumber, bindings: BindingTarget[], ): Promise { const fabricIndex = this.#currentFabricIndex(nodeId); const bindingEntries: Binding.Target[] = bindings.map(binding => ({ node: binding.node !== null ? NodeId(binding.node) : undefined, group: binding.group !== null ? GroupId(binding.group) : undefined, endpoint: binding.endpoint !== null ? EndpointNumber(binding.endpoint) : undefined, cluster: binding.cluster !== null ? ClusterId(binding.cluster) : undefined, fabricIndex, })); logger.info("Setting bindings on endpoint", endpointId, bindingEntries); const { status } = await this.#writeAttribute(nodeId, endpointId, Binding.id, "binding", bindingEntries); return [ { path: { endpoint_id: endpointId, cluster_id: Binding.id, attribute_id: 0 }, status, }, ]; } /** * Check if a software update is available for a node. * First checks the cached updates from OTA events, then queries the DCL if not found. */ async checkNodeUpdate(nodeId: NodeId): Promise { if (!this.#otaEnabled) { throw ServerError.updateCheckError("OTA is disabled"); } // First check if we have a cached update from the updateAvailable event const cachedUpdate = this.#availableUpdates.get(nodeId); if (cachedUpdate) { return this.#convertToMatterSoftwareVersion(cachedUpdate); } // No cached update, query the OTA provider const node = this.#nodes.get(nodeId); try { const otaProvider = this.#controller.otaProvider; if (!otaProvider) { logger.info("OTA provider not available"); return null; } // Query OTA provider for updates using dynamic behavior access const updatesAvailable = await otaProvider.act(agent => agent.get(SoftwareUpdateManager).queryUpdates({ peerToCheck: node.node, includeStoredUpdates: true, }), ); // Find update for this specific node const peerAddress = this.#controller.fabric.addressOf(nodeId); const nodeUpdate = updatesAvailable.find(({ peerAddress: updateAddress }) => PeerAddress.is(peerAddress, updateAddress), ); if (nodeUpdate) { const { info } = nodeUpdate; this.#availableUpdates.set(nodeId, info); return this.#convertToMatterSoftwareVersion(info); } return null; } catch (error) { logger.warn(`Failed to check for updates for node ${this.formatNode(nodeId)}:`, error); return null; } } /** * Trigger a software update for a node. * @param nodeId The node to update * @param softwareVersion The target software version to update to */ async updateNode(nodeId: NodeId, softwareVersion: number): Promise { if (!this.#otaEnabled) { throw ServerError.updateError("OTA is disabled"); } if (!this.#nodes.has(nodeId)) { throw ServerError.nodeNotExists(nodeId); } if (!this.#nodes.isAvailable(nodeId)) { throw ServerError.updateError(`Node ${this.formatNode(nodeId)} is not connected. Retry later`); } // Check if node is already updating by checking the OTA Requestor UpdateState attribute // Attribute path: 0/42/2 (endpoint 0, OtaSoftwareUpdateRequestor cluster, UpdateState attribute) // UpdateState 1 = Idle, anything else means update in progress const cachedAttributes = this.#nodes.attributeCache.get(nodeId); const updateState = cachedAttributes?.["0/42/2"]; if (updateState !== undefined && updateState !== 1) { throw ServerError.updateError( `Node ${this.formatNode(nodeId)} is already in the process of updating (state: ${updateState})`, ); } const otaProvider = this.#controller.otaProvider; if (!otaProvider) { throw ServerError.updateError("OTA provider not available"); } let updateInfo = this.#availableUpdates.get(nodeId); if (!updateInfo) { const result = await this.checkNodeUpdate(nodeId); if (!result) { throw ServerError.updateError("No update available for this node"); } updateInfo = this.#availableUpdates.get(nodeId); if (!updateInfo) { throw ServerError.updateError("Failed to get update info"); } } logger.info(`Starting update for node ${this.formatNode(nodeId)} to version ${softwareVersion}`); await otaProvider.act(agent => agent .get(SoftwareUpdateManager) .forceUpdate( this.#controller.fabric.addressOf(nodeId), updateInfo.vendorId, updateInfo.productId, softwareVersion, ), ); return this.#convertToMatterSoftwareVersion(updateInfo); } /** * Convert SoftwareUpdateInfo to MatterSoftwareVersion format for WebSocket API. */ #convertToMatterSoftwareVersion(updateInfo: SoftwareUpdateInfo): MatterSoftwareVersion { const { vendorId, productId, softwareVersion, softwareVersionString, releaseNotesUrl, source } = updateInfo; return { vid: vendorId, pid: productId, software_version: softwareVersion, software_version_string: softwareVersionString, min_applicable_software_version: 0, // Not available from SoftwareUpdateInfo max_applicable_software_version: softwareVersion - 1, release_notes_url: releaseNotesUrl, update_source: source === "dcl-prod" ? UpdateSource.MAIN_NET_DCL : source === "dcl-test" ? UpdateSource.TEST_NET_DCL : UpdateSource.LOCAL, }; } }