Skip to content

Commit

Permalink
Refactor Netkan for SQS mode
Browse files Browse the repository at this point in the history
  • Loading branch information
HebaruSan committed Jun 23, 2019
1 parent 1766c3c commit 4315c5c
Show file tree
Hide file tree
Showing 12 changed files with 387 additions and 71 deletions.
10 changes: 9 additions & 1 deletion Netkan/CKAN-netkan.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@
<Reference Include="Namotion.Reflection" Version="1.0.5">
<HintPath>..\_build\lib\nuget\Namotion.Reflection.1.0.5\lib\net45\Namotion.Reflection.dll</HintPath>
</Reference>
<Reference Include="AWSSDK.Core" Version="3.3.103">
<HintPath>..\_build\lib\nuget\AWSSDK.Core.3.3.103\lib\net45\AWSSDK.Core.dll</HintPath>
</Reference>
<Reference Include="AWSSDK.SQS" Version="3.3.100.34">
<HintPath>..\_build\lib\nuget\AWSSDK.SQS.3.3.100.34\lib\net45\AWSSDK.SQS.dll</HintPath>
</Reference>
<Reference Include="System" />
</ItemGroup>
<ItemGroup>
Expand All @@ -76,6 +82,8 @@
<Compile Include="Extensions\VersionExtensions.cs" />
<Compile Include="Model\Metadata.cs" />
<Compile Include="Model\RemoteRef.cs" />
<Compile Include="Processors\Inflator.cs" />
<Compile Include="Processors\QueueHandler.cs" />
<Compile Include="Program.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
<Compile Include="Services\CachingHttpService.cs" />
Expand Down Expand Up @@ -171,4 +179,4 @@
<Exec Command="powershell ../build.ps1 Generate-GlobalAssemblyVersionInfo" Condition="!Exists('../_build/meta/GlobalAssemblyVersionInfo.cs') And '$(OS)' == 'Windows_NT'" />
<Exec Command="sh ../build Generate-GlobalAssemblyVersionInfo" Condition="!Exists('../_build/meta/GlobalAssemblyVersionInfo.cs') And '$(OS)' == 'Unix'" />
</Target>
</Project>
</Project>
3 changes: 3 additions & 0 deletions Netkan/CmdLineOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ internal class CmdLineOptions
[Option("overwrite-cache", HelpText = "Overwrite cached files")]
public bool OverwriteCache { get; set; }

[Option("sqs", HelpText = "Run in Queue Inflator mode")]
public bool Sqs { get; set; }

[Option("version", HelpText = "Display the netkan version number and exit")]
public bool Version { get; set; }

Expand Down
8 changes: 8 additions & 0 deletions Netkan/Model/Metadata.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ internal sealed class Metadata
private const string VersionPropertyName = "version";
private const string DownloadPropertyName = "download";
public const string UpdatedPropertyName = "x_netkan_asset_updated";
private const string StagedPropertyName = "x_netkan_staging";

private readonly JObject _json;

Expand All @@ -22,6 +23,7 @@ internal sealed class Metadata
public ModuleVersion Version { get; private set; }
public Uri Download { get; private set; }
public DateTime? RemoteTimestamp { get; private set; }
public bool Staged { get; private set; }

public Metadata(JObject json)
{
Expand Down Expand Up @@ -92,6 +94,12 @@ public Metadata(JObject json)
Download = new Uri((string)downloadToken);
}

JToken stagedToken;
if (json.TryGetValue(StagedPropertyName, out stagedToken))
{
Staged = (bool)stagedToken;
}

JToken updatedToken;
DateTime t;
if (json.TryGetValue(UpdatedPropertyName, out updatedToken)
Expand Down
119 changes: 119 additions & 0 deletions Netkan/Processors/Inflator.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
using System;
using System.IO;
using System.Collections.Generic;
using log4net;
using CKAN.NetKAN.Model;
using CKAN.NetKAN.Services;
using CKAN.NetKAN.Transformers;
using CKAN.NetKAN.Validators;

namespace CKAN.NetKAN.Processors
{
public class Inflator
{
public Inflator(string cacheDir, bool overwriteCache, string githubToken, bool prerelease)
{
log.Debug("Initializing inflator");
this.githubToken = githubToken;
this.prerelease = prerelease;
cache = FindCache(
new KSPManager(new ConsoleUser(false)),
new Win32Registry(),
cacheDir
);
http = new CachingHttpService(cache, overwriteCache);
ckanValidator = new CkanValidator(http, moduleService);
}

internal IEnumerable<Metadata> Inflate(string filename, Metadata netkan, int? releases)
{
log.DebugFormat("Inflating {0}", filename);
try
{
// Tell the downloader that we're starting a new request
http.ClearRequestedURLs();

netkanValidator.ValidateNetkan(netkan, filename);
log.Info("Input successfully passed pre-validation");

// TODO: Make this re-usable by refactoring releases somehow
transformer = new NetkanTransformer(
http,
fileService,
moduleService,
githubToken,
prerelease,
releases
);
IEnumerable<Metadata> ckans = transformer.Transform(netkan);
log.Info("Finished transformation");

foreach (Metadata ckan in ckans)
{
ckanValidator.ValidateCkan(ckan, netkan);
}
log.Info("Output successfully passed post-validation");
return ckans;
}
catch (Exception e)
{
// Purge anything we download for a failed indexing attempt from the cache to allow re-downloads
PurgeDownloads(http, cache);
throw e;
}
}

private static NetFileCache FindCache(KSPManager kspManager, IWin32Registry reg, string cacheDir)
{
if (cacheDir != null)
{
log.InfoFormat("Using user-supplied cache at {0}", cacheDir);
return new NetFileCache(cacheDir);
}

try
{
log.InfoFormat("Using main CKAN meta-cache at {0}", reg.DownloadCacheDir);
/// Create a new file cache in the same location so NetKAN can download pure URLs not sourced from CkanModules
return new NetFileCache(kspManager, reg.DownloadCacheDir);
}
catch
{
// Meh, can't find KSP. 'Scool, bro.
}

var tempdir = Path.GetTempPath();
log.InfoFormat("Using tempdir for cache: {0}", tempdir);

return new NetFileCache(tempdir);
}

private static void PurgeDownloads(IHttpService http, NetFileCache cache)
{
log.Debug("Deleting downloads for failed inflation");
if (http != null && cache != null)
{
foreach (Uri url in http.RequestedURLs)
{
cache.Remove(url);
}
}
}

private string githubToken;
private bool prerelease;

private NetFileCache cache;
private IHttpService http;

private IModuleService moduleService = new ModuleService();
private IFileService fileService = new FileService();

private NetkanTransformer transformer;

private NetkanValidator netkanValidator = new NetkanValidator();
private CkanValidator ckanValidator;

private static readonly ILog log = LogManager.GetLogger(typeof(Inflator));
}
}
205 changes: 205 additions & 0 deletions Netkan/Processors/QueueHandler.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
using System;
using System.IO;
using System.Text;
using System.Linq;
using System.Collections.Generic;
using System.Globalization;
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using Amazon;
using Amazon.SQS;
using Amazon.SQS.Model;
using log4net;
using CKAN.NetKAN.Model;

namespace CKAN.NetKAN.Processors
{
public class QueueHandler
{
public QueueHandler(string cacheDir, bool overwriteCache, string githubToken, bool prerelease)
{
log.Debug("Initializing SQS queue handler");
// TODO: Shouldn't hard code the region
client = new AmazonSQSClient(RegionEndpoint.USWest2);
inflator = new Inflator(cacheDir, overwriteCache, githubToken, prerelease);

inputQueueURL = getQueueUrl(inputQueueName);
outputQueueURL = getQueueUrl(outputQueueName);
log.DebugFormat("Queue URLs: {0}, {1}", inputQueueURL, outputQueueURL);
}

public void Process()
{
while (true)
{
int releases;
Metadata netkan = getNetkan(out releases);
if (netkan == null)
{
continue;
}
log.InfoFormat("Inflating {0}", netkan.Identifier);
IEnumerable<Metadata> ckans = null;
try
{
ckans = inflator.Inflate($"{netkan.Identifier}.netkan", netkan, releases);
}
catch (Exception e)
{
e = e.GetBaseException() ?? e;
log.InfoFormat("Sending failure: {0}", e.Message);
sendCkan(null, netkan, false, e.Message);
}
if (ckans != null)
{
foreach (Metadata ckan in ckans)
{
log.InfoFormat("Sending {0}-{1}", ckan.Identifier, ckan.Version);
sendCkan(ckan, netkan, true);
}
}
}
}

private string getQueueUrl(string name)
{
log.DebugFormat("Looking up URL for queue {0}", name);
return client.GetQueueUrl(new GetQueueUrlRequest() { QueueName = name }).QueueUrl;
}

private Metadata getNetkan(out int releases)
{
log.Debug("Retrieving metadata from queue");
// TODO: Get this from an attribute
releases = 1;
var msg = getFromQueue(inputQueueURL);
if (msg != null)
{
log.DebugFormat("Metadata returned: {0}", msg.Body);
return new Metadata(JObject.Parse(msg.Body));
}
log.Debug("No metadata in queue");
return null;
}

private Message getFromQueue(string url)
{
log.DebugFormat("Looking for message from {0}", url);
var resp = client.ReceiveMessage(new ReceiveMessageRequest()
{
QueueUrl = url,
MaxNumberOfMessages = 1,
VisibilityTimeout = (int)TimeSpan.FromMinutes(15).TotalSeconds,
});
var msg = resp.Messages.FirstOrDefault();
if (msg != null)
{
log.Debug("Message received");
client.DeleteMessage(new DeleteMessageRequest()
{
QueueUrl = url,
ReceiptHandle = msg.ReceiptHandle,
});
}
return msg;
}

private void sendCkan(Metadata ckan, Metadata netkan, bool success, string err = null)
{
var attribs = new Dictionary<string, MessageAttributeValue>()
{
{
"ModIdentifier",
new MessageAttributeValue()
{
DataType = "String",
StringValue = netkan.Identifier
}
},
{
"Staged",
new MessageAttributeValue()
{
DataType = "String",
StringValue = netkan.Staged.ToString()
}
},
{
"Success",
new MessageAttributeValue()
{
DataType = "String",
StringValue = success.ToString()
}
},
{
"CheckTime",
new MessageAttributeValue()
{
DataType = "String",
StringValue = DateTime.UtcNow.ToString("s", CultureInfo.InvariantCulture)
}
},
};
if (!string.IsNullOrEmpty(err))
{
attribs.Add(
"ErrorMessage",
new MessageAttributeValue()
{
DataType = "String",
StringValue = err
}
);
}

SendMessageRequest msg = new SendMessageRequest()
{
QueueUrl = outputQueueURL,
MessageGroupId = "1",
MessageDeduplicationId = Path.GetRandomFileName(),
MessageBody = serializeCkan(ckan),
MessageAttributes = attribs,
};
try
{
var resp = client.SendMessage(msg);
}
catch (Exception e)
{
log.ErrorFormat("Send failed: {0}\r\n{1}", e.Message, e.StackTrace);
}
}

private string serializeCkan(Metadata ckan)
{
if (ckan == null)
{
return "";
}
var sw = new StringWriter(new StringBuilder());
using (var writer = new JsonTextWriter(sw)
{
Formatting = Formatting.Indented,
Indentation = 4,
IndentChar = ' ',
})
{
var serializer = new JsonSerializer();
serializer.Serialize(writer, ckan.Json());
}
return sw + Environment.NewLine;
}

private Inflator inflator;
private AmazonSQSClient client;

private const string inputQueueName = "InboundDev.fifo";
private const string outputQueueName = "OutboundDev.fifo";

private readonly string inputQueueURL;
private readonly string outputQueueURL;

private static readonly ILog log = LogManager.GetLogger(typeof(QueueHandler));
}
}
Loading

0 comments on commit 4315c5c

Please sign in to comment.