Prettified listeners -> split event and msg events + unit testing

This commit is contained in:
Niklas Jensen
2026-01-01 16:35:08 +01:00
committed by nikguin04
parent 73aa38951d
commit 4633d2eb09
2 changed files with 62 additions and 43 deletions
+48 -40
View File
@@ -7,12 +7,13 @@ import type { BinaryWriter } from '@bufbuild/protobuf/wire'
// -------- START PARSING PROTO DATA -------- // -------- START PARSING PROTO DATA --------
// Auto-build reverse mapping from MessageFns to event key and tag // Auto-build reverse mapping from MessageFns to event key and tag
const MESSAGE_TYPE_TO_KEY = new Map<MessageFns<any>, string>() export const MESSAGE_TYPE_TO_KEY = new Map<MessageFns<any>, string>()
const MESSAGE_TYPE_TO_TAG = new Map<MessageFns<any>, number>() export const MESSAGE_TYPE_TO_TAG = new Map<MessageFns<any>, number>()
export const MESSAGE_KEY_TO_TAG = new Map<string, number>()
// Build the mapping using references from metadata // Build the mapping using references from metadata
const websocketMessageType = websocket_md.fileDescriptor.messageType?.find( const websocketMessageType = websocket_md.fileDescriptor.messageType?.find(
msg => msg.name === 'WebsocketMessage' ( msg: { name: string } ) => msg.name === 'WebsocketMessage'
) )
if (websocketMessageType?.field) { if (websocketMessageType?.field) {
@@ -23,6 +24,7 @@ if (websocketMessageType?.field) {
if (messageFns && field.jsonName && field.number) { if (messageFns && field.jsonName && field.number) {
MESSAGE_TYPE_TO_KEY.set(messageFns, field.jsonName) MESSAGE_TYPE_TO_KEY.set(messageFns, field.jsonName)
MESSAGE_TYPE_TO_TAG.set(messageFns, field.number) MESSAGE_TYPE_TO_TAG.set(messageFns, field.number)
MESSAGE_KEY_TO_TAG.set(field.jsonName, field.number)
} }
} }
} }
@@ -50,29 +52,34 @@ function get_tag_from_messagetype(event_type: MessageFns<any>): number {
const socketEvents = ['open', 'close', 'error', 'message', 'unresponsive'] as const const socketEvents = ['open', 'close', 'error', 'message', 'unresponsive'] as const
type SocketEvent = (typeof socketEvents)[number] type SocketEvent = (typeof socketEvents)[number]
type TaggedSocketMessage = [string, WebsocketMessage] type TaggedSocketMessage = {"tag": number, "msg": WebsocketMessage}
// Only exported for socket test
const decodeMessage = (data: ArrayBuffer): TaggedSocketMessage => { export const decodeMessage = (data: ArrayBuffer): TaggedSocketMessage => {
const decoded = WebsocketMessage.decode(new Uint8Array(data)); const decoded = WebsocketMessage.decode(new Uint8Array(data));
const values = Object.entries(decoded).filter(([, value]) => value !== undefined) // Filter all values which are not undefined const values = Object.entries(decoded).filter(([, value]) => value !== undefined) // Filter all values which are not undefined
if (values.length != 1) { if (values.length != 1) {
throw new Error("Message included either 0 or more than 1 data point") throw new Error("Message included either 0 or more than 1 data point")
} }
const [event, value] = values[0] const fieldName = values[0][0]
return [event, decoded] const tag = MESSAGE_KEY_TO_TAG.get(fieldName)
if (tag === undefined) {
throw new Error(`Tag not found for field: ${fieldName}`)
}
return {"tag": tag, "msg": decoded}
} }
const encodeMessage = (data: WebsocketMessage): Uint8Array<ArrayBuffer> => { export const encodeMessage = (data: WebsocketMessage): Uint8Array<ArrayBuffer> => {
const encoded = WebsocketMessage.encode(data).finish(); const encoded = WebsocketMessage.encode(data).finish();
return encoded; return encoded;
} }
function createWebSocket() { function createWebSocket() {
const listeners = new Map<string, Set<(data?: unknown) => void>>() const message_listeners = new Map<number, Set<(data?: unknown) => void>>()
const event_listeners = new Map<string, Set<(data?: unknown) => void>>()
const { subscribe, set } = writable(false) const { subscribe, set } = writable(false)
const reconnectTimeoutTime = 5000 const reconnectTimeoutTime = 5000
let unresponsiveTimeoutId: ReturnType<typeof setTimeout> let unresponsiveTimeoutId: ReturnType<typeof setTimeout>
@@ -85,16 +92,22 @@ function createWebSocket() {
connect() connect()
} }
function getListeners<MT>(event: MessageFns<MT> | string): Set<(data?: unknown) => void> { function getMsgListeners<MT>(event_type: MessageFns<MT>): Set<(data?: unknown) => void> {
if (typeof event != "string") { // Parse messagefns to string const type_tag = get_tag_from_messagetype(event_type)
event = get_name_from_messagetype(event)
}
const event_listeners = listeners.get(event); const type_listeners = message_listeners.get(type_tag);
if (event_listeners == undefined) { if (type_listeners == undefined) {
return new Set() return new Set()
} }
return event_listeners; return type_listeners;
}
function getListeners<MT>(event: string): Set<(data?: unknown) => void> {
const event_listeners_forevent = event_listeners.get(event);
if (event_listeners_forevent == undefined) {
return new Set()
}
return event_listeners_forevent;
} }
function disconnect(reason: SocketEvent, event?: Event) { function disconnect(reason: SocketEvent, event?: Event) {
@@ -102,7 +115,7 @@ function createWebSocket() {
set(false) set(false)
clearTimeout(unresponsiveTimeoutId) clearTimeout(unresponsiveTimeoutId)
clearTimeout(reconnectTimeoutId) clearTimeout(reconnectTimeoutId)
listeners.get(reason)?.forEach(listener => listener(event)) event_listeners.get(reason)?.forEach(listener => listener(event))
reconnectTimeoutId = setTimeout(connect, reconnectTimeoutTime) reconnectTimeoutId = setTimeout(connect, reconnectTimeoutTime)
} }
@@ -113,7 +126,7 @@ function createWebSocket() {
ping() ping()
set(true) set(true)
clearTimeout(reconnectTimeoutId) clearTimeout(reconnectTimeoutId)
listeners.get('open')?.forEach(listener => listener(ev)) event_listeners.get('open')?.forEach(listener => listener(ev))
// TODO: Check if this makes sense? we also call subscribe to event when a new listen calls the "on" function // TODO: Check if this makes sense? we also call subscribe to event when a new listen calls the "on" function
// for (const event of listeners.keys()) { // for (const event of listeners.keys()) {
// if (socketEvents.includes(event as SocketEvent)) continue // if (socketEvents.includes(event as SocketEvent)) continue
@@ -122,26 +135,24 @@ function createWebSocket() {
} }
ws.onmessage = frame => { ws.onmessage = frame => {
resetUnresponsiveCheck() resetUnresponsiveCheck()
const [event, message] = decodeMessage(frame.data) const {tag, msg} = decodeMessage(frame.data)
if (event) listeners.get(event)?.forEach(listener => listener(message)) if (tag) message_listeners.get(tag)?.forEach(listener => listener(msg))
} }
ws.onerror = ev => disconnect('error', ev) ws.onerror = ev => disconnect('error', ev)
ws.onclose = ev => disconnect('close', ev) ws.onclose = ev => disconnect('close', ev)
} }
function unsubscribe(event_type: MessageFns<any>, listener?: (data: unknown) => void) { function unsubscribe(event_type: MessageFns<any>, listener: (data: unknown) => void) {
const event = get_name_from_messagetype(event_type) const tag = get_tag_from_messagetype(event_type)
const eventListeners = listeners.get(event) const message_listeners_totag = message_listeners.get(tag)
if (!eventListeners) return if (!message_listeners_totag) return
if (!eventListeners.size) { // TODO: This looks like it deletes an individual listener, but unsubscribe unsubscribes for everyone. Not sure what it is supposed to do right now
message_listeners_totag?.delete(listener)
if (message_listeners_totag.size == 0) { // No more listeners, so we can unsubscribe
unsubscribeToEvent(event_type) unsubscribeToEvent(event_type)
} }
if (listener) {
eventListeners?.delete(listener)
} else {
listeners.delete(event)
}
} }
function resetUnresponsiveCheck() { function resetUnresponsiveCheck() {
@@ -191,18 +202,15 @@ function createWebSocket() {
sendEvent, sendEvent,
init, init,
on: <MT, T>(event_type: MessageFns<MT>, listener: (data: T) => void): (() => void) => { on: <MT, T>(event_type: MessageFns<MT>, listener: (data: T) => void): (() => void) => {
const event = get_name_from_messagetype(event_type); const tag = get_tag_from_messagetype(event_type);
let eventListeners = listeners.get(event) let message_listeners_totag = message_listeners.get(tag)
if (!eventListeners) { if (!message_listeners_totag) {
// If this is the first listener to this event, also call subscribe to the server // If this is the first listener to this event, also call subscribe to the server
if (!socketEvents.includes(event as SocketEvent)) { message_listeners_totag = new Set()
subscribeToEvent(event_type) message_listeners.set(tag, message_listeners_totag)
} }
eventListeners = new Set() message_listeners_totag.add(listener as (data: unknown) => void)
listeners.set(event, eventListeners)
}
eventListeners.add(listener as (data: unknown) => void)
return () => { return () => {
unsubscribe(event_type, listener as (data: unknown) => void) unsubscribe(event_type, listener as (data: unknown) => void)
+13 -2
View File
@@ -1,7 +1,7 @@
import { describe, it, expect, beforeEach, afterEach } from 'vitest' import { describe, it, expect, beforeEach, afterEach } from 'vitest'
import { WebSocketServer } from 'ws' import { WebSocketServer } from 'ws'
import { socket } from '../../src/lib/stores/socket' import { decodeMessage, MESSAGE_KEY_TO_TAG, socket } from '../../src/lib/stores/socket'
import { IMUData, RSSIData, WebsocketMessage } from '../../src/lib/platform_shared/websocket_message' import { IMUData, PingMsg, PongMsg, RSSIData, WebsocketMessage, protoMetadata as websocket_md } from '../../src/lib/platform_shared/websocket_message'
// Helper function to create encoded WebSocket messages // Helper function to create encoded WebSocket messages
function createEncodedMessage(messageType: 'imu' | 'rssi' | 'mode', data: any): Uint8Array { function createEncodedMessage(messageType: 'imu' | 'rssi' | 'mode', data: any): Uint8Array {
@@ -207,6 +207,17 @@ describe('WebsocketMessage Protobuf Encoding/Decoding', () => {
expect(decoded.temp).toBe(imuData.temp) expect(decoded.temp).toBe(imuData.temp)
}) })
it('should encode and decode two empty types correctly', () => {
const encoded_ping = WebsocketMessage.encode(WebsocketMessage.create({ pingmsg: PingMsg.create() })).finish()
const decoded_ping = decodeMessage(encoded_ping.buffer)
expect(decoded_ping.tag).toBe(MESSAGE_KEY_TO_TAG.get("pingmsg"))
const encoded_pong = WebsocketMessage.encode(WebsocketMessage.create({ pongmsg: PongMsg.create() })).finish()
const decoded_pong = decodeMessage(encoded_pong.buffer)
expect(decoded_pong.tag).toBe(MESSAGE_KEY_TO_TAG.get("pongmsg"))
})
it('should encode and decode complete WebsocketMessage', () => { it('should encode and decode complete WebsocketMessage', () => {