🪄 Updates eventsocket protocol

This commit is contained in:
Rune Harlyk
2024-06-18 16:03:07 +02:00
committed by Rune Harlyk
parent e4ea3992b3
commit 283c420f98
4 changed files with 1443 additions and 1367 deletions
+22 -17
View File
@@ -34,26 +34,26 @@ function createWebSocket() {
listeners.get('open')?.forEach((listener) => listener(ev)); listeners.get('open')?.forEach((listener) => listener(ev));
for (const event of listeners.keys()) { for (const event of listeners.keys()) {
if (socketEvents.includes(event as SocketEvent)) continue; if (socketEvents.includes(event as SocketEvent)) continue;
sendEvent('subscribe', event); subscribeToEvent(event);
} }
}; };
ws.onmessage = (message) => { ws.onmessage = (message) => {
resetUnresponsiveCheck(); resetUnresponsiveCheck();
let data = message.data; let data = message.data;
if (data instanceof ArrayBuffer) { if (data instanceof ArrayBuffer) {
listeners.get('binary')?.forEach((listener) => listener(data)); listeners.get('binary')?.forEach((listener) => listener(data));
return; return;
} }
listeners.get('message')?.forEach((listener) => listener(data)); data = data.substring(1);
if (!data) return;
let event = data.substring(data.indexOf('/') + 1, data.indexOf('['));
let payload = data.substring(data.indexOf('[') + 1, data.lastIndexOf(']'));
try { try {
data = JSON.parse(message.data); payload = JSON.parse(payload);
} catch (error) { } catch (error) {}
listeners.get('error')?.forEach((listener) => listener(error));
return;
}
listeners.get('json')?.forEach((listener) => listener(data));
const [event, payload] = data;
if (event) listeners.get(event)?.forEach((listener) => listener(payload)); if (event) listeners.get(event)?.forEach((listener) => listener(payload));
}; };
ws.onerror = (ev) => disconnect('error', ev); ws.onerror = (ev) => disconnect('error', ev);
@@ -65,7 +65,7 @@ function createWebSocket() {
if (!eventListeners) return; if (!eventListeners) return;
if (!eventListeners.size) { if (!eventListeners.size) {
sendEvent('unsubscribe', event); unsubscribeToEvent(event);
} }
if (listener) { if (listener) {
eventListeners?.delete(listener); eventListeners?.delete(listener);
@@ -79,25 +79,30 @@ function createWebSocket() {
unresponsiveTimeoutId = setTimeout(() => disconnect('unresponsive'), reconnectTimeoutTime); unresponsiveTimeoutId = setTimeout(() => disconnect('unresponsive'), reconnectTimeoutTime);
} }
function send(msg: unknown) { function sendEvent(event: string, data: unknown) {
if (!ws || ws.readyState !== WebSocket.OPEN) return; if (!ws || ws.readyState !== WebSocket.OPEN) return;
ws.send(JSON.stringify(msg)); ws.send(`2/${event}[${JSON.stringify(data)}]`);
} }
function sendEvent(event: string, data: unknown) { function unsubscribeToEvent(event: string) {
send({ event, data }); if (!ws || ws.readyState !== WebSocket.OPEN) return;
ws.send('1/' + event);
}
function subscribeToEvent(event: string) {
if (!ws || ws.readyState !== WebSocket.OPEN) return;
ws.send('0/' + event);
} }
return { return {
subscribe, subscribe,
send,
sendEvent, sendEvent,
init, init,
on: <T>(event: string, listener: (data: T) => void): (() => void) => { on: <T>(event: string, listener: (data: T) => void): (() => void) => {
let eventListeners = listeners.get(event); let eventListeners = listeners.get(event);
if (!eventListeners) { if (!eventListeners) {
if (!socketEvents.includes(event as SocketEvent)) { if (!socketEvents.includes(event as SocketEvent)) {
sendEvent('subscribe', event); subscribeToEvent(event);
} }
eventListeners = new Set(); eventListeners = new Set();
listeners.set(event, eventListeners); listeners.set(event, eventListeners);
+82 -21
View File
@@ -2,6 +2,49 @@
SemaphoreHandle_t clientSubscriptionsMutex = xSemaphoreCreateMutex(); SemaphoreHandle_t clientSubscriptionsMutex = xSemaphoreCreateMutex();
message_type_t char_to_message_type(char c) {
switch(c) {
case '0': return CONNECT;
case '1': return DISCONNECT;
case '2': return EVENT;
case '3': return PING;
case '4': return PONG;
case '5': return BINARY_EVENT;
default: throw std::invalid_argument("Invalid message type");
}
}
const char* getEventName(const char* msg) {
const char* start = strchr(msg, '/');
if (!start) return nullptr;
start++;
const char* end = strchr(start, '[');
if (!end) return start;
static char eventName[32];
int len = end - start;
strncpy(eventName, start, len);
eventName[len] = '\0';
return eventName;
}
const char* getEventPayload(const char* msg) {
const char* start = strchr(msg + 4, '\"') - 1;
const char* end = msg + strlen(msg) - 1;
if (*start == '\"') {
start++;
}
if (*(end - 1) == '\"') {
end--;
}
int len = end - start;
if (len < 0) return nullptr;
char* payload = new char[len + 1];
strncpy(payload, start, len);
payload[len] = '\0';
return payload;
}
EventSocket::EventSocket(PsychicHttpServer *server, EventSocket::EventSocket(PsychicHttpServer *server,
SecurityManager *securityManager, SecurityManager *securityManager,
AuthenticationPredicate authenticationPredicate) : _server(server), AuthenticationPredicate authenticationPredicate) : _server(server),
@@ -42,30 +85,48 @@ esp_err_t EventSocket::onFrame(PsychicWebSocketRequest *request, httpd_ws_frame
if (frame->type == HTTPD_WS_TYPE_TEXT) if (frame->type == HTTPD_WS_TYPE_TEXT)
{ {
ESP_LOGV("EventSocket", "ws[%s][%u] request: %s", request->client()->remoteIP().toString().c_str(), ESP_LOGV("EventSocket", "Received message: %s", (char *)frame->payload);
request->client()->socket(), (char *)frame->payload); char* msg = (char *)frame->payload;
message_type_t message_type = char_to_message_type(msg[0]);
if (message_type == PING) {
ESP_LOGV("EventSocket", "Ping");
request->client()->sendMessage("3");
return ESP_OK;
} else if (message_type == PONG) {
ESP_LOGV("EventSocket", "Pong");
return ESP_OK;
}
const char* event = getEventName(msg);
if (!event) {
ESP_LOGE("EventSocket", "Invalid event name");
return ESP_OK;
}
if (message_type == CONNECT) {
ESP_LOGV("EventSocket", "Connect: %s", event);
client_subscriptions[event].push_back(request->client()->socket());
handleSubscribeCallbacks(event, String(request->client()->socket()));
} else if (message_type == DISCONNECT) {
ESP_LOGV("EventSocket", "Disconnect: %s", event);
client_subscriptions[event].remove(request->client()->socket());
} else if (message_type == EVENT) {
const char* payload = getEventPayload(msg);
if (!payload) {
ESP_LOGE("EventSocket", "Invalid event payload");
return ESP_OK;
}
JsonDocument doc; JsonDocument doc;
DeserializationError error = deserializeJson(doc, (char *)frame->payload, frame->len); DeserializationError error = deserializeJson(doc, payload);
if (error) {
if (!error && doc.is<JsonObject>()) ESP_LOGE("EventSocket", "Failed to parse JSON payload");
{ return ESP_OK;
String event = doc["event"];
if (event == "subscribe")
{
// only subscribe to events that are registered
client_subscriptions[doc["data"]].push_back(request->client()->socket());
handleSubscribeCallbacks(doc["data"], String(request->client()->socket()));
} }
else if (event == "unsubscribe") JsonObject jsonObject = doc.as<JsonObject>();
{
client_subscriptions[doc["data"]].remove(request->client()->socket());
}
else
{
JsonObject jsonObject = doc["data"].as<JsonObject>();
handleEventCallbacks(event, jsonObject, request->client()->socket()); handleEventCallbacks(event, jsonObject, request->client()->socket());
}
return ESP_OK; return ESP_OK;
} }
} }
@@ -83,7 +144,7 @@ void EventSocket::emit(const char *event, const char *payload, const char *origi
return; return;
} }
char msg[strlen(event) + strlen(payload) + 10]; char msg[strlen(event) + strlen(payload) + 10];
snprintf(msg, sizeof(msg), "[\"%s\",%s]", event, payload); snprintf(msg, sizeof(msg), "2/%s[%s]", event, payload);
// if onlyToSameOrigin == true, send the message back to the origin // if onlyToSameOrigin == true, send the message back to the origin
if (onlyToSameOrigin && originSubscriptionId > 0) if (onlyToSameOrigin && originSubscriptionId > 0)
+9
View File
@@ -10,6 +10,15 @@
#define EVENT_SERVICE_PATH "/ws/events" #define EVENT_SERVICE_PATH "/ws/events"
enum message_type_t {
CONNECT = 0,
DISCONNECT = 1,
EVENT = 2,
PING = 3,
PONG = 4,
BINARY_EVENT = 5
};
typedef std::function<void(JsonObject &root, int originId)> EventCallback; typedef std::function<void(JsonObject &root, int originId)> EventCallback;
typedef std::function<void(const String &originId, bool sync)> SubscribeCallback; typedef std::function<void(const String &originId, bool sync)> SubscribeCallback;
File diff suppressed because it is too large Load Diff