Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 54 additions & 4 deletions src/deploy/functions/release/fabricator.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@
it("handles topics that already exist", async () => {
pubsub.createTopic.callsFake(() => {
const err = new Error("Already exists");
(err as any).status = 409;

Check warning on line 450 in src/deploy/functions/release/fabricator.spec.ts

View workflow job for this annotation

GitHub Actions / lint (20)

Unexpected any. Specify a different type

Check warning on line 450 in src/deploy/functions/release/fabricator.spec.ts

View workflow job for this annotation

GitHub Actions / lint (20)

Unsafe member access .status on an `any` value
return Promise.reject(err);
});
gcfv2.createFunction.resolves({ name: "op", done: false });
Expand Down Expand Up @@ -522,7 +522,7 @@
eventarc.createChannel.callsFake(({ name }) => {
expect(name).to.equal("channel");
const err = new Error("Already exists");
(err as any).status = 409;

Check warning on line 525 in src/deploy/functions/release/fabricator.spec.ts

View workflow job for this annotation

GitHub Actions / lint (20)

Unexpected any. Specify a different type

Check warning on line 525 in src/deploy/functions/release/fabricator.spec.ts

View workflow job for this annotation

GitHub Actions / lint (20)

Unsafe member access .status on an `any` value
return Promise.reject(err);
});
gcfv2.createFunction.resolves({ name: "op", done: false });
Expand Down Expand Up @@ -588,7 +588,7 @@
eventarc.getChannel.resolves(undefined);
eventarc.createChannel.callsFake(() => {
const err = new Error("🤷‍♂️");
(err as any).status = 400;

Check warning on line 591 in src/deploy/functions/release/fabricator.spec.ts

View workflow job for this annotation

GitHub Actions / lint (20)

Unexpected any. Specify a different type

Check warning on line 591 in src/deploy/functions/release/fabricator.spec.ts

View workflow job for this annotation

GitHub Actions / lint (20)

Unsafe member access .status on an `any` value
return Promise.reject(err);
});

Expand Down Expand Up @@ -1580,7 +1580,7 @@
const updateEndpoint = sinon.stub(fab, "updateEndpoint");
updateEndpoint.callsFake(fakeUpsert);

await fab.applyChangeset(changes);
await fab.applyPlan({ "us-central1": changes });
});

it("handles errors and wraps them in results", async () => {
Expand All @@ -1593,7 +1593,8 @@
endpointsToSkip: [],
};

const results = await fab.applyChangeset(changes);
const summary = await fab.applyPlan({ "us-central1": changes });
const results = summary.results;
expect(results[0].error).to.be.instanceOf(reporter.DeploymentError);
expect(results[0].error?.message).to.match(/create function/);
});
Expand Down Expand Up @@ -1645,7 +1646,8 @@
endpointsToSkip: [],
};

const results = await fab.applyChangeset(changes);
const summary = await fab.applyPlan({ "us-central1": changes });
const results = summary.results;
const result = results.find((r) => r.endpoint.id === deleteEP.id);
expect(result?.error).to.be.instanceOf(reporter.AbortedDeploymentError);
expect(result?.durationMs).to.equal(0);
Expand All @@ -1671,7 +1673,8 @@
const deleteEndpoint = sinon.stub(fab, "deleteEndpoint");
deleteEndpoint.resolves();

const results = await fab.applyChangeset(changes);
const summary = await fab.applyPlan({ "us-central1": changes });
const results = summary.results;
expect(createEndpoint).to.have.been.calledWithMatch(createEP);
expect(updateEndpoint).to.have.been.calledWithMatch(update);
expect(deleteEndpoint).to.have.been.calledWith(deleteEP);
Expand Down Expand Up @@ -1712,11 +1715,58 @@
expect(ep2Result?.error).to.be.instanceOf(reporter.DeploymentError);
expect(ep2Result?.error?.message).to.match(/delete function/);
});

it("waits for all creates/updates to complete before doing deletes", async () => {
const ep1 = endpoint({ httpsTrigger: {} }, { region: "us-central1", id: "A" });
const ep2 = endpoint({ httpsTrigger: {} }, { region: "us-west1", id: "B" });
const plan: planner.DeploymentPlan = {
"us-central1": {
endpointsToCreate: [ep1],
endpointsToUpdate: [],
endpointsToDelete: [],
endpointsToSkip: [],
},
"us-west1": {
endpointsToCreate: [],
endpointsToUpdate: [],
endpointsToDelete: [ep2],
endpointsToSkip: [],
},
};

let resolveCreate: () => void;
const createPromise = new Promise<void>((resolve) => {
resolveCreate = resolve;
});

let createFinished = false;
const createEndpoint = sinon.stub(fab, "createEndpoint").callsFake(async () => {
await createPromise;
createFinished = true;
});

const deleteEndpoint = sinon.stub(fab, "deleteEndpoint").callsFake(async () => {
expect(createFinished).to.be.true;
});

const applyPlanPromise = fab.applyPlan(plan);

// At this point, create should be pending, and delete should NOT have run yet.
expect(deleteEndpoint).to.not.have.been.called;

// Resolve the create operation
resolveCreate!();

Check warning on line 1758 in src/deploy/functions/release/fabricator.spec.ts

View workflow job for this annotation

GitHub Actions / lint (20)

Forbidden non-null assertion

await applyPlanPromise;

expect(createEndpoint).to.have.been.calledOnce;
expect(deleteEndpoint).to.have.been.calledOnce;
});
});

describe("createRunFunction", () => {
it("creates a Cloud Run service with correct configuration", async () => {
runv2.createService.resolves({ uri: "https://service", name: "service" } as any);

Check warning on line 1769 in src/deploy/functions/release/fabricator.spec.ts

View workflow job for this annotation

GitHub Actions / lint (20)

Unexpected any. Specify a different type

Check warning on line 1769 in src/deploy/functions/release/fabricator.spec.ts

View workflow job for this annotation

GitHub Actions / lint (20)

Unsafe argument of type `any` assigned to a parameter of type `Service | undefined`
run.setInvokerUpdate.resolves();

const ep = endpoint(
Expand Down Expand Up @@ -1759,7 +1809,7 @@

describe("updateRunFunction", () => {
it("updates a Cloud Run service with correct configuration", async () => {
runv2.updateService.resolves({ uri: "https://service", name: "service" } as any);

Check warning on line 1812 in src/deploy/functions/release/fabricator.spec.ts

View workflow job for this annotation

GitHub Actions / lint (20)

Unsafe argument of type `any` assigned to a parameter of type `Service | undefined`
run.setInvokerUpdate.resolves();

const ep = endpoint(
Expand Down
148 changes: 95 additions & 53 deletions src/deploy/functions/release/fabricator.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,87 +95,129 @@ export class Fabricator {
totalTime: 0,
results: [],
};
const deployChangesets = Object.values(plan).map(async (changes): Promise<void> => {
const results = await this.applyChangeset(changes);
summary.results.push(...results);
return;

const changesets = Object.values(plan);

// Phase 1: Creates and Updates
const scraperV1 = new SourceTokenScraper();
const scraperV2 = new SourceTokenScraper();
const createAndUpdatePromises = changesets.map((changes) => {
return this.applyUpserts(changes, scraperV1, scraperV2);
});
const promiseResults = await utils.allSettled(deployChangesets);
const createAndUpdateResultsArray = await Promise.allSettled(createAndUpdatePromises);

const errs = promiseResults
.filter((r) => r.status === "rejected")
.map((r) => (r as utils.PromiseRejectedResult).reason);
if (errs.length) {
// Process results of Phase 1
summary.results = createAndUpdateResultsArray.reduce<reporter.DeployResult[]>((acc, r) => {
if (r.status === "fulfilled") {
return [...acc, ...r.value];
}
// Handle rejection
logger.debug(
"Fabricator.applyRegionalChanges returned an unhandled exception. This should never happen",
JSON.stringify(errs, null, 2),
"Fabricator.applyUpserts returned an unhandled exception.",
JSON.stringify(r.reason, null, 2),
);
return acc;
}, []);

// Simplify failure check (remove redundant check on createAndUpdateResultsArray)
const hasFailures = summary.results.some((r) => r.error);

if (hasFailures) {
utils.logLabeledWarning("functions", "Deploys failed. Skipping deletes.");

summary.results = changesets.reduce<reporter.DeployResult[]>((accum, changes) => {
const currentAborts = changes.endpointsToDelete.map((endpoint) => ({
endpoint,
durationMs: 0,
error: new reporter.AbortedDeploymentError(endpoint),
}));
return [...accum, ...currentAborts];
}, summary.results);

summary.totalTime = timer.stop();
return summary;
}

// Phase 2: Deletes
const deleteResultsArray = await Promise.allSettled(
changesets.map((changes) => this.applyDeletes(changes)),
);

const deleteResults = deleteResultsArray.reduce<reporter.DeployResult[]>((acc, r) => {
if (r.status === "fulfilled") {
return [...acc, ...r.value];
}
logger.debug(
"Fabricator.applyDeletes returned an unhandled exception. This should never happen",
JSON.stringify(r.reason, null, 2),
);
return acc;
Comment thread
shettyvarun268 marked this conversation as resolved.
}, []);

summary.results.push(...deleteResults);

summary.totalTime = timer.stop();
return summary;
}

async applyChangeset(changes: planner.Changeset): Promise<Array<reporter.DeployResult>> {
const deployResults: reporter.DeployResult[] = [];
const handle = async (
op: reporter.OperationType,
endpoint: backend.Endpoint,
fn: () => Promise<void>,
): Promise<void> => {
const timer = new Timer();
const result: Partial<reporter.DeployResult> = { endpoint };
try {
await fn();
this.logOpSuccess(op, endpoint);
} catch (err: any) {
result.error = err as Error;
}
result.durationMs = timer.stop();
deployResults.push(result as reporter.DeployResult);
};
async applyUpserts(
changes: planner.Changeset,
scraperV1: SourceTokenScraper,
scraperV2: SourceTokenScraper,
): Promise<Array<reporter.DeployResult>> {
const ops: Array<Promise<reporter.DeployResult>> = [];

const upserts: Array<Promise<void>> = [];
const scraperV1 = new SourceTokenScraper();
const scraperV2 = new SourceTokenScraper();
for (const endpoint of changes.endpointsToCreate) {
this.logOpStart("creating", endpoint);
upserts.push(
handle("create", endpoint, () => this.createEndpoint(endpoint, scraperV1, scraperV2)),
ops.push(
this.wrapOperation("create", endpoint, () =>
this.createEndpoint(endpoint, scraperV1, scraperV2),
),
);
}

for (const endpoint of changes.endpointsToSkip) {
utils.logSuccess(this.getLogSuccessMessage("skip", endpoint));
}

for (const update of changes.endpointsToUpdate) {
this.logOpStart("updating", update.endpoint);
upserts.push(
handle("update", update.endpoint, () => this.updateEndpoint(update, scraperV1, scraperV2)),
ops.push(
this.wrapOperation("update", update.endpoint, () =>
this.updateEndpoint(update, scraperV1, scraperV2),
),
);
}
await utils.allSettled(upserts);

// Note: every promise is generated by handle which records error in results.
// We've used hasErrors as a cheater here instead of viewing the results of allSettled
if (deployResults.find((r) => r.error)) {
for (const endpoint of changes.endpointsToDelete) {
deployResults.push({
endpoint,
durationMs: 0,
error: new reporter.AbortedDeploymentError(endpoint),
});
}
return deployResults;
}
return Promise.all(ops);
}

async applyDeletes(changes: planner.Changeset): Promise<Array<reporter.DeployResult>> {
const ops: Array<Promise<reporter.DeployResult>> = [];

const deletes: Array<Promise<void>> = [];
for (const endpoint of changes.endpointsToDelete) {
this.logOpStart("deleting", endpoint);
deletes.push(handle("delete", endpoint, () => this.deleteEndpoint(endpoint)));
ops.push(this.wrapOperation("delete", endpoint, () => this.deleteEndpoint(endpoint)));
}
await utils.allSettled(deletes);

return deployResults;
return Promise.all(ops);
}

private async wrapOperation(
op: reporter.OperationType,
endpoint: backend.Endpoint,
fn: () => Promise<void>,
): Promise<reporter.DeployResult> {
const timer = new Timer();
const result: Partial<reporter.DeployResult> = { endpoint };
try {
await fn();
this.logOpSuccess(op, endpoint);
} catch (err: any) {
result.error = err as Error;
}
result.durationMs = timer.stop();
return result as reporter.DeployResult;
}

async createEndpoint(
Expand Down
Loading