import { writable } from 'svelte/store' import { encode, decode } from '@msgpack/msgpack' import { WebsocketMessage, type MessageFns, protoMetadata as websocket_md } from '$lib/platform_shared/websocket_message' import * as WebsocketMessages from '$lib/platform_shared/websocket_message' import type { BinaryWriter } from '@bufbuild/protobuf/wire' // -------- START PARSING PROTO DATA -------- // Auto-build reverse mapping from MessageFns to event key and tag const MESSAGE_TYPE_TO_KEY = new Map, string>() const MESSAGE_TYPE_TO_TAG = new Map, number>() // Build the mapping using references from metadata const websocketMessageType = websocket_md.fileDescriptor.messageType?.find( msg => msg.name === 'WebsocketMessage' ) if (websocketMessageType?.field) { for (const field of websocketMessageType.field) { // Look up the MessageFns in references using the typeName if (field.typeName) { const messageFns = websocket_md.references[field.typeName] if (messageFns && field.jsonName && field.number) { MESSAGE_TYPE_TO_KEY.set(messageFns, field.jsonName) MESSAGE_TYPE_TO_TAG.set(messageFns, field.number) } } } } function get_name_from_messagetype(event_type: MessageFns): string { const event = MESSAGE_TYPE_TO_KEY.get(event_type) if (!event) { throw new Error("Event type not found in 'WebsocketMessage'. The MessageFns you passed doesn't correspond to any WebsocketMessage field."); } return event } // Get tag from MessageFns type function get_tag_from_messagetype(event_type: MessageFns): number { const fieldNumber = MESSAGE_TYPE_TO_TAG.get(event_type) if (fieldNumber === undefined) { throw new Error("Tag not found in 'WebsocketMessage'. The MessageFns you passed doesn't correspond to any WebsocketMessage field."); } return fieldNumber } // -------- END PARSING PROTO DATA -------- const socketEvents = ['open', 'close', 'error', 'message', 'unresponsive'] as const type SocketEvent = (typeof socketEvents)[number] type TaggedSocketMessage = [string, WebsocketMessage] const decodeMessage = (data: ArrayBuffer): TaggedSocketMessage => { const decoded = WebsocketMessage.decode(new Uint8Array(data)); const values = Object.entries(decoded).filter(([, value]) => value !== undefined) // Filter all values which are not undefined if (values.length != 1) { throw new Error("Message included either 0 or more than 1 data point") } const [event, value] = values[0] return [event, decoded] } const encodeMessage = (data: WebsocketMessage): Uint8Array => { const encoded = WebsocketMessage.encode(data).finish(); return encoded; } function createWebSocket() { const listeners = new Map void>>() const { subscribe, set } = writable(false) const reconnectTimeoutTime = 5000 let unresponsiveTimeoutId: ReturnType let reconnectTimeoutId: ReturnType let ws: WebSocket let socketUrl: string | URL function init(url: string | URL) { socketUrl = url connect() } function getListeners(event: MessageFns | string): Set<(data?: unknown) => void> { if (typeof event != "string") { // Parse messagefns to string event = get_name_from_messagetype(event) } const event_listeners = listeners.get(event); if (event_listeners == undefined) { return new Set() } return event_listeners; } function disconnect(reason: SocketEvent, event?: Event) { ws.close() set(false) clearTimeout(unresponsiveTimeoutId) clearTimeout(reconnectTimeoutId) listeners.get(reason)?.forEach(listener => listener(event)) reconnectTimeoutId = setTimeout(connect, reconnectTimeoutTime) } function connect() { ws = new WebSocket(socketUrl) ws.binaryType = 'arraybuffer' ws.onopen = ev => { ping() set(true) clearTimeout(reconnectTimeoutId) listeners.get('open')?.forEach(listener => listener(ev)) // TODO: Check if this makes sense? we also call subscribe to event when a new listen calls the "on" function // for (const event of listeners.keys()) { // if (socketEvents.includes(event as SocketEvent)) continue // subscribeToEvent(event) // } } ws.onmessage = frame => { resetUnresponsiveCheck() const [event, message] = decodeMessage(frame.data) if (event) listeners.get(event)?.forEach(listener => listener(message)) } ws.onerror = ev => disconnect('error', ev) ws.onclose = ev => disconnect('close', ev) } function unsubscribe(event_type: MessageFns, listener?: (data: unknown) => void) { const event = get_name_from_messagetype(event_type) const eventListeners = listeners.get(event) if (!eventListeners) return if (!eventListeners.size) { unsubscribeToEvent(event_type) } if (listener) { eventListeners?.delete(listener) } else { listeners.delete(event) } } function resetUnresponsiveCheck() { clearTimeout(unresponsiveTimeoutId) unresponsiveTimeoutId = setTimeout(() => disconnect('unresponsive'), reconnectTimeoutTime) } // T must extend a type of WebsocketMessages function sendEvent(event: MessageFns, data: T) { if (!ws || ws.readyState !== WebSocket.OPEN) return const type = get_name_from_messagetype(event); const wsm = WebsocketMessage.create(); (wsm as any)[type] = data send(wsm) } function unsubscribeToEvent(event_type: MessageFns) { if (!ws || ws.readyState !== WebSocket.OPEN) return const event = get_name_from_messagetype(event_type); const unsub_msg = WebsocketMessages.UnsubscribeNotification.create( {tag: get_tag_from_messagetype(event_type)} ); send(WebsocketMessage.create({unsubNotif: unsub_msg})); } function subscribeToEvent(event_type: MessageFns) { if (!ws || ws.readyState !== WebSocket.OPEN) return const event = get_name_from_messagetype(event_type); const sub_msg = WebsocketMessages.SubscribeNotification.create( {tag: get_tag_from_messagetype(event_type)} ); send(WebsocketMessage.create({subNotif: sub_msg})); } function send(data: WebsocketMessage) { if (!ws || ws.readyState !== WebSocket.OPEN) return const encoded = encodeMessage(data); ws.send(encoded); } function ping() { send(WebsocketMessage.create({pingmsg: {}})) } return { subscribe, sendEvent, init, on: (event_type: MessageFns, listener: (data: T) => void): (() => void) => { const event = get_name_from_messagetype(event_type); let eventListeners = listeners.get(event) if (!eventListeners) { // If this is the first listener to this event, also call subscribe to the server if (!socketEvents.includes(event as SocketEvent)) { subscribeToEvent(event_type) } eventListeners = new Set() listeners.set(event, eventListeners) } eventListeners.add(listener as (data: unknown) => void) return () => { unsubscribe(event_type, listener as (data: unknown) => void) } }, off: (event_type: MessageFns, listener: (data: T) => void) => { unsubscribe(event_type, listener as (data: unknown) => void) } } } export const socket = createWebSocket()