From 4315c5c145e9a2e2bc3e8408d3b3f726c96998c4 Mon Sep 17 00:00:00 2001 From: Paul Hebble Date: Tue, 18 Jun 2019 22:41:26 -0500 Subject: [PATCH] Refactor Netkan for SQS mode --- Netkan/CKAN-netkan.csproj | 10 +- Netkan/CmdLineOptions.cs | 3 + Netkan/Model/Metadata.cs | 8 + Netkan/Processors/Inflator.cs | 119 ++++++++++ Netkan/Processors/QueueHandler.cs | 205 ++++++++++++++++++ Netkan/Program.cs | 81 ++----- Netkan/Services/CachingHttpService.cs | 4 + Netkan/Services/IHttpService.cs | 1 + Netkan/Validators/CkanValidator.cs | 9 +- Netkan/Validators/NetkanValidator.cs | 8 +- Netkan/packages.config | 2 + Tests/NetKAN/Validators/CkanValidatorTests.cs | 8 +- 12 files changed, 387 insertions(+), 71 deletions(-) create mode 100644 Netkan/Processors/Inflator.cs create mode 100644 Netkan/Processors/QueueHandler.cs diff --git a/Netkan/CKAN-netkan.csproj b/Netkan/CKAN-netkan.csproj index 0b5ce137f1..e99e171e1b 100644 --- a/Netkan/CKAN-netkan.csproj +++ b/Netkan/CKAN-netkan.csproj @@ -58,6 +58,12 @@ ..\_build\lib\nuget\Namotion.Reflection.1.0.5\lib\net45\Namotion.Reflection.dll + + ..\_build\lib\nuget\AWSSDK.Core.3.3.103\lib\net45\AWSSDK.Core.dll + + + ..\_build\lib\nuget\AWSSDK.SQS.3.3.100.34\lib\net45\AWSSDK.SQS.dll + @@ -76,6 +82,8 @@ + + @@ -171,4 +179,4 @@ - \ No newline at end of file + diff --git a/Netkan/CmdLineOptions.cs b/Netkan/CmdLineOptions.cs index a48b300fcc..53515dce4d 100644 --- a/Netkan/CmdLineOptions.cs +++ b/Netkan/CmdLineOptions.cs @@ -37,6 +37,9 @@ internal class CmdLineOptions [Option("overwrite-cache", HelpText = "Overwrite cached files")] public bool OverwriteCache { get; set; } + [Option("sqs", HelpText = "Run in Queue Inflator mode")] + public bool Sqs { get; set; } + [Option("version", HelpText = "Display the netkan version number and exit")] public bool Version { get; set; } diff --git a/Netkan/Model/Metadata.cs b/Netkan/Model/Metadata.cs index 8e2ef10806..c5f17a9965 100644 --- a/Netkan/Model/Metadata.cs +++ b/Netkan/Model/Metadata.cs @@ -12,6 +12,7 @@ internal sealed class Metadata private const string VersionPropertyName = "version"; private const string DownloadPropertyName = "download"; public const string UpdatedPropertyName = "x_netkan_asset_updated"; + private const string StagedPropertyName = "x_netkan_staging"; private readonly JObject _json; @@ -22,6 +23,7 @@ internal sealed class Metadata public ModuleVersion Version { get; private set; } public Uri Download { get; private set; } public DateTime? RemoteTimestamp { get; private set; } + public bool Staged { get; private set; } public Metadata(JObject json) { @@ -92,6 +94,12 @@ public Metadata(JObject json) Download = new Uri((string)downloadToken); } + JToken stagedToken; + if (json.TryGetValue(StagedPropertyName, out stagedToken)) + { + Staged = (bool)stagedToken; + } + JToken updatedToken; DateTime t; if (json.TryGetValue(UpdatedPropertyName, out updatedToken) diff --git a/Netkan/Processors/Inflator.cs b/Netkan/Processors/Inflator.cs new file mode 100644 index 0000000000..75dd3e0fe0 --- /dev/null +++ b/Netkan/Processors/Inflator.cs @@ -0,0 +1,119 @@ +using System; +using System.IO; +using System.Collections.Generic; +using log4net; +using CKAN.NetKAN.Model; +using CKAN.NetKAN.Services; +using CKAN.NetKAN.Transformers; +using CKAN.NetKAN.Validators; + +namespace CKAN.NetKAN.Processors +{ + public class Inflator + { + public Inflator(string cacheDir, bool overwriteCache, string githubToken, bool prerelease) + { + log.Debug("Initializing inflator"); + this.githubToken = githubToken; + this.prerelease = prerelease; + cache = FindCache( + new KSPManager(new ConsoleUser(false)), + new Win32Registry(), + cacheDir + ); + http = new CachingHttpService(cache, overwriteCache); + ckanValidator = new CkanValidator(http, moduleService); + } + + internal IEnumerable Inflate(string filename, Metadata netkan, int? releases) + { + log.DebugFormat("Inflating {0}", filename); + try + { + // Tell the downloader that we're starting a new request + http.ClearRequestedURLs(); + + netkanValidator.ValidateNetkan(netkan, filename); + log.Info("Input successfully passed pre-validation"); + + // TODO: Make this re-usable by refactoring releases somehow + transformer = new NetkanTransformer( + http, + fileService, + moduleService, + githubToken, + prerelease, + releases + ); + IEnumerable ckans = transformer.Transform(netkan); + log.Info("Finished transformation"); + + foreach (Metadata ckan in ckans) + { + ckanValidator.ValidateCkan(ckan, netkan); + } + log.Info("Output successfully passed post-validation"); + return ckans; + } + catch (Exception e) + { + // Purge anything we download for a failed indexing attempt from the cache to allow re-downloads + PurgeDownloads(http, cache); + throw e; + } + } + + private static NetFileCache FindCache(KSPManager kspManager, IWin32Registry reg, string cacheDir) + { + if (cacheDir != null) + { + log.InfoFormat("Using user-supplied cache at {0}", cacheDir); + return new NetFileCache(cacheDir); + } + + try + { + log.InfoFormat("Using main CKAN meta-cache at {0}", reg.DownloadCacheDir); + /// Create a new file cache in the same location so NetKAN can download pure URLs not sourced from CkanModules + return new NetFileCache(kspManager, reg.DownloadCacheDir); + } + catch + { + // Meh, can't find KSP. 'Scool, bro. + } + + var tempdir = Path.GetTempPath(); + log.InfoFormat("Using tempdir for cache: {0}", tempdir); + + return new NetFileCache(tempdir); + } + + private static void PurgeDownloads(IHttpService http, NetFileCache cache) + { + log.Debug("Deleting downloads for failed inflation"); + if (http != null && cache != null) + { + foreach (Uri url in http.RequestedURLs) + { + cache.Remove(url); + } + } + } + + private string githubToken; + private bool prerelease; + + private NetFileCache cache; + private IHttpService http; + + private IModuleService moduleService = new ModuleService(); + private IFileService fileService = new FileService(); + + private NetkanTransformer transformer; + + private NetkanValidator netkanValidator = new NetkanValidator(); + private CkanValidator ckanValidator; + + private static readonly ILog log = LogManager.GetLogger(typeof(Inflator)); + } +} diff --git a/Netkan/Processors/QueueHandler.cs b/Netkan/Processors/QueueHandler.cs new file mode 100644 index 0000000000..cc6e80d088 --- /dev/null +++ b/Netkan/Processors/QueueHandler.cs @@ -0,0 +1,205 @@ +using System; +using System.IO; +using System.Text; +using System.Linq; +using System.Collections.Generic; +using System.Globalization; +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using Amazon; +using Amazon.SQS; +using Amazon.SQS.Model; +using log4net; +using CKAN.NetKAN.Model; + +namespace CKAN.NetKAN.Processors +{ + public class QueueHandler + { + public QueueHandler(string cacheDir, bool overwriteCache, string githubToken, bool prerelease) + { + log.Debug("Initializing SQS queue handler"); + // TODO: Shouldn't hard code the region + client = new AmazonSQSClient(RegionEndpoint.USWest2); + inflator = new Inflator(cacheDir, overwriteCache, githubToken, prerelease); + + inputQueueURL = getQueueUrl(inputQueueName); + outputQueueURL = getQueueUrl(outputQueueName); + log.DebugFormat("Queue URLs: {0}, {1}", inputQueueURL, outputQueueURL); + } + + public void Process() + { + while (true) + { + int releases; + Metadata netkan = getNetkan(out releases); + if (netkan == null) + { + continue; + } + log.InfoFormat("Inflating {0}", netkan.Identifier); + IEnumerable ckans = null; + try + { + ckans = inflator.Inflate($"{netkan.Identifier}.netkan", netkan, releases); + } + catch (Exception e) + { + e = e.GetBaseException() ?? e; + log.InfoFormat("Sending failure: {0}", e.Message); + sendCkan(null, netkan, false, e.Message); + } + if (ckans != null) + { + foreach (Metadata ckan in ckans) + { + log.InfoFormat("Sending {0}-{1}", ckan.Identifier, ckan.Version); + sendCkan(ckan, netkan, true); + } + } + } + } + + private string getQueueUrl(string name) + { + log.DebugFormat("Looking up URL for queue {0}", name); + return client.GetQueueUrl(new GetQueueUrlRequest() { QueueName = name }).QueueUrl; + } + + private Metadata getNetkan(out int releases) + { + log.Debug("Retrieving metadata from queue"); + // TODO: Get this from an attribute + releases = 1; + var msg = getFromQueue(inputQueueURL); + if (msg != null) + { + log.DebugFormat("Metadata returned: {0}", msg.Body); + return new Metadata(JObject.Parse(msg.Body)); + } + log.Debug("No metadata in queue"); + return null; + } + + private Message getFromQueue(string url) + { + log.DebugFormat("Looking for message from {0}", url); + var resp = client.ReceiveMessage(new ReceiveMessageRequest() + { + QueueUrl = url, + MaxNumberOfMessages = 1, + VisibilityTimeout = (int)TimeSpan.FromMinutes(15).TotalSeconds, + }); + var msg = resp.Messages.FirstOrDefault(); + if (msg != null) + { + log.Debug("Message received"); + client.DeleteMessage(new DeleteMessageRequest() + { + QueueUrl = url, + ReceiptHandle = msg.ReceiptHandle, + }); + } + return msg; + } + + private void sendCkan(Metadata ckan, Metadata netkan, bool success, string err = null) + { + var attribs = new Dictionary() + { + { + "ModIdentifier", + new MessageAttributeValue() + { + DataType = "String", + StringValue = netkan.Identifier + } + }, + { + "Staged", + new MessageAttributeValue() + { + DataType = "String", + StringValue = netkan.Staged.ToString() + } + }, + { + "Success", + new MessageAttributeValue() + { + DataType = "String", + StringValue = success.ToString() + } + }, + { + "CheckTime", + new MessageAttributeValue() + { + DataType = "String", + StringValue = DateTime.UtcNow.ToString("s", CultureInfo.InvariantCulture) + } + }, + }; + if (!string.IsNullOrEmpty(err)) + { + attribs.Add( + "ErrorMessage", + new MessageAttributeValue() + { + DataType = "String", + StringValue = err + } + ); + } + + SendMessageRequest msg = new SendMessageRequest() + { + QueueUrl = outputQueueURL, + MessageGroupId = "1", + MessageDeduplicationId = Path.GetRandomFileName(), + MessageBody = serializeCkan(ckan), + MessageAttributes = attribs, + }; + try + { + var resp = client.SendMessage(msg); + } + catch (Exception e) + { + log.ErrorFormat("Send failed: {0}\r\n{1}", e.Message, e.StackTrace); + } + } + + private string serializeCkan(Metadata ckan) + { + if (ckan == null) + { + return ""; + } + var sw = new StringWriter(new StringBuilder()); + using (var writer = new JsonTextWriter(sw) + { + Formatting = Formatting.Indented, + Indentation = 4, + IndentChar = ' ', + }) + { + var serializer = new JsonSerializer(); + serializer.Serialize(writer, ckan.Json()); + } + return sw + Environment.NewLine; + } + + private Inflator inflator; + private AmazonSQSClient client; + + private const string inputQueueName = "InboundDev.fifo"; + private const string outputQueueName = "OutboundDev.fifo"; + + private readonly string inputQueueURL; + private readonly string outputQueueURL; + + private static readonly ILog log = LogManager.GetLogger(typeof(QueueHandler)); + } +} diff --git a/Netkan/Program.cs b/Netkan/Program.cs index ebbb76ba3c..9b6690047a 100644 --- a/Netkan/Program.cs +++ b/Netkan/Program.cs @@ -14,6 +14,7 @@ using CKAN.NetKAN.Services; using CKAN.NetKAN.Transformers; using CKAN.NetKAN.Validators; +using CKAN.NetKAN.Processors; namespace CKAN.NetKAN { @@ -49,40 +50,34 @@ public static int Main(string[] args) return ExitOk; } + if (Options.Sqs) + { + var qh = new QueueHandler( + Options.CacheDir, + Options.OverwriteCache, + Options.GitHubToken, + Options.PreRelease + ); + qh.Process(); + return ExitOk; + } + if (Options.File != null) { Log.InfoFormat("Transforming {0}", Options.File); - var moduleService = new ModuleService(); - var fileService = new FileService(); - cache = FindCache( - new KSPManager(new ConsoleUser(false)), - new Win32Registry() - ); - http = new CachingHttpService(cache, Options.OverwriteCache); - var netkan = ReadNetkan(); Log.Info("Finished reading input"); - new NetkanValidator(Options.File).Validate(netkan); - Log.Info("Input successfully passed pre-validation"); - - var transformer = new NetkanTransformer( - http, - fileService, - moduleService, + var inf = new Inflator( + Options.CacheDir, + Options.OverwriteCache, Options.GitHubToken, - Options.PreRelease, - ParseReleases(Options.Releases) + Options.PreRelease ); - - IEnumerable ckans = transformer.Transform(netkan); - Log.Info("Finished transformation"); + var ckans = inf.Inflate(Options.File, netkan, ParseReleases(Options.Releases)); foreach (Metadata ckan in ckans) { - new CkanValidator(netkan, http, moduleService).Validate(ckan); - Log.Info("Output successfully passed post-validation"); - WriteCkan(ckan); } } @@ -91,7 +86,6 @@ public static int Main(string[] args) Log.Fatal( "Usage: netkan [--verbose|--debug] [--debugger] [--prerelease] [--outputdir=...] " ); - return ExitBadOpt; } } @@ -99,9 +93,6 @@ public static int Main(string[] args) { e = e.GetBaseException() ?? e; - // Purge anything we download for a failed indexing attempt from the cache to allow re-downloads - PurgeDownloads(http, cache); - Log.Fatal(e.Message); if (Options == null || Options.Debug) @@ -148,31 +139,6 @@ private static void ProcessArgs(string[] args) } } - private static NetFileCache FindCache(KSPManager kspManager, IWin32Registry reg) - { - if (Options.CacheDir != null) - { - Log.InfoFormat("Using user-supplied cache at {0}", Options.CacheDir); - return new NetFileCache(Options.CacheDir); - } - - try - { - Log.InfoFormat("Using main CKAN meta-cache at {0}", reg.DownloadCacheDir); - /// Create a new file cache in the same location so NetKAN can download pure URLs not sourced from CkanModules - return new NetFileCache(kspManager, reg.DownloadCacheDir); - } - catch - { - // Meh, can't find KSP. 'Scool, bro. - } - - var tempdir = Path.GetTempPath(); - Log.InfoFormat("Using tempdir for cache: {0}", tempdir); - - return new NetFileCache(tempdir); - } - private static Metadata ReadNetkan() { if (!Options.File.EndsWith(".netkan")) @@ -209,16 +175,5 @@ private static void WriteCkan(Metadata metadata) Log.InfoFormat("Transformation written to {0}", finalPath); } - private static void PurgeDownloads(IHttpService http, NetFileCache cache) - { - if (http != null && cache != null) - { - foreach (Uri url in http.RequestedURLs) - { - cache.Remove(url); - } - } - } - } } diff --git a/Netkan/Services/CachingHttpService.cs b/Netkan/Services/CachingHttpService.cs index 8f5010e981..0c3a4b6285 100644 --- a/Netkan/Services/CachingHttpService.cs +++ b/Netkan/Services/CachingHttpService.cs @@ -84,6 +84,10 @@ public string DownloadText(Uri url, string authToken) } public IEnumerable RequestedURLs { get { return _requestedURLs; } } + public void ClearRequestedURLs() + { + _requestedURLs?.Clear(); + } } } diff --git a/Netkan/Services/IHttpService.cs b/Netkan/Services/IHttpService.cs index 1d881046c8..5963d7fa4a 100644 --- a/Netkan/Services/IHttpService.cs +++ b/Netkan/Services/IHttpService.cs @@ -10,5 +10,6 @@ internal interface IHttpService string DownloadText(Uri url, string authToken); IEnumerable RequestedURLs { get; } + void ClearRequestedURLs(); } } diff --git a/Netkan/Validators/CkanValidator.cs b/Netkan/Validators/CkanValidator.cs index 8fd3f5a78e..bb8db1ce75 100644 --- a/Netkan/Validators/CkanValidator.cs +++ b/Netkan/Validators/CkanValidator.cs @@ -8,12 +8,11 @@ internal sealed class CkanValidator : IValidator { private readonly List _validators; - public CkanValidator(Metadata netkan, IHttpService downloader, IModuleService moduleService) + public CkanValidator(IHttpService downloader, IModuleService moduleService) { _validators = new List { new IsCkanModuleValidator(), - new MatchingIdentifiersValidator(netkan.Identifier), new InstallsFilesValidator(downloader, moduleService), new MatchesKnownGameVersionsValidator(), new ObeysCKANSchemaValidator() @@ -27,5 +26,11 @@ public void Validate(Metadata metadata) validator.Validate(metadata); } } + + public void ValidateCkan(Metadata metadata, Metadata netkan) + { + Validate(metadata); + new MatchingIdentifiersValidator(netkan.Identifier).Validate(metadata); + } } } diff --git a/Netkan/Validators/NetkanValidator.cs b/Netkan/Validators/NetkanValidator.cs index 73be2d988d..030f10496b 100644 --- a/Netkan/Validators/NetkanValidator.cs +++ b/Netkan/Validators/NetkanValidator.cs @@ -15,7 +15,6 @@ public NetkanValidator(string filename) new SpecVersionFormatValidator(), new HasIdentifierValidator(), new KrefValidator(), - new MatchingIdentifiersValidator(Path.GetFileNameWithoutExtension(filename)), new AlphaNumericIdentifierValidator(), new RelationshipsValidator(), new LicensesValidator(), @@ -35,5 +34,12 @@ public void Validate(Metadata metadata) validator.Validate(metadata); } } + + public void ValidateNetkan(Metadata metadata, string filename) + { + Validate(metadata); + new MatchingIdentifiersValidator(Path.GetFileNameWithoutExtension(filename)).Validate(metadata); + + } } } diff --git a/Netkan/packages.config b/Netkan/packages.config index a89dcc8725..8b59d97ad6 100644 --- a/Netkan/packages.config +++ b/Netkan/packages.config @@ -6,4 +6,6 @@ + + diff --git a/Tests/NetKAN/Validators/CkanValidatorTests.cs b/Tests/NetKAN/Validators/CkanValidatorTests.cs index e4389fdb69..653109c30a 100644 --- a/Tests/NetKAN/Validators/CkanValidatorTests.cs +++ b/Tests/NetKAN/Validators/CkanValidatorTests.cs @@ -42,7 +42,7 @@ public void DoesNotThrowOnValidCkan() ckan["abstract"] = "A great mod"; ckan["license"] = "GPL-3.0"; - var sut = new CkanValidator(new Metadata(ckan), mHttp.Object, mModuleService.Object); + var sut = new CkanValidator(mHttp.Object, mModuleService.Object); var json = (JObject)ValidCkan.DeepClone(); // Act @@ -67,7 +67,7 @@ public void DoesThrowWhenMissingProperty(string propertyName) mModuleService.Setup(i => i.HasInstallableFiles(It.IsAny(), It.IsAny())) .Returns(true); - var sut = new CkanValidator(new Metadata(ValidCkan), mHttp.Object, mModuleService.Object); + var sut = new CkanValidator(mHttp.Object, mModuleService.Object); var json = (JObject)ValidCkan.DeepClone(); json.Remove(propertyName); @@ -90,7 +90,7 @@ public void DoesThrowWhenIdentifiersDoNotMatch() mModuleService.Setup(i => i.HasInstallableFiles(It.IsAny(), It.IsAny())) .Returns(true); - var sut = new CkanValidator(new Metadata(ValidCkan), mHttp.Object, mModuleService.Object); + var sut = new CkanValidator(mHttp.Object, mModuleService.Object); var json = new JObject(); json["spec_version"] = 1; json["identifier"] = "AmazingMod"; @@ -118,7 +118,7 @@ public void DoesThrowWhenNoInstallableFiles() netkan["spec_version"] = 1; netkan["identifier"] = "AwesomeMod"; - var sut = new CkanValidator(new Metadata(netkan), mHttp.Object, mModuleService.Object); + var sut = new CkanValidator(mHttp.Object, mModuleService.Object); var json = (JObject)ValidCkan.DeepClone(); // Act