From 2478e9a77b998be9d0655930bab88873a6ec4c15 Mon Sep 17 00:00:00 2001 From: Rune Harlyk Date: Tue, 9 Jul 2024 20:04:12 +0200 Subject: [PATCH] =?UTF-8?q?=F0=9F=AA=84=20Formats=20EventSocket?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- esp32/lib/ESP32-sveltekit/EventSocket.cpp | 103 ++++++++-------------- esp32/lib/ESP32-sveltekit/EventSocket.h | 54 +++++------- 2 files changed, 63 insertions(+), 94 deletions(-) diff --git a/esp32/lib/ESP32-sveltekit/EventSocket.cpp b/esp32/lib/ESP32-sveltekit/EventSocket.cpp index 5a51d30..3c8d335 100644 --- a/esp32/lib/ESP32-sveltekit/EventSocket.cpp +++ b/esp32/lib/ESP32-sveltekit/EventSocket.cpp @@ -3,7 +3,7 @@ SemaphoreHandle_t clientSubscriptionsMutex = xSemaphoreCreateMutex(); message_type_t char_to_message_type(char c) { - switch(c) { + switch (c) { case '0': return CONNECT; case '1': return DISCONNECT; case '2': return EVENT; @@ -14,11 +14,11 @@ message_type_t char_to_message_type(char c) { } } -const char* getEventName(const char* msg) { - const char* start = strchr(msg, '/'); +const char *getEventName(const char *msg) { + const char *start = strchr(msg, '/'); if (!start) return nullptr; start++; - const char* end = strchr(start, '['); + const char *end = strchr(start, '['); if (!end) return start; static char eventName[32]; @@ -28,30 +28,25 @@ const char* getEventName(const char* msg) { return eventName; } -const char* getEventPayload(const char* msg) { - const char* start = strchr(msg + 2, '['); - const char* end = msg + strlen(msg) - 1; +const char *getEventPayload(const char *msg) { + const char *start = strchr(msg + 2, '['); + const char *end = msg + strlen(msg) - 1; if (*start == '[') { start++; } int len = end - start; if (len < 0) return nullptr; - char* payload = new char[len + 1]; + char *payload = new char[len + 1]; strncpy(payload, start, len); payload[len] = '\0'; return payload; } -EventSocket::EventSocket(PsychicHttpServer *server, - SecurityManager *securityManager, - AuthenticationPredicate authenticationPredicate) : _server(server), - _securityManager(securityManager), - _authenticationPredicate(authenticationPredicate) -{ -} +EventSocket::EventSocket(PsychicHttpServer *server, SecurityManager *securityManager, + AuthenticationPredicate authenticationPredicate) + : _server(server), _securityManager(securityManager), _authenticationPredicate(authenticationPredicate) {} -void EventSocket::begin() -{ +void EventSocket::begin() { _socket.setFilter(_securityManager->filterRequest(_authenticationPredicate)); _socket.onOpen((std::bind(&EventSocket::onWSOpen, this, std::placeholders::_1))); _socket.onClose(std::bind(&EventSocket::onWSClose, this, std::placeholders::_1)); @@ -61,33 +56,28 @@ void EventSocket::begin() ESP_LOGV("EventSocket", "Registered event socket endpoint: %s", EVENT_SERVICE_PATH); } -void EventSocket::onWSOpen(PsychicWebSocketClient *client) -{ +void EventSocket::onWSOpen(PsychicWebSocketClient *client) { ESP_LOGI("EventSocket", "ws[%s][%u] connect", client->remoteIP().toString().c_str(), client->socket()); } -void EventSocket::onWSClose(PsychicWebSocketClient *client) -{ - for (auto &event_subscriptions : client_subscriptions) - { +void EventSocket::onWSClose(PsychicWebSocketClient *client) { + for (auto &event_subscriptions : client_subscriptions) { event_subscriptions.second.remove(client->socket()); } ESP_LOGI("EventSocket", "ws[%s][%u] disconnect", client->remoteIP().toString().c_str(), client->socket()); } -esp_err_t EventSocket::onFrame(PsychicWebSocketRequest *request, httpd_ws_frame *frame) -{ +esp_err_t EventSocket::onFrame(PsychicWebSocketRequest *request, httpd_ws_frame *frame) { ESP_LOGV("EventSocket", "ws[%s][%u] opcode[%d]", request->client()->remoteIP().toString().c_str(), request->client()->socket(), frame->type); - if (frame->type != HTTPD_WS_TYPE_TEXT) - { + if (frame->type != HTTPD_WS_TYPE_TEXT) { ESP_LOGE("EventSocket", "Unsupported frame type"); return ESP_OK; } ESP_LOGV("EventSocket", "Received message: %s", (char *)frame->payload); - char* msg = (char *)frame->payload; + char *msg = (char *)frame->payload; message_type_t message_type = char_to_message_type(msg[0]); @@ -100,13 +90,13 @@ esp_err_t EventSocket::onFrame(PsychicWebSocketRequest *request, httpd_ws_frame return ESP_OK; } - const char* event = getEventName(msg); + const char *event = getEventName(msg); if (!event) { ESP_LOGE("EventSocket", "Invalid event name"); return ESP_OK; } - + if (message_type == CONNECT) { ESP_LOGV("EventSocket", "Connect: %s", event); client_subscriptions[event].push_back(request->client()->socket()); @@ -115,7 +105,7 @@ esp_err_t EventSocket::onFrame(PsychicWebSocketRequest *request, httpd_ws_frame ESP_LOGV("EventSocket", "Disconnect: %s", event); client_subscriptions[event].remove(request->client()->socket()); } else if (message_type == EVENT) { - const char* payload = getEventPayload(msg); + const char *payload = getEventPayload(msg); if (!payload) { ESP_LOGE("EventSocket", "Invalid event payload"); return ESP_OK; @@ -133,13 +123,11 @@ esp_err_t EventSocket::onFrame(PsychicWebSocketRequest *request, httpd_ws_frame return ESP_OK; } -void EventSocket::emit(const char *event, const char *payload, const char *originId, bool onlyToSameOrigin) -{ +void EventSocket::emit(const char *event, const char *payload, const char *originId, bool onlyToSameOrigin) { int originSubscriptionId = originId[0] ? atoi(originId) : -1; xSemaphoreTake(clientSubscriptionsMutex, portMAX_DELAY); auto &subscriptions = client_subscriptions[event]; - if (subscriptions.empty()) - { + if (subscriptions.empty()) { xSemaphoreGive(clientSubscriptionsMutex); return; } @@ -147,58 +135,45 @@ void EventSocket::emit(const char *event, const char *payload, const char *origi snprintf(msg, sizeof(msg), "2/%s[%s]", event, payload); // if onlyToSameOrigin == true, send the message back to the origin - if (onlyToSameOrigin && originSubscriptionId > 0) - { + if (onlyToSameOrigin && originSubscriptionId > 0) { auto *client = _socket.getClient(originSubscriptionId); - if (client) - { - ESP_LOGV("EventSocket", "Emitting event: %s to %s, Message: %s", event, client->remoteIP().toString().c_str(), msg); + if (client) { + ESP_LOGV("EventSocket", "Emitting event: %s to %s, Message: %s", event, + client->remoteIP().toString().c_str(), msg); client->sendMessage(msg); } - } - else - { // else send the message to all other clients + } else { // else send the message to all other clients - for (int subscription : client_subscriptions[event]) - { - if (subscription == originSubscriptionId) - continue; + for (int subscription : client_subscriptions[event]) { + if (subscription == originSubscriptionId) continue; auto *client = _socket.getClient(subscription); - if (!client) - { + if (!client) { subscriptions.remove(subscription); continue; } - ESP_LOGV("EventSocket", "Emitting event: %s to %s, Message: %s", event, client->remoteIP().toString().c_str(), msg); + ESP_LOGV("EventSocket", "Emitting event: %s to %s, Message: %s", event, + client->remoteIP().toString().c_str(), msg); client->sendMessage(msg); } } xSemaphoreGive(clientSubscriptionsMutex); } -void EventSocket::handleEventCallbacks(String event, JsonObject &jsonObject, int originId) -{ - for (auto &callback : event_callbacks[event]) - { +void EventSocket::handleEventCallbacks(String event, JsonObject &jsonObject, int originId) { + for (auto &callback : event_callbacks[event]) { callback(jsonObject, originId); } } -void EventSocket::handleSubscribeCallbacks(String event, const String &originId) -{ - for (auto &callback : subscribe_callbacks[event]) - { +void EventSocket::handleSubscribeCallbacks(String event, const String &originId) { + for (auto &callback : subscribe_callbacks[event]) { callback(originId, true); } } -void EventSocket::onEvent(String event, EventCallback callback) -{ - event_callbacks[event].push_back(callback); -} +void EventSocket::onEvent(String event, EventCallback callback) { event_callbacks[event].push_back(callback); } -void EventSocket::onSubscribe(String event, SubscribeCallback callback) -{ +void EventSocket::onSubscribe(String event, SubscribeCallback callback) { subscribe_callbacks[event].push_back(callback); ESP_LOGD("EventSocket", "onSubscribe for event: %s", event.c_str()); } \ No newline at end of file diff --git a/esp32/lib/ESP32-sveltekit/EventSocket.h b/esp32/lib/ESP32-sveltekit/EventSocket.h index ca6ec70..4b00783 100644 --- a/esp32/lib/ESP32-sveltekit/EventSocket.h +++ b/esp32/lib/ESP32-sveltekit/EventSocket.h @@ -10,47 +10,41 @@ #define EVENT_SERVICE_PATH "/ws/events" -enum message_type_t { - CONNECT = 0, - DISCONNECT = 1, - EVENT = 2, - PING = 3, - PONG = 4, - BINARY_EVENT = 5 -}; +enum message_type_t { CONNECT = 0, DISCONNECT = 1, EVENT = 2, PING = 3, PONG = 4, BINARY_EVENT = 5 }; typedef std::function EventCallback; typedef std::function SubscribeCallback; -class EventSocket -{ -public: - EventSocket(PsychicHttpServer *server, SecurityManager *_securityManager, AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_AUTHENTICATED); +class EventSocket { + public: + EventSocket(PsychicHttpServer *server, SecurityManager *_securityManager, + AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_AUTHENTICATED); - void begin(); + void begin(); - void onEvent(String event, EventCallback callback); + void onEvent(String event, EventCallback callback); - void onSubscribe(String event, SubscribeCallback callback); + void onSubscribe(String event, SubscribeCallback callback); - void emit(const char *event, const char *payload, const char *originId = "", bool onlyToSameOrigin = false); - // if onlyToSameOrigin == true, the message will be sent to the originId only, otherwise it will be broadcasted to all clients except the originId + void emit(const char *event, const char *payload, const char *originId = "", bool onlyToSameOrigin = false); + // if onlyToSameOrigin == true, the message will be sent to the originId only, otherwise it will be broadcasted to + // all clients except the originId -private: - PsychicHttpServer *_server; - PsychicWebSocketHandler _socket; - SecurityManager *_securityManager; - AuthenticationPredicate _authenticationPredicate; + private: + PsychicHttpServer *_server; + PsychicWebSocketHandler _socket; + SecurityManager *_securityManager; + AuthenticationPredicate _authenticationPredicate; - std::map> client_subscriptions; - std::map> event_callbacks; - std::map> subscribe_callbacks; - void handleEventCallbacks(String event, JsonObject &jsonObject, int originId); - void handleSubscribeCallbacks(String event, const String &originId); + std::map> client_subscriptions; + std::map> event_callbacks; + std::map> subscribe_callbacks; + void handleEventCallbacks(String event, JsonObject &jsonObject, int originId); + void handleSubscribeCallbacks(String event, const String &originId); - void onWSOpen(PsychicWebSocketClient *client); - void onWSClose(PsychicWebSocketClient *client); - esp_err_t onFrame(PsychicWebSocketRequest *request, httpd_ws_frame *frame); + void onWSOpen(PsychicWebSocketClient *client); + void onWSClose(PsychicWebSocketClient *client); + esp_err_t onFrame(PsychicWebSocketRequest *request, httpd_ws_frame *frame); }; #endif