♻️ Centralizes socket serialization

This commit is contained in:
Rune Harlyk
2025-07-10 17:13:19 +02:00
committed by Rune Harlyk
parent a43c250ed1
commit aae16335b3
9 changed files with 54 additions and 127 deletions
+1 -55
View File
@@ -93,61 +93,7 @@ esp_err_t EventSocket::onFrame(PsychicWebSocketRequest *request, httpd_ws_frame
bool EventSocket::hasSubscribers(const char *event) { return !client_subscriptions[event].empty(); }
void EventSocket::emit(const char *event, const char *payload, const char *originId, bool onlyToSameOrigin) {
int originSubscriptionId = originId[0] ? atoi(originId) : -1;
xSemaphoreTake(clientSubscriptionsMutex, portMAX_DELAY);
auto &subscriptions = client_subscriptions[event];
if (subscriptions.empty()) {
xSemaphoreGive(clientSubscriptionsMutex);
return;
}
JsonDocument doc;
auto a = doc.to<JsonArray>();
a.add(static_cast<uint8_t>(message_type_t::EVENT));
a.add(event);
JsonDocument payloadDoc;
if (deserializeJson(payloadDoc, payload) == DeserializationError::Ok)
a.add(payloadDoc.as<JsonVariant>());
else
a.add(payload); // fallback: insert as plain string if not valid JSON
String out;
#if USE_MSGPACK
serializeMsgPack(doc, out);
#else
serializeJson(doc, out);
#endif
const char *msg = out.c_str();
// if onlyToSameOrigin == true, send the message back to the origin
if (onlyToSameOrigin && originSubscriptionId > 0) {
auto *client = _socket.getClient(originSubscriptionId);
if (client) {
ESP_LOGV("EventSocket", "Emitting event: %s to %s, Message: %s", event,
client->remoteIP().toString().c_str(), msg);
send(client, msg, strlen(msg));
}
} else { // else send the message to all other clients
for (int subscription : client_subscriptions[event]) {
if (subscription == originSubscriptionId) continue;
auto *client = _socket.getClient(subscription);
if (!client) {
subscriptions.remove(subscription);
continue;
}
ESP_LOGV("EventSocket", "Emitting event: %s to %s, Message: %s", event,
client->remoteIP().toString().c_str(), msg);
send(client, msg, strlen(msg));
}
}
xSemaphoreGive(clientSubscriptionsMutex);
}
void EventSocket::emit(const char *event, JsonObject &payload, const char *originId, bool onlyToSameOrigin) {
void EventSocket::emit(const char *event, JsonVariant &payload, const char *originId, bool onlyToSameOrigin) {
int originSubscriptionId = originId[0] ? atoi(originId) : -1;
xSemaphoreTake(clientSubscriptionsMutex, portMAX_DELAY);
auto &subscriptions = client_subscriptions[event];
+1 -3
View File
@@ -24,9 +24,7 @@ class EventSocket {
void onSubscribe(String event, SubscribeCallback callback);
void emit(const char *event, const char *payload, const char *originId = "", bool onlyToSameOrigin = false);
void emit(const char *event, JsonObject &root, const char *originId = "", bool onlyToSameOrigin = false);
void emit(const char *event, JsonVariant &payload, const char *originId = "", bool onlyToSameOrigin = false);
private:
PsychicWebSocketHandler _socket;
@@ -16,22 +16,19 @@
extern const uint8_t rootca_crt_bundle_start[] asm("_binary_src_certs_x509_crt_bundle_bin_start");
static int previousProgress = 0;
JsonDocument doc;
JsonVariant obj;
void update_started() {
String output;
doc["status"] = "preparing";
serializeJson(doc, output);
socket.emit(EVENT_DOWNLOAD_OTA, output.c_str());
obj["status"] = "preparing";
socket.emit(EVENT_DOWNLOAD_OTA, obj);
}
void update_progress(int currentBytes, int totalBytes) {
String output;
doc["status"] = "progress";
obj["status"] = "progress";
int progress = ((currentBytes * 100) / totalBytes);
if (progress > previousProgress) {
doc["progress"] = progress;
socket.emit(EVENT_DOWNLOAD_OTA, output.c_str());
obj["progress"] = progress;
socket.emit(EVENT_DOWNLOAD_OTA, obj);
ESP_LOGV("Download OTA", "HTTP update process at %d of %d bytes... (%d %%)", currentBytes, totalBytes,
progress);
}
@@ -39,10 +36,8 @@ void update_progress(int currentBytes, int totalBytes) {
}
void update_finished() {
String output;
doc["status"] = "finished";
serializeJson(doc, output);
socket.emit(EVENT_DOWNLOAD_OTA, output.c_str());
obj["status"] = "finished";
socket.emit(EVENT_DOWNLOAD_OTA, obj);
// delay to allow the event to be sent out
vTaskDelay(100 / portTICK_PERIOD_MS);
@@ -57,7 +52,6 @@ void updateTask(void *param) {
httpUpdate.rebootOnUpdate(true);
String url = *((String *)param);
String output;
// httpUpdate.onStart(update_started);
// httpUpdate.onProgress(update_progress);
// httpUpdate.onEnd(update_finished);
@@ -66,21 +60,18 @@ void updateTask(void *param) {
switch (ret) {
case HTTP_UPDATE_FAILED:
doc["status"] = "error";
doc["error"] = httpUpdate.getLastErrorString().c_str();
serializeJson(doc, output);
socket.emit(EVENT_DOWNLOAD_OTA, output.c_str());
obj["status"] = "error";
obj["error"] = httpUpdate.getLastErrorString().c_str();
socket.emit(EVENT_DOWNLOAD_OTA, obj);
ESP_LOGE("Download OTA", "HTTP Update failed with error (%d): %s", httpUpdate.getLastError(),
httpUpdate.getLastErrorString().c_str());
break;
case HTTP_UPDATE_NO_UPDATES:
doc["status"] = "error";
doc["error"] = "Update failed, has same firmware version";
serializeJson(doc, output);
socket.emit(EVENT_DOWNLOAD_OTA, output.c_str());
obj["status"] = "error";
obj["error"] = "Update failed, has same firmware version";
socket.emit(EVENT_DOWNLOAD_OTA, obj);
ESP_LOGE("Download OTA", "HTTP Update failed, has same firmware version");
break;
@@ -99,14 +90,10 @@ esp_err_t DownloadFirmwareService::handleDownloadUpdate(PsychicRequest *request,
String downloadURL = json["download_url"];
ESP_LOGI("Download OTA", "Starting OTA from: %s", downloadURL.c_str());
doc["status"] = "preparing";
doc["progress"] = 0;
doc["error"] = "";
String output;
serializeJson(doc, output);
socket.emit(EVENT_DOWNLOAD_OTA, output.c_str());
obj["status"] = "preparing";
obj["progress"] = 0;
obj["error"] = "";
socket.emit(EVENT_DOWNLOAD_OTA, obj);
const BaseType_t taskResult = g_taskManager.createTask(&updateTask, "Firmware download", OTA_TASK_STACK_SIZE,
&downloadURL, (configMAX_PRIORITIES - 1), NULL, 1);
@@ -85,17 +85,19 @@ esp_err_t FirmwareUploadService::handleUpload(PsychicRequest *request, const Str
if (Update.write(data, len) != len) {
handleError(request, 500);
} else {
char buffer[64];
snprintf(buffer, sizeof(buffer), "{\"status\":\"progress\",\"progress\":%.1f}",
(float)Update.progress() / (float)fsize * 100.f);
socket.emit("otastatus", buffer);
JsonVariant obj;
obj["status"] = "progress";
obj["progress"] = (float)Update.progress() / (float)fsize * 100.f;
socket.emit("otastatus", obj);
delay(20);
}
if (final) {
if (!Update.end(true)) {
handleError(request, 500);
} else {
socket.emit("otastatus", "{\"status\":\"finished\",\"progress\":100}");
JsonVariant obj;
obj["status"] = "finished", obj["progress"] = 100;
socket.emit("otastatus", obj);
ESP_LOGI(TAG, "Finish writing update");
}
}
@@ -134,9 +136,9 @@ esp_err_t FirmwareUploadService::uploadComplete(PsychicRequest *request) {
}
esp_err_t FirmwareUploadService::handleError(PsychicRequest *request, int code) {
char buffer[64];
snprintf(buffer, sizeof(buffer), "{\"status\":\"error\",\"error\":\"%d\"}", Update.getError());
socket.emit("otastatus", buffer);
JsonVariant obj;
obj["status"] = "error", obj["error"] = Update.getError();
socket.emit("otastatus", obj);
// if we have had an error already, do nothing
if (request->_tempObject) {
return ESP_OK;
+10 -9
View File
@@ -82,20 +82,21 @@ class MotionService {
}
void handleMode(JsonObject &root, int originId) {
motionState = (MOTION_STATE)root["data"].as<int>();
motionState = static_cast<MOTION_STATE>(root["data"].as<int>());
ESP_LOGV("MotionService", "Mode %d", motionState);
char output[2];
itoa((int)motionState, output, 10);
motionState == MOTION_STATE::DEACTIVATED ? _servoController->deactivate() : _servoController->activate();
socket.emit(MODE_EVENT, output, String(originId).c_str());
JsonDocument doc;
doc.set(static_cast<int>(motionState));
JsonVariant data = doc.as<JsonVariant>();
socket.emit(MODE_EVENT, data, String(originId).c_str());
}
void emitAngles(const String &originId = "", bool sync = false) {
char output[100];
snprintf(output, sizeof(output), "[%.1f,%.1f,%.1f,%.1f,%.1f,%.1f,%.1f,%.1f,%.1f,%.1f,%.1f,%.1f]", angles[0],
angles[1], angles[2], angles[3], angles[4], angles[5], angles[6], angles[7], angles[8], angles[9],
angles[10], angles[11]);
socket.emit(ANGLES_EVENT, output, originId.c_str());
JsonDocument doc;
auto arr = doc.to<JsonArray>();
for (int i = 0; i < 12; i++) arr.add(angles[i]);
JsonVariant data = doc.as<JsonVariant>();
socket.emit(ANGLES_EVENT, data, originId.c_str());
}
void syncAngles(const String &originId = "", bool sync = false) {
@@ -113,9 +113,9 @@ class Peripherals : public StatefulService<PeripheralsConfiguration> {
for (auto &address : addressList) {
addresses.add(address);
}
serializeJson(root, output);
ESP_LOGI("Peripherals", "Emitting I2C scan results, %s %d", originId.c_str(), sync);
socket.emit(EVENT_I2C_SCAN, output, originId.c_str(), sync);
JsonVariant data = doc.as<JsonVariant>();
socket.emit(EVENT_I2C_SCAN, data, originId.c_str(), sync);
}
void scanI2C(uint8_t lower = 1, uint8_t higher = 127) {
@@ -190,15 +190,17 @@ class Peripherals : public StatefulService<PeripheralsConfiguration> {
#if FT_ENABLED(USE_BMP180)
_bmp.readBarometer(root);
#endif
serializeJson(doc, message);
socket.emit(EVENT_IMU, message);
JsonVariant data = doc.as<JsonVariant>();
socket.emit(EVENT_IMU, data);
}
void emitSonar() {
#if FT_ENABLED(USE_USS)
char output[16];
snprintf(output, sizeof(output), "[%.1f,%.1f]", _left_distance, _right_distance);
socket.emit("sonar", output);
doc.clear();
JsonArray root = doc.to<JsonArray>();
root[0] = _left_distance, root[1] = _right_distance;
JsonVariant data = doc.as<JsonVariant>();
socket.emit("sonar", data);
#endif
}
@@ -86,14 +86,6 @@ class ServoController : public StatefulService<ServoSettings> {
ESP_LOGI("SERVO_CONTROLLER", "Setting servo %d to %d", servo_id, pwm);
}
void syncAngles(const String &originId) {
char output[100];
snprintf(output, sizeof(output), "[%.1f,%.1f,%.1f,%.1f,%.1f,%.1f,%.1f,%.1f,%.1f,%.1f,%.1f,%.1f]", angles[0],
angles[1], angles[2], angles[3], angles[4], angles[5], angles[6], angles[7], angles[8], angles[9],
angles[10], angles[11]);
socket.emit("angles", output, String(originId).c_str());
}
void updateActiveState() { is_active ? activate() : deactivate(); }
void setAngles(float new_angles[12]) {
+3 -3
View File
@@ -140,8 +140,8 @@ void emitMetrics() {
analyticsDoc.clear();
JsonObject root = analyticsDoc.to<JsonObject>();
system_service::metrics(root);
serializeJson(analyticsDoc, analyticsMessage);
socket.emit(EVENT_ANALYTICS, analyticsMessage);
JsonVariant data = analyticsDoc.as<JsonVariant>();
socket.emit(EVENT_ANALYTICS, data);
}
const char *resetReason(esp_reset_reason_t reason) {
@@ -173,7 +173,7 @@ const char *resetReason(esp_reset_reason_t reason) {
case ESP_RST_CPU_LOCKUP: return "Reset due to CPU lock up (double exception)";
#endif
default:
char buffer[50];
static char buffer[48];
snprintf(buffer, sizeof(buffer), "Unknown reset reason (%d)", reason);
return buffer;
}
@@ -36,8 +36,7 @@ class EventEndpoint {
JsonObject root = jsonDocument.to<JsonObject>();
String output;
_statefulService->read(root, _stateReader);
serializeJson(root, output);
ESP_LOGV("EventEndpoint", "Syncing state: %s", output.c_str());
socket.emit(_event, output.c_str(), originId.c_str(), sync);
JsonVariant obj = jsonDocument.as<JsonVariant>();
socket.emit(_event, obj, originId.c_str(), sync);
}
};