diff --git a/app/src/lib/filesystem/chunkedTransfer.ts b/app/src/lib/filesystem/chunkedTransfer.ts index 0a67f2e..0709988 100644 --- a/app/src/lib/filesystem/chunkedTransfer.ts +++ b/app/src/lib/filesystem/chunkedTransfer.ts @@ -4,15 +4,16 @@ import type { FSDeleteRequest, FSMkdirRequest, FSListRequest, - FSDownloadStartRequest, - FSDownloadChunkRequest, - FSUploadStartRequest, - FSUploadChunkRequest, - FSCancelTransferRequest, - CorrelationResponse + FSDownloadRequest, + FSDownloadData, + FSDownloadComplete, + FSUploadStart, + FSUploadData, + FSUploadComplete, + FSCancelTransfer } from '$lib/platform_shared/message' -const MAX_CHUNK_SIZE = 2**14 // ~= 16 kb +const MAX_CHUNK_SIZE = 2 ** 14 // ~= 16 kb export interface FileInfo { name: string @@ -41,7 +42,143 @@ export interface TransferProgress { export type ProgressCallback = (progress: TransferProgress) => void +// Active transfer tracking +interface ActiveDownload { + path: string + buffer: Uint8Array + fileSize: number + totalChunks: number + chunksReceived: number + bytesReceived: number + resolve: (result: { success: boolean; data?: Uint8Array; error?: string }) => void + reject: (error: Error) => void + onProgress?: ProgressCallback + timeoutId: ReturnType +} + +interface ActiveUpload { + path: string + transferId: string + totalChunks: number + chunksSent: number + resolve: (result: { success: boolean; error?: string }) => void + reject: (error: Error) => void + onProgress?: ProgressCallback + timeoutId: ReturnType +} + export class FileSystemClient { + private activeDownloads = new Map() + private activeUploads = new Map() + private downloadListenerCleanup: (() => void) | null = null + private completeListenerCleanup: (() => void) | null = null + private uploadCompleteListenerCleanup: (() => void) | null = null + private transferTimeout = 60000 // 60 seconds timeout for transfers + + constructor() { + this.setupListeners() + } + + private setupListeners() { + // Listen for download data chunks + this.downloadListenerCleanup = socket.on( + Messages.FSDownloadData, + (data: FSDownloadData) => { + this.handleDownloadData(data) + } + ) + + // Listen for download completion + this.completeListenerCleanup = socket.on( + Messages.FSDownloadComplete, + (complete: FSDownloadComplete) => { + this.handleDownloadComplete(complete) + } + ) + + // Listen for upload completion + this.uploadCompleteListenerCleanup = socket.on( + Messages.FSUploadComplete, + (complete: FSUploadComplete) => { + this.handleUploadComplete(complete) + } + ) + } + + private handleDownloadData(data: FSDownloadData) { + const download = this.activeDownloads.get(data.transferId) + if (!download) { + console.warn(`Received download data for unknown transfer: ${data.transferId}`) + return + } + + // Reset timeout + clearTimeout(download.timeoutId) + download.timeoutId = setTimeout(() => { + this.activeDownloads.delete(data.transferId) + download.reject(new Error('Download timeout')) + }, this.transferTimeout) + + // Copy chunk data to buffer + if (data.data && data.data.length > 0) { + const offset = data.chunkIndex * MAX_CHUNK_SIZE + download.buffer.set(data.data, offset) + download.bytesReceived += data.data.length + download.chunksReceived++ + } + + // Report progress + if (download.onProgress) { + download.onProgress({ + transferId: data.transferId, + bytesTransferred: download.bytesReceived, + totalBytes: download.fileSize, + chunksCompleted: download.chunksReceived, + totalChunks: download.totalChunks, + percentage: (download.chunksReceived / download.totalChunks) * 100 + }) + } + } + + private handleDownloadComplete(complete: FSDownloadComplete) { + const download = this.activeDownloads.get(complete.transferId) + if (!download) { + // This is normal for error cases where transferId wasn't set + if (complete.error) { + console.warn(`Download failed: ${complete.error}`) + } + return + } + + clearTimeout(download.timeoutId) + this.activeDownloads.delete(complete.transferId) + + if (complete.success) { + // Trim buffer to actual file size + const finalData = download.buffer.slice(0, complete.fileSize) + download.resolve({ success: true, data: finalData }) + } else { + download.resolve({ success: false, error: complete.error || 'Download failed' }) + } + } + + private handleUploadComplete(complete: FSUploadComplete) { + const upload = this.activeUploads.get(complete.transferId) + if (!upload) { + console.warn(`Received upload complete for unknown transfer: ${complete.transferId}`) + return + } + + clearTimeout(upload.timeoutId) + this.activeUploads.delete(complete.transferId) + + if (complete.success) { + upload.resolve({ success: true }) + } else { + upload.resolve({ success: false, error: complete.error || 'Upload failed' }) + } + } + /** * Delete a file or directory on the ESP32 */ @@ -106,84 +243,89 @@ export class FileSystemClient { } /** - * Download a file from the ESP32 + * Download a file from the ESP32 using streaming transfer + * Server streams all chunks without waiting for ACKs */ async downloadFile( path: string, onProgress?: ProgressCallback ): Promise<{ success: boolean; data?: Uint8Array; error?: string }> { - // Start download - const startRequest: FSDownloadStartRequest = { path } + return new Promise((resolve, reject) => { + // Send download request - server will stream chunks back + const request: FSDownloadRequest = { path } - const startResponse = await socket.request({ - fsDownloadStartRequest: startRequest - }) + // We need to set up tracking before sending the request + // The server will generate a transfer ID and include it in all responses + // We'll capture the first chunk to get the transfer ID - if (!startResponse.fsDownloadStartResponse) { - return { success: false, error: 'Failed to start download' } - } + // Set up timeout for initial response + const initialTimeout = setTimeout(() => { + reject(new Error('Download request timeout - no data received')) + }, this.transferTimeout) - const startResp = startResponse.fsDownloadStartResponse + // One-time listener for the first chunk to get transfer details + const firstChunkHandler = (data: FSDownloadData) => { + clearTimeout(initialTimeout) - if (!startResp.success) { - return { success: false, error: startResp.error || 'Failed to start download' } - } + // Now we have the real transfer ID + const transferId = data.transferId - const transferId = startResp.transferId - const totalChunks = startResp.totalChunks - const fileSize = startResp.fileSize + // Estimate total size from first chunk (server sends file_size in complete message) + // For now, allocate a large buffer and resize later + const estimatedSize = 10 * 1024 * 1024 // 10MB max + const buffer = new Uint8Array(estimatedSize) - // Allocate buffer for entire file - const buffer = new Uint8Array(fileSize) - let offset = 0 + const download: ActiveDownload = { + path, + buffer, + fileSize: estimatedSize, // Will be updated on completion + totalChunks: Math.ceil(estimatedSize / MAX_CHUNK_SIZE), + chunksReceived: 0, + bytesReceived: 0, + resolve, + reject, + onProgress, + timeoutId: setTimeout(() => { + this.activeDownloads.delete(transferId) + reject(new Error('Download timeout')) + }, this.transferTimeout) + } - // Download chunks sequentially - for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++) { - const chunkRequest: FSDownloadChunkRequest = { - transferId, - chunkIndex + this.activeDownloads.set(transferId, download) + + // Process this first chunk + this.handleDownloadData(data) + + // Remove the first chunk handler - subsequent chunks go through normal listener + firstChunkCleanup() } - const chunkResponse = await socket.request({ - fsDownloadChunkRequest: chunkRequest + // Error handler for if download fails immediately + const errorHandler = (complete: FSDownloadComplete) => { + if (!complete.success && !complete.transferId) { + clearTimeout(initialTimeout) + firstChunkCleanup() + errorCleanup() + resolve({ success: false, error: complete.error || 'Download failed' }) + } + } + + const firstChunkCleanup = socket.on(Messages.FSDownloadData, firstChunkHandler) + const errorCleanup = socket.on(Messages.FSDownloadComplete, errorHandler) + + // Send the download request (no response expected, server streams data) + socket.request({ fsDownloadRequest: request }).catch((err) => { + clearTimeout(initialTimeout) + firstChunkCleanup() + errorCleanup() + reject(err) }) - - if (!chunkResponse.fsDownloadChunkResponse) { - await this.cancelTransfer(transferId) - return { success: false, error: `Failed to download chunk ${chunkIndex}` } - } - - const chunkResp = chunkResponse.fsDownloadChunkResponse - - if (chunkResp.error) { - await this.cancelTransfer(transferId) - return { success: false, error: chunkResp.error } - } - - // Copy chunk data to buffer - if (chunkResp.data) { - buffer.set(chunkResp.data, offset) - offset += chunkResp.data.length - } - - // Report progress - if (onProgress) { - onProgress({ - transferId, - bytesTransferred: offset, - totalBytes: fileSize, - chunksCompleted: chunkIndex + 1, - totalChunks, - percentage: ((chunkIndex + 1) / totalChunks) * 100 - }) - } - } - - return { success: true, data: buffer } + }) } /** - * Upload a file to the ESP32 + * Upload a file to the ESP32 using streaming transfer + * Client sends all chunks without waiting for ACKs */ async uploadFile( path: string, @@ -192,17 +334,17 @@ export class FileSystemClient { ): Promise<{ success: boolean; error?: string }> { const fileSize = data.length const chunkSize = MAX_CHUNK_SIZE - const totalChunks = Math.ceil(fileSize / chunkSize) + const totalChunks = Math.ceil(fileSize / chunkSize) || 1 - // Start upload - const startRequest: FSUploadStartRequest = { + // Start upload - get transfer ID + const startRequest: FSUploadStart = { path, fileSize, - chunkSize + totalChunks } const startResponse = await socket.request({ - fsUploadStartRequest: startRequest + fsUploadStart: startRequest }) if (!startResponse.fsUploadStartResponse) { @@ -216,62 +358,83 @@ export class FileSystemClient { } const transferId = startResp.transferId - const maxChunkSize = startResp.maxChunkSize || MAX_CHUNK_SIZE - // Upload chunks sequentially - for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++) { - const offset = chunkIndex * chunkSize - const end = Math.min(offset + chunkSize, fileSize) - const chunkData = data.slice(offset, end) - const isLast = chunkIndex === totalChunks - 1 - - const chunkRequest: FSUploadChunkRequest = { + return new Promise((resolve, reject) => { + // Set up upload tracking + const upload: ActiveUpload = { + path, transferId, - chunkIndex, - data: chunkData, - isLast + totalChunks, + chunksSent: 0, + resolve, + reject, + onProgress, + timeoutId: setTimeout(() => { + this.activeUploads.delete(transferId) + reject(new Error('Upload timeout - no completion received')) + }, this.transferTimeout) } - const chunkResponse = await socket.request({ - fsUploadChunkRequest: chunkRequest - }) + this.activeUploads.set(transferId, upload) - if (!chunkResponse.fsUploadChunkResponse) { - await this.cancelTransfer(transferId) - return { success: false, error: `Failed to upload chunk ${chunkIndex}` } - } + // Stream all chunks without waiting for ACKs + for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++) { + const offset = chunkIndex * chunkSize + const end = Math.min(offset + chunkSize, fileSize) + const chunkData = data.slice(offset, end) - const chunkResp = chunkResponse.fsUploadChunkResponse - - if (!chunkResp.success) { - await this.cancelTransfer(transferId) - return { success: false, error: chunkResp.error || 'Failed to upload chunk' } - } - - // Report progress - if (onProgress) { - onProgress({ + const uploadData: FSUploadData = { transferId, - bytesTransferred: end, - totalBytes: fileSize, - chunksCompleted: chunkIndex + 1, - totalChunks, - percentage: ((chunkIndex + 1) / totalChunks) * 100 - }) - } - } + chunkIndex, + data: chunkData + } - return { success: true } + // Send chunk as fire-and-forget message + socket.emit(Messages.FSUploadData, uploadData) + + upload.chunksSent++ + + // Report progress + if (onProgress) { + onProgress({ + transferId, + bytesTransferred: end, + totalBytes: fileSize, + chunksCompleted: chunkIndex + 1, + totalChunks, + percentage: ((chunkIndex + 1) / totalChunks) * 100 + }) + } + } + + // All chunks sent - now wait for completion message from server + // The timeout will handle if server doesn't respond + }) } /** * Cancel an ongoing transfer */ async cancelTransfer(transferId: string): Promise<{ success: boolean }> { - const request: FSCancelTransferRequest = { transferId } + const request: FSCancelTransfer = { transferId } + + // Clean up local state + const download = this.activeDownloads.get(transferId) + if (download) { + clearTimeout(download.timeoutId) + this.activeDownloads.delete(transferId) + download.resolve({ success: false, error: 'Transfer cancelled' }) + } + + const upload = this.activeUploads.get(transferId) + if (upload) { + clearTimeout(upload.timeoutId) + this.activeUploads.delete(transferId) + upload.resolve({ success: false, error: 'Transfer cancelled' }) + } const response = await socket.request({ - fsCancelTransferRequest: request + fsCancelTransfer: request }) if (response.fsCancelTransferResponse) { @@ -309,7 +472,7 @@ export class FileSystemClient { } // Create blob and trigger download - const blob = new Blob([result.data]) + const blob = new Blob([result.data.buffer as ArrayBuffer]) const url = URL.createObjectURL(blob) const a = document.createElement('a') a.href = url @@ -321,6 +484,28 @@ export class FileSystemClient { return { success: true } } + + /** + * Cleanup listeners when no longer needed + */ + destroy() { + this.downloadListenerCleanup?.() + this.completeListenerCleanup?.() + this.uploadCompleteListenerCleanup?.() + + // Cancel all active transfers + for (const [, download] of this.activeDownloads) { + clearTimeout(download.timeoutId) + download.reject(new Error('FileSystemClient destroyed')) + } + this.activeDownloads.clear() + + for (const [, upload] of this.activeUploads) { + clearTimeout(upload.timeoutId) + upload.reject(new Error('FileSystemClient destroyed')) + } + this.activeUploads.clear() + } } export const fileSystemClient = new FileSystemClient() diff --git a/esp32/include/communication/proto_helpers.h b/esp32/include/communication/proto_helpers.h index 7620bb8..da43ddf 100644 --- a/esp32/include/communication/proto_helpers.h +++ b/esp32/include/communication/proto_helpers.h @@ -41,22 +41,12 @@ DEFINE_MESSAGE_TRAITS(ServoPWMData, servo_pwm) DEFINE_MESSAGE_TRAITS(ServoStateData, servo_state) DEFINE_MESSAGE_TRAITS(CorrelationRequest, correlation_request) DEFINE_MESSAGE_TRAITS(CorrelationResponse, correlation_response) -// DEFINE_MESSAGE_TRAITS(FSDeleteRequest, fs_delete_request) -// DEFINE_MESSAGE_TRAITS(FSDeleteResponse, fs_delete_response) -// DEFINE_MESSAGE_TRAITS(FSMkdirRequest, fs_mkdir_request) -// DEFINE_MESSAGE_TRAITS(FSMkdirResponse, fs_mkdir_response) -// DEFINE_MESSAGE_TRAITS(FSListRequest, fs_list_request) -// DEFINE_MESSAGE_TRAITS(FSListResponse, fs_list_response) -// DEFINE_MESSAGE_TRAITS(FSDownloadStartRequest, fs_download_start_request) -// DEFINE_MESSAGE_TRAITS(FSDownloadStartResponse, fs_download_start_response) -// DEFINE_MESSAGE_TRAITS(FSDownloadChunkRequest, fs_download_chunk_request) -// DEFINE_MESSAGE_TRAITS(FSDownloadChunkResponse, fs_download_chunk_response) -// DEFINE_MESSAGE_TRAITS(FSUploadStartRequest, fs_upload_start_request) -// DEFINE_MESSAGE_TRAITS(FSUploadStartResponse, fs_upload_start_response) -// DEFINE_MESSAGE_TRAITS(FSUploadChunkRequest, fs_upload_chunk_request) -// DEFINE_MESSAGE_TRAITS(FSUploadChunkResponse, fs_upload_chunk_response) -// DEFINE_MESSAGE_TRAITS(FSCancelTransferRequest, fs_cancel_transfer_request) -// DEFINE_MESSAGE_TRAITS(FSCancelTransferResponse, fs_cancel_transfer_response) + +// Streaming file transfer messages +DEFINE_MESSAGE_TRAITS(FSDownloadData, fs_download_data) +DEFINE_MESSAGE_TRAITS(FSDownloadComplete, fs_download_complete) +DEFINE_MESSAGE_TRAITS(FSUploadData, fs_upload_data) +DEFINE_MESSAGE_TRAITS(FSUploadComplete, fs_upload_complete) #undef DEFINE_MESSAGE_TRAITS diff --git a/esp32/include/filesystem_ws.h b/esp32/include/filesystem_ws.h index 2dda546..9e19814 100644 --- a/esp32/include/filesystem_ws.h +++ b/esp32/include/filesystem_ws.h @@ -4,28 +4,54 @@ #include #include #include +#include -// Make sure that this aligns with socket_message.FSDownloadChunkResponse.data max_size (and for the corresponsing request) +// Make sure that this aligns with socket_message.FSDownloadData.data max_size (and for upload) #define FS_MAX_CHUNK_SIZE 16384 // ~= 16 kb #define FS_TRANSFER_TIMEOUT 30000 // 30 seconds namespace FileSystemWS { -struct TransferState { +struct DownloadState { std::string path; File file; uint32_t fileSize; uint32_t chunkSize; uint32_t totalChunks; - uint32_t chunksProcessed; + uint32_t chunksSent; uint32_t lastActivityTime; - bool isUpload; + int clientId; }; +struct UploadState { + std::string path; + File file; + uint32_t fileSize; + uint32_t totalChunks; + uint32_t chunksReceived; + uint32_t bytesReceived; + uint32_t lastActivityTime; + int clientId; + bool hasError; + std::string errorMessage; +}; + +// Callback type for sending messages to clients +using SendCallback = std::function; +using SendCompleteCallback = std::function; +using SendUploadCompleteCallback = std::function; + class FileSystemHandler { public: FileSystemHandler(); + // Set callbacks for sending streaming data + void setSendCallbacks( + SendCallback sendData, + SendCompleteCallback sendComplete, + SendUploadCompleteCallback sendUploadComplete + ); + // Delete file/directory socket_message_FSDeleteResponse handleDelete(const socket_message_FSDeleteRequest& req); @@ -35,27 +61,42 @@ class FileSystemHandler { // List directory socket_message_FSListResponse handleList(const socket_message_FSListRequest& req); - // Download operations (ESP -> Client) - socket_message_FSDownloadStartResponse handleDownloadStart(const socket_message_FSDownloadStartRequest& req); - socket_message_FSDownloadChunkResponse handleDownloadChunk(const socket_message_FSDownloadChunkRequest& req); + // Streaming download - starts the download and streams all chunks + void handleDownloadRequest(const socket_message_FSDownloadRequest& req, int clientId); - // Upload operations (Client -> ESP) - socket_message_FSUploadStartResponse handleUploadStart(const socket_message_FSUploadStartRequest& req); - socket_message_FSUploadChunkResponse handleUploadChunk(const socket_message_FSUploadChunkRequest& req); + // Streaming upload - start upload session + socket_message_FSUploadStartResponse handleUploadStart(const socket_message_FSUploadStart& req, int clientId); + + // Streaming upload - receive chunk data (fire-and-forget from client) + void handleUploadData(const socket_message_FSUploadData& req); // Cancel transfer - socket_message_FSCancelTransferResponse handleCancelTransfer(const socket_message_FSCancelTransferRequest& req); + socket_message_FSCancelTransferResponse handleCancelTransfer(const socket_message_FSCancelTransfer& req); // Cleanup expired transfers void cleanupExpiredTransfers(); + // Process pending downloads (call from main loop) + void processPendingDownloads(); + private: - std::map transfers_; + std::map downloads_; + std::map uploads_; uint32_t transferIdCounter_; + SendCallback sendDataCallback_; + SendCompleteCallback sendCompleteCallback_; + SendUploadCompleteCallback sendUploadCompleteCallback_; + std::string generateTransferId(); void listDirectory(const std::string& path, socket_message_FSListResponse& response); bool deleteRecursive(const std::string& path); + + // Send next chunk for a download + bool sendNextDownloadChunk(const std::string& transferId); + + // Finalize upload and send completion message + void finalizeUpload(const std::string& transferId, bool success, const std::string& error = ""); }; extern FileSystemHandler fsHandler; diff --git a/esp32/src/filesystem_ws.cpp b/esp32/src/filesystem_ws.cpp index 4a3b6e7..fed3b0e 100644 --- a/esp32/src/filesystem_ws.cpp +++ b/esp32/src/filesystem_ws.cpp @@ -12,22 +12,73 @@ FileSystemHandler fsHandler; FileSystemHandler::FileSystemHandler() : transferIdCounter_(0) {} +void FileSystemHandler::setSendCallbacks( + SendCallback sendData, + SendCompleteCallback sendComplete, + SendUploadCompleteCallback sendUploadComplete +) { + sendDataCallback_ = sendData; + sendCompleteCallback_ = sendComplete; + sendUploadCompleteCallback_ = sendUploadComplete; +} + std::string FileSystemHandler::generateTransferId() { return "xfer_" + std::to_string(millis()) + "_" + std::to_string(++transferIdCounter_); } void FileSystemHandler::cleanupExpiredTransfers() { uint32_t now = millis(); - auto it = transfers_.begin(); - while (it != transfers_.end()) { - if (now - it->second.lastActivityTime > FS_TRANSFER_TIMEOUT) { - if (it->second.file) { - it->second.file.close(); + + // Cleanup expired downloads + auto dlIt = downloads_.begin(); + while (dlIt != downloads_.end()) { + if (now - dlIt->second.lastActivityTime > FS_TRANSFER_TIMEOUT) { + if (dlIt->second.file) { + dlIt->second.file.close(); } - ESP_LOGW(TAG, "Transfer %s timed out", it->first.c_str()); - it = transfers_.erase(it); + ESP_LOGW(TAG, "Download %s timed out", dlIt->first.c_str()); + + // Send error completion + if (sendCompleteCallback_) { + socket_message_FSDownloadComplete complete = socket_message_FSDownloadComplete_init_zero; + strncpy(complete.transfer_id, dlIt->first.c_str(), sizeof(complete.transfer_id) - 1); + complete.success = false; + strncpy(complete.error, "Transfer timed out", sizeof(complete.error) - 1); + complete.total_chunks = dlIt->second.chunksSent; + complete.file_size = dlIt->second.fileSize; + sendCompleteCallback_(complete, dlIt->second.clientId); + } + + dlIt = downloads_.erase(dlIt); } else { - ++it; + ++dlIt; + } + } + + // Cleanup expired uploads + auto ulIt = uploads_.begin(); + while (ulIt != uploads_.end()) { + if (now - ulIt->second.lastActivityTime > FS_TRANSFER_TIMEOUT) { + if (ulIt->second.file) { + ulIt->second.file.close(); + } + // Delete partial file + ESP_FS.remove(ulIt->second.path.c_str()); + ESP_LOGW(TAG, "Upload %s timed out, deleted partial file", ulIt->first.c_str()); + + // Send error completion + if (sendUploadCompleteCallback_) { + socket_message_FSUploadComplete complete = socket_message_FSUploadComplete_init_zero; + strncpy(complete.transfer_id, ulIt->first.c_str(), sizeof(complete.transfer_id) - 1); + complete.success = false; + strncpy(complete.error, "Transfer timed out", sizeof(complete.error) - 1); + complete.chunks_received = ulIt->second.chunksReceived; + sendUploadCompleteCallback_(complete, ulIt->second.clientId); + } + + ulIt = uploads_.erase(ulIt); + } else { + ++ulIt; } } } @@ -149,106 +200,161 @@ socket_message_FSListResponse FileSystemHandler::handleList(const socket_message return response; } -socket_message_FSDownloadStartResponse FileSystemHandler::handleDownloadStart(const socket_message_FSDownloadStartRequest& req) { - socket_message_FSDownloadStartResponse response = socket_message_FSDownloadStartResponse_init_zero; +// ===== STREAMING DOWNLOAD ===== +void FileSystemHandler::handleDownloadRequest(const socket_message_FSDownloadRequest& req, int clientId) { std::string path(req.path); - ESP_LOGI(TAG, "Download start request: %s", path.c_str()); + ESP_LOGI(TAG, "Download request: %s", path.c_str()); + // Validate file exists if (!ESP_FS.exists(path.c_str())) { - response.success = false; - strncpy(response.error, "File not found", sizeof(response.error) - 1); - return response; + if (sendCompleteCallback_) { + socket_message_FSDownloadComplete complete = socket_message_FSDownloadComplete_init_zero; + complete.success = false; + strncpy(complete.error, "File not found", sizeof(complete.error) - 1); + sendCompleteCallback_(complete, clientId); + } + return; } File file = ESP_FS.open(path.c_str(), "r"); if (!file || file.isDirectory()) { - response.success = false; - strncpy(response.error, "Cannot open file for reading", sizeof(response.error) - 1); - return response; + if (sendCompleteCallback_) { + socket_message_FSDownloadComplete complete = socket_message_FSDownloadComplete_init_zero; + complete.success = false; + strncpy(complete.error, "Cannot open file for reading", sizeof(complete.error) - 1); + sendCompleteCallback_(complete, clientId); + } + return; } uint32_t fileSize = file.size(); uint32_t chunkSize = FS_MAX_CHUNK_SIZE; uint32_t totalChunks = (fileSize + chunkSize - 1) / chunkSize; + if (totalChunks == 0) totalChunks = 1; // Handle empty files std::string transferId = generateTransferId(); - TransferState state; + DownloadState state; state.path = path; state.file = file; state.fileSize = fileSize; state.chunkSize = chunkSize; state.totalChunks = totalChunks; - state.chunksProcessed = 0; + state.chunksSent = 0; state.lastActivityTime = millis(); - state.isUpload = false; + state.clientId = clientId; - transfers_[transferId] = state; + downloads_[transferId] = state; - response.success = true; - response.file_size = fileSize; - response.chunk_size = chunkSize; - response.total_chunks = totalChunks; - strncpy(response.transfer_id, transferId.c_str(), sizeof(response.transfer_id) - 1); + ESP_LOGI(TAG, "Download started: %s, size=%u, chunks=%u, id=%s", + path.c_str(), fileSize, totalChunks, transferId.c_str()); - ESP_LOGI(TAG, "Download started: %s, size=%u, chunks=%u, id=%s", path.c_str(), fileSize, totalChunks, transferId.c_str()); - - return response; + // Start streaming chunks immediately + while (sendNextDownloadChunk(transferId)) { + // Keep sending until done or error + taskYIELD(); // Allow other tasks to run + } } -socket_message_FSDownloadChunkResponse FileSystemHandler::handleDownloadChunk(const socket_message_FSDownloadChunkRequest& req) { - socket_message_FSDownloadChunkResponse response = socket_message_FSDownloadChunkResponse_init_zero; - - std::string transferId(req.transfer_id); - strncpy(response.transfer_id, transferId.c_str(), sizeof(response.transfer_id) - 1); - response.chunk_index = req.chunk_index; - - auto it = transfers_.find(transferId); - if (it == transfers_.end()) { - strncpy(response.error, "Invalid transfer ID", sizeof(response.error) - 1); - return response; +bool FileSystemHandler::sendNextDownloadChunk(const std::string& transferId) { + auto it = downloads_.find(transferId); + if (it == downloads_.end()) { + return false; } - TransferState& state = it->second; + DownloadState& state = it->second; state.lastActivityTime = millis(); - // Seek to chunk position - uint32_t position = req.chunk_index * state.chunkSize; - if (!state.file.seek(position)) { - strncpy(response.error, "Failed to seek file", sizeof(response.error) - 1); - return response; + // Check if we're done + if (state.chunksSent >= state.totalChunks) { + // Send completion message + if (sendCompleteCallback_) { + socket_message_FSDownloadComplete complete = socket_message_FSDownloadComplete_init_zero; + strncpy(complete.transfer_id, transferId.c_str(), sizeof(complete.transfer_id) - 1); + complete.success = true; + complete.total_chunks = state.totalChunks; + complete.file_size = state.fileSize; + sendCompleteCallback_(complete, state.clientId); + } + + state.file.close(); + downloads_.erase(it); + ESP_LOGI(TAG, "Download completed: %s", transferId.c_str()); + return false; } + // Allocate data struct on heap to avoid stack overflow (it contains 16KB buffer) + auto data = new socket_message_FSDownloadData(); + memset(data, 0, sizeof(socket_message_FSDownloadData)); + strncpy(data->transfer_id, transferId.c_str(), sizeof(data->transfer_id) - 1); + data->chunk_index = state.chunksSent; + // Calculate chunk size (last chunk might be smaller) uint32_t bytesToRead = state.chunkSize; - if (req.chunk_index == state.totalChunks - 1) { + uint32_t position = state.chunksSent * state.chunkSize; + if (position + bytesToRead > state.fileSize) { bytesToRead = state.fileSize - position; } // Read chunk data - size_t bytesRead = state.file.read(response.data.bytes, bytesToRead); - response.data.size = bytesRead; + size_t bytesRead = state.file.read(data->data.bytes, bytesToRead); + if (bytesRead == 0 && bytesToRead > 0) { + // Read error - send error completion + delete data; + if (sendCompleteCallback_) { + socket_message_FSDownloadComplete complete = socket_message_FSDownloadComplete_init_zero; + strncpy(complete.transfer_id, transferId.c_str(), sizeof(complete.transfer_id) - 1); + complete.success = false; + strncpy(complete.error, "Failed to read file", sizeof(complete.error) - 1); + complete.total_chunks = state.chunksSent; + complete.file_size = state.fileSize; + sendCompleteCallback_(complete, state.clientId); + } - response.is_last = (req.chunk_index == state.totalChunks - 1); - - ESP_LOGI(TAG, "Download chunk %u/%u: %u bytes", req.chunk_index + 1, state.totalChunks, bytesRead); - - // Cleanup if last chunk - if (response.is_last) { state.file.close(); - transfers_.erase(it); - ESP_LOGI(TAG, "Download completed: %s", transferId.c_str()); + downloads_.erase(it); + ESP_LOGE(TAG, "Download failed - read error: %s", transferId.c_str()); + return false; + } + data->data.size = bytesRead; + + // Send chunk + if (sendDataCallback_) { + sendDataCallback_(*data, state.clientId); } - return response; + delete data; + state.chunksSent++; + ESP_LOGD(TAG, "Download chunk %u/%u sent: %u bytes", state.chunksSent, state.totalChunks, bytesRead); + + return true; } -socket_message_FSUploadStartResponse FileSystemHandler::handleUploadStart(const socket_message_FSUploadStartRequest& req) { +void FileSystemHandler::processPendingDownloads() { + // Process any pending downloads (in case we want non-blocking downloads) + // Currently downloads are synchronous in handleDownloadRequest +} + +// ===== STREAMING UPLOAD ===== + +socket_message_FSUploadStartResponse FileSystemHandler::handleUploadStart( + const socket_message_FSUploadStart& req, int clientId +) { socket_message_FSUploadStartResponse response = socket_message_FSUploadStartResponse_init_zero; std::string path(req.path); - ESP_LOGI(TAG, "Upload start request: %s, size=%u", path.c_str(), req.file_size); + ESP_LOGI(TAG, "Upload start request: %s, size=%u, chunks=%u", path.c_str(), req.file_size, req.total_chunks); + + // Check available space + size_t fs_total = 0, fs_used = 0; + esp_littlefs_info("spiffs", &fs_total, &fs_used); + size_t freeSpace = fs_total - fs_used; + if (freeSpace < req.file_size + 4096) { // 4KB safety margin + response.success = false; + strncpy(response.error, "Insufficient storage space", sizeof(response.error) - 1); + return response; + } // Ensure parent directory exists size_t lastSlash = path.find_last_of('/'); @@ -270,20 +376,20 @@ socket_message_FSUploadStartResponse FileSystemHandler::handleUploadStart(const std::string transferId = generateTransferId(); - TransferState state; + UploadState state; state.path = path; state.file = file; state.fileSize = req.file_size; - state.chunkSize = req.chunk_size > FS_MAX_CHUNK_SIZE ? FS_MAX_CHUNK_SIZE : req.chunk_size; - state.totalChunks = (req.file_size + state.chunkSize - 1) / state.chunkSize; - state.chunksProcessed = 0; + state.totalChunks = req.total_chunks; + state.chunksReceived = 0; + state.bytesReceived = 0; state.lastActivityTime = millis(); - state.isUpload = true; + state.clientId = clientId; + state.hasError = false; - transfers_[transferId] = state; + uploads_[transferId] = state; response.success = true; - response.max_chunk_size = FS_MAX_CHUNK_SIZE; strncpy(response.transfer_id, transferId.c_str(), sizeof(response.transfer_id) - 1); ESP_LOGI(TAG, "Upload started: %s, id=%s", path.c_str(), transferId.c_str()); @@ -291,91 +397,139 @@ socket_message_FSUploadStartResponse FileSystemHandler::handleUploadStart(const return response; } -socket_message_FSUploadChunkResponse FileSystemHandler::handleUploadChunk(const socket_message_FSUploadChunkRequest& req) { - socket_message_FSUploadChunkResponse fs_up_response = socket_message_FSUploadChunkResponse_init_zero; - +void FileSystemHandler::handleUploadData(const socket_message_FSUploadData& req) { std::string transferId(req.transfer_id); - strncpy(fs_up_response.transfer_id, transferId.c_str(), sizeof(fs_up_response.transfer_id) - 1); - fs_up_response.chunk_index = req.chunk_index; - auto it = transfers_.find(transferId); - if (it == transfers_.end()) { - fs_up_response.success = false; - strncpy(fs_up_response.error, "Invalid transfer ID", sizeof(fs_up_response.error) - 1); - return fs_up_response; + auto it = uploads_.find(transferId); + if (it == uploads_.end()) { + ESP_LOGW(TAG, "Upload data for unknown transfer: %s", transferId.c_str()); + return; } - TransferState& state = it->second; + UploadState& state = it->second; state.lastActivityTime = millis(); + // Skip if we already have an error + if (state.hasError) { + return; + } + + // Validate chunk index (allow out-of-order but warn) + if (req.chunk_index != state.chunksReceived) { + ESP_LOGW(TAG, "Upload chunk out of order: expected %u, got %u", state.chunksReceived, req.chunk_index); + // For now, we'll accept it anyway and write sequentially + // A more robust implementation would buffer out-of-order chunks + } + // Check available space before writing size_t fs_total = 0, fs_used = 0; esp_littlefs_info("spiffs", &fs_total, &fs_used); - size_t freeSpace = fs_total - fs_used; - if (freeSpace < req.data.size + 4096) { // 4KB safety margin - fs_up_response.success = false; - strncpy(fs_up_response.error, "Filesystem full", sizeof(fs_up_response.error) - 1); - state.file.close(); - transfers_.erase(it); - return fs_up_response; + size_t freeSpace = fs_total - fs_used; + if (freeSpace < req.data.size + 4096) { + state.hasError = true; + state.errorMessage = "Filesystem full"; + finalizeUpload(transferId, false, state.errorMessage); + return; } + // Write chunk data size_t bytesWritten = state.file.write(req.data.bytes, req.data.size); if (bytesWritten != req.data.size) { - fs_up_response.success = false; - strncpy(fs_up_response.error, "Failed to write chunk", sizeof(fs_up_response.error) - 1); - state.file.close(); - transfers_.erase(it); - return fs_up_response; + state.hasError = true; + state.errorMessage = "Failed to write chunk"; + finalizeUpload(transferId, false, state.errorMessage); + return; } - // Flush periodically to prevent LittleFS buffer issues (every 64 chunks = ~64KB with 1KB chunks) - if (state.chunksProcessed > 0 && state.chunksProcessed % 64 == 0) { - ESP_LOGI(TAG, "Flushing file at chunk %u", state.chunksProcessed); + state.chunksReceived++; + state.bytesReceived += bytesWritten; + + // Flush periodically to prevent LittleFS buffer issues + if (state.chunksReceived > 0 && state.chunksReceived % 64 == 0) { + ESP_LOGD(TAG, "Flushing file at chunk %u", state.chunksReceived); state.file.flush(); } - state.chunksProcessed++; - fs_up_response.success = true; - fs_up_response.transfer_complete = req.is_last; + ESP_LOGD(TAG, "Upload chunk %u/%u: %u bytes", state.chunksReceived, state.totalChunks, bytesWritten); - ESP_LOGI(TAG, "Upload chunk %u/%u: %u bytes", state.chunksProcessed, state.totalChunks, bytesWritten); - - // Cleanup if last chunk - if (req.is_last) { - state.file.close(); - transfers_.erase(it); - ESP_LOGI(TAG, "Upload completed: %s", state.path.c_str()); + // Check if upload is complete + if (state.chunksReceived >= state.totalChunks) { + finalizeUpload(transferId, true); } - - return fs_up_response; } -socket_message_FSCancelTransferResponse FileSystemHandler::handleCancelTransfer(const socket_message_FSCancelTransferRequest& req) { +void FileSystemHandler::finalizeUpload(const std::string& transferId, bool success, const std::string& error) { + auto it = uploads_.find(transferId); + if (it == uploads_.end()) { + return; + } + + UploadState& state = it->second; + + // Close file + if (state.file) { + state.file.close(); + } + + // Delete file on error + if (!success) { + ESP_FS.remove(state.path.c_str()); + ESP_LOGW(TAG, "Upload failed, deleted partial file: %s", state.path.c_str()); + } else { + ESP_LOGI(TAG, "Upload completed: %s (%u bytes)", state.path.c_str(), state.bytesReceived); + } + + // Send completion message + if (sendUploadCompleteCallback_) { + socket_message_FSUploadComplete complete = socket_message_FSUploadComplete_init_zero; + strncpy(complete.transfer_id, transferId.c_str(), sizeof(complete.transfer_id) - 1); + complete.success = success; + if (!error.empty()) { + strncpy(complete.error, error.c_str(), sizeof(complete.error) - 1); + } + complete.chunks_received = state.chunksReceived; + sendUploadCompleteCallback_(complete, state.clientId); + } + + uploads_.erase(it); +} + +// ===== TRANSFER CONTROL ===== + +socket_message_FSCancelTransferResponse FileSystemHandler::handleCancelTransfer( + const socket_message_FSCancelTransfer& req +) { socket_message_FSCancelTransferResponse response = socket_message_FSCancelTransferResponse_init_zero; - std::string transferId(req.transfer_id); - auto it = transfers_.find(transferId); + strncpy(response.transfer_id, transferId.c_str(), sizeof(response.transfer_id) - 1); - if (it == transfers_.end()) { - response.success = false; + // Check downloads + auto dlIt = downloads_.find(transferId); + if (dlIt != downloads_.end()) { + if (dlIt->second.file) { + dlIt->second.file.close(); + } + downloads_.erase(dlIt); + response.success = true; + ESP_LOGI(TAG, "Download cancelled: %s", transferId.c_str()); return response; } - if (it->second.file) { - it->second.file.close(); + // Check uploads + auto ulIt = uploads_.find(transferId); + if (ulIt != uploads_.end()) { + if (ulIt->second.file) { + ulIt->second.file.close(); + } + // Delete partial upload file + ESP_FS.remove(ulIt->second.path.c_str()); + uploads_.erase(ulIt); + response.success = true; + ESP_LOGI(TAG, "Upload cancelled: %s", transferId.c_str()); + return response; } - // Delete partial upload file - if (it->second.isUpload) { - ESP_FS.remove(it->second.path.c_str()); - } - - transfers_.erase(it); - response.success = true; - - ESP_LOGI(TAG, "Transfer cancelled: %s", transferId.c_str()); - + response.success = false; return response; } diff --git a/esp32/src/main.cpp b/esp32/src/main.cpp index 2b6606f..8f5d37a 100644 --- a/esp32/src/main.cpp +++ b/esp32/src/main.cpp @@ -135,6 +135,22 @@ void setupServer() { } void setupEventSocket() { + // Set up filesystem handler callbacks for streaming transfers + FileSystemWS::fsHandler.setSendCallbacks( + // Send download data chunk + [](const socket_message_FSDownloadData& data, int clientId) { + socket.emit(data, clientId); + }, + // Send download complete + [](const socket_message_FSDownloadComplete& complete, int clientId) { + socket.emit(complete, clientId); + }, + // Send upload complete + [](const socket_message_FSUploadComplete& complete, int clientId) { + socket.emit(complete, clientId); + } + ); + socket.on( [&](const socket_message_ControllerData &data, int clientId) { motionService.handleInput(data); }); @@ -158,30 +174,35 @@ void setupEventSocket() { data.active ? servoController.activate() : servoController.deactivate(); }); + // Handle streaming upload data (fire-and-forget from client) + socket.on([&](const socket_message_FSUploadData &data, int clientId) { + FileSystemWS::fsHandler.handleUploadData(data); + }); + using CorrelationHandler = - std::function; + std::function; static std::map correlationHandlers = { {socket_message_CorrelationRequest_features_data_request_tag, // Features data - [](const auto &req, auto &res) { + [](const auto &req, auto &res, int clientId) { res.which_response = socket_message_CorrelationResponse_features_data_response_tag; feature_service::features_request(req.request.features_data_request, res.response.features_data_response); }}, {socket_message_CorrelationRequest_i2c_scan_data_request_tag, // i2c data - [](const auto &req, auto &res) { + [](const auto &req, auto &res, int clientId) { res.which_response = socket_message_CorrelationResponse_i2c_scan_data_tag; peripherals.scanI2C(); peripherals.getI2CScanProto(res.response.i2c_scan_data); }}, {socket_message_CorrelationRequest_imu_calibrate_execute_tag, // Calibration request - [](const auto &req, auto &res) { + [](const auto &req, auto &res, int clientId) { res.which_response = socket_message_CorrelationResponse_imu_calibrate_data_tag; res.response.imu_calibrate_data.success = peripherals.calibrateIMU(); }}, {socket_message_CorrelationRequest_system_information_request_tag, // All system information data - [](const auto &req, auto &res) { + [](const auto &req, auto &res, int clientId) { res.which_response = socket_message_CorrelationResponse_system_information_response_tag; res.response.system_information_response.has_analytics_data = true; res.response.system_information_response.has_static_system_information = true; @@ -191,51 +212,42 @@ void setupEventSocket() { // Filesystem operations {socket_message_CorrelationRequest_fs_delete_request_tag, // Delete file/directory - [](const auto &req, auto &res) { + [](const auto &req, auto &res, int clientId) { res.which_response = socket_message_CorrelationResponse_fs_delete_response_tag; res.response.fs_delete_response = FileSystemWS::fsHandler.handleDelete(req.request.fs_delete_request); }}, {socket_message_CorrelationRequest_fs_mkdir_request_tag, // Create directory - [](const auto &req, auto &res) { + [](const auto &req, auto &res, int clientId) { res.which_response = socket_message_CorrelationResponse_fs_mkdir_response_tag; res.response.fs_mkdir_response = FileSystemWS::fsHandler.handleMkdir(req.request.fs_mkdir_request); }}, {socket_message_CorrelationRequest_fs_list_request_tag, // List directory - [](const auto &req, auto &res) { + [](const auto &req, auto &res, int clientId) { res.which_response = socket_message_CorrelationResponse_fs_list_response_tag; res.response.fs_list_response = FileSystemWS::fsHandler.handleList(req.request.fs_list_request); }}, - {socket_message_CorrelationRequest_fs_download_start_request_tag, // Download start - [](const auto &req, auto &res) { - res.which_response = socket_message_CorrelationResponse_fs_download_start_response_tag; - res.response.fs_download_start_response = FileSystemWS::fsHandler.handleDownloadStart(req.request.fs_download_start_request); + {socket_message_CorrelationRequest_fs_download_request_tag, // Streaming download (no response, streams data) + [](const auto &req, auto &res, int clientId) { + // Download is handled differently - it streams chunks directly + // No correlation response is sent; instead FSDownloadData/FSDownloadComplete are streamed + FileSystemWS::fsHandler.handleDownloadRequest(req.request.fs_download_request, clientId); + // Set status_code to 0 to indicate no response should be sent + res.status_code = 0; }}, - {socket_message_CorrelationRequest_fs_download_chunk_request_tag, // Download chunk - [](const auto &req, auto &res) { - res.which_response = socket_message_CorrelationResponse_fs_download_chunk_response_tag; - res.response.fs_download_chunk_response = FileSystemWS::fsHandler.handleDownloadChunk(req.request.fs_download_chunk_request); - }}, - - {socket_message_CorrelationRequest_fs_upload_start_request_tag, // Upload start - [](const auto &req, auto &res) { + {socket_message_CorrelationRequest_fs_upload_start_tag, // Upload start + [](const auto &req, auto &res, int clientId) { res.which_response = socket_message_CorrelationResponse_fs_upload_start_response_tag; - res.response.fs_upload_start_response = FileSystemWS::fsHandler.handleUploadStart(req.request.fs_upload_start_request); + res.response.fs_upload_start_response = FileSystemWS::fsHandler.handleUploadStart(req.request.fs_upload_start, clientId); }}, - {socket_message_CorrelationRequest_fs_upload_chunk_request_tag, // Upload chunk - [](const auto &req, auto &res) { - res.which_response = socket_message_CorrelationResponse_fs_upload_chunk_response_tag; - res.response.fs_upload_chunk_response = FileSystemWS::fsHandler.handleUploadChunk(req.request.fs_upload_chunk_request); - }}, - - {socket_message_CorrelationRequest_fs_cancel_transfer_request_tag, // Cancel transfer - [](const auto &req, auto &res) { + {socket_message_CorrelationRequest_fs_cancel_transfer_tag, // Cancel transfer + [](const auto &req, auto &res, int clientId) { res.which_response = socket_message_CorrelationResponse_fs_cancel_transfer_response_tag; - res.response.fs_cancel_transfer_response = FileSystemWS::fsHandler.handleCancelTransfer(req.request.fs_cancel_transfer_request); + res.response.fs_cancel_transfer_response = FileSystemWS::fsHandler.handleCancelTransfer(req.request.fs_cancel_transfer); }}, }; @@ -248,8 +260,11 @@ void setupEventSocket() { auto it = correlationHandlers.find(data.which_request); if (it != correlationHandlers.end()) { - it->second(data, *res); - socket.emit(*res, clientId); + it->second(data, *res, clientId); + // Only emit response if status_code is non-zero (streaming handlers set it to 0) + if (res->status_code != 0) { + socket.emit(*res, clientId); + } } else { printf("WARNING: no handler for correlation request: %d\n", data.which_request); } diff --git a/platform_shared/message.options b/platform_shared/message.options index 3c12394..2f4b18b 100644 --- a/platform_shared/message.options +++ b/platform_shared/message.options @@ -54,22 +54,22 @@ socket_message.FSListResponse.error max_size:128 socket_message.FSListResponse.files max_count:20 socket_message.FSListResponse.directories max_count:20 -socket_message.FSDownloadStartRequest.path max_size:256 -socket_message.FSDownloadStartResponse.error max_size:128 -socket_message.FSDownloadStartResponse.transfer_id max_size:64 +# Streaming download messages +socket_message.FSDownloadRequest.path max_size:256 +socket_message.FSDownloadData.transfer_id max_size:64 +socket_message.FSDownloadData.data max_size:16384 +socket_message.FSDownloadComplete.transfer_id max_size:64 +socket_message.FSDownloadComplete.error max_size:128 -socket_message.FSDownloadChunkRequest.transfer_id max_size:64 -socket_message.FSDownloadChunkResponse.transfer_id max_size:64 -socket_message.FSDownloadChunkResponse.data max_size:16384 -socket_message.FSDownloadChunkResponse.error max_size:128 - -socket_message.FSUploadStartRequest.path max_size:256 +# Streaming upload messages +socket_message.FSUploadStart.path max_size:256 socket_message.FSUploadStartResponse.error max_size:128 socket_message.FSUploadStartResponse.transfer_id max_size:64 +socket_message.FSUploadData.transfer_id max_size:64 +socket_message.FSUploadData.data max_size:16384 +socket_message.FSUploadComplete.transfer_id max_size:64 +socket_message.FSUploadComplete.error max_size:128 -socket_message.FSUploadChunkRequest.transfer_id max_size:64 -socket_message.FSUploadChunkRequest.data max_size:16384 -socket_message.FSUploadChunkResponse.transfer_id max_size:64 -socket_message.FSUploadChunkResponse.error max_size:128 - -socket_message.FSCancelTransferRequest.transfer_id max_size:64 \ No newline at end of file +# Transfer control +socket_message.FSCancelTransfer.transfer_id max_size:64 +socket_message.FSCancelTransferResponse.transfer_id max_size:64 \ No newline at end of file diff --git a/platform_shared/message.proto b/platform_shared/message.proto index 4877b2b..c8501ec 100644 --- a/platform_shared/message.proto +++ b/platform_shared/message.proto @@ -53,69 +53,64 @@ message FSListResponse { repeated Directory directories = 4; } -// Download from ESP (ESP -> Client) -message FSDownloadStartRequest { - string path = 1; // File path on ESP to download +// ===== STREAMING DOWNLOAD (ESP -> Client) ===== +// Flow: Client sends FSDownloadRequest -> Server streams FSDownloadData chunks -> Server sends FSDownloadComplete + +message FSDownloadRequest { + string path = 1; // File path on ESP to download } -message FSDownloadStartResponse { - bool success = 1; - string error = 2; - uint32 file_size = 3; // Total file size in bytes - uint32 chunk_size = 4; // Size of each chunk (max 1024) - uint32 total_chunks = 5; // Total number of chunks - string transfer_id = 6; // Unique ID for this transfer +message FSDownloadData { + string transfer_id = 1; // Transfer identifier + uint32 chunk_index = 2; // Which chunk this is (0-based) + bytes data = 3; // Chunk data (up to 16KB) } -message FSDownloadChunkRequest { +message FSDownloadComplete { string transfer_id = 1; - uint32 chunk_index = 2; // Which chunk to request (0-based) + bool success = 2; + string error = 3; // Error message if failed + uint32 total_chunks = 4; // Total chunks that were sent + uint32 file_size = 5; // Total file size in bytes } -message FSDownloadChunkResponse { - string transfer_id = 1; - uint32 chunk_index = 2; - bytes data = 3; // Chunk data (max 1024 bytes) - bool is_last = 4; // True if this is the last chunk - string error = 5; -} +// ===== STREAMING UPLOAD (Client -> ESP) ===== +// Flow: Client sends FSUploadStart -> Server responds with transfer_id -> Client streams FSUploadData chunks -> Server sends FSUploadComplete -// Upload to ESP (Client -> ESP) -message FSUploadStartRequest { +message FSUploadStart { string path = 1; // Destination path on ESP uint32 file_size = 2; // Total file size in bytes - uint32 chunk_size = 3; // Size of each chunk (max 1024) + uint32 total_chunks = 3; // Total number of chunks to expect } message FSUploadStartResponse { bool success = 1; string error = 2; string transfer_id = 3; // Unique ID for this transfer - uint32 max_chunk_size = 4; // Server's max chunk size (1024) } -message FSUploadChunkRequest { - string transfer_id = 1; +message FSUploadData { + string transfer_id = 1; // Transfer identifier uint32 chunk_index = 2; // Which chunk this is (0-based) - bytes data = 3; // Chunk data (max 1024 bytes) - bool is_last = 4; // True if this is the last chunk + bytes data = 3; // Chunk data (up to 16KB) } -message FSUploadChunkResponse { +message FSUploadComplete { string transfer_id = 1; - uint32 chunk_index = 2; - bool success = 3; - string error = 4; - bool transfer_complete = 5; // True when all chunks received + bool success = 2; + string error = 3; // Error message if failed + uint32 chunks_received = 4; // Number of chunks actually received } -// Cancel transfer -message FSCancelTransferRequest { +// ===== TRANSFER CONTROL ===== + +message FSCancelTransfer { string transfer_id = 1; } message FSCancelTransferResponse { - bool success = 1; + string transfer_id = 1; + bool success = 2; } // ----- FILESYSTEM ----- @@ -169,11 +164,9 @@ message CorrelationRequest { FSDeleteRequest fs_delete_request = 50; FSMkdirRequest fs_mkdir_request = 60; FSListRequest fs_list_request = 70; - FSDownloadStartRequest fs_download_start_request = 80; - FSDownloadChunkRequest fs_download_chunk_request = 90; - FSUploadStartRequest fs_upload_start_request = 100; - FSUploadChunkRequest fs_upload_chunk_request = 110; - FSCancelTransferRequest fs_cancel_transfer_request = 120; + FSDownloadRequest fs_download_request = 80; + FSUploadStart fs_upload_start = 100; + FSCancelTransfer fs_cancel_transfer = 120; } } @@ -188,10 +181,7 @@ message CorrelationResponse { FSDeleteResponse fs_delete_response = 50; FSMkdirResponse fs_mkdir_response = 60; FSListResponse fs_list_response = 70; - FSDownloadStartResponse fs_download_start_response = 80; - FSDownloadChunkResponse fs_download_chunk_response = 90; FSUploadStartResponse fs_upload_start_response = 100; - FSUploadChunkResponse fs_upload_chunk_response = 110; FSCancelTransferResponse fs_cancel_transfer_response = 120; } } @@ -321,6 +311,11 @@ message Message { UnsubscribeNotification unsub_notif = 21; PingMsg pingmsg = 30; PongMsg pongmsg = 31; + // Streaming file transfer messages (fire-and-forget, no correlation) + FSDownloadData fs_download_data = 40; + FSDownloadComplete fs_download_complete = 41; + FSUploadData fs_upload_data = 42; + FSUploadComplete fs_upload_complete = 43; IMUData imu = 110; IMUCalibrateData imu_calibrate = 120; IMUCalibrateExecute imu_calibrate_execute = 121;