Remaking the upload and download scheme to rapid streaming (WIP)
This commit is contained in:
committed by
Rune Harlyk
parent
50ef91ab22
commit
f0c4f0f929
@@ -4,15 +4,16 @@ import type {
|
||||
FSDeleteRequest,
|
||||
FSMkdirRequest,
|
||||
FSListRequest,
|
||||
FSDownloadStartRequest,
|
||||
FSDownloadChunkRequest,
|
||||
FSUploadStartRequest,
|
||||
FSUploadChunkRequest,
|
||||
FSCancelTransferRequest,
|
||||
CorrelationResponse
|
||||
FSDownloadRequest,
|
||||
FSDownloadData,
|
||||
FSDownloadComplete,
|
||||
FSUploadStart,
|
||||
FSUploadData,
|
||||
FSUploadComplete,
|
||||
FSCancelTransfer
|
||||
} from '$lib/platform_shared/message'
|
||||
|
||||
const MAX_CHUNK_SIZE = 2**14 // ~= 16 kb
|
||||
const MAX_CHUNK_SIZE = 2 ** 14 // ~= 16 kb
|
||||
|
||||
export interface FileInfo {
|
||||
name: string
|
||||
@@ -41,7 +42,143 @@ export interface TransferProgress {
|
||||
|
||||
export type ProgressCallback = (progress: TransferProgress) => void
|
||||
|
||||
// Active transfer tracking
|
||||
interface ActiveDownload {
|
||||
path: string
|
||||
buffer: Uint8Array
|
||||
fileSize: number
|
||||
totalChunks: number
|
||||
chunksReceived: number
|
||||
bytesReceived: number
|
||||
resolve: (result: { success: boolean; data?: Uint8Array; error?: string }) => void
|
||||
reject: (error: Error) => void
|
||||
onProgress?: ProgressCallback
|
||||
timeoutId: ReturnType<typeof setTimeout>
|
||||
}
|
||||
|
||||
interface ActiveUpload {
|
||||
path: string
|
||||
transferId: string
|
||||
totalChunks: number
|
||||
chunksSent: number
|
||||
resolve: (result: { success: boolean; error?: string }) => void
|
||||
reject: (error: Error) => void
|
||||
onProgress?: ProgressCallback
|
||||
timeoutId: ReturnType<typeof setTimeout>
|
||||
}
|
||||
|
||||
export class FileSystemClient {
|
||||
private activeDownloads = new Map<string, ActiveDownload>()
|
||||
private activeUploads = new Map<string, ActiveUpload>()
|
||||
private downloadListenerCleanup: (() => void) | null = null
|
||||
private completeListenerCleanup: (() => void) | null = null
|
||||
private uploadCompleteListenerCleanup: (() => void) | null = null
|
||||
private transferTimeout = 60000 // 60 seconds timeout for transfers
|
||||
|
||||
constructor() {
|
||||
this.setupListeners()
|
||||
}
|
||||
|
||||
private setupListeners() {
|
||||
// Listen for download data chunks
|
||||
this.downloadListenerCleanup = socket.on(
|
||||
Messages.FSDownloadData,
|
||||
(data: FSDownloadData) => {
|
||||
this.handleDownloadData(data)
|
||||
}
|
||||
)
|
||||
|
||||
// Listen for download completion
|
||||
this.completeListenerCleanup = socket.on(
|
||||
Messages.FSDownloadComplete,
|
||||
(complete: FSDownloadComplete) => {
|
||||
this.handleDownloadComplete(complete)
|
||||
}
|
||||
)
|
||||
|
||||
// Listen for upload completion
|
||||
this.uploadCompleteListenerCleanup = socket.on(
|
||||
Messages.FSUploadComplete,
|
||||
(complete: FSUploadComplete) => {
|
||||
this.handleUploadComplete(complete)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
private handleDownloadData(data: FSDownloadData) {
|
||||
const download = this.activeDownloads.get(data.transferId)
|
||||
if (!download) {
|
||||
console.warn(`Received download data for unknown transfer: ${data.transferId}`)
|
||||
return
|
||||
}
|
||||
|
||||
// Reset timeout
|
||||
clearTimeout(download.timeoutId)
|
||||
download.timeoutId = setTimeout(() => {
|
||||
this.activeDownloads.delete(data.transferId)
|
||||
download.reject(new Error('Download timeout'))
|
||||
}, this.transferTimeout)
|
||||
|
||||
// Copy chunk data to buffer
|
||||
if (data.data && data.data.length > 0) {
|
||||
const offset = data.chunkIndex * MAX_CHUNK_SIZE
|
||||
download.buffer.set(data.data, offset)
|
||||
download.bytesReceived += data.data.length
|
||||
download.chunksReceived++
|
||||
}
|
||||
|
||||
// Report progress
|
||||
if (download.onProgress) {
|
||||
download.onProgress({
|
||||
transferId: data.transferId,
|
||||
bytesTransferred: download.bytesReceived,
|
||||
totalBytes: download.fileSize,
|
||||
chunksCompleted: download.chunksReceived,
|
||||
totalChunks: download.totalChunks,
|
||||
percentage: (download.chunksReceived / download.totalChunks) * 100
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
private handleDownloadComplete(complete: FSDownloadComplete) {
|
||||
const download = this.activeDownloads.get(complete.transferId)
|
||||
if (!download) {
|
||||
// This is normal for error cases where transferId wasn't set
|
||||
if (complete.error) {
|
||||
console.warn(`Download failed: ${complete.error}`)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
clearTimeout(download.timeoutId)
|
||||
this.activeDownloads.delete(complete.transferId)
|
||||
|
||||
if (complete.success) {
|
||||
// Trim buffer to actual file size
|
||||
const finalData = download.buffer.slice(0, complete.fileSize)
|
||||
download.resolve({ success: true, data: finalData })
|
||||
} else {
|
||||
download.resolve({ success: false, error: complete.error || 'Download failed' })
|
||||
}
|
||||
}
|
||||
|
||||
private handleUploadComplete(complete: FSUploadComplete) {
|
||||
const upload = this.activeUploads.get(complete.transferId)
|
||||
if (!upload) {
|
||||
console.warn(`Received upload complete for unknown transfer: ${complete.transferId}`)
|
||||
return
|
||||
}
|
||||
|
||||
clearTimeout(upload.timeoutId)
|
||||
this.activeUploads.delete(complete.transferId)
|
||||
|
||||
if (complete.success) {
|
||||
upload.resolve({ success: true })
|
||||
} else {
|
||||
upload.resolve({ success: false, error: complete.error || 'Upload failed' })
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a file or directory on the ESP32
|
||||
*/
|
||||
@@ -106,84 +243,89 @@ export class FileSystemClient {
|
||||
}
|
||||
|
||||
/**
|
||||
* Download a file from the ESP32
|
||||
* Download a file from the ESP32 using streaming transfer
|
||||
* Server streams all chunks without waiting for ACKs
|
||||
*/
|
||||
async downloadFile(
|
||||
path: string,
|
||||
onProgress?: ProgressCallback
|
||||
): Promise<{ success: boolean; data?: Uint8Array; error?: string }> {
|
||||
// Start download
|
||||
const startRequest: FSDownloadStartRequest = { path }
|
||||
return new Promise((resolve, reject) => {
|
||||
// Send download request - server will stream chunks back
|
||||
const request: FSDownloadRequest = { path }
|
||||
|
||||
const startResponse = await socket.request({
|
||||
fsDownloadStartRequest: startRequest
|
||||
})
|
||||
// We need to set up tracking before sending the request
|
||||
// The server will generate a transfer ID and include it in all responses
|
||||
// We'll capture the first chunk to get the transfer ID
|
||||
|
||||
if (!startResponse.fsDownloadStartResponse) {
|
||||
return { success: false, error: 'Failed to start download' }
|
||||
}
|
||||
// Set up timeout for initial response
|
||||
const initialTimeout = setTimeout(() => {
|
||||
reject(new Error('Download request timeout - no data received'))
|
||||
}, this.transferTimeout)
|
||||
|
||||
const startResp = startResponse.fsDownloadStartResponse
|
||||
// One-time listener for the first chunk to get transfer details
|
||||
const firstChunkHandler = (data: FSDownloadData) => {
|
||||
clearTimeout(initialTimeout)
|
||||
|
||||
if (!startResp.success) {
|
||||
return { success: false, error: startResp.error || 'Failed to start download' }
|
||||
}
|
||||
// Now we have the real transfer ID
|
||||
const transferId = data.transferId
|
||||
|
||||
const transferId = startResp.transferId
|
||||
const totalChunks = startResp.totalChunks
|
||||
const fileSize = startResp.fileSize
|
||||
// Estimate total size from first chunk (server sends file_size in complete message)
|
||||
// For now, allocate a large buffer and resize later
|
||||
const estimatedSize = 10 * 1024 * 1024 // 10MB max
|
||||
const buffer = new Uint8Array(estimatedSize)
|
||||
|
||||
// Allocate buffer for entire file
|
||||
const buffer = new Uint8Array(fileSize)
|
||||
let offset = 0
|
||||
const download: ActiveDownload = {
|
||||
path,
|
||||
buffer,
|
||||
fileSize: estimatedSize, // Will be updated on completion
|
||||
totalChunks: Math.ceil(estimatedSize / MAX_CHUNK_SIZE),
|
||||
chunksReceived: 0,
|
||||
bytesReceived: 0,
|
||||
resolve,
|
||||
reject,
|
||||
onProgress,
|
||||
timeoutId: setTimeout(() => {
|
||||
this.activeDownloads.delete(transferId)
|
||||
reject(new Error('Download timeout'))
|
||||
}, this.transferTimeout)
|
||||
}
|
||||
|
||||
// Download chunks sequentially
|
||||
for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++) {
|
||||
const chunkRequest: FSDownloadChunkRequest = {
|
||||
transferId,
|
||||
chunkIndex
|
||||
this.activeDownloads.set(transferId, download)
|
||||
|
||||
// Process this first chunk
|
||||
this.handleDownloadData(data)
|
||||
|
||||
// Remove the first chunk handler - subsequent chunks go through normal listener
|
||||
firstChunkCleanup()
|
||||
}
|
||||
|
||||
const chunkResponse = await socket.request({
|
||||
fsDownloadChunkRequest: chunkRequest
|
||||
// Error handler for if download fails immediately
|
||||
const errorHandler = (complete: FSDownloadComplete) => {
|
||||
if (!complete.success && !complete.transferId) {
|
||||
clearTimeout(initialTimeout)
|
||||
firstChunkCleanup()
|
||||
errorCleanup()
|
||||
resolve({ success: false, error: complete.error || 'Download failed' })
|
||||
}
|
||||
}
|
||||
|
||||
const firstChunkCleanup = socket.on(Messages.FSDownloadData, firstChunkHandler)
|
||||
const errorCleanup = socket.on(Messages.FSDownloadComplete, errorHandler)
|
||||
|
||||
// Send the download request (no response expected, server streams data)
|
||||
socket.request({ fsDownloadRequest: request }).catch((err) => {
|
||||
clearTimeout(initialTimeout)
|
||||
firstChunkCleanup()
|
||||
errorCleanup()
|
||||
reject(err)
|
||||
})
|
||||
|
||||
if (!chunkResponse.fsDownloadChunkResponse) {
|
||||
await this.cancelTransfer(transferId)
|
||||
return { success: false, error: `Failed to download chunk ${chunkIndex}` }
|
||||
}
|
||||
|
||||
const chunkResp = chunkResponse.fsDownloadChunkResponse
|
||||
|
||||
if (chunkResp.error) {
|
||||
await this.cancelTransfer(transferId)
|
||||
return { success: false, error: chunkResp.error }
|
||||
}
|
||||
|
||||
// Copy chunk data to buffer
|
||||
if (chunkResp.data) {
|
||||
buffer.set(chunkResp.data, offset)
|
||||
offset += chunkResp.data.length
|
||||
}
|
||||
|
||||
// Report progress
|
||||
if (onProgress) {
|
||||
onProgress({
|
||||
transferId,
|
||||
bytesTransferred: offset,
|
||||
totalBytes: fileSize,
|
||||
chunksCompleted: chunkIndex + 1,
|
||||
totalChunks,
|
||||
percentage: ((chunkIndex + 1) / totalChunks) * 100
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
return { success: true, data: buffer }
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Upload a file to the ESP32
|
||||
* Upload a file to the ESP32 using streaming transfer
|
||||
* Client sends all chunks without waiting for ACKs
|
||||
*/
|
||||
async uploadFile(
|
||||
path: string,
|
||||
@@ -192,17 +334,17 @@ export class FileSystemClient {
|
||||
): Promise<{ success: boolean; error?: string }> {
|
||||
const fileSize = data.length
|
||||
const chunkSize = MAX_CHUNK_SIZE
|
||||
const totalChunks = Math.ceil(fileSize / chunkSize)
|
||||
const totalChunks = Math.ceil(fileSize / chunkSize) || 1
|
||||
|
||||
// Start upload
|
||||
const startRequest: FSUploadStartRequest = {
|
||||
// Start upload - get transfer ID
|
||||
const startRequest: FSUploadStart = {
|
||||
path,
|
||||
fileSize,
|
||||
chunkSize
|
||||
totalChunks
|
||||
}
|
||||
|
||||
const startResponse = await socket.request({
|
||||
fsUploadStartRequest: startRequest
|
||||
fsUploadStart: startRequest
|
||||
})
|
||||
|
||||
if (!startResponse.fsUploadStartResponse) {
|
||||
@@ -216,62 +358,83 @@ export class FileSystemClient {
|
||||
}
|
||||
|
||||
const transferId = startResp.transferId
|
||||
const maxChunkSize = startResp.maxChunkSize || MAX_CHUNK_SIZE
|
||||
|
||||
// Upload chunks sequentially
|
||||
for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++) {
|
||||
const offset = chunkIndex * chunkSize
|
||||
const end = Math.min(offset + chunkSize, fileSize)
|
||||
const chunkData = data.slice(offset, end)
|
||||
const isLast = chunkIndex === totalChunks - 1
|
||||
|
||||
const chunkRequest: FSUploadChunkRequest = {
|
||||
return new Promise((resolve, reject) => {
|
||||
// Set up upload tracking
|
||||
const upload: ActiveUpload = {
|
||||
path,
|
||||
transferId,
|
||||
chunkIndex,
|
||||
data: chunkData,
|
||||
isLast
|
||||
totalChunks,
|
||||
chunksSent: 0,
|
||||
resolve,
|
||||
reject,
|
||||
onProgress,
|
||||
timeoutId: setTimeout(() => {
|
||||
this.activeUploads.delete(transferId)
|
||||
reject(new Error('Upload timeout - no completion received'))
|
||||
}, this.transferTimeout)
|
||||
}
|
||||
|
||||
const chunkResponse = await socket.request({
|
||||
fsUploadChunkRequest: chunkRequest
|
||||
})
|
||||
this.activeUploads.set(transferId, upload)
|
||||
|
||||
if (!chunkResponse.fsUploadChunkResponse) {
|
||||
await this.cancelTransfer(transferId)
|
||||
return { success: false, error: `Failed to upload chunk ${chunkIndex}` }
|
||||
}
|
||||
// Stream all chunks without waiting for ACKs
|
||||
for (let chunkIndex = 0; chunkIndex < totalChunks; chunkIndex++) {
|
||||
const offset = chunkIndex * chunkSize
|
||||
const end = Math.min(offset + chunkSize, fileSize)
|
||||
const chunkData = data.slice(offset, end)
|
||||
|
||||
const chunkResp = chunkResponse.fsUploadChunkResponse
|
||||
|
||||
if (!chunkResp.success) {
|
||||
await this.cancelTransfer(transferId)
|
||||
return { success: false, error: chunkResp.error || 'Failed to upload chunk' }
|
||||
}
|
||||
|
||||
// Report progress
|
||||
if (onProgress) {
|
||||
onProgress({
|
||||
const uploadData: FSUploadData = {
|
||||
transferId,
|
||||
bytesTransferred: end,
|
||||
totalBytes: fileSize,
|
||||
chunksCompleted: chunkIndex + 1,
|
||||
totalChunks,
|
||||
percentage: ((chunkIndex + 1) / totalChunks) * 100
|
||||
})
|
||||
}
|
||||
}
|
||||
chunkIndex,
|
||||
data: chunkData
|
||||
}
|
||||
|
||||
return { success: true }
|
||||
// Send chunk as fire-and-forget message
|
||||
socket.emit(Messages.FSUploadData, uploadData)
|
||||
|
||||
upload.chunksSent++
|
||||
|
||||
// Report progress
|
||||
if (onProgress) {
|
||||
onProgress({
|
||||
transferId,
|
||||
bytesTransferred: end,
|
||||
totalBytes: fileSize,
|
||||
chunksCompleted: chunkIndex + 1,
|
||||
totalChunks,
|
||||
percentage: ((chunkIndex + 1) / totalChunks) * 100
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// All chunks sent - now wait for completion message from server
|
||||
// The timeout will handle if server doesn't respond
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancel an ongoing transfer
|
||||
*/
|
||||
async cancelTransfer(transferId: string): Promise<{ success: boolean }> {
|
||||
const request: FSCancelTransferRequest = { transferId }
|
||||
const request: FSCancelTransfer = { transferId }
|
||||
|
||||
// Clean up local state
|
||||
const download = this.activeDownloads.get(transferId)
|
||||
if (download) {
|
||||
clearTimeout(download.timeoutId)
|
||||
this.activeDownloads.delete(transferId)
|
||||
download.resolve({ success: false, error: 'Transfer cancelled' })
|
||||
}
|
||||
|
||||
const upload = this.activeUploads.get(transferId)
|
||||
if (upload) {
|
||||
clearTimeout(upload.timeoutId)
|
||||
this.activeUploads.delete(transferId)
|
||||
upload.resolve({ success: false, error: 'Transfer cancelled' })
|
||||
}
|
||||
|
||||
const response = await socket.request({
|
||||
fsCancelTransferRequest: request
|
||||
fsCancelTransfer: request
|
||||
})
|
||||
|
||||
if (response.fsCancelTransferResponse) {
|
||||
@@ -309,7 +472,7 @@ export class FileSystemClient {
|
||||
}
|
||||
|
||||
// Create blob and trigger download
|
||||
const blob = new Blob([result.data])
|
||||
const blob = new Blob([result.data.buffer as ArrayBuffer])
|
||||
const url = URL.createObjectURL(blob)
|
||||
const a = document.createElement('a')
|
||||
a.href = url
|
||||
@@ -321,6 +484,28 @@ export class FileSystemClient {
|
||||
|
||||
return { success: true }
|
||||
}
|
||||
|
||||
/**
|
||||
* Cleanup listeners when no longer needed
|
||||
*/
|
||||
destroy() {
|
||||
this.downloadListenerCleanup?.()
|
||||
this.completeListenerCleanup?.()
|
||||
this.uploadCompleteListenerCleanup?.()
|
||||
|
||||
// Cancel all active transfers
|
||||
for (const [, download] of this.activeDownloads) {
|
||||
clearTimeout(download.timeoutId)
|
||||
download.reject(new Error('FileSystemClient destroyed'))
|
||||
}
|
||||
this.activeDownloads.clear()
|
||||
|
||||
for (const [, upload] of this.activeUploads) {
|
||||
clearTimeout(upload.timeoutId)
|
||||
upload.reject(new Error('FileSystemClient destroyed'))
|
||||
}
|
||||
this.activeUploads.clear()
|
||||
}
|
||||
}
|
||||
|
||||
export const fileSystemClient = new FileSystemClient()
|
||||
|
||||
Reference in New Issue
Block a user