-
Notifications
You must be signed in to change notification settings - Fork 1.2k
feat(emulator): adds resumable upload API support for multiple chunk and cancelling #10325
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
5d4bf45
109e8f5
a0f9342
ff2d139
5709b19
f0b7b41
e52c8cf
edc5d64
f3f61ac
62e7ac4
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1 +1,3 @@ | ||
| - Fixed an issue where some MCP tools would error with "Invalid input: expected record, received array". (#10437) | ||
| - feat(emulator): resumable upload API now supports multiple chunk uploads | ||
| - feat(emulator): resumable upload API now supports cancelling an upload |
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -13,7 +13,7 @@ import { EmulatorLogger } from "../../emulatorLogger"; | |||||||||||||||||||||||||
| import { GetObjectResponse, ListObjectsResponse } from "../files"; | ||||||||||||||||||||||||||
| import type { Request, Response } from "express"; | ||||||||||||||||||||||||||
| import { parseObjectUploadMultipartRequest } from "../multipart"; | ||||||||||||||||||||||||||
| import { Upload, UploadNotActiveError } from "../upload"; | ||||||||||||||||||||||||||
| import { NotCancellableError, Upload, UploadNotActiveError, UploadStatus } from "../upload"; | ||||||||||||||||||||||||||
| import { ForbiddenError, NotFoundError } from "../errors"; | ||||||||||||||||||||||||||
| import { reqBodyToBuffer } from "../../shared/request"; | ||||||||||||||||||||||||||
| import type { Query } from "express-serve-static-core"; | ||||||||||||||||||||||||||
|
|
@@ -183,34 +183,172 @@ export function createCloudEndpoints(emulator: StorageEmulator): Router { | |||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| gcloudStorageAPI.put("/upload/storage/v1/b/:bucketId/o", async (req, res) => { | ||||||||||||||||||||||||||
| if (!req.query.upload_id) { | ||||||||||||||||||||||||||
| res.sendStatus(400); | ||||||||||||||||||||||||||
| return; | ||||||||||||||||||||||||||
| return res.sendStatus(404); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| async function handleStatusCheck(upload: Upload, contentRange: string) { | ||||||||||||||||||||||||||
| // Extract the numeric total from bytes */total or bytes start-end/total forms. | ||||||||||||||||||||||||||
| const totalMatch = contentRange && /\/(\d+)$/.exec(contentRange); | ||||||||||||||||||||||||||
| const declaredTotal = totalMatch ? parseInt(totalMatch[1], 10) : undefined; | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if (upload.status === UploadStatus.FINISHED) { | ||||||||||||||||||||||||||
| let getObjectResponse: GetObjectResponse; | ||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||
| getObjectResponse = await adminStorageLayer.getObject({ | ||||||||||||||||||||||||||
| bucketId: upload.bucketId, | ||||||||||||||||||||||||||
| decodedObjectId: upload.objectId, | ||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||
| } catch (err) { | ||||||||||||||||||||||||||
| if (err instanceof NotFoundError) { | ||||||||||||||||||||||||||
| return res.sendStatus(404); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| if (err instanceof ForbiddenError) { | ||||||||||||||||||||||||||
| return res.sendStatus(403); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| throw err; | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| return res.status(200).json(new CloudStorageObjectMetadata(getObjectResponse.metadata)); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if (upload.status !== UploadStatus.ACTIVE) { | ||||||||||||||||||||||||||
| return res.sendStatus(400); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // if the declared total matches the bytes received so far, the client is signalling that | ||||||||||||||||||||||||||
| // the upload is complete (i.e. streaming upload where total size wasn't known upfront). | ||||||||||||||||||||||||||
| if (declaredTotal !== undefined && upload.size === declaredTotal) { | ||||||||||||||||||||||||||
| const metadata = await finalizeUpload(upload.id); | ||||||||||||||||||||||||||
| return res.status(200).json(new CloudStorageObjectMetadata(metadata)); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
Comment on lines
+219
to
+222
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If
Suggested change
|
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if (upload.size === 0) { | ||||||||||||||||||||||||||
| return res.status(308).send(); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| res.setHeader("Range", `bytes=0-${upload.size - 1}`); | ||||||||||||||||||||||||||
| return res.status(308).send(); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| async function handleChunkedUpload(upload: Upload, contentRange: string, data: Buffer) { | ||||||||||||||||||||||||||
| const rangeMatch = /^bytes (\d+)-(\d+)(?:\/(\d+|\*))?$/.exec(contentRange); | ||||||||||||||||||||||||||
| if (!rangeMatch) { | ||||||||||||||||||||||||||
| return res.status(400).send("Failed to parse Content-Range header."); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| const rangeStart = parseInt(rangeMatch[1], 10); | ||||||||||||||||||||||||||
| const rangeEnd = parseInt(rangeMatch[2], 10); | ||||||||||||||||||||||||||
| const rangeTotal = | ||||||||||||||||||||||||||
| rangeMatch[3] && rangeMatch[3] !== "*" ? parseInt(rangeMatch[3], 10) : undefined; | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if (rangeEnd < rangeStart || (rangeTotal !== undefined && rangeTotal < rangeEnd + 1)) { | ||||||||||||||||||||||||||
| return res.status(400).send("Failed to parse Content-Range header."); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if (rangeStart !== upload.size) { | ||||||||||||||||||||||||||
| return res | ||||||||||||||||||||||||||
| .status(400) | ||||||||||||||||||||||||||
| .send(`Invalid chunk position. The next chunk should start at offset ${upload.size}.`); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
Comment on lines
+247
to
+251
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The current implementation returns a 400 error if the if (rangeStart < upload.size) {
res.setHeader("Range", `bytes=0-${upload.size - 1}`);
return res.status(308).send();
}
if (rangeStart > upload.size) {
return res
.status(400)
.send(`Invalid chunk position. The next chunk should start at offset ${upload.size}.`);
} |
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| const expectedChunkSize = rangeEnd - rangeStart + 1; | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if (data.byteLength !== expectedChunkSize) { | ||||||||||||||||||||||||||
| const message = `Invalid request. There were ${data.byteLength} byte(s) in the request body. There should have been ${expectedChunkSize} byte(s) (starting at offset ${rangeStart} and ending at offset ${rangeEnd}) according to the Content-Range header.`; | ||||||||||||||||||||||||||
| return res.status(400).send(message); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| /** | ||||||||||||||||||||||||||
| * When the client uses an unknown total (`*` or omitted), we treat a chunk whose | ||||||||||||||||||||||||||
| * size is not a multiple of 256 KiB as the final chunk. Objects whose size is | ||||||||||||||||||||||||||
| * an exact multiple of 256 KiB must end with an explicit total in Content-Range. | ||||||||||||||||||||||||||
| * @see https://docs.cloud.google.com/storage/docs/performing-resumable-uploads#chunked-upload | ||||||||||||||||||||||||||
| */ | ||||||||||||||||||||||||||
| const isComplete = | ||||||||||||||||||||||||||
| typeof rangeTotal === "number" | ||||||||||||||||||||||||||
| ? rangeEnd + 1 === rangeTotal | ||||||||||||||||||||||||||
| : data.byteLength % (256 * 1024) !== 0; | ||||||||||||||||||||||||||
|
jakeisonline marked this conversation as resolved.
|
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if (!isComplete && data.byteLength % (256 * 1024) !== 0) { | ||||||||||||||||||||||||||
| return res | ||||||||||||||||||||||||||
| .status(400) | ||||||||||||||||||||||||||
| .send("Chunk size must be a multiple of 256 KiB unless it is the last chunk."); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| const updatedUpload = uploadService.continueResumableUpload(upload.id, data); | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if (isComplete) { | ||||||||||||||||||||||||||
| const metadata = await finalizeUpload(upload.id); | ||||||||||||||||||||||||||
| return res.status(200).json(new CloudStorageObjectMetadata(metadata)); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| res.setHeader("Range", `bytes=0-${updatedUpload.size - 1}`); | ||||||||||||||||||||||||||
| return res.status(308).send(); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| async function finalizeUpload(uploadId: string) { | ||||||||||||||||||||||||||
| const finalizedUpload = uploadService.finalizeResumableUpload(uploadId); | ||||||||||||||||||||||||||
| return await adminStorageLayer.uploadObject(finalizedUpload); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| const uploadId = req.query.upload_id.toString(); | ||||||||||||||||||||||||||
| let upload: Upload; | ||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||
| uploadService.continueResumableUpload(uploadId, await reqBodyToBuffer(req)); | ||||||||||||||||||||||||||
| upload = uploadService.finalizeResumableUpload(uploadId); | ||||||||||||||||||||||||||
| upload = uploadService.getResumableUpload(req.query.upload_id.toString()); | ||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Avoid using
Suggested change
|
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if (upload.status === UploadStatus.CANCELLED) { | ||||||||||||||||||||||||||
| return res.sendStatus(499); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| const contentLength = req.headers["content-length"]; | ||||||||||||||||||||||||||
| const contentRange = req.headers["content-range"] ?? ""; | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // Status check request | ||||||||||||||||||||||||||
| // @see https://docs.cloud.google.com/storage/docs/performing-resumable-uploads#status-check | ||||||||||||||||||||||||||
| if (contentLength === "0") { | ||||||||||||||||||||||||||
| return await handleStatusCheck(upload, contentRange); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| if (contentLength && contentLength !== "0") { | ||||||||||||||||||||||||||
| const data = await reqBodyToBuffer(req); | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // Multiple chunk upload | ||||||||||||||||||||||||||
| if (contentRange) { | ||||||||||||||||||||||||||
| return await handleChunkedUpload(upload, contentRange, data); | ||||||||||||||||||||||||||
| } else { | ||||||||||||||||||||||||||
| // Single chunk upload | ||||||||||||||||||||||||||
| uploadService.continueResumableUpload(upload.id, data); | ||||||||||||||||||||||||||
| const metadata = await finalizeUpload(upload.id); | ||||||||||||||||||||||||||
| return res.status(200).json(new CloudStorageObjectMetadata(metadata)); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| return res.status(400).send(); | ||||||||||||||||||||||||||
| } catch (err) { | ||||||||||||||||||||||||||
| if (err instanceof NotFoundError) { | ||||||||||||||||||||||||||
| return res.sendStatus(404); | ||||||||||||||||||||||||||
| } else if (err instanceof ForbiddenError) { | ||||||||||||||||||||||||||
| return res.sendStatus(403); | ||||||||||||||||||||||||||
| } else if (err instanceof UploadNotActiveError) { | ||||||||||||||||||||||||||
| return res.sendStatus(400); | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
Comment on lines
329
to
331
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Suggested change
|
||||||||||||||||||||||||||
| throw err; | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||
|
jakeisonline marked this conversation as resolved.
|
||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| gcloudStorageAPI.delete("/upload/storage/v1/b/:bucketId/o", async (req, res) => { | ||||||||||||||||||||||||||
| if (!req.query.upload_id) { | ||||||||||||||||||||||||||
| return res.sendStatus(405); | ||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Returning a A
Suggested change
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree with this comment, but the live service will respond with a Happy to be challenged and adjust though. |
||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| let metadata: StoredFileMetadata; | ||||||||||||||||||||||||||
| try { | ||||||||||||||||||||||||||
| metadata = await adminStorageLayer.uploadObject(upload); | ||||||||||||||||||||||||||
| uploadService.cancelResumableUpload(req.query.upload_id.toString()); | ||||||||||||||||||||||||||
| return res.sendStatus(499); | ||||||||||||||||||||||||||
| } catch (err) { | ||||||||||||||||||||||||||
| if (err instanceof ForbiddenError) { | ||||||||||||||||||||||||||
| return res.sendStatus(403); | ||||||||||||||||||||||||||
| if (err instanceof NotFoundError) { | ||||||||||||||||||||||||||
| return res.sendStatus(404); | ||||||||||||||||||||||||||
| } else if (err instanceof NotCancellableError) { | ||||||||||||||||||||||||||
| return res.sendStatus(405); | ||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| throw err; | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
| return res.json(new CloudStorageObjectMetadata(metadata)); | ||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| gcloudStorageAPI.post("/b/:bucketId/o/:objectId/acl", async (req, res) => { | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
UploadPreviouslyFinalizedErrorshould be imported to ensure it can be correctly handled in the catch block of the resumable upload endpoint.