diff --git a/app/src/lib/stores/socket.ts b/app/src/lib/stores/socket.ts index 59a346e..2d76637 100644 --- a/app/src/lib/stores/socket.ts +++ b/app/src/lib/stores/socket.ts @@ -85,6 +85,14 @@ function createWebSocket() { const message_listeners = new Map void>>() const event_listeners = new Map void>>() const pending_requests = new Map() + const queued_requests = new Map< + string, + { + data: CorrelationRequestData + resolve: (r: CorrelationResponse) => void + reject: (e: Error) => void + } + >() const { subscribe, set } = writable(false) const reconnectTimeoutTime = 500000 const requestTimeoutTime = 10000 @@ -94,6 +102,13 @@ function createWebSocket() { let ws: WebSocket let socketUrl: string | URL + function getRequestKey(data: CorrelationRequestData): string { + return ( + Object.keys(data).find(k => data[k as keyof CorrelationRequestData] !== undefined) ?? + 'unknown' + ) + } + function init(url: string | URL) { socketUrl = url connect() @@ -116,6 +131,7 @@ function createWebSocket() { set(true) clearTimeout(reconnectTimeoutId) resubscribeAll() + flushQueuedRequests() event_listeners.get('open')?.forEach(listener => listener(ev)) } ws.onmessage = frame => { @@ -205,6 +221,30 @@ function createWebSocket() { send(WebsocketMessage.create({ pingmsg: {} })) } + function sendRequest( + data: CorrelationRequestData, + resolve: (r: CorrelationResponse) => void, + reject: (e: Error) => void + ) { + const correlationId = ++correlationIdCounter + const timeoutId = setTimeout(() => { + pending_requests.delete(correlationId) + reject(new Error(`Request timeout (id: ${correlationId})`)) + }, requestTimeoutTime) + + pending_requests.set(correlationId, { resolve, reject, timeoutId }) + + const request = CorrelationRequest.create({ correlationId, ...data }) + send(WebsocketMessage.create({ correlationRequest: request })) + } + + function flushQueuedRequests() { + for (const [, { data, resolve, reject }] of queued_requests) { + sendRequest(data, resolve, reject) + } + queued_requests.clear() + } + return { subscribe, sendEvent, @@ -237,21 +277,16 @@ function createWebSocket() { }, request: (data: CorrelationRequestData): Promise => { return new Promise((resolve, reject) => { - if (!ws || ws.readyState !== WebSocket.OPEN) { - reject(new Error('WebSocket not connected')) - return + if (ws && ws.readyState === WebSocket.OPEN) { + sendRequest(data, resolve, reject) + } else { + const key = getRequestKey(data) + const existing = queued_requests.get(key) + if (existing) { + existing.reject(new Error('Request superseded by newer request')) + } + queued_requests.set(key, { data, resolve, reject }) } - - const correlationId = ++correlationIdCounter - const timeoutId = setTimeout(() => { - pending_requests.delete(correlationId) - reject(new Error(`Request timeout (id: ${correlationId})`)) - }, requestTimeoutTime) - - pending_requests.set(correlationId, { resolve, reject, timeoutId }) - - const request = CorrelationRequest.create({ correlationId, ...data }) - send(WebsocketMessage.create({ correlationRequest: request })) }) } }