diff --git a/esp32/include/communication/comm_base.hpp b/esp32/include/communication/comm_base.hpp index 53cff81..cd4a468 100644 --- a/esp32/include/communication/comm_base.hpp +++ b/esp32/include/communication/comm_base.hpp @@ -8,6 +8,7 @@ #include #include #include +#include class CommAdapterBase { public: @@ -96,9 +97,11 @@ class CommAdapterBase { } void handleIncoming(const uint8_t* data, size_t len, int cid) { + TIME_IT( if (!decoder_.decode(data, len, cid)) { ESP_LOGE("ProtoComm", "Failed to decode incoming message from client %d", cid); } + , INCOMING_DECODE) } void sendPong(int cid) { diff --git a/esp32/include/filesystem_ws.h b/esp32/include/filesystem_ws.h index 2cb4a15..f2c9410 100644 --- a/esp32/include/filesystem_ws.h +++ b/esp32/include/filesystem_ws.h @@ -6,9 +6,16 @@ #include #include #include +#include +#include +#include +#include -#define FS_MAX_CHUNK_SIZE (1024*64) +#define FS_MAX_CHUNK_SIZE (1024 * 64) #define FS_TRANSFER_TIMEOUT_MS 30000 +#define FS_WRITE_QUEUE_SIZE 4 +#define FS_WRITER_TASK_STACK_SIZE 4096 +#define FS_WRITER_TASK_PRIORITY 5 namespace FileSystemWS { @@ -29,6 +36,7 @@ struct UploadState { uint32_t fileSize; uint32_t totalChunks; uint32_t chunksReceived; + uint32_t chunksWritten; uint32_t bytesReceived; uint32_t lastActivityTime; int clientId; @@ -36,6 +44,14 @@ struct UploadState { std::string errorMessage; }; +struct WriteRequest { + uint32_t transferId; + uint8_t* data; + size_t size; + uint32_t chunkIndex; + bool isLastChunk; +}; + using SendMetadataCallback = std::function; using SendCallback = std::function; using SendCompleteCallback = std::function; @@ -44,6 +60,10 @@ using SendUploadCompleteCallback = std::function uploads_; uint32_t transferIdCounter_; + // Async writer task + QueueHandle_t writeQueue_; + TaskHandle_t writerTaskHandle_; + SemaphoreHandle_t uploadsMutex_; + volatile bool writerTaskRunning_; + inline uint32_t generateTransferId() { return ++transferIdCounter_; } SendMetadataCallback sendMetadataCallback_; @@ -74,6 +100,8 @@ class FileSystemHandler { bool deleteRecursive(const std::string& path); bool sendNextDownloadChunk(uint32_t transferId); void finalizeUpload(uint32_t transferId, bool success, const std::string& error = ""); + void processWriteRequest(const WriteRequest& req); + static void writerTaskFunc(void* param); }; extern FileSystemHandler fsHandler; diff --git a/esp32/src/communication/webserver.cpp b/esp32/src/communication/webserver.cpp index 668519e..1dc0c33 100644 --- a/esp32/src/communication/webserver.cpp +++ b/esp32/src/communication/webserver.cpp @@ -2,6 +2,7 @@ #include #include #include +#include static const char* TAG = "WebServer"; @@ -121,6 +122,10 @@ esp_err_t WebServer::httpHandler(httpd_req_t* req) { } esp_err_t WebServer::wsHandler(httpd_req_t* req) { + esp_err_t result; + httpd_ws_frame_t frame; + esp_err_t ret; + TIME_IT( WebServer* self = static_cast(req->user_ctx); if (req->method == HTTP_GET) { @@ -133,17 +138,21 @@ esp_err_t WebServer::wsHandler(httpd_req_t* req) { return ESP_OK; } - httpd_ws_frame_t frame; + memset(&frame, 0, sizeof(httpd_ws_frame_t)); frame.type = HTTPD_WS_TYPE_BINARY; - esp_err_t ret = httpd_ws_recv_frame(req, &frame, 0); + + TIME_IT( + ret = httpd_ws_recv_frame(req, &frame, 0); if (ret != ESP_OK) { ESP_LOGE(TAG, "Failed to get frame len: %s", esp_err_to_name(ret)); return ret; } + , FRAME_LEN) if (frame.len > 0) { + TIME_IT( frame.payload = (uint8_t*)malloc(frame.len); if (!frame.payload) { ESP_LOGE(TAG, "Failed to allocate frame payload"); @@ -156,6 +165,7 @@ esp_err_t WebServer::wsHandler(httpd_req_t* req) { free(frame.payload); return ret; } + , FRAME_RECEIVE) } if (frame.type == HTTPD_WS_TYPE_CLOSE) { @@ -169,15 +179,18 @@ esp_err_t WebServer::wsHandler(httpd_req_t* req) { return ESP_OK; } - esp_err_t result = ESP_OK; + result = ESP_OK; if (self->wsFrameHandler_) { + TIME_IT( result = self->wsFrameHandler_(req, &frame); + , FRAME_HANDLER) } if (frame.payload) { free(frame.payload); } + , WS_HANDLER) return result; } diff --git a/esp32/src/communication/websocket.cpp b/esp32/src/communication/websocket.cpp index 065a848..2ebefb0 100644 --- a/esp32/src/communication/websocket.cpp +++ b/esp32/src/communication/websocket.cpp @@ -24,8 +24,26 @@ void Websocket::onWsClose(int sockfd) { } esp_err_t Websocket::onFrame(httpd_req_t* req, httpd_ws_frame_t* frame) { + // Handle PING - respond with PONG + if (frame->type == HTTPD_WS_TYPE_PING) { + httpd_ws_frame_t pong = { + .final = true, + .fragmented = false, + .type = HTTPD_WS_TYPE_PONG, + .payload = frame->payload, + .len = frame->len + }; + return httpd_ws_send_frame(req, &pong); + } + + // Ignore PONG frames + if (frame->type == HTTPD_WS_TYPE_PONG) { + return ESP_OK; + } + + // Ignore other non-binary frames if (frame->type != HTTPD_WS_TYPE_BINARY) { - ESP_LOGW(TAG, "Expected binary frame, got type %d", frame->type); + ESP_LOGD(TAG, "Ignoring frame type %d", frame->type); return ESP_OK; } diff --git a/esp32/src/filesystem_ws.cpp b/esp32/src/filesystem_ws.cpp index 6a7d585..39d902f 100644 --- a/esp32/src/filesystem_ws.cpp +++ b/esp32/src/filesystem_ws.cpp @@ -8,6 +8,7 @@ #include #include #include +#include static const char* TAG = "FileSystemWS"; @@ -15,7 +16,137 @@ namespace FileSystemWS { FileSystemHandler fsHandler; -FileSystemHandler::FileSystemHandler() : transferIdCounter_(0) {} +FileSystemHandler::FileSystemHandler() + : transferIdCounter_(0), writeQueue_(nullptr), writerTaskHandle_(nullptr), uploadsMutex_(nullptr), + writerTaskRunning_(false) { + uploadsMutex_ = xSemaphoreCreateMutex(); +} + +FileSystemHandler::~FileSystemHandler() { + stopWriterTask(); + if (uploadsMutex_) { + vSemaphoreDelete(uploadsMutex_); + } +} + +void FileSystemHandler::startWriterTask() { + if (writerTaskHandle_ != nullptr) { + return; + } + + writeQueue_ = xQueueCreate(FS_WRITE_QUEUE_SIZE, sizeof(WriteRequest)); + if (!writeQueue_) { + ESP_LOGE(TAG, "Failed to create write queue"); + return; + } + + writerTaskRunning_ = true; + BaseType_t result = + xTaskCreate(writerTaskFunc, "fs_writer", FS_WRITER_TASK_STACK_SIZE, this, FS_WRITER_TASK_PRIORITY, &writerTaskHandle_); + + if (result != pdPASS) { + ESP_LOGE(TAG, "Failed to create writer task"); + vQueueDelete(writeQueue_); + writeQueue_ = nullptr; + writerTaskRunning_ = false; + } else { + ESP_LOGI(TAG, "Writer task started"); + } +} + +void FileSystemHandler::stopWriterTask() { + if (!writerTaskRunning_) { + return; + } + + writerTaskRunning_ = false; + + // Send a poison pill to wake up the task + WriteRequest poison = {0, nullptr, 0, 0, false}; + if (writeQueue_) { + xQueueSend(writeQueue_, &poison, portMAX_DELAY); + } + + // Wait for task to finish + if (writerTaskHandle_) { + vTaskDelay(pdMS_TO_TICKS(100)); + writerTaskHandle_ = nullptr; + } + + if (writeQueue_) { + // Drain any remaining requests and free their data + WriteRequest req; + while (xQueueReceive(writeQueue_, &req, 0) == pdTRUE) { + if (req.data) { + free(req.data); + } + } + vQueueDelete(writeQueue_); + writeQueue_ = nullptr; + } + + ESP_LOGI(TAG, "Writer task stopped"); +} + +void FileSystemHandler::writerTaskFunc(void* param) { + FileSystemHandler* self = static_cast(param); + WriteRequest req; + + while (self->writerTaskRunning_) { + if (xQueueReceive(self->writeQueue_, &req, pdMS_TO_TICKS(10)) == pdTRUE) { + if (req.data == nullptr) { + // Poison pill - exit + break; + } + self->processWriteRequest(req); + free(req.data); + } + } + + vTaskDelete(nullptr); +} + +void FileSystemHandler::processWriteRequest(const WriteRequest& req) { + xSemaphoreTake(uploadsMutex_, portMAX_DELAY); + + auto it = uploads_.find(req.transferId); + if (it == uploads_.end()) { + xSemaphoreGive(uploadsMutex_); + return; + } + + UploadState& state = it->second; + + if (state.hasError) { + xSemaphoreGive(uploadsMutex_); + return; + } + + size_t bytesWritten = fwrite(req.data, 1, req.size, state.file); + + if (bytesWritten != req.size) { + state.hasError = true; + state.errorMessage = "Failed to write chunk"; + xSemaphoreGive(uploadsMutex_); + finalizeUpload(req.transferId, false, state.errorMessage); + return; + } + + state.chunksWritten++; + ESP_LOGD(TAG, "Async write chunk %u/%u: %u bytes", state.chunksWritten, state.totalChunks, bytesWritten); + + // Periodic flush + if (state.chunksWritten > 0 && state.chunksWritten % 64 == 0) { + fflush(state.file); + } + + bool shouldFinalize = req.isLastChunk; + xSemaphoreGive(uploadsMutex_); + + if (shouldFinalize) { + finalizeUpload(req.transferId, true); + } +} void FileSystemHandler::setSendCallbacks(SendMetadataCallback sendMetadata, SendCallback sendData, SendCompleteCallback sendComplete, @@ -53,29 +184,40 @@ void FileSystemHandler::cleanupExpiredTransfers() { } } + xSemaphoreTake(uploadsMutex_, portMAX_DELAY); auto ulIt = uploads_.begin(); while (ulIt != uploads_.end()) { if (now - ulIt->second.lastActivityTime > FS_TRANSFER_TIMEOUT_MS) { if (ulIt->second.file) { fclose(ulIt->second.file); + ulIt->second.file = nullptr; } - remove(ulIt->second.path.c_str()); - ESP_LOGW(TAG, "Upload %u timed out, deleted partial file", ulIt->first); + std::string path = ulIt->second.path; + uint32_t chunksReceived = ulIt->second.chunksReceived; + int clientId = ulIt->second.clientId; + uint32_t transferId = ulIt->first; + + ulIt = uploads_.erase(ulIt); + xSemaphoreGive(uploadsMutex_); + + remove(path.c_str()); + ESP_LOGW(TAG, "Upload %u timed out, deleted partial file", transferId); if (sendUploadCompleteCallback_) { socket_message_FSUploadComplete complete = socket_message_FSUploadComplete_init_zero; - complete.transfer_id = ulIt->first; + complete.transfer_id = transferId; complete.success = false; strncpy(complete.error, "Transfer timed out", sizeof(complete.error) - 1); - complete.chunks_received = ulIt->second.chunksReceived; - sendUploadCompleteCallback_(complete, ulIt->second.clientId); + complete.chunks_received = chunksReceived; + sendUploadCompleteCallback_(complete, clientId); } - ulIt = uploads_.erase(ulIt); + xSemaphoreTake(uploadsMutex_, portMAX_DELAY); } else { ++ulIt; } } + xSemaphoreGive(uploadsMutex_); } socket_message_FSDeleteResponse FileSystemHandler::handleDelete(const socket_message_FSDeleteRequest& req) { @@ -408,6 +550,9 @@ socket_message_FSUploadStartResponse FileSystemHandler::handleUploadStart(const return response; } + // Set larger buffer for better write performance + setvbuf(file, nullptr, _IOFBF, 32 * 1024); + uint32_t transferId = generateTransferId(); UploadState state; @@ -416,12 +561,15 @@ socket_message_FSUploadStartResponse FileSystemHandler::handleUploadStart(const state.fileSize = req.file_size; state.totalChunks = req.total_chunks; state.chunksReceived = 0; + state.chunksWritten = 0; state.bytesReceived = 0; state.lastActivityTime = esp_timer_get_time() / 1000; state.clientId = clientId; state.hasError = false; + xSemaphoreTake(uploadsMutex_, portMAX_DELAY); uploads_[transferId] = state; + xSemaphoreGive(uploadsMutex_); response.success = true; response.transfer_id = transferId; @@ -434,8 +582,16 @@ socket_message_FSUploadStartResponse FileSystemHandler::handleUploadStart(const void FileSystemHandler::handleUploadData(const socket_message_FSUploadData& req) { uint32_t transferId = req.transfer_id; + // Auto-start writer task if not running + if (!writerTaskRunning_) { + startWriterTask(); + } + + xSemaphoreTake(uploadsMutex_, portMAX_DELAY); + auto it = uploads_.find(transferId); if (it == uploads_.end()) { + xSemaphoreGive(uploadsMutex_); ESP_LOGW(TAG, "Upload data for unknown transfer: %u", transferId); return; } @@ -444,6 +600,7 @@ void FileSystemHandler::handleUploadData(const socket_message_FSUploadData& req) state.lastActivityTime = esp_timer_get_time() / 1000; if (state.hasError) { + xSemaphoreGive(uploadsMutex_); return; } @@ -454,36 +611,58 @@ void FileSystemHandler::handleUploadData(const socket_message_FSUploadData& req) if (!req.data || req.data->size == 0) { state.hasError = true; state.errorMessage = "Empty or invalid data chunk"; + xSemaphoreGive(uploadsMutex_); finalizeUpload(transferId, false, state.errorMessage); return; } - size_t bytesWritten = fwrite(req.data->bytes, 1, req.data->size, state.file); - if (bytesWritten != req.data->size) { + // Copy data for async write + WriteRequest writeReq; + writeReq.transferId = transferId; + writeReq.size = req.data->size; + writeReq.chunkIndex = req.chunk_index; + writeReq.data = static_cast(malloc(req.data->size)); + + if (!writeReq.data) { state.hasError = true; - state.errorMessage = "Failed to write chunk"; + state.errorMessage = "Memory allocation failed"; + xSemaphoreGive(uploadsMutex_); finalizeUpload(transferId, false, state.errorMessage); return; } + memcpy(writeReq.data, req.data->bytes, req.data->size); + state.chunksReceived++; - state.bytesReceived += bytesWritten; + state.bytesReceived += req.data->size; + writeReq.isLastChunk = (state.chunksReceived >= state.totalChunks); - if (state.chunksReceived > 0 && state.chunksReceived % 64 == 0) { - ESP_LOGD(TAG, "Flushing file at chunk %u", state.chunksReceived); - fflush(state.file); + ESP_LOGD(TAG, "Queuing chunk %u/%u: %u bytes", state.chunksReceived, state.totalChunks, req.data->size); + + xSemaphoreGive(uploadsMutex_); + + // Check queue is valid + if (!writeQueue_) { + ESP_LOGE(TAG, "Write queue not initialized"); + free(writeReq.data); + finalizeUpload(transferId, false, "Write queue not initialized"); + return; } - ESP_LOGD(TAG, "Upload chunk %u/%u: %u bytes", state.chunksReceived, state.totalChunks, bytesWritten); - - if (state.chunksReceived >= state.totalChunks) { - finalizeUpload(transferId, true); + // Try to queue (non-blocking) - if full, do sync write to avoid blocking HTTP server + if (xQueueSend(writeQueue_, &writeReq, 0) != pdTRUE) { + ESP_LOGD(TAG, "Queue full, doing sync write for chunk %u", writeReq.chunkIndex); + processWriteRequest(writeReq); + free(writeReq.data); } } void FileSystemHandler::finalizeUpload(uint32_t transferId, bool success, const std::string& error) { + xSemaphoreTake(uploadsMutex_, portMAX_DELAY); + auto it = uploads_.find(transferId); if (it == uploads_.end()) { + xSemaphoreGive(uploadsMutex_); return; } @@ -491,13 +670,22 @@ void FileSystemHandler::finalizeUpload(uint32_t transferId, bool success, const if (state.file) { fclose(state.file); + state.file = nullptr; } + std::string path = state.path; + uint32_t bytesReceived = state.bytesReceived; + uint32_t chunksReceived = state.chunksReceived; + int clientId = state.clientId; + + uploads_.erase(it); + xSemaphoreGive(uploadsMutex_); + if (!success) { - remove(state.path.c_str()); - ESP_LOGW(TAG, "Upload failed, deleted partial file: %s", state.path.c_str()); + remove(path.c_str()); + ESP_LOGW(TAG, "Upload failed, deleted partial file: %s", path.c_str()); } else { - ESP_LOGI(TAG, "Upload completed: %s (%u bytes)", state.path.c_str(), state.bytesReceived); + ESP_LOGI(TAG, "Upload completed: %s (%u bytes)", path.c_str(), bytesReceived); } if (sendUploadCompleteCallback_) { @@ -507,11 +695,9 @@ void FileSystemHandler::finalizeUpload(uint32_t transferId, bool success, const if (!error.empty()) { strncpy(complete.error, error.c_str(), sizeof(complete.error) - 1); } - complete.chunks_received = state.chunksReceived; - sendUploadCompleteCallback_(complete, state.clientId); + complete.chunks_received = chunksReceived; + sendUploadCompleteCallback_(complete, clientId); } - - uploads_.erase(it); } socket_message_FSCancelTransferResponse FileSystemHandler::handleCancelTransfer( @@ -531,17 +717,22 @@ socket_message_FSCancelTransferResponse FileSystemHandler::handleCancelTransfer( return response; } + xSemaphoreTake(uploadsMutex_, portMAX_DELAY); auto ulIt = uploads_.find(transferId); if (ulIt != uploads_.end()) { if (ulIt->second.file) { fclose(ulIt->second.file); + ulIt->second.file = nullptr; } - remove(ulIt->second.path.c_str()); + std::string path = ulIt->second.path; uploads_.erase(ulIt); + xSemaphoreGive(uploadsMutex_); + remove(path.c_str()); response.success = true; ESP_LOGI(TAG, "Upload cancelled: %u", transferId); return response; } + xSemaphoreGive(uploadsMutex_); response.success = false; return response; diff --git a/esp32/src/main.cpp b/esp32/src/main.cpp index 8ac7330..62122e2 100644 --- a/esp32/src/main.cpp +++ b/esp32/src/main.cpp @@ -157,7 +157,7 @@ void setupEventSocket() { }); wsSocket.on( - [&](const socket_message_FSUploadData &data, int clientId) { FileSystemWS::fsHandler.handleUploadData(data); }); + [&](const socket_message_FSUploadData &data, int clientId) { TIME_IT(FileSystemWS::fsHandler.handleUploadData(data), handle_upload) }); using CorrelationHandler = std::function;