Skip to content

Commit

Permalink
Remove dependency tracker and replace analysis triggers (#6958)
Browse files Browse the repository at this point in the history
* Remove old dependency tracker.

* Remove most of Job backend + remove entries unconditionally.
  • Loading branch information
isoos authored Aug 23, 2023
1 parent b71e2c4 commit 71244f1
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 405 deletions.
5 changes: 2 additions & 3 deletions app/lib/admin/tools/package_discontinued.dart
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@
import 'package:args/args.dart';
import 'package:clock/clock.dart';

import 'package:pub_dev/job/backend.dart';
import 'package:pub_dev/package/backend.dart';
import 'package:pub_dev/package/models.dart';
import 'package:pub_dev/shared/datastore.dart';
import 'package:pub_dev/task/backend.dart';

final _argParser = ArgParser()
..addOption('package', help: 'The package to update.')
Expand Down Expand Up @@ -50,7 +50,6 @@ Future<String> executeSetPackageDiscontinued(List<String> args) async {
tx.insert(pkg);
});
await purgePackageCache(packageName);
await jobBackend.trigger(JobService.analyzer, packageName,
version: package.latestVersion);
await taskBackend.trackPackage(packageName);
return 'Done.';
}
160 changes: 2 additions & 158 deletions app/lib/job/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,14 @@

import 'dart:async';

import 'package:clock/clock.dart';
import 'package:gcloud/service_scope.dart' as ss;
import 'package:logging/logging.dart';

import '../package/backend.dart';
import '../package/models.dart';
import '../shared/datastore.dart' as db;
import '../shared/popularity_storage.dart';
import '../shared/versions.dart' as versions;

import 'model.dart';

export 'model.dart';

const _shortExtendDuration = Duration(hours: 12);

final _logger = Logger('pub.job.backend');

typedef ShouldProcess = Future<bool> Function(
String package, String version, DateTime updated);

Expand All @@ -37,154 +27,8 @@ class JobBackend {

JobBackend(this._db);

String _id(JobService service, String package, String version) => Uri(
pathSegments: [
versions.runtimeVersion,
service.toString().split('.').last,
package,
version,
],
).toString();

/// Triggers analysis job.
Future<void> triggerAnalysis(
String package,
String? version, {
bool isHighPriority = false,
}) async {
await jobBackend.trigger(
JobService.analyzer,
package,
version: version,
isHighPriority: isHighPriority,
);
}

/// Triggers analysis/dartdoc for [package]/[version] if older than [updated].
Future<void> trigger(
JobService service,
String package, {
String? version,
DateTime? updated,
bool? shouldProcess,
bool isHighPriority = false,
}) async {
final pKey = _db.emptyKey.append(Package, id: package);
final p = await _db.lookupOrNull<Package>(pKey);
if (p == null || p.isNotVisible) {
_logger.info("Couldn't trigger $service job: $package not found.");
return;
}
final latestReleases = await packageBackend.latestReleases(p);

version ??= latestReleases.stable.version;
final pvKey = pKey.append(PackageVersion, id: version);
final pv = await _db.lookupOrNull<PackageVersion>(pvKey);
if (pv == null) {
_logger
.info("Couldn't trigger $service job: $package $version not found.");
return;
}

final isLatestStable = p.latestVersion == version;
final isLatestPrerelease = latestReleases.showPrerelease &&
latestReleases.prerelease!.version == version;
final isLatestPreview = latestReleases.showPreview &&
latestReleases.preview!.version == version;
shouldProcess ??= updated == null || updated.isAfter(pv.created!);
shouldProcess |= isHighPriority;
await createOrUpdate(
service: service,
package: package,
version: version,
isLatestStable: isLatestStable,
isLatestPrerelease: isLatestPrerelease,
isLatestPreview: isLatestPreview,
packageVersionUpdated: pv.created,
shouldProcess: shouldProcess,
priority: isHighPriority ? 0 : null,
);
}

Future<void> createOrUpdate({
required JobService service,
required String package,
required String version,
required bool isLatestStable,
required bool isLatestPrerelease,
required bool isLatestPreview,
required DateTime? packageVersionUpdated,
required bool shouldProcess,
int? priority,
}) async {
packageVersionUpdated ??= clock.now().toUtc();
final id = _id(service, package, version);
final state = shouldProcess ? JobState.available : JobState.idle;
final lockedUntil =
shouldProcess ? null : clock.now().add(_shortExtendDuration);
await db.withRetryTransaction(_db, (tx) async {
final current =
await tx.lookupOrNull<Job>(_db.emptyKey.append(Job, id: id));
if (current != null) {
final hasNotChanged = current.isLatestStable == isLatestStable &&
current.isLatestPrerelease == isLatestPrerelease &&
current.isLatestPreview == isLatestPreview &&
!current.packageVersionUpdated!.isBefore(packageVersionUpdated!) &&
(priority == null || current.priority <= priority);
if (hasNotChanged && !shouldProcess) {
// no reason to re-schedule the job
return;
}
if (current.state == JobState.available &&
current.lockedUntil == null) {
// already scheduled for processing
return;
}
_logger.info('Updating job: $id ($state, $lockedUntil)');
current
..isLatestStable = isLatestStable
..isLatestPrerelease = isLatestPrerelease
..isLatestPreview = isLatestPreview
..packageVersionUpdated = packageVersionUpdated
..state = state
..lockedUntil = lockedUntil
..processingKey = null // drops ongoing processing
..updatePriority(
popularityStorage.lookup(package),
fixPriority: priority,
);
tx.insert(current);
return;
} else {
_logger.info('Creating job: $id');
final job = Job()
..id = id
..service = service
..packageName = package
..packageVersion = version
..isLatestStable = isLatestStable
..isLatestPrerelease = isLatestPrerelease
..isLatestPreview = isLatestPreview
..packageVersionUpdated = packageVersionUpdated
..state = JobState.available
..lockedUntil = null
..lastStatus = JobStatus.none
..runtimeVersion = versions.runtimeVersion
..errorCount = 0
..updatePriority(
popularityStorage.lookup(package),
fixPriority: priority,
);
tx.insert(job);
return;
}
});
}

/// Deletes the old entries that predate [versions.gcBeforeRuntimeVersion].
/// Deletes the old entries.
Future<void> deleteOldEntries() async {
final query = _db.query<Job>()
..filter('runtimeVersion <', versions.gcBeforeRuntimeVersion);
await _db.deleteWithQuery<Job>(query);
await _db.deleteWithQuery<Job>(_db.query<Job>());
}
}
6 changes: 1 addition & 5 deletions app/lib/package/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import '../account/backend.dart';
import '../account/consent_backend.dart';
import '../account/models.dart' show User;
import '../audit/models.dart';
import '../job/backend.dart';
import '../publisher/backend.dart';
import '../service/email/backend.dart';
import '../service/email/models.dart';
Expand Down Expand Up @@ -378,10 +377,8 @@ class PackageBackend {
}

final pkg = await _requirePackageAdmin(package, user.userId);
String? latestVersion;
await withRetryTransaction(db, (tx) async {
final p = await tx.lookupValue<Package>(pkg.key);
latestVersion = p.latestVersion;

final optionsChanges = <String>[];
if (options.isDiscontinued != null &&
Expand Down Expand Up @@ -419,8 +416,7 @@ class PackageBackend {
));
});
await purgePackageCache(package);
await jobBackend.trigger(JobService.analyzer, package,
version: latestVersion);
await taskBackend.trackPackage(package);
}

/// Updates [options] on [package]/[version], assuming the current user
Expand Down
Loading

0 comments on commit 71244f1

Please sign in to comment.