Skip to content

Commit

Permalink
Add background job with async method. Refactor and improve serializat…
Browse files Browse the repository at this point in the history
…ion.
  • Loading branch information
hhalim committed Aug 24, 2017
1 parent 8319abb commit b6e12af
Show file tree
Hide file tree
Showing 24 changed files with 878 additions and 393 deletions.
79 changes: 75 additions & 4 deletions Shift.DataLayer/DALHelpers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
using System.ComponentModel;
using Shift.Entities;
using System.Threading;
using System.Runtime.CompilerServices;

namespace Shift.DataLayer
{
Expand Down Expand Up @@ -41,14 +42,14 @@ string argumentParameterName
throw new NotSupportedException("Global methods are not supported. Use class methods instead.");
}

if (!method.DeclaringType.IsAssignableFrom(type))
if (!method.DeclaringType.GetTypeInfo().IsAssignableFrom(type.GetTypeInfo()))
{
throw new ArgumentException(String.Format("The type `{0}` must be derived from the `{1}` type.", method.DeclaringType, type), typeParameterName);
}

if (typeof(Task).IsAssignableFrom(method.ReturnType))
if (method.ReturnType == typeof(void) && method.GetCustomAttribute<AsyncStateMachineAttribute>() != null)
{
throw new NotSupportedException("Async methods (Task) are not supported . Please make them synchronous.");
throw new NotSupportedException("Async void methods are not supported. Use async Task instead.");
}

var parameters = method.GetParameters();
Expand All @@ -70,7 +71,14 @@ string argumentParameterName
throw new NotSupportedException("Output parameters (out) are not supported: no guarantee that method will be invoked in the same process.");
}

var parameterTypeInfo = parameter.ParameterType.GetTypeInfo();

if (parameterTypeInfo.IsSubclassOf(typeof(Delegate)) || parameterTypeInfo.IsSubclassOf(typeof(Expression)))
{
throw new NotSupportedException("Anonymous functions, delegates and lambda expressions aren't supported in job method parameters.");
}
}

}

public static object GetExpressionValue(Expression expression)
Expand All @@ -95,6 +103,13 @@ internal static string[] SerializeArguments(IReadOnlyCollection<object> argument
{
value = ((DateTime)argument).ToString("o", CultureInfo.InvariantCulture);
}
else if (argument is CancellationToken
|| argument is PauseToken
|| argument is IProgress<ProgressInfo>)
{
//These types will be replaced during invocation with the real objects
value = null;
}
else
{
value = JsonConvert.SerializeObject(argument, SerializerSettings.Settings);
Expand Down Expand Up @@ -166,16 +181,72 @@ public static object DeserializeArgument(string argument, Type type)
try
{
var converter = TypeDescriptor.GetConverter(type);

if (converter.GetType() == typeof(ReferenceConverter))
{
throw;
}

value = converter.ConvertFromInvariantString(argument);
}
catch (Exception)
{
throw jsonException;
throw;
}
}
}
return value;
}

public static Job CreateJobFromExpression(string encryptionKey, string appID, string userID, string jobType, string jobName, LambdaExpression methodCall)
{
if (methodCall == null)
throw new ArgumentNullException("methodCall");

var callExpression = methodCall.Body as MethodCallExpression;
if (callExpression == null)
{
throw new ArgumentException("Expression body must be 'MethodCallExpression' type.", "methodCall");
}

var type = callExpression.Method.DeclaringType;
var methodInfo = callExpression.Method;
if (callExpression.Object != null)
{
var objectValue = GetExpressionValue(callExpression.Object);
if (objectValue == null)
{
throw new InvalidOperationException("Expression object should be not null.");
}

type = objectValue.GetType();

methodInfo = type.GetNonOpenMatchingMethod(callExpression.Method.Name, callExpression.Method.GetParameters().Select(x => x.ParameterType).ToArray());
}

var args = callExpression.Arguments.Select(GetExpressionValue).ToArray();

if (type == null) throw new ArgumentNullException("type");
if (methodInfo == null) throw new ArgumentNullException("method");
if (args == null) throw new ArgumentNullException("args");

Validate(type, "type", methodInfo, "method", args.Length, "args");

var invokeMeta = new InvokeMeta(type, methodInfo);

//Save InvokeMeta and args
var job = new Job();
job.AppID = appID;
job.UserID = userID;
job.JobType = jobType;
job.JobName = string.IsNullOrWhiteSpace(jobName) ? type.Name + "." + methodInfo.Name : jobName;
job.InvokeMeta = JsonConvert.SerializeObject(invokeMeta, SerializerSettings.Settings);
job.Parameters = Helpers.Encrypt(JsonConvert.SerializeObject(SerializeArguments(args), SerializerSettings.Settings), encryptionKey); //ENCRYPT it!!!
job.Created = DateTime.Now;

return job;
}


}
}
71 changes: 24 additions & 47 deletions Shift.DataLayer/JobDALDocumentDB.cs
Original file line number Diff line number Diff line change
Expand Up @@ -196,53 +196,20 @@ public Task<string> AddAsync(string appID, string userID, string jobType, string
return AddAsync(appID, userID, jobType, jobName, methodCall, false);
}

private async Task<string> AddAsync(string appID, string userID, string jobType, string jobName, Expression<Action> methodCall, bool isSync)
public string Add(string appID, string userID, string jobType, string jobName, Expression<Func<Task>> methodCall)
{
if (methodCall == null)
throw new ArgumentNullException("methodCall");

var callExpression = methodCall.Body as MethodCallExpression;
if (callExpression == null)
{
throw new ArgumentException("Expression body must be 'System.Linq.Expressions.MethodCallExpression' type.", "methodCall");
}

Type type;
if (callExpression.Object != null)
{
var value = DALHelpers.GetExpressionValue(callExpression.Object);
if (value == null)
throw new InvalidOperationException("Expression object can not be null.");

type = value.GetType();
}
else
{
type = callExpression.Method.DeclaringType;
}

var methodInfo = callExpression.Method;
var args = callExpression.Arguments.Select(DALHelpers.GetExpressionValue).ToArray();

if (type == null) throw new ArgumentNullException("type");
if (methodInfo == null) throw new ArgumentNullException("method");
if (args == null) throw new ArgumentNullException("args");

DALHelpers.Validate(type, "type", methodInfo, "method", args.Length, "args");
return AddAsync(appID, userID, jobType, jobName, methodCall, true).GetAwaiter().GetResult();
}

var invokeMeta = new InvokeMeta(type, methodInfo);
public Task<string> AddAsync(string appID, string userID, string jobType, string jobName, Expression<Func<Task>> methodCall)
{
return AddAsync(appID, userID, jobType, jobName, methodCall, false);
}

//Save InvokeMeta and args
var now = DateTime.Now;
var job = new Job();
job.AppID = appID;
job.UserID = userID;
job.JobType = jobType;
job.JobName = string.IsNullOrWhiteSpace(jobName) ? type.Name + "." + methodInfo.Name : jobName;
job.InvokeMeta = JsonConvert.SerializeObject(invokeMeta, SerializerSettings.Settings);
job.Parameters = Helpers.Encrypt(JsonConvert.SerializeObject(DALHelpers.SerializeArguments(args), SerializerSettings.Settings), encryptionKey); //ENCRYPT it!!!
job.Created = now;
job.Score = (new DateTimeOffset(now)).ToUnixTimeSeconds();
private async Task<string> AddAsync(string appID, string userID, string jobType, string jobName, LambdaExpression methodCall, bool isSync)
{
var job = DALHelpers.CreateJobFromExpression(encryptionKey, appID, userID, jobType, jobName, methodCall);
job.Score = (new DateTimeOffset(job.Created.GetValueOrDefault())).ToUnixTimeSeconds();

ResourceResponse<Document> rsp;
if (isSync)
Expand All @@ -269,7 +236,17 @@ public Task<int> UpdateAsync(string jobID, string appID, string userID, string j
return UpdateAsync(jobID, appID, userID, jobType, jobName, methodCall, false);
}

private async Task<int> UpdateAsync(string jobID, string appID, string userID, string jobType, string jobName, Expression<Action> methodCall, bool isSync)
public int Update(string jobID, string appID, string userID, string jobType, string jobName, Expression<Func<Task>> methodCall)
{
return UpdateAsync(jobID, appID, userID, jobType, jobName, methodCall, true).GetAwaiter().GetResult();
}

public Task<int> UpdateAsync(string jobID, string appID, string userID, string jobType, string jobName, Expression<Func<Task>> methodCall)
{
return UpdateAsync(jobID, appID, userID, jobType, jobName, methodCall, false);
}

private async Task<int> UpdateAsync(string jobID, string appID, string userID, string jobType, string jobName, LambdaExpression methodCall, bool isSync)
{
if (methodCall == null)
throw new ArgumentNullException("methodCall");
Expand Down Expand Up @@ -378,11 +355,11 @@ private async Task<int> SetCommandStopAsync(ICollection<string> jobIDs, bool isS
IEnumerable<JobView> jobList;
if (isSync)
{
jobList = GetItemsAsync<JobView>(j => jobIDs.Contains(j.ID) && (j.Status == null || j.Status == JobStatus.Running), isSync).GetAwaiter().GetResult();
jobList = GetItemsAsync<JobView>(j => jobIDs.Contains(j.ID) && (j.Status == null || j.Status == JobStatus.Running || j.Status == JobStatus.Paused), isSync).GetAwaiter().GetResult();
}
else
{
jobList = await GetItemsAsync<JobView>(j => jobIDs.Contains(j.ID) && (j.Status == null || j.Status == JobStatus.Running), isSync);
jobList = await GetItemsAsync<JobView>(j => jobIDs.Contains(j.ID) && (j.Status == null || j.Status == JobStatus.Running || j.Status == JobStatus.Paused), isSync);
}

foreach (var job in jobList)
Expand Down
72 changes: 26 additions & 46 deletions Shift.DataLayer/JobDALMongo.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,53 +89,20 @@ public Task<string> AddAsync(string appID, string userID, string jobType, string
return AddAsync(appID, userID, jobType, jobName, methodCall, false);
}

private async Task<string> AddAsync(string appID, string userID, string jobType, string jobName, Expression<Action> methodCall, bool isSync)
public string Add(string appID, string userID, string jobType, string jobName, Expression<Func<Task>> methodCall)
{
if (methodCall == null)
throw new ArgumentNullException("methodCall");

var callExpression = methodCall.Body as MethodCallExpression;
if (callExpression == null)
{
throw new ArgumentException("Expression body must be 'System.Linq.Expressions.MethodCallExpression' type.", "methodCall");
}

Type type;
if (callExpression.Object != null)
{
var value = DALHelpers.GetExpressionValue(callExpression.Object);
if (value == null)
throw new InvalidOperationException("Expression object can not be null.");

type = value.GetType();
}
else
{
type = callExpression.Method.DeclaringType;
}

var methodInfo = callExpression.Method;
var args = callExpression.Arguments.Select(DALHelpers.GetExpressionValue).ToArray();

if (type == null) throw new ArgumentNullException("type");
if (methodInfo == null) throw new ArgumentNullException("method");
if (args == null) throw new ArgumentNullException("args");

DALHelpers.Validate(type, "type", methodInfo, "method", args.Length, "args");
return AddAsync(appID, userID, jobType, jobName, methodCall, true).GetAwaiter().GetResult();
}

var invokeMeta = new InvokeMeta(type, methodInfo);
public Task<string> AddAsync(string appID, string userID, string jobType, string jobName, Expression<Func<Task>> methodCall)
{
return AddAsync(appID, userID, jobType, jobName, methodCall, false);
}

//Save InvokeMeta and args
var now = DateTime.Now;
var job = new Job();
job.AppID = appID;
job.UserID = userID;
job.JobType = jobType;
job.JobName = string.IsNullOrWhiteSpace(jobName) ? type.Name + "." + methodInfo.Name : jobName;
job.InvokeMeta = JsonConvert.SerializeObject(invokeMeta, SerializerSettings.Settings);
job.Parameters = Helpers.Encrypt(JsonConvert.SerializeObject(DALHelpers.SerializeArguments(args), SerializerSettings.Settings), encryptionKey); //ENCRYPT it!!!
job.Created = now;
job.Score = (new DateTimeOffset(now)).ToUnixTimeSeconds();
private async Task<string> AddAsync(string appID, string userID, string jobType, string jobName, LambdaExpression methodCall, bool isSync)
{
var job = DALHelpers.CreateJobFromExpression(encryptionKey, appID, userID, jobType, jobName, methodCall);
job.Score = (new DateTimeOffset(job.Created.GetValueOrDefault())).ToUnixTimeSeconds();

var collection = database.GetCollection<Job>(JobCollectionName);
if (isSync)
Expand All @@ -159,7 +126,17 @@ public Task<int> UpdateAsync(string jobID, string appID, string userID, string j
return UpdateAsync(jobID, appID, userID, jobType, jobName, methodCall, false);
}

private async Task<int> UpdateAsync(string jobID, string appID, string userID, string jobType, string jobName, Expression<Action> methodCall, bool isSync)
public int Update(string jobID, string appID, string userID, string jobType, string jobName, Expression<Func<Task>> methodCall)
{
return UpdateAsync(jobID, appID, userID, jobType, jobName, methodCall, true).GetAwaiter().GetResult();
}

public Task<int> UpdateAsync(string jobID, string appID, string userID, string jobType, string jobName, Expression<Func<Task>> methodCall)
{
return UpdateAsync(jobID, appID, userID, jobType, jobName, methodCall, false);
}

private async Task<int> UpdateAsync(string jobID, string appID, string userID, string jobType, string jobName, LambdaExpression methodCall, bool isSync)
{
if (methodCall == null)
throw new ArgumentNullException("methodCall");
Expand Down Expand Up @@ -273,7 +250,10 @@ private async Task<int> SetCommandStopAsync(ICollection<string> jobIDs, bool isS

var collection = database.GetCollection<Job>(JobCollectionName);
var blFilter = Builders<Job>.Filter;
var filter = blFilter.In(j => j.JobID, jobIDs) & (blFilter.Eq(j => j.Status, null) | blFilter.Eq(j => j.Status, JobStatus.Running));
var filter = blFilter.In(j => j.JobID, jobIDs)
& (blFilter.Eq(j => j.Status, null)
| blFilter.Eq(j => j.Status, JobStatus.Running)
| blFilter.Eq(j => j.Status, JobStatus.Paused));
var update = Builders<Job>.Update.Set("Command", JobCommand.Stop);

var result = isSync ? collection.UpdateMany(filter, update) : await collection.UpdateManyAsync(filter, update);
Expand Down
Loading

0 comments on commit b6e12af

Please sign in to comment.