Fix task and added timings for write

This commit is contained in:
Niklas Jensen
2026-02-21 15:24:37 +01:00
parent 0ee459b1a7
commit 41e6ff9ba6
6 changed files with 285 additions and 32 deletions
@@ -8,6 +8,7 @@
#include <map>
#include <type_traits>
#include <communication/proto_helpers.h>
#include <utils/timing.h>
class CommAdapterBase {
public:
@@ -96,9 +97,11 @@ class CommAdapterBase {
}
void handleIncoming(const uint8_t* data, size_t len, int cid) {
TIME_IT(
if (!decoder_.decode(data, len, cid)) {
ESP_LOGE("ProtoComm", "Failed to decode incoming message from client %d", cid);
}
, INCOMING_DECODE)
}
void sendPong(int cid) {
+29 -1
View File
@@ -6,9 +6,16 @@
#include <string>
#include <functional>
#include <cstdio>
#include <freertos/FreeRTOS.h>
#include <freertos/queue.h>
#include <freertos/task.h>
#include <freertos/semphr.h>
#define FS_MAX_CHUNK_SIZE (1024*64)
#define FS_MAX_CHUNK_SIZE (1024 * 64)
#define FS_TRANSFER_TIMEOUT_MS 30000
#define FS_WRITE_QUEUE_SIZE 4
#define FS_WRITER_TASK_STACK_SIZE 4096
#define FS_WRITER_TASK_PRIORITY 5
namespace FileSystemWS {
@@ -29,6 +36,7 @@ struct UploadState {
uint32_t fileSize;
uint32_t totalChunks;
uint32_t chunksReceived;
uint32_t chunksWritten;
uint32_t bytesReceived;
uint32_t lastActivityTime;
int clientId;
@@ -36,6 +44,14 @@ struct UploadState {
std::string errorMessage;
};
struct WriteRequest {
uint32_t transferId;
uint8_t* data;
size_t size;
uint32_t chunkIndex;
bool isLastChunk;
};
using SendMetadataCallback = std::function<void(const socket_message_FSDownloadMetadata&, int clientId)>;
using SendCallback = std::function<void(const socket_message_FSDownloadData&, int clientId)>;
using SendCompleteCallback = std::function<void(const socket_message_FSDownloadComplete&, int clientId)>;
@@ -44,6 +60,10 @@ using SendUploadCompleteCallback = std::function<void(const socket_message_FSUpl
class FileSystemHandler {
public:
FileSystemHandler();
~FileSystemHandler();
void startWriterTask();
void stopWriterTask();
void setSendCallbacks(SendMetadataCallback sendMetadata, SendCallback sendData, SendCompleteCallback sendComplete,
SendUploadCompleteCallback sendUploadComplete);
@@ -63,6 +83,12 @@ class FileSystemHandler {
std::map<uint32_t, UploadState> uploads_;
uint32_t transferIdCounter_;
// Async writer task
QueueHandle_t writeQueue_;
TaskHandle_t writerTaskHandle_;
SemaphoreHandle_t uploadsMutex_;
volatile bool writerTaskRunning_;
inline uint32_t generateTransferId() { return ++transferIdCounter_; }
SendMetadataCallback sendMetadataCallback_;
@@ -74,6 +100,8 @@ class FileSystemHandler {
bool deleteRecursive(const std::string& path);
bool sendNextDownloadChunk(uint32_t transferId);
void finalizeUpload(uint32_t transferId, bool success, const std::string& error = "");
void processWriteRequest(const WriteRequest& req);
static void writerTaskFunc(void* param);
};
extern FileSystemHandler fsHandler;
+16 -3
View File
@@ -2,6 +2,7 @@
#include <esp_log.h>
#include <cstring>
#include <algorithm>
#include <utils/timing.h>
static const char* TAG = "WebServer";
@@ -121,6 +122,10 @@ esp_err_t WebServer::httpHandler(httpd_req_t* req) {
}
esp_err_t WebServer::wsHandler(httpd_req_t* req) {
esp_err_t result;
httpd_ws_frame_t frame;
esp_err_t ret;
TIME_IT(
WebServer* self = static_cast<WebServer*>(req->user_ctx);
if (req->method == HTTP_GET) {
@@ -133,17 +138,21 @@ esp_err_t WebServer::wsHandler(httpd_req_t* req) {
return ESP_OK;
}
httpd_ws_frame_t frame;
memset(&frame, 0, sizeof(httpd_ws_frame_t));
frame.type = HTTPD_WS_TYPE_BINARY;
esp_err_t ret = httpd_ws_recv_frame(req, &frame, 0);
TIME_IT(
ret = httpd_ws_recv_frame(req, &frame, 0);
if (ret != ESP_OK) {
ESP_LOGE(TAG, "Failed to get frame len: %s", esp_err_to_name(ret));
return ret;
}
, FRAME_LEN)
if (frame.len > 0) {
TIME_IT(
frame.payload = (uint8_t*)malloc(frame.len);
if (!frame.payload) {
ESP_LOGE(TAG, "Failed to allocate frame payload");
@@ -156,6 +165,7 @@ esp_err_t WebServer::wsHandler(httpd_req_t* req) {
free(frame.payload);
return ret;
}
, FRAME_RECEIVE)
}
if (frame.type == HTTPD_WS_TYPE_CLOSE) {
@@ -169,15 +179,18 @@ esp_err_t WebServer::wsHandler(httpd_req_t* req) {
return ESP_OK;
}
esp_err_t result = ESP_OK;
result = ESP_OK;
if (self->wsFrameHandler_) {
TIME_IT(
result = self->wsFrameHandler_(req, &frame);
, FRAME_HANDLER)
}
if (frame.payload) {
free(frame.payload);
}
, WS_HANDLER)
return result;
}
+19 -1
View File
@@ -24,8 +24,26 @@ void Websocket::onWsClose(int sockfd) {
}
esp_err_t Websocket::onFrame(httpd_req_t* req, httpd_ws_frame_t* frame) {
// Handle PING - respond with PONG
if (frame->type == HTTPD_WS_TYPE_PING) {
httpd_ws_frame_t pong = {
.final = true,
.fragmented = false,
.type = HTTPD_WS_TYPE_PONG,
.payload = frame->payload,
.len = frame->len
};
return httpd_ws_send_frame(req, &pong);
}
// Ignore PONG frames
if (frame->type == HTTPD_WS_TYPE_PONG) {
return ESP_OK;
}
// Ignore other non-binary frames
if (frame->type != HTTPD_WS_TYPE_BINARY) {
ESP_LOGW(TAG, "Expected binary frame, got type %d", frame->type);
ESP_LOGD(TAG, "Ignoring frame type %d", frame->type);
return ESP_OK;
}
+217 -26
View File
@@ -8,6 +8,7 @@
#include <sys/stat.h>
#include <cstring>
#include <cerrno>
#include <utils/timing.h>
static const char* TAG = "FileSystemWS";
@@ -15,7 +16,137 @@ namespace FileSystemWS {
FileSystemHandler fsHandler;
FileSystemHandler::FileSystemHandler() : transferIdCounter_(0) {}
FileSystemHandler::FileSystemHandler()
: transferIdCounter_(0), writeQueue_(nullptr), writerTaskHandle_(nullptr), uploadsMutex_(nullptr),
writerTaskRunning_(false) {
uploadsMutex_ = xSemaphoreCreateMutex();
}
FileSystemHandler::~FileSystemHandler() {
stopWriterTask();
if (uploadsMutex_) {
vSemaphoreDelete(uploadsMutex_);
}
}
void FileSystemHandler::startWriterTask() {
if (writerTaskHandle_ != nullptr) {
return;
}
writeQueue_ = xQueueCreate(FS_WRITE_QUEUE_SIZE, sizeof(WriteRequest));
if (!writeQueue_) {
ESP_LOGE(TAG, "Failed to create write queue");
return;
}
writerTaskRunning_ = true;
BaseType_t result =
xTaskCreate(writerTaskFunc, "fs_writer", FS_WRITER_TASK_STACK_SIZE, this, FS_WRITER_TASK_PRIORITY, &writerTaskHandle_);
if (result != pdPASS) {
ESP_LOGE(TAG, "Failed to create writer task");
vQueueDelete(writeQueue_);
writeQueue_ = nullptr;
writerTaskRunning_ = false;
} else {
ESP_LOGI(TAG, "Writer task started");
}
}
void FileSystemHandler::stopWriterTask() {
if (!writerTaskRunning_) {
return;
}
writerTaskRunning_ = false;
// Send a poison pill to wake up the task
WriteRequest poison = {0, nullptr, 0, 0, false};
if (writeQueue_) {
xQueueSend(writeQueue_, &poison, portMAX_DELAY);
}
// Wait for task to finish
if (writerTaskHandle_) {
vTaskDelay(pdMS_TO_TICKS(100));
writerTaskHandle_ = nullptr;
}
if (writeQueue_) {
// Drain any remaining requests and free their data
WriteRequest req;
while (xQueueReceive(writeQueue_, &req, 0) == pdTRUE) {
if (req.data) {
free(req.data);
}
}
vQueueDelete(writeQueue_);
writeQueue_ = nullptr;
}
ESP_LOGI(TAG, "Writer task stopped");
}
void FileSystemHandler::writerTaskFunc(void* param) {
FileSystemHandler* self = static_cast<FileSystemHandler*>(param);
WriteRequest req;
while (self->writerTaskRunning_) {
if (xQueueReceive(self->writeQueue_, &req, pdMS_TO_TICKS(10)) == pdTRUE) {
if (req.data == nullptr) {
// Poison pill - exit
break;
}
self->processWriteRequest(req);
free(req.data);
}
}
vTaskDelete(nullptr);
}
void FileSystemHandler::processWriteRequest(const WriteRequest& req) {
xSemaphoreTake(uploadsMutex_, portMAX_DELAY);
auto it = uploads_.find(req.transferId);
if (it == uploads_.end()) {
xSemaphoreGive(uploadsMutex_);
return;
}
UploadState& state = it->second;
if (state.hasError) {
xSemaphoreGive(uploadsMutex_);
return;
}
size_t bytesWritten = fwrite(req.data, 1, req.size, state.file);
if (bytesWritten != req.size) {
state.hasError = true;
state.errorMessage = "Failed to write chunk";
xSemaphoreGive(uploadsMutex_);
finalizeUpload(req.transferId, false, state.errorMessage);
return;
}
state.chunksWritten++;
ESP_LOGD(TAG, "Async write chunk %u/%u: %u bytes", state.chunksWritten, state.totalChunks, bytesWritten);
// Periodic flush
if (state.chunksWritten > 0 && state.chunksWritten % 64 == 0) {
fflush(state.file);
}
bool shouldFinalize = req.isLastChunk;
xSemaphoreGive(uploadsMutex_);
if (shouldFinalize) {
finalizeUpload(req.transferId, true);
}
}
void FileSystemHandler::setSendCallbacks(SendMetadataCallback sendMetadata, SendCallback sendData,
SendCompleteCallback sendComplete,
@@ -53,29 +184,40 @@ void FileSystemHandler::cleanupExpiredTransfers() {
}
}
xSemaphoreTake(uploadsMutex_, portMAX_DELAY);
auto ulIt = uploads_.begin();
while (ulIt != uploads_.end()) {
if (now - ulIt->second.lastActivityTime > FS_TRANSFER_TIMEOUT_MS) {
if (ulIt->second.file) {
fclose(ulIt->second.file);
ulIt->second.file = nullptr;
}
remove(ulIt->second.path.c_str());
ESP_LOGW(TAG, "Upload %u timed out, deleted partial file", ulIt->first);
std::string path = ulIt->second.path;
uint32_t chunksReceived = ulIt->second.chunksReceived;
int clientId = ulIt->second.clientId;
uint32_t transferId = ulIt->first;
ulIt = uploads_.erase(ulIt);
xSemaphoreGive(uploadsMutex_);
remove(path.c_str());
ESP_LOGW(TAG, "Upload %u timed out, deleted partial file", transferId);
if (sendUploadCompleteCallback_) {
socket_message_FSUploadComplete complete = socket_message_FSUploadComplete_init_zero;
complete.transfer_id = ulIt->first;
complete.transfer_id = transferId;
complete.success = false;
strncpy(complete.error, "Transfer timed out", sizeof(complete.error) - 1);
complete.chunks_received = ulIt->second.chunksReceived;
sendUploadCompleteCallback_(complete, ulIt->second.clientId);
complete.chunks_received = chunksReceived;
sendUploadCompleteCallback_(complete, clientId);
}
ulIt = uploads_.erase(ulIt);
xSemaphoreTake(uploadsMutex_, portMAX_DELAY);
} else {
++ulIt;
}
}
xSemaphoreGive(uploadsMutex_);
}
socket_message_FSDeleteResponse FileSystemHandler::handleDelete(const socket_message_FSDeleteRequest& req) {
@@ -408,6 +550,9 @@ socket_message_FSUploadStartResponse FileSystemHandler::handleUploadStart(const
return response;
}
// Set larger buffer for better write performance
setvbuf(file, nullptr, _IOFBF, 32 * 1024);
uint32_t transferId = generateTransferId();
UploadState state;
@@ -416,12 +561,15 @@ socket_message_FSUploadStartResponse FileSystemHandler::handleUploadStart(const
state.fileSize = req.file_size;
state.totalChunks = req.total_chunks;
state.chunksReceived = 0;
state.chunksWritten = 0;
state.bytesReceived = 0;
state.lastActivityTime = esp_timer_get_time() / 1000;
state.clientId = clientId;
state.hasError = false;
xSemaphoreTake(uploadsMutex_, portMAX_DELAY);
uploads_[transferId] = state;
xSemaphoreGive(uploadsMutex_);
response.success = true;
response.transfer_id = transferId;
@@ -434,8 +582,16 @@ socket_message_FSUploadStartResponse FileSystemHandler::handleUploadStart(const
void FileSystemHandler::handleUploadData(const socket_message_FSUploadData& req) {
uint32_t transferId = req.transfer_id;
// Auto-start writer task if not running
if (!writerTaskRunning_) {
startWriterTask();
}
xSemaphoreTake(uploadsMutex_, portMAX_DELAY);
auto it = uploads_.find(transferId);
if (it == uploads_.end()) {
xSemaphoreGive(uploadsMutex_);
ESP_LOGW(TAG, "Upload data for unknown transfer: %u", transferId);
return;
}
@@ -444,6 +600,7 @@ void FileSystemHandler::handleUploadData(const socket_message_FSUploadData& req)
state.lastActivityTime = esp_timer_get_time() / 1000;
if (state.hasError) {
xSemaphoreGive(uploadsMutex_);
return;
}
@@ -454,36 +611,58 @@ void FileSystemHandler::handleUploadData(const socket_message_FSUploadData& req)
if (!req.data || req.data->size == 0) {
state.hasError = true;
state.errorMessage = "Empty or invalid data chunk";
xSemaphoreGive(uploadsMutex_);
finalizeUpload(transferId, false, state.errorMessage);
return;
}
size_t bytesWritten = fwrite(req.data->bytes, 1, req.data->size, state.file);
if (bytesWritten != req.data->size) {
// Copy data for async write
WriteRequest writeReq;
writeReq.transferId = transferId;
writeReq.size = req.data->size;
writeReq.chunkIndex = req.chunk_index;
writeReq.data = static_cast<uint8_t*>(malloc(req.data->size));
if (!writeReq.data) {
state.hasError = true;
state.errorMessage = "Failed to write chunk";
state.errorMessage = "Memory allocation failed";
xSemaphoreGive(uploadsMutex_);
finalizeUpload(transferId, false, state.errorMessage);
return;
}
memcpy(writeReq.data, req.data->bytes, req.data->size);
state.chunksReceived++;
state.bytesReceived += bytesWritten;
state.bytesReceived += req.data->size;
writeReq.isLastChunk = (state.chunksReceived >= state.totalChunks);
if (state.chunksReceived > 0 && state.chunksReceived % 64 == 0) {
ESP_LOGD(TAG, "Flushing file at chunk %u", state.chunksReceived);
fflush(state.file);
ESP_LOGD(TAG, "Queuing chunk %u/%u: %u bytes", state.chunksReceived, state.totalChunks, req.data->size);
xSemaphoreGive(uploadsMutex_);
// Check queue is valid
if (!writeQueue_) {
ESP_LOGE(TAG, "Write queue not initialized");
free(writeReq.data);
finalizeUpload(transferId, false, "Write queue not initialized");
return;
}
ESP_LOGD(TAG, "Upload chunk %u/%u: %u bytes", state.chunksReceived, state.totalChunks, bytesWritten);
if (state.chunksReceived >= state.totalChunks) {
finalizeUpload(transferId, true);
// Try to queue (non-blocking) - if full, do sync write to avoid blocking HTTP server
if (xQueueSend(writeQueue_, &writeReq, 0) != pdTRUE) {
ESP_LOGD(TAG, "Queue full, doing sync write for chunk %u", writeReq.chunkIndex);
processWriteRequest(writeReq);
free(writeReq.data);
}
}
void FileSystemHandler::finalizeUpload(uint32_t transferId, bool success, const std::string& error) {
xSemaphoreTake(uploadsMutex_, portMAX_DELAY);
auto it = uploads_.find(transferId);
if (it == uploads_.end()) {
xSemaphoreGive(uploadsMutex_);
return;
}
@@ -491,13 +670,22 @@ void FileSystemHandler::finalizeUpload(uint32_t transferId, bool success, const
if (state.file) {
fclose(state.file);
state.file = nullptr;
}
std::string path = state.path;
uint32_t bytesReceived = state.bytesReceived;
uint32_t chunksReceived = state.chunksReceived;
int clientId = state.clientId;
uploads_.erase(it);
xSemaphoreGive(uploadsMutex_);
if (!success) {
remove(state.path.c_str());
ESP_LOGW(TAG, "Upload failed, deleted partial file: %s", state.path.c_str());
remove(path.c_str());
ESP_LOGW(TAG, "Upload failed, deleted partial file: %s", path.c_str());
} else {
ESP_LOGI(TAG, "Upload completed: %s (%u bytes)", state.path.c_str(), state.bytesReceived);
ESP_LOGI(TAG, "Upload completed: %s (%u bytes)", path.c_str(), bytesReceived);
}
if (sendUploadCompleteCallback_) {
@@ -507,11 +695,9 @@ void FileSystemHandler::finalizeUpload(uint32_t transferId, bool success, const
if (!error.empty()) {
strncpy(complete.error, error.c_str(), sizeof(complete.error) - 1);
}
complete.chunks_received = state.chunksReceived;
sendUploadCompleteCallback_(complete, state.clientId);
complete.chunks_received = chunksReceived;
sendUploadCompleteCallback_(complete, clientId);
}
uploads_.erase(it);
}
socket_message_FSCancelTransferResponse FileSystemHandler::handleCancelTransfer(
@@ -531,17 +717,22 @@ socket_message_FSCancelTransferResponse FileSystemHandler::handleCancelTransfer(
return response;
}
xSemaphoreTake(uploadsMutex_, portMAX_DELAY);
auto ulIt = uploads_.find(transferId);
if (ulIt != uploads_.end()) {
if (ulIt->second.file) {
fclose(ulIt->second.file);
ulIt->second.file = nullptr;
}
remove(ulIt->second.path.c_str());
std::string path = ulIt->second.path;
uploads_.erase(ulIt);
xSemaphoreGive(uploadsMutex_);
remove(path.c_str());
response.success = true;
ESP_LOGI(TAG, "Upload cancelled: %u", transferId);
return response;
}
xSemaphoreGive(uploadsMutex_);
response.success = false;
return response;
+1 -1
View File
@@ -157,7 +157,7 @@ void setupEventSocket() {
});
wsSocket.on<socket_message_FSUploadData>(
[&](const socket_message_FSUploadData &data, int clientId) { FileSystemWS::fsHandler.handleUploadData(data); });
[&](const socket_message_FSUploadData &data, int clientId) { TIME_IT(FileSystemWS::fsHandler.handleUploadData(data), handle_upload) });
using CorrelationHandler =
std::function<void(const socket_message_CorrelationRequest &, socket_message_CorrelationResponse &, int)>;