Added metadata message for sending fs transfer info
This commit is contained in:
committed by
Rune Harlyk
parent
f0c4f0f929
commit
a799af360f
@@ -5,6 +5,7 @@ import type {
|
|||||||
FSMkdirRequest,
|
FSMkdirRequest,
|
||||||
FSListRequest,
|
FSListRequest,
|
||||||
FSDownloadRequest,
|
FSDownloadRequest,
|
||||||
|
FSDownloadMetadata,
|
||||||
FSDownloadData,
|
FSDownloadData,
|
||||||
FSDownloadComplete,
|
FSDownloadComplete,
|
||||||
FSUploadStart,
|
FSUploadStart,
|
||||||
@@ -70,6 +71,16 @@ interface ActiveUpload {
|
|||||||
export class FileSystemClient {
|
export class FileSystemClient {
|
||||||
private activeDownloads = new Map<string, ActiveDownload>()
|
private activeDownloads = new Map<string, ActiveDownload>()
|
||||||
private activeUploads = new Map<string, ActiveUpload>()
|
private activeUploads = new Map<string, ActiveUpload>()
|
||||||
|
private pendingDownloads = new Map<
|
||||||
|
string,
|
||||||
|
{
|
||||||
|
resolve: (result: { success: boolean; data?: Uint8Array; error?: string }) => void
|
||||||
|
reject: (error: Error) => void
|
||||||
|
onProgress?: ProgressCallback
|
||||||
|
timeoutId: ReturnType<typeof setTimeout>
|
||||||
|
}
|
||||||
|
>()
|
||||||
|
private metadataListenerCleanup: (() => void) | null = null
|
||||||
private downloadListenerCleanup: (() => void) | null = null
|
private downloadListenerCleanup: (() => void) | null = null
|
||||||
private completeListenerCleanup: (() => void) | null = null
|
private completeListenerCleanup: (() => void) | null = null
|
||||||
private uploadCompleteListenerCleanup: (() => void) | null = null
|
private uploadCompleteListenerCleanup: (() => void) | null = null
|
||||||
@@ -80,6 +91,14 @@ export class FileSystemClient {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private setupListeners() {
|
private setupListeners() {
|
||||||
|
// Listen for download metadata (sent first with file size)
|
||||||
|
this.metadataListenerCleanup = socket.on(
|
||||||
|
Messages.FSDownloadMetadata,
|
||||||
|
(metadata: FSDownloadMetadata) => {
|
||||||
|
this.handleDownloadMetadata(metadata)
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
// Listen for download data chunks
|
// Listen for download data chunks
|
||||||
this.downloadListenerCleanup = socket.on(
|
this.downloadListenerCleanup = socket.on(
|
||||||
Messages.FSDownloadData,
|
Messages.FSDownloadData,
|
||||||
@@ -105,6 +124,51 @@ export class FileSystemClient {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private handleDownloadMetadata(metadata: FSDownloadMetadata) {
|
||||||
|
// Find the pending download by path (we don't have transferId yet)
|
||||||
|
// The metadata arrives in response to a download request
|
||||||
|
const pending = this.pendingDownloads.values().next().value
|
||||||
|
if (!pending) {
|
||||||
|
console.warn(`Received download metadata but no pending download`)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clear initial timeout
|
||||||
|
clearTimeout(pending.timeoutId)
|
||||||
|
|
||||||
|
// Get the path from the pending downloads (first one)
|
||||||
|
const [path] = this.pendingDownloads.keys()
|
||||||
|
this.pendingDownloads.delete(path)
|
||||||
|
|
||||||
|
if (!metadata.success) {
|
||||||
|
pending.resolve({ success: false, error: metadata.error || 'Download failed' })
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
const transferId = metadata.transferId
|
||||||
|
|
||||||
|
// Now we know the exact file size - allocate properly sized buffer
|
||||||
|
const buffer = new Uint8Array(metadata.fileSize)
|
||||||
|
|
||||||
|
const download: ActiveDownload = {
|
||||||
|
path,
|
||||||
|
buffer,
|
||||||
|
fileSize: metadata.fileSize,
|
||||||
|
totalChunks: metadata.totalChunks,
|
||||||
|
chunksReceived: 0,
|
||||||
|
bytesReceived: 0,
|
||||||
|
resolve: pending.resolve,
|
||||||
|
reject: pending.reject,
|
||||||
|
onProgress: pending.onProgress,
|
||||||
|
timeoutId: setTimeout(() => {
|
||||||
|
this.activeDownloads.delete(transferId)
|
||||||
|
pending.reject(new Error('Download timeout'))
|
||||||
|
}, this.transferTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
this.activeDownloads.set(transferId, download)
|
||||||
|
}
|
||||||
|
|
||||||
private handleDownloadData(data: FSDownloadData) {
|
private handleDownloadData(data: FSDownloadData) {
|
||||||
const download = this.activeDownloads.get(data.transferId)
|
const download = this.activeDownloads.get(data.transferId)
|
||||||
if (!download) {
|
if (!download) {
|
||||||
@@ -244,80 +308,34 @@ export class FileSystemClient {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Download a file from the ESP32 using streaming transfer
|
* Download a file from the ESP32 using streaming transfer
|
||||||
* Server streams all chunks without waiting for ACKs
|
* Server sends metadata first (with file size), then streams all chunks
|
||||||
*/
|
*/
|
||||||
async downloadFile(
|
async downloadFile(
|
||||||
path: string,
|
path: string,
|
||||||
onProgress?: ProgressCallback
|
onProgress?: ProgressCallback
|
||||||
): Promise<{ success: boolean; data?: Uint8Array; error?: string }> {
|
): Promise<{ success: boolean; data?: Uint8Array; error?: string }> {
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
// Send download request - server will stream chunks back
|
// Send download request - server will send metadata first, then stream chunks
|
||||||
const request: FSDownloadRequest = { path }
|
const request: FSDownloadRequest = { path }
|
||||||
|
|
||||||
// We need to set up tracking before sending the request
|
// Set up timeout for initial metadata response
|
||||||
// The server will generate a transfer ID and include it in all responses
|
|
||||||
// We'll capture the first chunk to get the transfer ID
|
|
||||||
|
|
||||||
// Set up timeout for initial response
|
|
||||||
const initialTimeout = setTimeout(() => {
|
const initialTimeout = setTimeout(() => {
|
||||||
reject(new Error('Download request timeout - no data received'))
|
this.pendingDownloads.delete(path)
|
||||||
|
reject(new Error('Download request timeout - no metadata received'))
|
||||||
}, this.transferTimeout)
|
}, this.transferTimeout)
|
||||||
|
|
||||||
// One-time listener for the first chunk to get transfer details
|
// Track this pending download - will be converted to active when metadata arrives
|
||||||
const firstChunkHandler = (data: FSDownloadData) => {
|
this.pendingDownloads.set(path, {
|
||||||
clearTimeout(initialTimeout)
|
|
||||||
|
|
||||||
// Now we have the real transfer ID
|
|
||||||
const transferId = data.transferId
|
|
||||||
|
|
||||||
// Estimate total size from first chunk (server sends file_size in complete message)
|
|
||||||
// For now, allocate a large buffer and resize later
|
|
||||||
const estimatedSize = 10 * 1024 * 1024 // 10MB max
|
|
||||||
const buffer = new Uint8Array(estimatedSize)
|
|
||||||
|
|
||||||
const download: ActiveDownload = {
|
|
||||||
path,
|
|
||||||
buffer,
|
|
||||||
fileSize: estimatedSize, // Will be updated on completion
|
|
||||||
totalChunks: Math.ceil(estimatedSize / MAX_CHUNK_SIZE),
|
|
||||||
chunksReceived: 0,
|
|
||||||
bytesReceived: 0,
|
|
||||||
resolve,
|
resolve,
|
||||||
reject,
|
reject,
|
||||||
onProgress,
|
onProgress,
|
||||||
timeoutId: setTimeout(() => {
|
timeoutId: initialTimeout
|
||||||
this.activeDownloads.delete(transferId)
|
})
|
||||||
reject(new Error('Download timeout'))
|
|
||||||
}, this.transferTimeout)
|
|
||||||
}
|
|
||||||
|
|
||||||
this.activeDownloads.set(transferId, download)
|
// Send the download request (server will respond with metadata, then stream data)
|
||||||
|
|
||||||
// Process this first chunk
|
|
||||||
this.handleDownloadData(data)
|
|
||||||
|
|
||||||
// Remove the first chunk handler - subsequent chunks go through normal listener
|
|
||||||
firstChunkCleanup()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Error handler for if download fails immediately
|
|
||||||
const errorHandler = (complete: FSDownloadComplete) => {
|
|
||||||
if (!complete.success && !complete.transferId) {
|
|
||||||
clearTimeout(initialTimeout)
|
|
||||||
firstChunkCleanup()
|
|
||||||
errorCleanup()
|
|
||||||
resolve({ success: false, error: complete.error || 'Download failed' })
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const firstChunkCleanup = socket.on(Messages.FSDownloadData, firstChunkHandler)
|
|
||||||
const errorCleanup = socket.on(Messages.FSDownloadComplete, errorHandler)
|
|
||||||
|
|
||||||
// Send the download request (no response expected, server streams data)
|
|
||||||
socket.request({ fsDownloadRequest: request }).catch((err) => {
|
socket.request({ fsDownloadRequest: request }).catch((err) => {
|
||||||
clearTimeout(initialTimeout)
|
clearTimeout(initialTimeout)
|
||||||
firstChunkCleanup()
|
this.pendingDownloads.delete(path)
|
||||||
errorCleanup()
|
|
||||||
reject(err)
|
reject(err)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|||||||
@@ -43,6 +43,7 @@ DEFINE_MESSAGE_TRAITS(CorrelationRequest, correlation_request)
|
|||||||
DEFINE_MESSAGE_TRAITS(CorrelationResponse, correlation_response)
|
DEFINE_MESSAGE_TRAITS(CorrelationResponse, correlation_response)
|
||||||
|
|
||||||
// Streaming file transfer messages
|
// Streaming file transfer messages
|
||||||
|
DEFINE_MESSAGE_TRAITS(FSDownloadMetadata, fs_download_metadata)
|
||||||
DEFINE_MESSAGE_TRAITS(FSDownloadData, fs_download_data)
|
DEFINE_MESSAGE_TRAITS(FSDownloadData, fs_download_data)
|
||||||
DEFINE_MESSAGE_TRAITS(FSDownloadComplete, fs_download_complete)
|
DEFINE_MESSAGE_TRAITS(FSDownloadComplete, fs_download_complete)
|
||||||
DEFINE_MESSAGE_TRAITS(FSUploadData, fs_upload_data)
|
DEFINE_MESSAGE_TRAITS(FSUploadData, fs_upload_data)
|
||||||
|
|||||||
@@ -37,6 +37,7 @@ struct UploadState {
|
|||||||
};
|
};
|
||||||
|
|
||||||
// Callback type for sending messages to clients
|
// Callback type for sending messages to clients
|
||||||
|
using SendMetadataCallback = std::function<void(const socket_message_FSDownloadMetadata&, int clientId)>;
|
||||||
using SendCallback = std::function<void(const socket_message_FSDownloadData&, int clientId)>;
|
using SendCallback = std::function<void(const socket_message_FSDownloadData&, int clientId)>;
|
||||||
using SendCompleteCallback = std::function<void(const socket_message_FSDownloadComplete&, int clientId)>;
|
using SendCompleteCallback = std::function<void(const socket_message_FSDownloadComplete&, int clientId)>;
|
||||||
using SendUploadCompleteCallback = std::function<void(const socket_message_FSUploadComplete&, int clientId)>;
|
using SendUploadCompleteCallback = std::function<void(const socket_message_FSUploadComplete&, int clientId)>;
|
||||||
@@ -47,6 +48,7 @@ class FileSystemHandler {
|
|||||||
|
|
||||||
// Set callbacks for sending streaming data
|
// Set callbacks for sending streaming data
|
||||||
void setSendCallbacks(
|
void setSendCallbacks(
|
||||||
|
SendMetadataCallback sendMetadata,
|
||||||
SendCallback sendData,
|
SendCallback sendData,
|
||||||
SendCompleteCallback sendComplete,
|
SendCompleteCallback sendComplete,
|
||||||
SendUploadCompleteCallback sendUploadComplete
|
SendUploadCompleteCallback sendUploadComplete
|
||||||
@@ -84,6 +86,7 @@ class FileSystemHandler {
|
|||||||
std::map<std::string, UploadState> uploads_;
|
std::map<std::string, UploadState> uploads_;
|
||||||
uint32_t transferIdCounter_;
|
uint32_t transferIdCounter_;
|
||||||
|
|
||||||
|
SendMetadataCallback sendMetadataCallback_;
|
||||||
SendCallback sendDataCallback_;
|
SendCallback sendDataCallback_;
|
||||||
SendCompleteCallback sendCompleteCallback_;
|
SendCompleteCallback sendCompleteCallback_;
|
||||||
SendUploadCompleteCallback sendUploadCompleteCallback_;
|
SendUploadCompleteCallback sendUploadCompleteCallback_;
|
||||||
|
|||||||
+22
-10
@@ -13,10 +13,12 @@ FileSystemHandler fsHandler;
|
|||||||
FileSystemHandler::FileSystemHandler() : transferIdCounter_(0) {}
|
FileSystemHandler::FileSystemHandler() : transferIdCounter_(0) {}
|
||||||
|
|
||||||
void FileSystemHandler::setSendCallbacks(
|
void FileSystemHandler::setSendCallbacks(
|
||||||
|
SendMetadataCallback sendMetadata,
|
||||||
SendCallback sendData,
|
SendCallback sendData,
|
||||||
SendCompleteCallback sendComplete,
|
SendCompleteCallback sendComplete,
|
||||||
SendUploadCompleteCallback sendUploadComplete
|
SendUploadCompleteCallback sendUploadComplete
|
||||||
) {
|
) {
|
||||||
|
sendMetadataCallback_ = sendMetadata;
|
||||||
sendDataCallback_ = sendData;
|
sendDataCallback_ = sendData;
|
||||||
sendCompleteCallback_ = sendComplete;
|
sendCompleteCallback_ = sendComplete;
|
||||||
sendUploadCompleteCallback_ = sendUploadComplete;
|
sendUploadCompleteCallback_ = sendUploadComplete;
|
||||||
@@ -208,22 +210,22 @@ void FileSystemHandler::handleDownloadRequest(const socket_message_FSDownloadReq
|
|||||||
|
|
||||||
// Validate file exists
|
// Validate file exists
|
||||||
if (!ESP_FS.exists(path.c_str())) {
|
if (!ESP_FS.exists(path.c_str())) {
|
||||||
if (sendCompleteCallback_) {
|
if (sendMetadataCallback_) {
|
||||||
socket_message_FSDownloadComplete complete = socket_message_FSDownloadComplete_init_zero;
|
socket_message_FSDownloadMetadata metadata = socket_message_FSDownloadMetadata_init_zero;
|
||||||
complete.success = false;
|
metadata.success = false;
|
||||||
strncpy(complete.error, "File not found", sizeof(complete.error) - 1);
|
strncpy(metadata.error, "File not found", sizeof(metadata.error) - 1);
|
||||||
sendCompleteCallback_(complete, clientId);
|
sendMetadataCallback_(metadata, clientId);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
File file = ESP_FS.open(path.c_str(), "r");
|
File file = ESP_FS.open(path.c_str(), "r");
|
||||||
if (!file || file.isDirectory()) {
|
if (!file || file.isDirectory()) {
|
||||||
if (sendCompleteCallback_) {
|
if (sendMetadataCallback_) {
|
||||||
socket_message_FSDownloadComplete complete = socket_message_FSDownloadComplete_init_zero;
|
socket_message_FSDownloadMetadata metadata = socket_message_FSDownloadMetadata_init_zero;
|
||||||
complete.success = false;
|
metadata.success = false;
|
||||||
strncpy(complete.error, "Cannot open file for reading", sizeof(complete.error) - 1);
|
strncpy(metadata.error, "Cannot open file for reading", sizeof(metadata.error) - 1);
|
||||||
sendCompleteCallback_(complete, clientId);
|
sendMetadataCallback_(metadata, clientId);
|
||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -235,6 +237,16 @@ void FileSystemHandler::handleDownloadRequest(const socket_message_FSDownloadReq
|
|||||||
|
|
||||||
std::string transferId = generateTransferId();
|
std::string transferId = generateTransferId();
|
||||||
|
|
||||||
|
// Send metadata first so client knows exact file size and can allocate buffer
|
||||||
|
if (sendMetadataCallback_) {
|
||||||
|
socket_message_FSDownloadMetadata metadata = socket_message_FSDownloadMetadata_init_zero;
|
||||||
|
strncpy(metadata.transfer_id, transferId.c_str(), sizeof(metadata.transfer_id) - 1);
|
||||||
|
metadata.success = true;
|
||||||
|
metadata.file_size = fileSize;
|
||||||
|
metadata.total_chunks = totalChunks;
|
||||||
|
sendMetadataCallback_(metadata, clientId);
|
||||||
|
}
|
||||||
|
|
||||||
DownloadState state;
|
DownloadState state;
|
||||||
state.path = path;
|
state.path = path;
|
||||||
state.file = file;
|
state.file = file;
|
||||||
|
|||||||
@@ -137,6 +137,10 @@ void setupServer() {
|
|||||||
void setupEventSocket() {
|
void setupEventSocket() {
|
||||||
// Set up filesystem handler callbacks for streaming transfers
|
// Set up filesystem handler callbacks for streaming transfers
|
||||||
FileSystemWS::fsHandler.setSendCallbacks(
|
FileSystemWS::fsHandler.setSendCallbacks(
|
||||||
|
// Send download metadata (file size, total chunks)
|
||||||
|
[](const socket_message_FSDownloadMetadata& metadata, int clientId) {
|
||||||
|
socket.emit(metadata, clientId);
|
||||||
|
},
|
||||||
// Send download data chunk
|
// Send download data chunk
|
||||||
[](const socket_message_FSDownloadData& data, int clientId) {
|
[](const socket_message_FSDownloadData& data, int clientId) {
|
||||||
socket.emit(data, clientId);
|
socket.emit(data, clientId);
|
||||||
|
|||||||
@@ -56,6 +56,8 @@ socket_message.FSListResponse.directories max_count:20
|
|||||||
|
|
||||||
# Streaming download messages
|
# Streaming download messages
|
||||||
socket_message.FSDownloadRequest.path max_size:256
|
socket_message.FSDownloadRequest.path max_size:256
|
||||||
|
socket_message.FSDownloadMetadata.transfer_id max_size:64
|
||||||
|
socket_message.FSDownloadMetadata.error max_size:128
|
||||||
socket_message.FSDownloadData.transfer_id max_size:64
|
socket_message.FSDownloadData.transfer_id max_size:64
|
||||||
socket_message.FSDownloadData.data max_size:16384
|
socket_message.FSDownloadData.data max_size:16384
|
||||||
socket_message.FSDownloadComplete.transfer_id max_size:64
|
socket_message.FSDownloadComplete.transfer_id max_size:64
|
||||||
|
|||||||
@@ -54,12 +54,20 @@ message FSListResponse {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// ===== STREAMING DOWNLOAD (ESP -> Client) =====
|
// ===== STREAMING DOWNLOAD (ESP -> Client) =====
|
||||||
// Flow: Client sends FSDownloadRequest -> Server streams FSDownloadData chunks -> Server sends FSDownloadComplete
|
// Flow: Client sends FSDownloadRequest -> Server sends FSDownloadMetadata -> Server streams FSDownloadData chunks -> Server sends FSDownloadComplete
|
||||||
|
|
||||||
message FSDownloadRequest {
|
message FSDownloadRequest {
|
||||||
string path = 1; // File path on ESP to download
|
string path = 1; // File path on ESP to download
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message FSDownloadMetadata {
|
||||||
|
string transfer_id = 1; // Transfer identifier
|
||||||
|
bool success = 2; // True if file exists and is readable
|
||||||
|
string error = 3; // Error message if failed
|
||||||
|
uint32 file_size = 4; // Total file size in bytes
|
||||||
|
uint32 total_chunks = 5; // Total number of chunks to expect
|
||||||
|
}
|
||||||
|
|
||||||
message FSDownloadData {
|
message FSDownloadData {
|
||||||
string transfer_id = 1; // Transfer identifier
|
string transfer_id = 1; // Transfer identifier
|
||||||
uint32 chunk_index = 2; // Which chunk this is (0-based)
|
uint32 chunk_index = 2; // Which chunk this is (0-based)
|
||||||
@@ -312,6 +320,7 @@ message Message {
|
|||||||
PingMsg pingmsg = 30;
|
PingMsg pingmsg = 30;
|
||||||
PongMsg pongmsg = 31;
|
PongMsg pongmsg = 31;
|
||||||
// Streaming file transfer messages (fire-and-forget, no correlation)
|
// Streaming file transfer messages (fire-and-forget, no correlation)
|
||||||
|
FSDownloadMetadata fs_download_metadata = 39;
|
||||||
FSDownloadData fs_download_data = 40;
|
FSDownloadData fs_download_data = 40;
|
||||||
FSDownloadComplete fs_download_complete = 41;
|
FSDownloadComplete fs_download_complete = 41;
|
||||||
FSUploadData fs_upload_data = 42;
|
FSUploadData fs_upload_data = 42;
|
||||||
|
|||||||
Reference in New Issue
Block a user