✨ Adds promise based request reponse system
This commit is contained in:
@@ -1,8 +1,10 @@
|
||||
import { writable } from 'svelte/store'
|
||||
import {
|
||||
WebsocketMessage,
|
||||
type MessageFns,
|
||||
protoMetadata as websocket_md
|
||||
CorrelationRequest,
|
||||
CorrelationResponse,
|
||||
protoMetadata as websocket_md,
|
||||
type MessageFns
|
||||
} from '$lib/platform_shared/websocket_message'
|
||||
import * as WebsocketMessages from '$lib/platform_shared/websocket_message'
|
||||
|
||||
@@ -11,6 +13,13 @@ export const MESSAGE_TYPE_TO_TAG = new Map<MessageFns<unknown>, number>()
|
||||
export const MESSAGE_KEY_TO_TAG = new Map<string, number>()
|
||||
export const MESSAGE_TAG_TO_KEY = new Map<number, string>()
|
||||
|
||||
type CorrelationRequestData = Omit<CorrelationRequest, 'correlationId'>
|
||||
type PendingRequest = {
|
||||
resolve: (response: CorrelationResponse) => void
|
||||
reject: (error: Error) => void
|
||||
timeoutId: ReturnType<typeof setTimeout>
|
||||
}
|
||||
|
||||
const websocketMessageType = websocket_md.fileDescriptor.messageType?.find(
|
||||
(msg: { name: string }) => msg.name === 'WebsocketMessage'
|
||||
)
|
||||
@@ -75,8 +84,11 @@ export const encodeMessage = (data: WebsocketMessage): Uint8Array<ArrayBuffer> =
|
||||
function createWebSocket() {
|
||||
const message_listeners = new Map<number, Set<(data?: unknown) => void>>()
|
||||
const event_listeners = new Map<string, Set<(data?: unknown) => void>>()
|
||||
const pending_requests = new Map<number, PendingRequest>()
|
||||
const { subscribe, set } = writable(false)
|
||||
const reconnectTimeoutTime = 500000
|
||||
const requestTimeoutTime = 10000
|
||||
let correlationIdCounter = 0
|
||||
let unresponsiveTimeoutId: ReturnType<typeof setTimeout>
|
||||
let reconnectTimeoutId: ReturnType<typeof setTimeout>
|
||||
let ws: WebSocket
|
||||
@@ -109,9 +121,20 @@ function createWebSocket() {
|
||||
ws.onmessage = frame => {
|
||||
resetUnresponsiveCheck()
|
||||
const { tag, msg } = decodeMessage(frame.data)
|
||||
if (msg.correlationResponse) {
|
||||
const pending = pending_requests.get(msg.correlationResponse.correlationId)
|
||||
if (pending) {
|
||||
clearTimeout(pending.timeoutId)
|
||||
pending_requests.delete(msg.correlationResponse.correlationId)
|
||||
pending.resolve(msg.correlationResponse)
|
||||
}
|
||||
return
|
||||
}
|
||||
if (tag) {
|
||||
const key = MESSAGE_TAG_TO_KEY.get(tag)!
|
||||
message_listeners.get(tag)?.forEach(listener => listener(msg[key as keyof typeof msg]))
|
||||
message_listeners
|
||||
.get(tag)
|
||||
?.forEach(listener => listener(msg[key as keyof typeof msg]))
|
||||
}
|
||||
}
|
||||
ws.onerror = ev => disconnect('error', ev)
|
||||
@@ -211,6 +234,25 @@ function createWebSocket() {
|
||||
return () => {
|
||||
unsubscribe_event(event_type, listener)
|
||||
}
|
||||
},
|
||||
request: (data: CorrelationRequestData): Promise<CorrelationResponse> => {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (!ws || ws.readyState !== WebSocket.OPEN) {
|
||||
reject(new Error('WebSocket not connected'))
|
||||
return
|
||||
}
|
||||
|
||||
const correlationId = ++correlationIdCounter
|
||||
const timeoutId = setTimeout(() => {
|
||||
pending_requests.delete(correlationId)
|
||||
reject(new Error(`Request timeout (id: ${correlationId})`))
|
||||
}, requestTimeoutTime)
|
||||
|
||||
pending_requests.set(correlationId, { resolve, reject, timeoutId })
|
||||
|
||||
const request = CorrelationRequest.create({ correlationId, ...data })
|
||||
send(WebsocketMessage.create({ correlationRequest: request }))
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user