🪄 Formats EventSocket

This commit is contained in:
Rune Harlyk
2024-07-09 20:04:12 +02:00
committed by Rune Harlyk
parent 3c8775de3d
commit 2478e9a77b
2 changed files with 63 additions and 94 deletions
+38 -63
View File
@@ -3,7 +3,7 @@
SemaphoreHandle_t clientSubscriptionsMutex = xSemaphoreCreateMutex(); SemaphoreHandle_t clientSubscriptionsMutex = xSemaphoreCreateMutex();
message_type_t char_to_message_type(char c) { message_type_t char_to_message_type(char c) {
switch(c) { switch (c) {
case '0': return CONNECT; case '0': return CONNECT;
case '1': return DISCONNECT; case '1': return DISCONNECT;
case '2': return EVENT; case '2': return EVENT;
@@ -14,11 +14,11 @@ message_type_t char_to_message_type(char c) {
} }
} }
const char* getEventName(const char* msg) { const char *getEventName(const char *msg) {
const char* start = strchr(msg, '/'); const char *start = strchr(msg, '/');
if (!start) return nullptr; if (!start) return nullptr;
start++; start++;
const char* end = strchr(start, '['); const char *end = strchr(start, '[');
if (!end) return start; if (!end) return start;
static char eventName[32]; static char eventName[32];
@@ -28,30 +28,25 @@ const char* getEventName(const char* msg) {
return eventName; return eventName;
} }
const char* getEventPayload(const char* msg) { const char *getEventPayload(const char *msg) {
const char* start = strchr(msg + 2, '['); const char *start = strchr(msg + 2, '[');
const char* end = msg + strlen(msg) - 1; const char *end = msg + strlen(msg) - 1;
if (*start == '[') { if (*start == '[') {
start++; start++;
} }
int len = end - start; int len = end - start;
if (len < 0) return nullptr; if (len < 0) return nullptr;
char* payload = new char[len + 1]; char *payload = new char[len + 1];
strncpy(payload, start, len); strncpy(payload, start, len);
payload[len] = '\0'; payload[len] = '\0';
return payload; return payload;
} }
EventSocket::EventSocket(PsychicHttpServer *server, EventSocket::EventSocket(PsychicHttpServer *server, SecurityManager *securityManager,
SecurityManager *securityManager, AuthenticationPredicate authenticationPredicate)
AuthenticationPredicate authenticationPredicate) : _server(server), : _server(server), _securityManager(securityManager), _authenticationPredicate(authenticationPredicate) {}
_securityManager(securityManager),
_authenticationPredicate(authenticationPredicate)
{
}
void EventSocket::begin() void EventSocket::begin() {
{
_socket.setFilter(_securityManager->filterRequest(_authenticationPredicate)); _socket.setFilter(_securityManager->filterRequest(_authenticationPredicate));
_socket.onOpen((std::bind(&EventSocket::onWSOpen, this, std::placeholders::_1))); _socket.onOpen((std::bind(&EventSocket::onWSOpen, this, std::placeholders::_1)));
_socket.onClose(std::bind(&EventSocket::onWSClose, this, std::placeholders::_1)); _socket.onClose(std::bind(&EventSocket::onWSClose, this, std::placeholders::_1));
@@ -61,33 +56,28 @@ void EventSocket::begin()
ESP_LOGV("EventSocket", "Registered event socket endpoint: %s", EVENT_SERVICE_PATH); ESP_LOGV("EventSocket", "Registered event socket endpoint: %s", EVENT_SERVICE_PATH);
} }
void EventSocket::onWSOpen(PsychicWebSocketClient *client) void EventSocket::onWSOpen(PsychicWebSocketClient *client) {
{
ESP_LOGI("EventSocket", "ws[%s][%u] connect", client->remoteIP().toString().c_str(), client->socket()); ESP_LOGI("EventSocket", "ws[%s][%u] connect", client->remoteIP().toString().c_str(), client->socket());
} }
void EventSocket::onWSClose(PsychicWebSocketClient *client) void EventSocket::onWSClose(PsychicWebSocketClient *client) {
{ for (auto &event_subscriptions : client_subscriptions) {
for (auto &event_subscriptions : client_subscriptions)
{
event_subscriptions.second.remove(client->socket()); event_subscriptions.second.remove(client->socket());
} }
ESP_LOGI("EventSocket", "ws[%s][%u] disconnect", client->remoteIP().toString().c_str(), client->socket()); ESP_LOGI("EventSocket", "ws[%s][%u] disconnect", client->remoteIP().toString().c_str(), client->socket());
} }
esp_err_t EventSocket::onFrame(PsychicWebSocketRequest *request, httpd_ws_frame *frame) esp_err_t EventSocket::onFrame(PsychicWebSocketRequest *request, httpd_ws_frame *frame) {
{
ESP_LOGV("EventSocket", "ws[%s][%u] opcode[%d]", request->client()->remoteIP().toString().c_str(), ESP_LOGV("EventSocket", "ws[%s][%u] opcode[%d]", request->client()->remoteIP().toString().c_str(),
request->client()->socket(), frame->type); request->client()->socket(), frame->type);
if (frame->type != HTTPD_WS_TYPE_TEXT) if (frame->type != HTTPD_WS_TYPE_TEXT) {
{
ESP_LOGE("EventSocket", "Unsupported frame type"); ESP_LOGE("EventSocket", "Unsupported frame type");
return ESP_OK; return ESP_OK;
} }
ESP_LOGV("EventSocket", "Received message: %s", (char *)frame->payload); ESP_LOGV("EventSocket", "Received message: %s", (char *)frame->payload);
char* msg = (char *)frame->payload; char *msg = (char *)frame->payload;
message_type_t message_type = char_to_message_type(msg[0]); message_type_t message_type = char_to_message_type(msg[0]);
@@ -100,7 +90,7 @@ esp_err_t EventSocket::onFrame(PsychicWebSocketRequest *request, httpd_ws_frame
return ESP_OK; return ESP_OK;
} }
const char* event = getEventName(msg); const char *event = getEventName(msg);
if (!event) { if (!event) {
ESP_LOGE("EventSocket", "Invalid event name"); ESP_LOGE("EventSocket", "Invalid event name");
@@ -115,7 +105,7 @@ esp_err_t EventSocket::onFrame(PsychicWebSocketRequest *request, httpd_ws_frame
ESP_LOGV("EventSocket", "Disconnect: %s", event); ESP_LOGV("EventSocket", "Disconnect: %s", event);
client_subscriptions[event].remove(request->client()->socket()); client_subscriptions[event].remove(request->client()->socket());
} else if (message_type == EVENT) { } else if (message_type == EVENT) {
const char* payload = getEventPayload(msg); const char *payload = getEventPayload(msg);
if (!payload) { if (!payload) {
ESP_LOGE("EventSocket", "Invalid event payload"); ESP_LOGE("EventSocket", "Invalid event payload");
return ESP_OK; return ESP_OK;
@@ -133,13 +123,11 @@ esp_err_t EventSocket::onFrame(PsychicWebSocketRequest *request, httpd_ws_frame
return ESP_OK; return ESP_OK;
} }
void EventSocket::emit(const char *event, const char *payload, const char *originId, bool onlyToSameOrigin) void EventSocket::emit(const char *event, const char *payload, const char *originId, bool onlyToSameOrigin) {
{
int originSubscriptionId = originId[0] ? atoi(originId) : -1; int originSubscriptionId = originId[0] ? atoi(originId) : -1;
xSemaphoreTake(clientSubscriptionsMutex, portMAX_DELAY); xSemaphoreTake(clientSubscriptionsMutex, portMAX_DELAY);
auto &subscriptions = client_subscriptions[event]; auto &subscriptions = client_subscriptions[event];
if (subscriptions.empty()) if (subscriptions.empty()) {
{
xSemaphoreGive(clientSubscriptionsMutex); xSemaphoreGive(clientSubscriptionsMutex);
return; return;
} }
@@ -147,58 +135,45 @@ void EventSocket::emit(const char *event, const char *payload, const char *origi
snprintf(msg, sizeof(msg), "2/%s[%s]", event, payload); snprintf(msg, sizeof(msg), "2/%s[%s]", event, payload);
// if onlyToSameOrigin == true, send the message back to the origin // if onlyToSameOrigin == true, send the message back to the origin
if (onlyToSameOrigin && originSubscriptionId > 0) if (onlyToSameOrigin && originSubscriptionId > 0) {
{
auto *client = _socket.getClient(originSubscriptionId); auto *client = _socket.getClient(originSubscriptionId);
if (client) if (client) {
{ ESP_LOGV("EventSocket", "Emitting event: %s to %s, Message: %s", event,
ESP_LOGV("EventSocket", "Emitting event: %s to %s, Message: %s", event, client->remoteIP().toString().c_str(), msg); client->remoteIP().toString().c_str(), msg);
client->sendMessage(msg); client->sendMessage(msg);
} }
} } else { // else send the message to all other clients
else
{ // else send the message to all other clients
for (int subscription : client_subscriptions[event]) for (int subscription : client_subscriptions[event]) {
{ if (subscription == originSubscriptionId) continue;
if (subscription == originSubscriptionId)
continue;
auto *client = _socket.getClient(subscription); auto *client = _socket.getClient(subscription);
if (!client) if (!client) {
{
subscriptions.remove(subscription); subscriptions.remove(subscription);
continue; continue;
} }
ESP_LOGV("EventSocket", "Emitting event: %s to %s, Message: %s", event, client->remoteIP().toString().c_str(), msg); ESP_LOGV("EventSocket", "Emitting event: %s to %s, Message: %s", event,
client->remoteIP().toString().c_str(), msg);
client->sendMessage(msg); client->sendMessage(msg);
} }
} }
xSemaphoreGive(clientSubscriptionsMutex); xSemaphoreGive(clientSubscriptionsMutex);
} }
void EventSocket::handleEventCallbacks(String event, JsonObject &jsonObject, int originId) void EventSocket::handleEventCallbacks(String event, JsonObject &jsonObject, int originId) {
{ for (auto &callback : event_callbacks[event]) {
for (auto &callback : event_callbacks[event])
{
callback(jsonObject, originId); callback(jsonObject, originId);
} }
} }
void EventSocket::handleSubscribeCallbacks(String event, const String &originId) void EventSocket::handleSubscribeCallbacks(String event, const String &originId) {
{ for (auto &callback : subscribe_callbacks[event]) {
for (auto &callback : subscribe_callbacks[event])
{
callback(originId, true); callback(originId, true);
} }
} }
void EventSocket::onEvent(String event, EventCallback callback) void EventSocket::onEvent(String event, EventCallback callback) { event_callbacks[event].push_back(callback); }
{
event_callbacks[event].push_back(callback);
}
void EventSocket::onSubscribe(String event, SubscribeCallback callback) void EventSocket::onSubscribe(String event, SubscribeCallback callback) {
{
subscribe_callbacks[event].push_back(callback); subscribe_callbacks[event].push_back(callback);
ESP_LOGD("EventSocket", "onSubscribe for event: %s", event.c_str()); ESP_LOGD("EventSocket", "onSubscribe for event: %s", event.c_str());
} }
+8 -14
View File
@@ -10,22 +10,15 @@
#define EVENT_SERVICE_PATH "/ws/events" #define EVENT_SERVICE_PATH "/ws/events"
enum message_type_t { enum message_type_t { CONNECT = 0, DISCONNECT = 1, EVENT = 2, PING = 3, PONG = 4, BINARY_EVENT = 5 };
CONNECT = 0,
DISCONNECT = 1,
EVENT = 2,
PING = 3,
PONG = 4,
BINARY_EVENT = 5
};
typedef std::function<void(JsonObject &root, int originId)> EventCallback; typedef std::function<void(JsonObject &root, int originId)> EventCallback;
typedef std::function<void(const String &originId, bool sync)> SubscribeCallback; typedef std::function<void(const String &originId, bool sync)> SubscribeCallback;
class EventSocket class EventSocket {
{ public:
public: EventSocket(PsychicHttpServer *server, SecurityManager *_securityManager,
EventSocket(PsychicHttpServer *server, SecurityManager *_securityManager, AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_AUTHENTICATED); AuthenticationPredicate authenticationPredicate = AuthenticationPredicates::IS_AUTHENTICATED);
void begin(); void begin();
@@ -34,9 +27,10 @@ public:
void onSubscribe(String event, SubscribeCallback callback); void onSubscribe(String event, SubscribeCallback callback);
void emit(const char *event, const char *payload, const char *originId = "", bool onlyToSameOrigin = false); void emit(const char *event, const char *payload, const char *originId = "", bool onlyToSameOrigin = false);
// if onlyToSameOrigin == true, the message will be sent to the originId only, otherwise it will be broadcasted to all clients except the originId // if onlyToSameOrigin == true, the message will be sent to the originId only, otherwise it will be broadcasted to
// all clients except the originId
private: private:
PsychicHttpServer *_server; PsychicHttpServer *_server;
PsychicWebSocketHandler _socket; PsychicWebSocketHandler _socket;
SecurityManager *_securityManager; SecurityManager *_securityManager;