Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove dependency tracker and replace analysis triggers #6958

Merged
merged 2 commits into from
Aug 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions app/lib/admin/tools/package_discontinued.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import 'package:args/args.dart';
import 'package:clock/clock.dart';

import 'package:pub_dev/job/backend.dart';
import 'package:pub_dev/package/backend.dart';
import 'package:pub_dev/package/models.dart';
import 'package:pub_dev/shared/datastore.dart';
import 'package:pub_dev/task/backend.dart';

final _argParser = ArgParser()
..addOption('package', help: 'The package to update.')
Expand Down Expand Up @@ -50,7 +50,6 @@ Future<String> executeSetPackageDiscontinued(List<String> args) async {
tx.insert(pkg);
});
await purgePackageCache(packageName);
await jobBackend.trigger(JobService.analyzer, packageName,
version: package.latestVersion);
await taskBackend.trackPackage(packageName);
return 'Done.';
}
160 changes: 2 additions & 158 deletions app/lib/job/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,14 @@

import 'dart:async';

import 'package:clock/clock.dart';
import 'package:gcloud/service_scope.dart' as ss;
import 'package:logging/logging.dart';

import '../package/backend.dart';
import '../package/models.dart';
import '../shared/datastore.dart' as db;
import '../shared/popularity_storage.dart';
import '../shared/versions.dart' as versions;

import 'model.dart';

export 'model.dart';

const _shortExtendDuration = Duration(hours: 12);

final _logger = Logger('pub.job.backend');

typedef ShouldProcess = Future<bool> Function(
String package, String version, DateTime updated);

Expand All @@ -37,154 +27,8 @@ class JobBackend {

JobBackend(this._db);

String _id(JobService service, String package, String version) => Uri(
pathSegments: [
versions.runtimeVersion,
service.toString().split('.').last,
package,
version,
],
).toString();

/// Triggers analysis job.
Future<void> triggerAnalysis(
String package,
String? version, {
bool isHighPriority = false,
}) async {
await jobBackend.trigger(
JobService.analyzer,
package,
version: version,
isHighPriority: isHighPriority,
);
}

/// Triggers analysis/dartdoc for [package]/[version] if older than [updated].
Future<void> trigger(
JobService service,
String package, {
String? version,
DateTime? updated,
bool? shouldProcess,
bool isHighPriority = false,
}) async {
final pKey = _db.emptyKey.append(Package, id: package);
final p = await _db.lookupOrNull<Package>(pKey);
if (p == null || p.isNotVisible) {
_logger.info("Couldn't trigger $service job: $package not found.");
return;
}
final latestReleases = await packageBackend.latestReleases(p);

version ??= latestReleases.stable.version;
final pvKey = pKey.append(PackageVersion, id: version);
final pv = await _db.lookupOrNull<PackageVersion>(pvKey);
if (pv == null) {
_logger
.info("Couldn't trigger $service job: $package $version not found.");
return;
}

final isLatestStable = p.latestVersion == version;
final isLatestPrerelease = latestReleases.showPrerelease &&
latestReleases.prerelease!.version == version;
final isLatestPreview = latestReleases.showPreview &&
latestReleases.preview!.version == version;
shouldProcess ??= updated == null || updated.isAfter(pv.created!);
shouldProcess |= isHighPriority;
await createOrUpdate(
service: service,
package: package,
version: version,
isLatestStable: isLatestStable,
isLatestPrerelease: isLatestPrerelease,
isLatestPreview: isLatestPreview,
packageVersionUpdated: pv.created,
shouldProcess: shouldProcess,
priority: isHighPriority ? 0 : null,
);
}

Future<void> createOrUpdate({
required JobService service,
required String package,
required String version,
required bool isLatestStable,
required bool isLatestPrerelease,
required bool isLatestPreview,
required DateTime? packageVersionUpdated,
required bool shouldProcess,
int? priority,
}) async {
packageVersionUpdated ??= clock.now().toUtc();
final id = _id(service, package, version);
final state = shouldProcess ? JobState.available : JobState.idle;
final lockedUntil =
shouldProcess ? null : clock.now().add(_shortExtendDuration);
await db.withRetryTransaction(_db, (tx) async {
final current =
await tx.lookupOrNull<Job>(_db.emptyKey.append(Job, id: id));
if (current != null) {
final hasNotChanged = current.isLatestStable == isLatestStable &&
current.isLatestPrerelease == isLatestPrerelease &&
current.isLatestPreview == isLatestPreview &&
!current.packageVersionUpdated!.isBefore(packageVersionUpdated!) &&
(priority == null || current.priority <= priority);
if (hasNotChanged && !shouldProcess) {
// no reason to re-schedule the job
return;
}
if (current.state == JobState.available &&
current.lockedUntil == null) {
// already scheduled for processing
return;
}
_logger.info('Updating job: $id ($state, $lockedUntil)');
current
..isLatestStable = isLatestStable
..isLatestPrerelease = isLatestPrerelease
..isLatestPreview = isLatestPreview
..packageVersionUpdated = packageVersionUpdated
..state = state
..lockedUntil = lockedUntil
..processingKey = null // drops ongoing processing
..updatePriority(
popularityStorage.lookup(package),
fixPriority: priority,
);
tx.insert(current);
return;
} else {
_logger.info('Creating job: $id');
final job = Job()
..id = id
..service = service
..packageName = package
..packageVersion = version
..isLatestStable = isLatestStable
..isLatestPrerelease = isLatestPrerelease
..isLatestPreview = isLatestPreview
..packageVersionUpdated = packageVersionUpdated
..state = JobState.available
..lockedUntil = null
..lastStatus = JobStatus.none
..runtimeVersion = versions.runtimeVersion
..errorCount = 0
..updatePriority(
popularityStorage.lookup(package),
fixPriority: priority,
);
tx.insert(job);
return;
}
});
}

/// Deletes the old entries that predate [versions.gcBeforeRuntimeVersion].
/// Deletes the old entries.
Future<void> deleteOldEntries() async {
final query = _db.query<Job>()
..filter('runtimeVersion <', versions.gcBeforeRuntimeVersion);
await _db.deleteWithQuery<Job>(query);
await _db.deleteWithQuery<Job>(_db.query<Job>());
}
}
6 changes: 1 addition & 5 deletions app/lib/package/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import '../account/backend.dart';
import '../account/consent_backend.dart';
import '../account/models.dart' show User;
import '../audit/models.dart';
import '../job/backend.dart';
import '../publisher/backend.dart';
import '../service/email/backend.dart';
import '../service/email/models.dart';
Expand Down Expand Up @@ -378,10 +377,8 @@ class PackageBackend {
}

final pkg = await _requirePackageAdmin(package, user.userId);
String? latestVersion;
await withRetryTransaction(db, (tx) async {
final p = await tx.lookupValue<Package>(pkg.key);
latestVersion = p.latestVersion;

final optionsChanges = <String>[];
if (options.isDiscontinued != null &&
Expand Down Expand Up @@ -419,8 +416,7 @@ class PackageBackend {
));
});
await purgePackageCache(package);
await jobBackend.trigger(JobService.analyzer, package,
version: latestVersion);
await taskBackend.trackPackage(package);
}

/// Updates [options] on [package]/[version], assuming the current user
Expand Down
Loading