Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry storage list operations using retried pagination. #8431

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion app/lib/package/api_export/exported_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,8 @@ final class ExportedApi {
Future<void> _gcOldPrefixes() async {
// List all top-level prefixes, and delete the ones we don't need
final topLevelprefixes = await _pool.withResource(
() async => await _bucket.list(prefix: '', delimiter: '/').toList(),
() async =>
await _bucket.listAllItemsWithRetry(prefix: '', delimiter: '/'),
);
await Future.wait(topLevelprefixes.map((entry) async {
if (entry.isObject) {
Expand Down
29 changes: 14 additions & 15 deletions app/lib/package/tarball_storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,10 @@ class TarballStorage {
String package,
) async {
final prefix = _tarballObjectNamePackagePrefix(package);
final items = await _canonicalBucket
.list(
prefix: prefix,
delimiter: '',
)
.toList();
final items = await _canonicalBucket.listAllItemsWithRetry(
prefix: prefix,
delimiter: '',
);
return Map.fromEntries(items.whereType<BucketObjectEntry>().map((item) {
final version = item.name.without(prefix: prefix, suffix: '.tar.gz');
return MapEntry(
Expand Down Expand Up @@ -255,32 +253,33 @@ class TarballStorage {
final filterForNamePrefix = package == null
? 'packages/'
: _tarballObjectNamePackagePrefix(package);
await for (final entry in _publicBucket.list(prefix: filterForNamePrefix)) {
await _publicBucket.listWithRetry(prefix: filterForNamePrefix,
(entry) async {
// Skip non-objects.
if (!entry.isObject) {
continue;
return;
}
// Skip objects that were matched in the previous step.
if (objectNamesInPublicBucket.contains(entry.name)) {
continue;
return;
}
if (deleteObjects.contains(entry.name)) {
continue;
return;
}

final publicInfo = await _publicBucket.tryInfo(entry.name);
if (publicInfo == null) {
_logger.warning(
'Failed to get info for public bucket object "${entry.name}".');
continue;
return;
}

await updateContentDispositionToAttachment(publicInfo, _publicBucket);

// Skip recently updated objects.
if (publicInfo.age < ageCheckThreshold) {
// Ignore recent files.
continue;
return;
}

final canonicalInfo = await _canonicalBucket.tryInfo(entry.name);
Expand All @@ -289,11 +288,11 @@ class TarballStorage {
// but it wasn't matched through the [PackageVersion] query above.
if (canonicalInfo.age < ageCheckThreshold) {
// Ignore recent files.
continue;
return;
}
_logger.severe(
'Object without matching PackageVersion in canonical and public buckets: "${entry.name}".');
continue;
return;
} else {
// The object in the public bucket has no matching file in the canonical bucket.
// We can assume it is stale and can delete it.
Expand All @@ -305,7 +304,7 @@ class TarballStorage {
deleteObjects.add(entry.name);
}
}
}
});

for (final objectName in deleteObjects) {
_logger.shout('Deleting object from public bucket: "$objectName".');
Expand Down
3 changes: 2 additions & 1 deletion app/lib/service/download_counts/sync_download_counts.dart
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ Future<Set<String>> processDownloadCounts(DateTime date) async {

final failedFiles = <String>{};

final bucketEntries = await bucket.list(prefix: fileNamePrefix).toList();
final bucketEntries =
await bucket.listAllItemsWithRetry(prefix: fileNamePrefix);

if (bucketEntries.isEmpty) {
_logger.info('Failed to read any files with prefix "$fileNamePrefix"./n');
Expand Down
48 changes: 39 additions & 9 deletions app/lib/shared/storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,36 @@ extension BucketExt on Bucket {
return await _retry(() async => fn(read(objectName)));
}

/// List objects in the bucket with default retry with pagination.
Future<void> listWithRetry(
FutureOr<void> Function(BucketEntry input) fn, {
String? prefix,
String? delimiter,
}) async {
for (;;) {
var p = await pageWithRetry(prefix: prefix, delimiter: delimiter);
for (final item in p.items) {
await fn(item);
}
if (p.isLast) break;
p = await p.nextWithRetry();
}
}

/// Lists all entries with default retry pagination, returns them as List.
Future<List<BucketEntry>> listAllItemsWithRetry({
String? prefix,
String? delimiter,
}) async {
final entries = <BucketEntry>[];
await listWithRetry(
prefix: prefix,
delimiter: delimiter,
entries.add,
);
return entries;
}

/// The HTTP URL of a publicly accessable GCS object.
String objectUrl(String objectName) {
return '${activeConfiguration.storageBaseUrl}/$bucketName/$objectName';
Expand Down Expand Up @@ -324,8 +354,9 @@ Future<int> deleteBucketFolderRecursively(
page = await retry(
() async {
return page == null
? await bucket.page(prefix: folder, delimiter: '', pageSize: 100)
: await page.next(pageSize: 100);
? await bucket.pageWithRetry(
prefix: folder, delimiter: '', pageSize: 100)
: await page.nextWithRetry(pageSize: 100);
},
delayFactor: Duration(seconds: 10),
maxAttempts: 3,
Expand Down Expand Up @@ -430,8 +461,7 @@ class VersionedJsonStorage {
}
// fallback to earlier runtimes
final currentPath = _objectName();
final list = await _bucket
.list(prefix: _prefix)
final list = (await _bucket.listAllItemsWithRetry(prefix: _prefix))
.map((entry) => entry.name)
.where((name) => name.endsWith(_extension))
.where((name) => name.compareTo(currentPath) <= 0)
Expand All @@ -456,19 +486,19 @@ class VersionedJsonStorage {
Future<DeleteCounts> deleteOldData({Duration? minAgeThreshold}) async {
var found = 0;
var deleted = 0;
await for (final entry in _bucket.list(prefix: _prefix)) {
await _bucket.listWithRetry(prefix: _prefix, (entry) async {
if (entry.isDirectory) {
continue;
return;
}
final name = p.basename(entry.name);
if (!name.endsWith(_extension)) {
continue;
return;
}
final version = name.substring(0, name.length - _extension.length);
final matchesPattern = version.length == 10 &&
versions.runtimeVersionPattern.hasMatch(version);
if (!matchesPattern) {
continue;
return;
}
found++;
if (versions.shouldGCVersion(version)) {
Expand All @@ -479,7 +509,7 @@ class VersionedJsonStorage {
await deleteFromBucket(_bucket, entry.name);
}
}
}
});
return DeleteCounts(found, deleted);
}

Expand Down
10 changes: 5 additions & 5 deletions app/lib/task/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -516,10 +516,10 @@ class TaskBackend {
// Objects in the bucket are stored under the following pattern:
// `<runtimeVersion>/<package>/<version>/...`
// Thus, we list with `/` as delimiter and get a list of runtimeVersions
await for (final d in _bucket.list(prefix: '', delimiter: '/')) {
await _bucket.listWithRetry(prefix: '', delimiter: '/', (d) async {
if (!d.isDirectory) {
_log.warning('bucket should not contain any top-level object');
continue;
return;
}

// Remove trailing slash from object prefix, to get a runtimeVersion
Expand All @@ -529,7 +529,7 @@ class TaskBackend {
// Check if the runtimeVersion should be GC'ed
if (shouldGCVersion(rtVersion)) {
// List all objects under the `<rtVersion>/`
await for (final obj in _bucket.list(prefix: d.name, delimiter: '')) {
await _bucket.listWithRetry(prefix: d.name, delimiter: '', (obj) async {
// Limit concurrency
final r = await pool.request();

Expand All @@ -545,9 +545,9 @@ class TaskBackend {
r.release(); // always release to avoid deadlock
}
});
}
});
}
}
});

// Close the pool, and wait for all pending deletion request to complete.
await pool.close();
Expand Down
1 change: 1 addition & 0 deletions pkg/fake_gcloud/lib/retry_enforcer_storage.dart
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ class _RetryEnforcerBucket implements Bucket {
@override
Stream<BucketEntry> list({String? prefix, String? delimiter}) {
// TODO: verify retry wrapper here
_verifyRetryOnStack();
return _bucket.list(
prefix: prefix,
delimiter: delimiter,
Expand Down
Loading