From 71244f169dd4948e95a060a4f36565c966c9a682 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Istv=C3=A1n=20So=C3=B3s?= Date: Wed, 23 Aug 2023 12:32:35 +0200 Subject: [PATCH] Remove dependency tracker and replace analysis triggers (#6958) * Remove old dependency tracker. * Remove most of Job backend + remove entries unconditionally. --- app/lib/admin/tools/package_discontinued.dart | 5 +- app/lib/job/backend.dart | 160 +------------- app/lib/package/backend.dart | 6 +- app/lib/package/deps_graph.dart | 208 ------------------ app/lib/service/entrypoint/analyzer.dart | 31 --- 5 files changed, 5 insertions(+), 405 deletions(-) delete mode 100644 app/lib/package/deps_graph.dart diff --git a/app/lib/admin/tools/package_discontinued.dart b/app/lib/admin/tools/package_discontinued.dart index 1cb5602eaf..d8a862304d 100644 --- a/app/lib/admin/tools/package_discontinued.dart +++ b/app/lib/admin/tools/package_discontinued.dart @@ -5,10 +5,10 @@ import 'package:args/args.dart'; import 'package:clock/clock.dart'; -import 'package:pub_dev/job/backend.dart'; import 'package:pub_dev/package/backend.dart'; import 'package:pub_dev/package/models.dart'; import 'package:pub_dev/shared/datastore.dart'; +import 'package:pub_dev/task/backend.dart'; final _argParser = ArgParser() ..addOption('package', help: 'The package to update.') @@ -50,7 +50,6 @@ Future executeSetPackageDiscontinued(List args) async { tx.insert(pkg); }); await purgePackageCache(packageName); - await jobBackend.trigger(JobService.analyzer, packageName, - version: package.latestVersion); + await taskBackend.trackPackage(packageName); return 'Done.'; } diff --git a/app/lib/job/backend.dart b/app/lib/job/backend.dart index 2d4d061d8f..657e6affb7 100644 --- a/app/lib/job/backend.dart +++ b/app/lib/job/backend.dart @@ -4,24 +4,14 @@ import 'dart:async'; -import 'package:clock/clock.dart'; import 'package:gcloud/service_scope.dart' as ss; -import 'package:logging/logging.dart'; -import '../package/backend.dart'; -import '../package/models.dart'; import '../shared/datastore.dart' as db; -import '../shared/popularity_storage.dart'; -import '../shared/versions.dart' as versions; import 'model.dart'; export 'model.dart'; -const _shortExtendDuration = Duration(hours: 12); - -final _logger = Logger('pub.job.backend'); - typedef ShouldProcess = Future Function( String package, String version, DateTime updated); @@ -37,154 +27,8 @@ class JobBackend { JobBackend(this._db); - String _id(JobService service, String package, String version) => Uri( - pathSegments: [ - versions.runtimeVersion, - service.toString().split('.').last, - package, - version, - ], - ).toString(); - - /// Triggers analysis job. - Future triggerAnalysis( - String package, - String? version, { - bool isHighPriority = false, - }) async { - await jobBackend.trigger( - JobService.analyzer, - package, - version: version, - isHighPriority: isHighPriority, - ); - } - - /// Triggers analysis/dartdoc for [package]/[version] if older than [updated]. - Future trigger( - JobService service, - String package, { - String? version, - DateTime? updated, - bool? shouldProcess, - bool isHighPriority = false, - }) async { - final pKey = _db.emptyKey.append(Package, id: package); - final p = await _db.lookupOrNull(pKey); - if (p == null || p.isNotVisible) { - _logger.info("Couldn't trigger $service job: $package not found."); - return; - } - final latestReleases = await packageBackend.latestReleases(p); - - version ??= latestReleases.stable.version; - final pvKey = pKey.append(PackageVersion, id: version); - final pv = await _db.lookupOrNull(pvKey); - if (pv == null) { - _logger - .info("Couldn't trigger $service job: $package $version not found."); - return; - } - - final isLatestStable = p.latestVersion == version; - final isLatestPrerelease = latestReleases.showPrerelease && - latestReleases.prerelease!.version == version; - final isLatestPreview = latestReleases.showPreview && - latestReleases.preview!.version == version; - shouldProcess ??= updated == null || updated.isAfter(pv.created!); - shouldProcess |= isHighPriority; - await createOrUpdate( - service: service, - package: package, - version: version, - isLatestStable: isLatestStable, - isLatestPrerelease: isLatestPrerelease, - isLatestPreview: isLatestPreview, - packageVersionUpdated: pv.created, - shouldProcess: shouldProcess, - priority: isHighPriority ? 0 : null, - ); - } - - Future createOrUpdate({ - required JobService service, - required String package, - required String version, - required bool isLatestStable, - required bool isLatestPrerelease, - required bool isLatestPreview, - required DateTime? packageVersionUpdated, - required bool shouldProcess, - int? priority, - }) async { - packageVersionUpdated ??= clock.now().toUtc(); - final id = _id(service, package, version); - final state = shouldProcess ? JobState.available : JobState.idle; - final lockedUntil = - shouldProcess ? null : clock.now().add(_shortExtendDuration); - await db.withRetryTransaction(_db, (tx) async { - final current = - await tx.lookupOrNull(_db.emptyKey.append(Job, id: id)); - if (current != null) { - final hasNotChanged = current.isLatestStable == isLatestStable && - current.isLatestPrerelease == isLatestPrerelease && - current.isLatestPreview == isLatestPreview && - !current.packageVersionUpdated!.isBefore(packageVersionUpdated!) && - (priority == null || current.priority <= priority); - if (hasNotChanged && !shouldProcess) { - // no reason to re-schedule the job - return; - } - if (current.state == JobState.available && - current.lockedUntil == null) { - // already scheduled for processing - return; - } - _logger.info('Updating job: $id ($state, $lockedUntil)'); - current - ..isLatestStable = isLatestStable - ..isLatestPrerelease = isLatestPrerelease - ..isLatestPreview = isLatestPreview - ..packageVersionUpdated = packageVersionUpdated - ..state = state - ..lockedUntil = lockedUntil - ..processingKey = null // drops ongoing processing - ..updatePriority( - popularityStorage.lookup(package), - fixPriority: priority, - ); - tx.insert(current); - return; - } else { - _logger.info('Creating job: $id'); - final job = Job() - ..id = id - ..service = service - ..packageName = package - ..packageVersion = version - ..isLatestStable = isLatestStable - ..isLatestPrerelease = isLatestPrerelease - ..isLatestPreview = isLatestPreview - ..packageVersionUpdated = packageVersionUpdated - ..state = JobState.available - ..lockedUntil = null - ..lastStatus = JobStatus.none - ..runtimeVersion = versions.runtimeVersion - ..errorCount = 0 - ..updatePriority( - popularityStorage.lookup(package), - fixPriority: priority, - ); - tx.insert(job); - return; - } - }); - } - - /// Deletes the old entries that predate [versions.gcBeforeRuntimeVersion]. + /// Deletes the old entries. Future deleteOldEntries() async { - final query = _db.query() - ..filter('runtimeVersion <', versions.gcBeforeRuntimeVersion); - await _db.deleteWithQuery(query); + await _db.deleteWithQuery(_db.query()); } } diff --git a/app/lib/package/backend.dart b/app/lib/package/backend.dart index a23d7b33b4..13e02ce0f9 100644 --- a/app/lib/package/backend.dart +++ b/app/lib/package/backend.dart @@ -26,7 +26,6 @@ import '../account/backend.dart'; import '../account/consent_backend.dart'; import '../account/models.dart' show User; import '../audit/models.dart'; -import '../job/backend.dart'; import '../publisher/backend.dart'; import '../service/email/backend.dart'; import '../service/email/models.dart'; @@ -378,10 +377,8 @@ class PackageBackend { } final pkg = await _requirePackageAdmin(package, user.userId); - String? latestVersion; await withRetryTransaction(db, (tx) async { final p = await tx.lookupValue(pkg.key); - latestVersion = p.latestVersion; final optionsChanges = []; if (options.isDiscontinued != null && @@ -419,8 +416,7 @@ class PackageBackend { )); }); await purgePackageCache(package); - await jobBackend.trigger(JobService.analyzer, package, - version: latestVersion); + await taskBackend.trackPackage(package); } /// Updates [options] on [package]/[version], assuming the current user diff --git a/app/lib/package/deps_graph.dart b/app/lib/package/deps_graph.dart deleted file mode 100644 index 9552659b0e..0000000000 --- a/app/lib/package/deps_graph.dart +++ /dev/null @@ -1,208 +0,0 @@ -// Copyright (c) 2017, the Dart project authors. Please see the AUTHORS file -// for details. All rights reserved. Use of this source code is governed by a -// BSD-style license that can be found in the LICENSE file. - -import 'dart:async'; - -import 'package:logging/logging.dart'; - -import '../package/models.dart'; -import '../shared/datastore.dart'; - -final Logger _logger = Logger('pub.package_graph'); - -// TODO(kustermann): We could incorporate the pubspec constraints to make the -// sets smaller. -class TransitiveDependencyGraph { - final _deps = >{}; - final _internedKeys = {}; - - void _logStats() { - final stats = { - 'keys': _deps.length, - 'values': _deps.values.fold(0, (a, b) => a + b.length), - 'interned': _internedKeys.length, - }; - _logger.info('TransitiveDependencyGraph stats: $stats'); - } - - void addAll(String node, Set deps) { - final nodes = _deps.putIfAbsent(node, _newSet); - final int before = nodes.length; - nodes.addAll(deps); - if (before != nodes.length) { - _fixpointNode(node); - } - } - - void add(String node, String dep) { - node = _intern(node); - if (_deps.putIfAbsent(node, _newSet).add(_intern(dep))) { - _fixpointNode(node); - } - } - - Set transitiveNodes(String node) => _deps.putIfAbsent(node, _newSet); - - String _intern(String s) => _internedKeys.putIfAbsent(s, () => s); - - void _fixpointNode(String node) { - final directs = _deps[node]!; - final temp = >[]; - for (;;) { - final int before = directs.length; - temp.clear(); - for (final String direct in directs) { - if (node != direct) { - final transitives = _deps[direct]; - if (transitives?.isNotEmpty == true) { - temp.add(transitives!); - } - } - } - for (final Set transitives in temp) { - directs.addAll(transitives); - } - if (before == directs.length) break; - } - } - - static Set _newSet() => {}; -} - -/// Callback function to be called when a new [PackageVersion] should trigger the -/// re-analysis of a package that depends on it. -typedef OnAffected = Future Function(Set affected); - -class PackageDependencyBuilder { - static const String _devPrefix = 'dev/'; - - final DatastoreDB _db; - final OnAffected _onAffected; - final Duration _pollingInterval; - - final _reverseDeps = TransitiveDependencyGraph(); - - DateTime? _lastTs; - - /// The future will complete once the initial database has been scanned and a - /// graph has been built. - static Future loadInitialGraphFromDb( - DatastoreDB db, - OnAffected onAffected, { - required Duration pollingInterval, - }) async { - final sw = Stopwatch()..start(); - final builder = PackageDependencyBuilder._(db, onAffected, pollingInterval); - await builder._scanExistingPackageGraph(); - _logger.info('Scanned initial dependency graph in ${sw.elapsed}.'); - return builder; - } - - PackageDependencyBuilder._(this._db, this._onAffected, this._pollingInterval); - - Future _scanExistingPackageGraph() async { - final sw = Stopwatch()..start(); - for (;;) { - _logger.info('Scanning existing package graph'); - try { - // We scan from oldest to newest and therefore keep [_lastTs] always - // increasing. - final query = _db.query()..order('created'); - await for (PackageVersion pv in query.run()) { - addPackageVersion(pv); - _lastTs = pv.created; - } - } catch (e, s) { - _logger.severe(e, s); - continue; - } - _logger.info('Scanned package graph in ${sw.elapsed}'); - _reverseDeps._logStats(); - return; - } - } - - // Note, this method never returns. - Future monitorInBackground() async { - _logger.info('Monitoring new package uploads.'); - for (;;) { - try { - final query = _db.query() - ..filter('created >', _lastTs) - ..order('created'); - var updated = false; - final affected = {}; - await for (final pv in query.run()) { - addPackageVersion(pv); - updated = true; - affected.addAll(_affectedPackages(pv.package)); - _logger.info( - 'Found ${affected.length} dependent packages for ${pv.package}.'); - _lastTs = pv.created; - } - - if (affected.isNotEmpty) { - try { - await _onAffected(affected); - } catch (e, st) { - _logger.warning('Error triggering action', e, st); - } - } - - if (updated) { - _reverseDeps._logStats(); - } - } catch (e, s) { - _logger.severe(e, s); - } - await Future.delayed(_pollingInterval); - } - } - - void addPackageVersion(PackageVersion pv) { - final Set depsSet = Set.from(pv.pubspec!.dependencyNames); - final Set devDepsSet = - Set.from(pv.pubspec!.devDependencies); - - // First we add the [package] together with the dependencies / - // dev_dependencies to the graph. This will update the graph transitively, - // thereby calculating new users of the [package]. - _add(pv.package, depsSet, devDepsSet); - } - - Set _affectedPackages(String package) { - // Due to the constraints in the new [pubspec] it might cause a number of - // packages to get new transitive dependencies during a `dart pub get` and - // therefore might cause new analysis results. - // - // So we trigger all packages that depend (directly or indirectly) on - // [package]. - final Set transitiveUsers = _reverseDeps.transitiveNodes(package); - - // We filter out all dev package usages and trigger an analysis for them - // (the dev packages are a superset of the normal packages). - final Set all = transitiveUsers - .where((String p) => p.startsWith(_devPrefix)) - .map((String p) => p.substring(_devPrefix.length)) - .toSet(); - return all; - } - - void _add(String package, Set deps, Set devDeps) { - final devPackage = _devPackage(package); - - for (final String dep in deps) { - _reverseDeps.add(dep, package); - _reverseDeps.add(dep, devPackage); - } - - for (final String dep in devDeps) { - _reverseDeps.add(dep, devPackage); - } - } - - // We use a synthetic package to handle weak dependencies / dev - // dependencies (no package is depending on this one). - String _devPackage(String package) => '$_devPrefix$package'; -} diff --git a/app/lib/service/entrypoint/analyzer.dart b/app/lib/service/entrypoint/analyzer.dart index 7cea7d2992..941d18e2db 100644 --- a/app/lib/service/entrypoint/analyzer.dart +++ b/app/lib/service/entrypoint/analyzer.dart @@ -4,15 +4,11 @@ import 'dart:async'; import 'dart:isolate'; -import 'dart:math'; import 'package:args/command_runner.dart'; import 'package:logging/logging.dart'; -import 'package:pub_dev/package/deps_graph.dart'; import '../../analyzer/handlers.dart'; -import '../../job/backend.dart'; -import '../../shared/datastore.dart' as db; import '../../shared/env_config.dart'; import '../../shared/handler_helpers.dart'; import '../../shared/popularity_storage.dart'; @@ -58,34 +54,7 @@ Future _workerMain(EntryMessage message) async { setupAnalyzerPeriodicTasks(); await popularityStorage.start(); - unawaited(_setupDependencyTracker()); // wait indefinitely await Completer().future; } - -Future _setupDependencyTracker() async { - // Updates job entries for analyzer and dartdoc. - Future triggerDependentAnalysis(Set affected) async { - for (final p in affected) { - await jobBackend.triggerAnalysis(p, null); - } - // TODO: re-enable this after we have added some stop-gaps on the frequency - // await dartdocClient.triggerDartdoc(package, version, - // dependentPackages: affected); - } - - // random delay to reduce race conditions - final random = Random.secure(); - await Future.delayed(Duration(minutes: random.nextInt(60))); - - final pdb = await PackageDependencyBuilder.loadInitialGraphFromDb( - db.dbService, - triggerDependentAnalysis, - // We don't need to scan for updates frequenty. - // Furthermore, scanning runs on all analyzer instances, triggering - // changes more often than the expected value of 6 hours. - pollingInterval: Duration(hours: 5, minutes: random.nextInt(120)), - ); - await pdb.monitorInBackground(); -}