✨ Quest socket request when connection down
This commit is contained in:
@@ -85,6 +85,14 @@ function createWebSocket() {
|
||||
const message_listeners = new Map<number, Set<(data?: unknown) => void>>()
|
||||
const event_listeners = new Map<string, Set<(data?: unknown) => void>>()
|
||||
const pending_requests = new Map<number, PendingRequest>()
|
||||
const queued_requests = new Map<
|
||||
string,
|
||||
{
|
||||
data: CorrelationRequestData
|
||||
resolve: (r: CorrelationResponse) => void
|
||||
reject: (e: Error) => void
|
||||
}
|
||||
>()
|
||||
const { subscribe, set } = writable(false)
|
||||
const reconnectTimeoutTime = 500000
|
||||
const requestTimeoutTime = 10000
|
||||
@@ -94,6 +102,13 @@ function createWebSocket() {
|
||||
let ws: WebSocket
|
||||
let socketUrl: string | URL
|
||||
|
||||
function getRequestKey(data: CorrelationRequestData): string {
|
||||
return (
|
||||
Object.keys(data).find(k => data[k as keyof CorrelationRequestData] !== undefined) ??
|
||||
'unknown'
|
||||
)
|
||||
}
|
||||
|
||||
function init(url: string | URL) {
|
||||
socketUrl = url
|
||||
connect()
|
||||
@@ -116,6 +131,7 @@ function createWebSocket() {
|
||||
set(true)
|
||||
clearTimeout(reconnectTimeoutId)
|
||||
resubscribeAll()
|
||||
flushQueuedRequests()
|
||||
event_listeners.get('open')?.forEach(listener => listener(ev))
|
||||
}
|
||||
ws.onmessage = frame => {
|
||||
@@ -205,6 +221,30 @@ function createWebSocket() {
|
||||
send(WebsocketMessage.create({ pingmsg: {} }))
|
||||
}
|
||||
|
||||
function sendRequest(
|
||||
data: CorrelationRequestData,
|
||||
resolve: (r: CorrelationResponse) => void,
|
||||
reject: (e: Error) => void
|
||||
) {
|
||||
const correlationId = ++correlationIdCounter
|
||||
const timeoutId = setTimeout(() => {
|
||||
pending_requests.delete(correlationId)
|
||||
reject(new Error(`Request timeout (id: ${correlationId})`))
|
||||
}, requestTimeoutTime)
|
||||
|
||||
pending_requests.set(correlationId, { resolve, reject, timeoutId })
|
||||
|
||||
const request = CorrelationRequest.create({ correlationId, ...data })
|
||||
send(WebsocketMessage.create({ correlationRequest: request }))
|
||||
}
|
||||
|
||||
function flushQueuedRequests() {
|
||||
for (const [, { data, resolve, reject }] of queued_requests) {
|
||||
sendRequest(data, resolve, reject)
|
||||
}
|
||||
queued_requests.clear()
|
||||
}
|
||||
|
||||
return {
|
||||
subscribe,
|
||||
sendEvent,
|
||||
@@ -237,21 +277,16 @@ function createWebSocket() {
|
||||
},
|
||||
request: (data: CorrelationRequestData): Promise<CorrelationResponse> => {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (!ws || ws.readyState !== WebSocket.OPEN) {
|
||||
reject(new Error('WebSocket not connected'))
|
||||
return
|
||||
if (ws && ws.readyState === WebSocket.OPEN) {
|
||||
sendRequest(data, resolve, reject)
|
||||
} else {
|
||||
const key = getRequestKey(data)
|
||||
const existing = queued_requests.get(key)
|
||||
if (existing) {
|
||||
existing.reject(new Error('Request superseded by newer request'))
|
||||
}
|
||||
queued_requests.set(key, { data, resolve, reject })
|
||||
}
|
||||
|
||||
const correlationId = ++correlationIdCounter
|
||||
const timeoutId = setTimeout(() => {
|
||||
pending_requests.delete(correlationId)
|
||||
reject(new Error(`Request timeout (id: ${correlationId})`))
|
||||
}, requestTimeoutTime)
|
||||
|
||||
pending_requests.set(correlationId, { resolve, reject, timeoutId })
|
||||
|
||||
const request = CorrelationRequest.create({ correlationId, ...data })
|
||||
send(WebsocketMessage.create({ correlationRequest: request }))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user