Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue in counting artifacts on retries, improve parallelization #198488

Merged
merged 5 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 83 additions & 30 deletions build/azure-pipelines/common/publish.js

Large diffs are not rendered by default.

124 changes: 88 additions & 36 deletions build/azure-pipelines/common/publish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,46 @@ class Temp {
}
}

class Sequencer {
export class Limiter {

private current: Promise<unknown> = Promise.resolve(null);
private _size = 0;
private runningPromises: number;
private readonly maxDegreeOfParalellism: number;
private readonly outstandingPromises: { factory: () => Promise<any>; c: (v: any) => void; e: (err: Error) => void }[];

queue<T>(promiseTask: () => Promise<T>): Promise<T> {
return this.current = this.current.then(() => promiseTask(), () => promiseTask());
constructor(maxDegreeOfParalellism: number) {
this.maxDegreeOfParalellism = maxDegreeOfParalellism;
this.outstandingPromises = [];
this.runningPromises = 0;
}

queue<T>(factory: () => Promise<T>): Promise<T> {
this._size++;

return new Promise<T>((c, e) => {
this.outstandingPromises.push({ factory, c, e });
this.consume();
});
}

private consume(): void {
while (this.outstandingPromises.length && this.runningPromises < this.maxDegreeOfParalellism) {
const iLimitedTask = this.outstandingPromises.shift()!;
this.runningPromises++;

const promise = iLimitedTask.factory();
promise.then(iLimitedTask.c, iLimitedTask.e);
promise.then(() => this.consumed(), () => this.consumed());
}
}

private consumed(): void {
this._size--;
this.runningPromises--;

if (this.outstandingPromises.length > 0) {
this.consume();
}
}
}

Expand Down Expand Up @@ -162,7 +196,7 @@ interface ReleaseDetailsResult {

class ESRPClient {

private static Sequencer = new Sequencer();
private static Limiter = new Limiter(1);

private readonly authPath: string;

Expand Down Expand Up @@ -198,7 +232,7 @@ class ESRPClient {
version: string,
filePath: string
): Promise<Release> {
const submitReleaseResult = await ESRPClient.Sequencer.queue(async () => {
const submitReleaseResult = await ESRPClient.Limiter.queue(async () => {
this.log(`Submitting release for ${version}: ${filePath}`);
return await this.SubmitRelease(version, filePath);
});
Expand Down Expand Up @@ -392,7 +426,7 @@ class State {
const stageAttempt = e('SYSTEM_STAGEATTEMPT');
this.statePath = path.join(pipelineWorkspacePath, `artifacts_processed_${stageAttempt}`, `artifacts_processed_${stageAttempt}.txt`);
fs.mkdirSync(path.dirname(this.statePath), { recursive: true });
fs.writeFileSync(this.statePath, [...this.set.values()].join('\n'));
fs.writeFileSync(this.statePath, [...this.set.values()].map(name => `${name}\n`).join(''));
}

get size(): number {
Expand All @@ -413,7 +447,17 @@ class State {
}
}

const azdoFetchOptions = { headers: { Authorization: `Bearer ${e('SYSTEM_ACCESSTOKEN')}` } };
const azdoFetchOptions = {
headers: {
// Pretend we're a web browser to avoid download rate limits
'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36 Edg/119.0.0.0',
'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
'Accept-Encoding': 'gzip, deflate, br',
'Accept-Language': 'en-US,en;q=0.9',
'Referer': 'https://dev.azure.com',
Authorization: `Bearer ${e('SYSTEM_ACCESSTOKEN')}`
}
};

async function requestAZDOAPI<T>(path: string): Promise<T> {
const abortController = new AbortController();
Expand Down Expand Up @@ -461,7 +505,7 @@ async function getPipelineTimeline(): Promise<Timeline> {

async function downloadArtifact(artifact: Artifact, downloadPath: string): Promise<void> {
const abortController = new AbortController();
const timeout = setTimeout(() => abortController.abort(), 6 * 60 * 1000);
const timeout = setTimeout(() => abortController.abort(), 4 * 60 * 1000);

try {
const res = await fetch(artifact.resource.downloadUrl, { ...azdoFetchOptions, signal: abortController.signal });
Expand Down Expand Up @@ -630,8 +674,8 @@ function getRealType(type: string) {
}
}

const azureSequencer = new Sequencer();
const mooncakeSequencer = new Sequencer();
const azureLimiter = new Limiter(1);
const mooncakeLimiter = new Limiter(1);

async function uploadAssetLegacy(log: (...args: any[]) => void, quality: string, commit: string, filePath: string): Promise<{ assetUrl: string; mooncakeUrl: string }> {
const fileName = path.basename(filePath);
Expand Down Expand Up @@ -660,7 +704,7 @@ async function uploadAssetLegacy(log: (...args: any[]) => void, quality: string,
if (await retry(() => blobClient.exists())) {
throw new Error(`Blob ${quality}, ${blobName} already exists, not publishing again.`);
} else {
await retry(attempt => azureSequencer.queue(async () => {
await retry(attempt => azureLimiter.queue(async () => {
log(`Uploading blobs to Azure storage (attempt ${attempt})...`);
await blobClient.uploadFile(filePath, blobOptions);
log('Blob successfully uploaded to Azure storage.');
Expand All @@ -682,7 +726,7 @@ async function uploadAssetLegacy(log: (...args: any[]) => void, quality: string,
if (await retry(() => mooncakeBlobClient.exists())) {
throw new Error(`Mooncake Blob ${quality}, ${blobName} already exists, not publishing again.`);
} else {
await retry(attempt => mooncakeSequencer.queue(async () => {
await retry(attempt => mooncakeLimiter.queue(async () => {
log(`Uploading blobs to Mooncake Azure storage (attempt ${attempt})...`);
await mooncakeBlobClient.uploadFile(filePath, blobOptions);
log('Blob successfully uploaded to Mooncake Azure storage.');
Expand Down Expand Up @@ -711,8 +755,8 @@ async function uploadAssetLegacy(log: (...args: any[]) => void, quality: string,
return { assetUrl, mooncakeUrl };
}

const downloadSequencer = new Sequencer();
const cosmosSequencer = new Sequencer();
const downloadLimiter = new Limiter(5);
const cosmosLimiter = new Limiter(1);

async function processArtifact(artifact: Artifact): Promise<void> {
const match = /^vscode_(?<product>[^_]+)_(?<os>[^_]+)_(?<arch>[^_]+)_(?<unprocessedType>[^_]+)$/.exec(artifact.name);
Expand All @@ -723,25 +767,39 @@ async function processArtifact(artifact: Artifact): Promise<void> {

const { product, os, arch, unprocessedType } = match.groups!;
const log = (...args: any[]) => console.log(`[${product} ${os} ${arch} ${unprocessedType}]`, ...args);
const start = Date.now();

const filePath = await retry(async attempt => {
const artifactZipPath = path.join(e('AGENT_TEMPDIRECTORY'), `${artifact.name}.zip`);
await downloadSequencer.queue(async () => {
log(`Downloading ${artifact.resource.downloadUrl} (attempt ${attempt})...`);
await downloadArtifact(artifact, artifactZipPath);
});

log(`Extracting (attempt ${attempt}) ...`);
const start = Date.now();
log(`Downloading ${artifact.resource.downloadUrl} (attempt ${attempt})...`);

try {
await downloadLimiter.queue(() => downloadArtifact(artifact, artifactZipPath));
} catch (err) {
log(`Download failed: ${err.message}`);
throw err;
}

const archiveSize = fs.statSync(artifactZipPath).size;
const downloadDurationS = (Date.now() - start) / 1000;
const downloadSpeedKBS = Math.round((archiveSize / 1024) / downloadDurationS);
log(`Successfully downloaded ${artifact.resource.downloadUrl} after ${Math.floor(downloadDurationS)} seconds (${downloadSpeedKBS} KB/s).`);

const filePath = await unzip(artifactZipPath, e('AGENT_TEMPDIRECTORY'));
const artifactSize = fs.statSync(filePath).size;

if (artifactSize !== Number(artifact.resource.properties.artifactsize)) {
throw new Error(`Artifact size mismatch. Expected ${artifact.resource.properties.artifactsize}. Actual ${artifactSize}`);
log(`Artifact size mismatch. Expected ${artifact.resource.properties.artifactsize}. Actual ${artifactSize}`);
throw new Error(`Artifact size mismatch.`);
}

return filePath;
});

log(`Successfully downloaded and extracted after ${(Date.now() - start) / 1000} seconds.`);

// getPlatform needs the unprocessedType
const quality = e('VSCODE_QUALITY');
const commit = e('BUILD_SOURCEVERSION');
Expand All @@ -751,8 +809,6 @@ async function processArtifact(artifact: Artifact): Promise<void> {
const stream = fs.createReadStream(filePath);
const [sha1hash, sha256hash] = await Promise.all([hashStream('sha1', stream), hashStream('sha256', stream)]);

log(`Publishing (size = ${size}, SHA1 = ${sha1hash}, SHA256 = ${sha256hash})...`);

const [{ assetUrl, mooncakeUrl }, prssUrl] = await Promise.all([
uploadAssetLegacy(log, quality, commit, filePath),
releaseAndProvision(
Expand All @@ -774,7 +830,7 @@ async function processArtifact(artifact: Artifact): Promise<void> {
log('Creating asset...', JSON.stringify(asset));

await retry(async (attempt) => {
await cosmosSequencer.queue(async () => {
await cosmosLimiter.queue(async () => {
log(`Creating asset in Cosmos DB (attempt ${attempt})...`);
const aadCredentials = new ClientSecretCredential(e('AZURE_TENANT_ID'), e('AZURE_CLIENT_ID'), e('AZURE_CLIENT_SECRET'));
const client = new CosmosClient({ endpoint: e('AZURE_DOCUMENTDB_ENDPOINT'), aadCredentials });
Expand All @@ -794,7 +850,7 @@ async function main() {
console.log(`\u2705 ${name}`);
}

const stages = new Set<string>();
const stages = new Set<string>(['Compile', 'CompileCLI']);
if (e('VSCODE_BUILD_STAGE_WINDOWS') === 'True') { stages.add('Windows'); }
if (e('VSCODE_BUILD_STAGE_LINUX') === 'True') { stages.add('Linux'); }
if (e('VSCODE_BUILD_STAGE_ALPINE') === 'True') { stages.add('Alpine'); }
Expand All @@ -806,21 +862,17 @@ async function main() {
while (true) {
const [timeline, artifacts] = await Promise.all([retry(() => getPipelineTimeline()), retry(() => getPipelineArtifacts())]);
const stagesCompleted = new Set<string>(timeline.records.filter(r => r.type === 'Stage' && r.state === 'completed' && stages.has(r.name)).map(r => r.name));

const stagesInProgress = [...stages].filter(s => !stagesCompleted.has(s));

if (stagesInProgress.length > 0) {
console.log('Stages in progress:', stagesInProgress.join(', '));
}

const artifactsInProgress = artifacts.filter(a => processing.has(a.name));

if (artifactsInProgress.length > 0) {
console.log('Artifacts in progress:', artifactsInProgress.map(a => a.name).join(', '));
}

if (stagesCompleted.size === stages.size && artifacts.length === done.size + processing.size) {
if (stagesInProgress.length === 0 && artifacts.length === done.size + processing.size) {
break;
} else if (stagesInProgress.length > 0) {
console.log('Stages in progress:', stagesInProgress.join(', '));
} else if (artifactsInProgress.length > 0) {
console.log('Artifacts in progress:', artifactsInProgress.map(a => a.name).join(', '));
} else {
console.log(`Waiting for a total of ${artifacts.length}, ${done.size} done, ${processing.size} in progress...`);
}

for (const artifact of artifacts) {
Expand Down
3 changes: 1 addition & 2 deletions build/azure-pipelines/product-build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -633,8 +633,7 @@ stages:

- ${{ if eq(variables['VSCODE_PUBLISH'], 'true') }}:
- stage: Publish
dependsOn:
- Compile
dependsOn: []
pool: 1es-windows-2019-x64
variables:
- name: BUILDS_API_URL
Expand Down
Loading