From b6e12af6a4ad9232d65ac710b9f7f461d55bf443 Mon Sep 17 00:00:00 2001 From: hhalim Date: Thu, 24 Aug 2017 11:17:43 -0500 Subject: [PATCH] Add background job with async method. Refactor and improve serialization. --- Shift.DataLayer/DALHelpers.cs | 79 ++++- Shift.DataLayer/JobDALDocumentDB.cs | 71 ++--- Shift.DataLayer/JobDALMongo.cs | 72 ++--- Shift.DataLayer/JobDALRedis.cs | 69 ++-- Shift.DataLayer/JobDALSql.cs | 65 ++-- Shift.DataLayer/Properties/AssemblyInfo.cs | 2 +- Shift.DataLayer/Shift.DataLayer.csproj | 1 + Shift.DataLayer/TypeExtensions.cs | 182 +++++++++++ Shift.Entities/IJobDAL.cs | 4 + Shift.Entities/Properties/AssemblyInfo.cs | 2 +- .../JobDALDocumentDBAsyncTest.cs | 26 ++ .../JobDALDocumentDBTest.cs | 27 ++ .../JobDALMongoAsyncTest.cs | 26 ++ Shift.UnitTest.DataLayer/JobDALMongoTest.cs | 27 ++ .../JobDALRedisAsyncTest.cs | 26 ++ Shift.UnitTest.DataLayer/JobDALRedisTest.cs | 27 ++ .../JobDALSqlAsyncTest.cs | 26 ++ Shift.UnitTest.DataLayer/JobDALSqlTest.cs | 27 ++ .../Properties/AssemblyInfo.cs | 2 +- Shift.UnitTest/Properties/AssemblyInfo.cs | 2 +- Shift/Helpers.cs | 65 ---- Shift/JobClient.cs | 144 ++++++++- Shift/Properties/AssemblyInfo.cs | 2 +- Shift/Worker.cs | 297 ++++++++++-------- 24 files changed, 878 insertions(+), 393 deletions(-) create mode 100644 Shift.DataLayer/TypeExtensions.cs diff --git a/Shift.DataLayer/DALHelpers.cs b/Shift.DataLayer/DALHelpers.cs index 7e7d272..1ea29d5 100644 --- a/Shift.DataLayer/DALHelpers.cs +++ b/Shift.DataLayer/DALHelpers.cs @@ -12,6 +12,7 @@ using System.ComponentModel; using Shift.Entities; using System.Threading; +using System.Runtime.CompilerServices; namespace Shift.DataLayer { @@ -41,14 +42,14 @@ string argumentParameterName throw new NotSupportedException("Global methods are not supported. Use class methods instead."); } - if (!method.DeclaringType.IsAssignableFrom(type)) + if (!method.DeclaringType.GetTypeInfo().IsAssignableFrom(type.GetTypeInfo())) { throw new ArgumentException(String.Format("The type `{0}` must be derived from the `{1}` type.", method.DeclaringType, type), typeParameterName); } - if (typeof(Task).IsAssignableFrom(method.ReturnType)) + if (method.ReturnType == typeof(void) && method.GetCustomAttribute() != null) { - throw new NotSupportedException("Async methods (Task) are not supported . Please make them synchronous."); + throw new NotSupportedException("Async void methods are not supported. Use async Task instead."); } var parameters = method.GetParameters(); @@ -70,7 +71,14 @@ string argumentParameterName throw new NotSupportedException("Output parameters (out) are not supported: no guarantee that method will be invoked in the same process."); } + var parameterTypeInfo = parameter.ParameterType.GetTypeInfo(); + + if (parameterTypeInfo.IsSubclassOf(typeof(Delegate)) || parameterTypeInfo.IsSubclassOf(typeof(Expression))) + { + throw new NotSupportedException("Anonymous functions, delegates and lambda expressions aren't supported in job method parameters."); + } } + } public static object GetExpressionValue(Expression expression) @@ -95,6 +103,13 @@ internal static string[] SerializeArguments(IReadOnlyCollection argument { value = ((DateTime)argument).ToString("o", CultureInfo.InvariantCulture); } + else if (argument is CancellationToken + || argument is PauseToken + || argument is IProgress) + { + //These types will be replaced during invocation with the real objects + value = null; + } else { value = JsonConvert.SerializeObject(argument, SerializerSettings.Settings); @@ -166,16 +181,72 @@ public static object DeserializeArgument(string argument, Type type) try { var converter = TypeDescriptor.GetConverter(type); + + if (converter.GetType() == typeof(ReferenceConverter)) + { + throw; + } + value = converter.ConvertFromInvariantString(argument); } catch (Exception) { - throw jsonException; + throw; } } } return value; } + public static Job CreateJobFromExpression(string encryptionKey, string appID, string userID, string jobType, string jobName, LambdaExpression methodCall) + { + if (methodCall == null) + throw new ArgumentNullException("methodCall"); + + var callExpression = methodCall.Body as MethodCallExpression; + if (callExpression == null) + { + throw new ArgumentException("Expression body must be 'MethodCallExpression' type.", "methodCall"); + } + + var type = callExpression.Method.DeclaringType; + var methodInfo = callExpression.Method; + if (callExpression.Object != null) + { + var objectValue = GetExpressionValue(callExpression.Object); + if (objectValue == null) + { + throw new InvalidOperationException("Expression object should be not null."); + } + + type = objectValue.GetType(); + + methodInfo = type.GetNonOpenMatchingMethod(callExpression.Method.Name, callExpression.Method.GetParameters().Select(x => x.ParameterType).ToArray()); + } + + var args = callExpression.Arguments.Select(GetExpressionValue).ToArray(); + + if (type == null) throw new ArgumentNullException("type"); + if (methodInfo == null) throw new ArgumentNullException("method"); + if (args == null) throw new ArgumentNullException("args"); + + Validate(type, "type", methodInfo, "method", args.Length, "args"); + + var invokeMeta = new InvokeMeta(type, methodInfo); + + //Save InvokeMeta and args + var job = new Job(); + job.AppID = appID; + job.UserID = userID; + job.JobType = jobType; + job.JobName = string.IsNullOrWhiteSpace(jobName) ? type.Name + "." + methodInfo.Name : jobName; + job.InvokeMeta = JsonConvert.SerializeObject(invokeMeta, SerializerSettings.Settings); + job.Parameters = Helpers.Encrypt(JsonConvert.SerializeObject(SerializeArguments(args), SerializerSettings.Settings), encryptionKey); //ENCRYPT it!!! + job.Created = DateTime.Now; + + return job; + } + + } } diff --git a/Shift.DataLayer/JobDALDocumentDB.cs b/Shift.DataLayer/JobDALDocumentDB.cs index c744cd0..d528f43 100644 --- a/Shift.DataLayer/JobDALDocumentDB.cs +++ b/Shift.DataLayer/JobDALDocumentDB.cs @@ -196,53 +196,20 @@ public Task AddAsync(string appID, string userID, string jobType, string return AddAsync(appID, userID, jobType, jobName, methodCall, false); } - private async Task AddAsync(string appID, string userID, string jobType, string jobName, Expression methodCall, bool isSync) + public string Add(string appID, string userID, string jobType, string jobName, Expression> methodCall) { - if (methodCall == null) - throw new ArgumentNullException("methodCall"); - - var callExpression = methodCall.Body as MethodCallExpression; - if (callExpression == null) - { - throw new ArgumentException("Expression body must be 'System.Linq.Expressions.MethodCallExpression' type.", "methodCall"); - } - - Type type; - if (callExpression.Object != null) - { - var value = DALHelpers.GetExpressionValue(callExpression.Object); - if (value == null) - throw new InvalidOperationException("Expression object can not be null."); - - type = value.GetType(); - } - else - { - type = callExpression.Method.DeclaringType; - } - - var methodInfo = callExpression.Method; - var args = callExpression.Arguments.Select(DALHelpers.GetExpressionValue).ToArray(); - - if (type == null) throw new ArgumentNullException("type"); - if (methodInfo == null) throw new ArgumentNullException("method"); - if (args == null) throw new ArgumentNullException("args"); - - DALHelpers.Validate(type, "type", methodInfo, "method", args.Length, "args"); + return AddAsync(appID, userID, jobType, jobName, methodCall, true).GetAwaiter().GetResult(); + } - var invokeMeta = new InvokeMeta(type, methodInfo); + public Task AddAsync(string appID, string userID, string jobType, string jobName, Expression> methodCall) + { + return AddAsync(appID, userID, jobType, jobName, methodCall, false); + } - //Save InvokeMeta and args - var now = DateTime.Now; - var job = new Job(); - job.AppID = appID; - job.UserID = userID; - job.JobType = jobType; - job.JobName = string.IsNullOrWhiteSpace(jobName) ? type.Name + "." + methodInfo.Name : jobName; - job.InvokeMeta = JsonConvert.SerializeObject(invokeMeta, SerializerSettings.Settings); - job.Parameters = Helpers.Encrypt(JsonConvert.SerializeObject(DALHelpers.SerializeArguments(args), SerializerSettings.Settings), encryptionKey); //ENCRYPT it!!! - job.Created = now; - job.Score = (new DateTimeOffset(now)).ToUnixTimeSeconds(); + private async Task AddAsync(string appID, string userID, string jobType, string jobName, LambdaExpression methodCall, bool isSync) + { + var job = DALHelpers.CreateJobFromExpression(encryptionKey, appID, userID, jobType, jobName, methodCall); + job.Score = (new DateTimeOffset(job.Created.GetValueOrDefault())).ToUnixTimeSeconds(); ResourceResponse rsp; if (isSync) @@ -269,7 +236,17 @@ public Task UpdateAsync(string jobID, string appID, string userID, string j return UpdateAsync(jobID, appID, userID, jobType, jobName, methodCall, false); } - private async Task UpdateAsync(string jobID, string appID, string userID, string jobType, string jobName, Expression methodCall, bool isSync) + public int Update(string jobID, string appID, string userID, string jobType, string jobName, Expression> methodCall) + { + return UpdateAsync(jobID, appID, userID, jobType, jobName, methodCall, true).GetAwaiter().GetResult(); + } + + public Task UpdateAsync(string jobID, string appID, string userID, string jobType, string jobName, Expression> methodCall) + { + return UpdateAsync(jobID, appID, userID, jobType, jobName, methodCall, false); + } + + private async Task UpdateAsync(string jobID, string appID, string userID, string jobType, string jobName, LambdaExpression methodCall, bool isSync) { if (methodCall == null) throw new ArgumentNullException("methodCall"); @@ -378,11 +355,11 @@ private async Task SetCommandStopAsync(ICollection jobIDs, bool isS IEnumerable jobList; if (isSync) { - jobList = GetItemsAsync(j => jobIDs.Contains(j.ID) && (j.Status == null || j.Status == JobStatus.Running), isSync).GetAwaiter().GetResult(); + jobList = GetItemsAsync(j => jobIDs.Contains(j.ID) && (j.Status == null || j.Status == JobStatus.Running || j.Status == JobStatus.Paused), isSync).GetAwaiter().GetResult(); } else { - jobList = await GetItemsAsync(j => jobIDs.Contains(j.ID) && (j.Status == null || j.Status == JobStatus.Running), isSync); + jobList = await GetItemsAsync(j => jobIDs.Contains(j.ID) && (j.Status == null || j.Status == JobStatus.Running || j.Status == JobStatus.Paused), isSync); } foreach (var job in jobList) diff --git a/Shift.DataLayer/JobDALMongo.cs b/Shift.DataLayer/JobDALMongo.cs index 154caae..3073374 100644 --- a/Shift.DataLayer/JobDALMongo.cs +++ b/Shift.DataLayer/JobDALMongo.cs @@ -89,53 +89,20 @@ public Task AddAsync(string appID, string userID, string jobType, string return AddAsync(appID, userID, jobType, jobName, methodCall, false); } - private async Task AddAsync(string appID, string userID, string jobType, string jobName, Expression methodCall, bool isSync) + public string Add(string appID, string userID, string jobType, string jobName, Expression> methodCall) { - if (methodCall == null) - throw new ArgumentNullException("methodCall"); - - var callExpression = methodCall.Body as MethodCallExpression; - if (callExpression == null) - { - throw new ArgumentException("Expression body must be 'System.Linq.Expressions.MethodCallExpression' type.", "methodCall"); - } - - Type type; - if (callExpression.Object != null) - { - var value = DALHelpers.GetExpressionValue(callExpression.Object); - if (value == null) - throw new InvalidOperationException("Expression object can not be null."); - - type = value.GetType(); - } - else - { - type = callExpression.Method.DeclaringType; - } - - var methodInfo = callExpression.Method; - var args = callExpression.Arguments.Select(DALHelpers.GetExpressionValue).ToArray(); - - if (type == null) throw new ArgumentNullException("type"); - if (methodInfo == null) throw new ArgumentNullException("method"); - if (args == null) throw new ArgumentNullException("args"); - - DALHelpers.Validate(type, "type", methodInfo, "method", args.Length, "args"); + return AddAsync(appID, userID, jobType, jobName, methodCall, true).GetAwaiter().GetResult(); + } - var invokeMeta = new InvokeMeta(type, methodInfo); + public Task AddAsync(string appID, string userID, string jobType, string jobName, Expression> methodCall) + { + return AddAsync(appID, userID, jobType, jobName, methodCall, false); + } - //Save InvokeMeta and args - var now = DateTime.Now; - var job = new Job(); - job.AppID = appID; - job.UserID = userID; - job.JobType = jobType; - job.JobName = string.IsNullOrWhiteSpace(jobName) ? type.Name + "." + methodInfo.Name : jobName; - job.InvokeMeta = JsonConvert.SerializeObject(invokeMeta, SerializerSettings.Settings); - job.Parameters = Helpers.Encrypt(JsonConvert.SerializeObject(DALHelpers.SerializeArguments(args), SerializerSettings.Settings), encryptionKey); //ENCRYPT it!!! - job.Created = now; - job.Score = (new DateTimeOffset(now)).ToUnixTimeSeconds(); + private async Task AddAsync(string appID, string userID, string jobType, string jobName, LambdaExpression methodCall, bool isSync) + { + var job = DALHelpers.CreateJobFromExpression(encryptionKey, appID, userID, jobType, jobName, methodCall); + job.Score = (new DateTimeOffset(job.Created.GetValueOrDefault())).ToUnixTimeSeconds(); var collection = database.GetCollection(JobCollectionName); if (isSync) @@ -159,7 +126,17 @@ public Task UpdateAsync(string jobID, string appID, string userID, string j return UpdateAsync(jobID, appID, userID, jobType, jobName, methodCall, false); } - private async Task UpdateAsync(string jobID, string appID, string userID, string jobType, string jobName, Expression methodCall, bool isSync) + public int Update(string jobID, string appID, string userID, string jobType, string jobName, Expression> methodCall) + { + return UpdateAsync(jobID, appID, userID, jobType, jobName, methodCall, true).GetAwaiter().GetResult(); + } + + public Task UpdateAsync(string jobID, string appID, string userID, string jobType, string jobName, Expression> methodCall) + { + return UpdateAsync(jobID, appID, userID, jobType, jobName, methodCall, false); + } + + private async Task UpdateAsync(string jobID, string appID, string userID, string jobType, string jobName, LambdaExpression methodCall, bool isSync) { if (methodCall == null) throw new ArgumentNullException("methodCall"); @@ -273,7 +250,10 @@ private async Task SetCommandStopAsync(ICollection jobIDs, bool isS var collection = database.GetCollection(JobCollectionName); var blFilter = Builders.Filter; - var filter = blFilter.In(j => j.JobID, jobIDs) & (blFilter.Eq(j => j.Status, null) | blFilter.Eq(j => j.Status, JobStatus.Running)); + var filter = blFilter.In(j => j.JobID, jobIDs) + & (blFilter.Eq(j => j.Status, null) + | blFilter.Eq(j => j.Status, JobStatus.Running) + | blFilter.Eq(j => j.Status, JobStatus.Paused)); var update = Builders.Update.Set("Command", JobCommand.Stop); var result = isSync ? collection.UpdateMany(filter, update) : await collection.UpdateManyAsync(filter, update); diff --git a/Shift.DataLayer/JobDALRedis.cs b/Shift.DataLayer/JobDALRedis.cs index 7322f61..ac50be2 100644 --- a/Shift.DataLayer/JobDALRedis.cs +++ b/Shift.DataLayer/JobDALRedis.cs @@ -95,52 +95,19 @@ public Task AddAsync(string appID, string userID, string jobType, string return AddAsync(appID, userID, jobType, jobName, methodCall, false); } - private async Task AddAsync(string appID, string userID, string jobType, string jobName, Expression methodCall, bool isSync) + public string Add(string appID, string userID, string jobType, string jobName, Expression> methodCall) { - if (methodCall == null) - throw new ArgumentNullException("methodCall"); - - var callExpression = methodCall.Body as MethodCallExpression; - if (callExpression == null) - { - throw new ArgumentException("Expression body must be 'System.Linq.Expressions.MethodCallExpression' type.", "methodCall"); - } - - Type type; - if (callExpression.Object != null) - { - var value = DALHelpers.GetExpressionValue(callExpression.Object); - if (value == null) - throw new InvalidOperationException("Expression object can not be null."); - - type = value.GetType(); - } - else - { - type = callExpression.Method.DeclaringType; - } - - var methodInfo = callExpression.Method; - var args = callExpression.Arguments.Select(DALHelpers.GetExpressionValue).ToArray(); - - if (type == null) throw new ArgumentNullException("type"); - if (methodInfo == null) throw new ArgumentNullException("method"); - if (args == null) throw new ArgumentNullException("args"); - - DALHelpers.Validate(type, "type", methodInfo, "method", args.Length, "args"); + return AddAsync(appID, userID, jobType, jobName, methodCall, true).GetAwaiter().GetResult(); + } - var invokeMeta = new InvokeMeta(type, methodInfo); + public Task AddAsync(string appID, string userID, string jobType, string jobName, Expression> methodCall) + { + return AddAsync(appID, userID, jobType, jobName, methodCall, false); + } - //Save InvokeMeta and args - var now = DateTime.Now; - var job = new JobView(); - job.AppID = appID; - job.UserID = userID; - job.JobType = jobType; - job.JobName = string.IsNullOrWhiteSpace(jobName) ? type.Name + "." + methodInfo.Name : jobName; - job.InvokeMeta = JsonConvert.SerializeObject(invokeMeta, SerializerSettings.Settings); - job.Parameters = Helpers.Encrypt(JsonConvert.SerializeObject(DALHelpers.SerializeArguments(args), SerializerSettings.Settings), encryptionKey); //ENCRYPT it!!! - job.Created = now; + private async Task AddAsync(string appID, string userID, string jobType, string jobName, LambdaExpression methodCall, bool isSync) + { + var job = DALHelpers.CreateJobFromExpression(encryptionKey, appID, userID, jobType, jobName, methodCall); var jobID = IncrementJobID(); job.JobID = jobID; @@ -158,7 +125,7 @@ private async Task AddAsync(string appID, string userID, string jobType, var index2 = trn.SortedSetAddAsync(JobQueue, key, Convert.ToDouble(job.JobID)); //Add to created - var createdTS = ((DateTimeOffset)now).ToUnixTimeSeconds(); + var createdTS = ((DateTimeOffset)job.Created).ToUnixTimeSeconds(); var index3 = trn.SortedSetAddAsync(JobCreated, key, createdTS); if (isSync) @@ -182,7 +149,17 @@ public Task UpdateAsync(string jobID, string appID, string userID, string j return UpdateAsync(jobID, appID, userID, jobType, jobName, methodCall, false); } - private async Task UpdateAsync(string jobID, string appID, string userID, string jobType, string jobName, Expression methodCall, bool isSync) + public int Update(string jobID, string appID, string userID, string jobType, string jobName, Expression> methodCall) + { + return UpdateAsync(jobID, appID, userID, jobType, jobName, methodCall, true).GetAwaiter().GetResult(); + } + + public Task UpdateAsync(string jobID, string appID, string userID, string jobType, string jobName, Expression> methodCall) + { + return UpdateAsync(jobID, appID, userID, jobType, jobName, methodCall, false); + } + + private async Task UpdateAsync(string jobID, string appID, string userID, string jobType, string jobName, LambdaExpression methodCall, bool isSync) { if (methodCall == null) throw new ArgumentNullException("methodCall"); @@ -347,7 +324,7 @@ private async Task SetCommandStopAsync(ICollection jobIDs, bool isS //Check status is null or status = running var job = isSync ? GetJob(jobID) : await GetJobAsync(jobID); - if (job != null && (job.Status == null || job.Status == JobStatus.Running)) + if (job != null && (job.Status == null || job.Status == JobStatus.Running || job.Status == JobStatus.Paused)) { var trn = RedisDatabase.CreateTransaction(); if (string.IsNullOrWhiteSpace(job.ProcessID)) diff --git a/Shift.DataLayer/JobDALSql.cs b/Shift.DataLayer/JobDALSql.cs index 5bd4632..1cf1a05 100644 --- a/Shift.DataLayer/JobDALSql.cs +++ b/Shift.DataLayer/JobDALSql.cs @@ -50,52 +50,19 @@ public Task AddAsync(string appID, string userID, string jobType, string return AddAsync(appID, userID, jobType, jobName, methodCall, false); } - private async Task AddAsync(string appID, string userID, string jobType, string jobName, Expression methodCall, bool isSync) + public string Add(string appID, string userID, string jobType, string jobName, Expression> methodCall) { - if (methodCall == null) - throw new ArgumentNullException("methodCall"); - - var callExpression = methodCall.Body as MethodCallExpression; - if (callExpression == null) - { - throw new ArgumentException("Expression body must be 'System.Linq.Expressions.MethodCallExpression' type.", "methodCall"); - } - - Type type; - if (callExpression.Object != null) - { - var value = DALHelpers.GetExpressionValue(callExpression.Object); - if (value == null) - throw new InvalidOperationException("Expression object can not be null."); - - type = value.GetType(); - } - else - { - type = callExpression.Method.DeclaringType; - } - - var methodInfo = callExpression.Method; - var args = callExpression.Arguments.Select(DALHelpers.GetExpressionValue).ToArray(); - - if (type == null) throw new ArgumentNullException("type"); - if (methodInfo == null) throw new ArgumentNullException("method"); - if (args == null) throw new ArgumentNullException("args"); - - DALHelpers.Validate(type, "type", methodInfo, "method", args.Length, "args"); + return AddAsync(appID, userID, jobType, jobName, methodCall, true).GetAwaiter().GetResult(); + } - var invokeMeta = new InvokeMeta(type, methodInfo); + public Task AddAsync(string appID, string userID, string jobType, string jobName, Expression> methodCall) + { + return AddAsync(appID, userID, jobType, jobName, methodCall, false); + } - //Save InvokeMeta and args - var now = DateTime.Now; - var job = new Job(); - job.AppID = appID; - job.UserID = userID; - job.JobType = jobType; - job.JobName = string.IsNullOrWhiteSpace(jobName) ? type.Name + "." + methodInfo.Name : jobName; - job.InvokeMeta = JsonConvert.SerializeObject(invokeMeta, SerializerSettings.Settings); - job.Parameters = Helpers.Encrypt(JsonConvert.SerializeObject(DALHelpers.SerializeArguments(args), SerializerSettings.Settings), encryptionKey); //ENCRYPT it!!! - job.Created = now; + private async Task AddAsync(string appID, string userID, string jobType, string jobName, LambdaExpression methodCall, bool isSync) + { + var job = DALHelpers.CreateJobFromExpression(encryptionKey, appID, userID, jobType, jobName, methodCall); string jobID = null; using (var connection = new SqlConnection(connectionString)) @@ -130,7 +97,17 @@ public Task UpdateAsync(string jobID, string appID, string userID, string j return UpdateAsync(jobID, appID, userID, jobType, jobName, methodCall, false); } - private async Task UpdateAsync(string jobID, string appID, string userID, string jobType, string jobName, Expression methodCall, bool isSync) + public int Update(string jobID, string appID, string userID, string jobType, string jobName, Expression> methodCall) + { + return UpdateAsync(jobID, appID, userID, jobType, jobName, methodCall, true).GetAwaiter().GetResult(); + } + + public Task UpdateAsync(string jobID, string appID, string userID, string jobType, string jobName, Expression> methodCall) + { + return UpdateAsync(jobID, appID, userID, jobType, jobName, methodCall, false); + } + + private async Task UpdateAsync(string jobID, string appID, string userID, string jobType, string jobName, LambdaExpression methodCall, bool isSync) { if (methodCall == null) throw new ArgumentNullException("methodCall"); diff --git a/Shift.DataLayer/Properties/AssemblyInfo.cs b/Shift.DataLayer/Properties/AssemblyInfo.cs index 6c94d41..2e6c0d5 100644 --- a/Shift.DataLayer/Properties/AssemblyInfo.cs +++ b/Shift.DataLayer/Properties/AssemblyInfo.cs @@ -32,4 +32,4 @@ // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.0.8.1")] +[assembly: AssemblyVersion("1.0.8.2")] diff --git a/Shift.DataLayer/Shift.DataLayer.csproj b/Shift.DataLayer/Shift.DataLayer.csproj index 0bbc876..dddc6ef 100644 --- a/Shift.DataLayer/Shift.DataLayer.csproj +++ b/Shift.DataLayer/Shift.DataLayer.csproj @@ -104,6 +104,7 @@ + diff --git a/Shift.DataLayer/TypeExtensions.cs b/Shift.DataLayer/TypeExtensions.cs new file mode 100644 index 0000000..0e64c03 --- /dev/null +++ b/Shift.DataLayer/TypeExtensions.cs @@ -0,0 +1,182 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Reflection; +using System.Text.RegularExpressions; + +namespace Shift +{ + public static class TypeExtensions + { + public static string ToGenericTypeString(this Type type) + { + if (!type.GetTypeInfo().IsGenericType) + { + return type.GetFullNameWithoutNamespace() + .ReplacePlusWithDotInNestedTypeName(); + } + + return type.GetGenericTypeDefinition() + .GetFullNameWithoutNamespace() + .ReplacePlusWithDotInNestedTypeName() + .ReplaceGenericParametersInGenericTypeName(type); + } + + public static MethodInfo GetNonOpenMatchingMethod( + this Type type, + string name, + Type[] parameterTypes) + { + if (type == null) throw new ArgumentNullException(nameof(type)); + if (name == null) throw new ArgumentNullException(nameof(name)); + + parameterTypes = parameterTypes ?? new Type[0]; + + var methodCandidates = new List(type.GetRuntimeMethods()); + + if (type.GetTypeInfo().IsInterface) + { + methodCandidates.AddRange(type.GetTypeInfo() + .ImplementedInterfaces.SelectMany(x => x.GetRuntimeMethods())); + } + + foreach (var methodCandidate in methodCandidates) + { + if (!methodCandidate.Name.Equals(name, StringComparison.Ordinal)) + { + continue; + } + + var parameters = methodCandidate.GetParameters(); + if (parameters.Length != parameterTypes.Length) + { + continue; + } + + var parameterTypesMatched = true; + + var genericArguments = methodCandidate.ContainsGenericParameters + ? new Type[methodCandidate.GetGenericArguments().Length] + : null; + + // Determining whether we can use this method candidate with + // current parameter types. + for (var i = 0; i < parameters.Length; i++) + { + var parameterType = parameters[i].ParameterType.GetTypeInfo(); + var actualType = parameterTypes[i].GetTypeInfo(); + + if (!TypesMatchRecursive(parameterType, actualType, genericArguments)) + { + parameterTypesMatched = false; + break; + } + } + + if (parameterTypesMatched) + { + // Return first found method candidate with matching parameters. + return genericArguments != null + ? methodCandidate.MakeGenericMethod(genericArguments) + : methodCandidate; + } + } + + return null; + } + + public static Type[] GetAllGenericArguments(this TypeInfo type) + { + return type.GenericTypeArguments.Length > 0 ? type.GenericTypeArguments : type.GenericTypeParameters; + } + + private static bool TypesMatchRecursive(TypeInfo parameterType, TypeInfo actualType, IList genericArguments) + { + if (parameterType.IsGenericParameter) + { + var position = parameterType.GenericParameterPosition; + + // Return false if this generic parameter has been identified and it's not the same as actual type + if (genericArguments[position] != null && genericArguments[position].GetTypeInfo() != actualType) + { + return false; + } + + genericArguments[position] = actualType.AsType(); + return true; + } + + if (parameterType.ContainsGenericParameters) + { + if (parameterType.IsArray) + { + // Return false if parameterType is array whereas actualType isn't + if (!actualType.IsArray) return false; + + var parameterElementType = parameterType.GetElementType(); + var actualElementType = actualType.GetElementType(); + + return TypesMatchRecursive(parameterElementType.GetTypeInfo(), actualElementType.GetTypeInfo(), genericArguments); + } + + if (!actualType.IsGenericType || parameterType.GetGenericTypeDefinition() != actualType.GetGenericTypeDefinition()) + { + return false; + } + + for (var i = 0; i < parameterType.GenericTypeArguments.Length; i++) + { + var parameterGenericArgument = parameterType.GenericTypeArguments[i]; + var actualGenericArgument = actualType.GenericTypeArguments[i]; + + if (!TypesMatchRecursive(parameterGenericArgument.GetTypeInfo(), actualGenericArgument.GetTypeInfo(), genericArguments)) + { + return false; + } + } + + return true; + } + + return parameterType == actualType; + } + + private static string GetFullNameWithoutNamespace(this Type type) + { + if (type.IsGenericParameter) + { + return type.Name; + } + + const int dotLength = 1; + // ReSharper disable once PossibleNullReferenceException + return !String.IsNullOrEmpty(type.Namespace) + ? type.FullName.Substring(type.Namespace.Length + dotLength) + : type.FullName; + } + + private static string ReplacePlusWithDotInNestedTypeName(this string typeName) + { + return typeName.Replace('+', '.'); + } + + private static string ReplaceGenericParametersInGenericTypeName(this string typeName, Type type) + { + var genericArguments = type.GetTypeInfo().GetAllGenericArguments(); + + const string regexForGenericArguments = @"`[1-9]\d*"; + + var rgx = new Regex(regexForGenericArguments); + + typeName = rgx.Replace(typeName, match => + { + var currentGenericArgumentNumbers = int.Parse(match.Value.Substring(1)); + var currentArguments = string.Join(",", genericArguments.Take(currentGenericArgumentNumbers).Select(ToGenericTypeString)); + genericArguments = genericArguments.Skip(currentGenericArgumentNumbers).ToArray(); + return string.Concat("<", currentArguments, ">"); + }); + + return typeName; + } + } +} diff --git a/Shift.Entities/IJobDAL.cs b/Shift.Entities/IJobDAL.cs index 08854e5..648f893 100644 --- a/Shift.Entities/IJobDAL.cs +++ b/Shift.Entities/IJobDAL.cs @@ -12,8 +12,12 @@ public interface IJobDAL #region insert/update job string Add(string appID, string userID, string jobType, string jobName, Expression methodCall); Task AddAsync(string appID, string userID, string jobType, string jobName, Expression methodCall); + string Add(string appID, string userID, string jobType, string jobName, Expression> methodCall); + Task AddAsync(string appID, string userID, string jobType, string jobName, Expression> methodCall); int Update(string jobID, string appID, string userID, string jobType, string jobName, Expression methodCall); Task UpdateAsync(string jobID, string appID, string userID, string jobType, string jobName, Expression methodCall); + int Update(string jobID, string appID, string userID, string jobType, string jobName, Expression> methodCall); + Task UpdateAsync(string jobID, string appID, string userID, string jobType, string jobName, Expression> methodCall); #endregion #region Set Command field diff --git a/Shift.Entities/Properties/AssemblyInfo.cs b/Shift.Entities/Properties/AssemblyInfo.cs index 805e355..27bc058 100644 --- a/Shift.Entities/Properties/AssemblyInfo.cs +++ b/Shift.Entities/Properties/AssemblyInfo.cs @@ -35,4 +35,4 @@ // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.0.8.1")] +[assembly: AssemblyVersion("1.0.8.2")] diff --git a/Shift.UnitTest.DataLayer/JobDALDocumentDBAsyncTest.cs b/Shift.UnitTest.DataLayer/JobDALDocumentDBAsyncTest.cs index b1e0ef6..c88b8f6 100644 --- a/Shift.UnitTest.DataLayer/JobDALDocumentDBAsyncTest.cs +++ b/Shift.UnitTest.DataLayer/JobDALDocumentDBAsyncTest.cs @@ -81,6 +81,32 @@ public async Task UpdateAsyncTest() Assert.Equal("JobNameUpdated", job.JobName); } + public async Task StartAsyncJob(string message) + { + Console.WriteLine(message); + await Task.Delay(1000); + } + + [Fact] + public async Task AddAsyncJobAsyncTest() + { + var jobID = await jobDAL.AddAsync(AppID, "", "", "", () => StartAsyncJob("Hello World Test!")); + await jobDAL.DeleteAsync(new List { jobID }); + Assert.True(!string.IsNullOrWhiteSpace(jobID)); + } + + [Fact] + public async Task UpdateAsyncJobAsyncTest() + { + var jobID = await jobDAL.AddAsync(AppID, "", "", "", () => StartAsyncJob("Hello World Test!")); + var count = await jobDAL.UpdateAsync(jobID, AppID, "", "", "JobNameUpdated", () => StartAsyncJob("Updated Hello World Test!")); + + var job = await jobDAL.GetJobAsync(jobID); + await jobDAL.DeleteAsync(new List { jobID }); + Assert.True(count > 0); + Assert.Equal("JobNameUpdated", job.JobName); + } + //Test auto delete older than 24 hours and Null(not started) status [Fact] public async Task DeleteAsyncOldJobs_NotStarted() diff --git a/Shift.UnitTest.DataLayer/JobDALDocumentDBTest.cs b/Shift.UnitTest.DataLayer/JobDALDocumentDBTest.cs index 7163cfd..71083f3 100644 --- a/Shift.UnitTest.DataLayer/JobDALDocumentDBTest.cs +++ b/Shift.UnitTest.DataLayer/JobDALDocumentDBTest.cs @@ -5,6 +5,7 @@ using Shift.DataLayer; using System.Configuration; using System.Linq; +using System.Threading.Tasks; namespace Shift.UnitTest.DataLayer { @@ -79,6 +80,32 @@ public void UpdateTest() Assert.Equal("JobNameUpdated", job.JobName); } + public async Task StartAsyncJob(string message) + { + Console.WriteLine(message); + await Task.Delay(1000); + } + + [Fact] + public void AddAsyncJobTest() + { + var jobID = jobDAL.Add(AppID, "", "", "", () => StartAsyncJob("Hello World Test!")); + jobDAL.Delete(new List { jobID }); + Assert.True(!string.IsNullOrWhiteSpace(jobID)); + } + + [Fact] + public void UpdateAsyncJobTest() + { + var jobID = jobDAL.Add(AppID, "", "", "", () => StartAsyncJob("Hello World Test!")); + var count = jobDAL.Update(jobID, AppID, "", "", "JobNameUpdated", () => StartAsyncJob("Updated Hello World Test!")); + + var job = jobDAL.GetJob(jobID); + jobDAL.Delete(new List { jobID }); + Assert.True(count > 0); + Assert.Equal("JobNameUpdated", job.JobName); + } + //Test auto delete older than 24 hours and Null(not started) status [Fact] public void DeleteOldJobs_NotStarted() diff --git a/Shift.UnitTest.DataLayer/JobDALMongoAsyncTest.cs b/Shift.UnitTest.DataLayer/JobDALMongoAsyncTest.cs index f542613..ddb7707 100644 --- a/Shift.UnitTest.DataLayer/JobDALMongoAsyncTest.cs +++ b/Shift.UnitTest.DataLayer/JobDALMongoAsyncTest.cs @@ -80,6 +80,32 @@ public async Task UpdateAsyncTest() Assert.Equal("JobNameUpdated", job.JobName); } + public async Task StartAsyncJob(string message) + { + Console.WriteLine(message); + await Task.Delay(1000); + } + + [Fact] + public async Task AddAsyncJobAsyncTest() + { + var jobID = await jobDAL.AddAsync(AppID, "", "", "", () => StartAsyncJob("Hello World Test!")); + await jobDAL.DeleteAsync(new List { jobID }); + Assert.True(!string.IsNullOrWhiteSpace(jobID)); + } + + [Fact] + public async Task UpdateAsyncJobAsyncTest() + { + var jobID = await jobDAL.AddAsync(AppID, "", "", "", () => StartAsyncJob("Hello World Test!")); + var count = await jobDAL.UpdateAsync(jobID, AppID, "", "", "JobNameUpdated", () => StartAsyncJob("Updated Hello World Test!")); + + var job = await jobDAL.GetJobAsync(jobID); + await jobDAL.DeleteAsync(new List { jobID }); + Assert.True(count > 0); + Assert.Equal("JobNameUpdated", job.JobName); + } + //Test auto delete older than 24 hours and Null(not started) status [Fact] public async Task DeleteAsyncOldJobs_NotStarted() diff --git a/Shift.UnitTest.DataLayer/JobDALMongoTest.cs b/Shift.UnitTest.DataLayer/JobDALMongoTest.cs index 2c03311..108d9e0 100644 --- a/Shift.UnitTest.DataLayer/JobDALMongoTest.cs +++ b/Shift.UnitTest.DataLayer/JobDALMongoTest.cs @@ -5,6 +5,7 @@ using Shift.DataLayer; using System.Configuration; using System.Linq; +using System.Threading.Tasks; namespace Shift.UnitTest.DataLayer { @@ -78,6 +79,32 @@ public void UpdateTest() Assert.Equal("JobNameUpdated", job.JobName); } + public async Task StartAsyncJob(string message) + { + Console.WriteLine(message); + await Task.Delay(1000); + } + + [Fact] + public void AddAsyncJobTest() + { + var jobID = jobDAL.Add(AppID, "", "", "", () => StartAsyncJob("Hello World Test!")); + jobDAL.Delete(new List { jobID }); + Assert.True(!string.IsNullOrWhiteSpace(jobID)); + } + + [Fact] + public void UpdateAsyncJobTest() + { + var jobID = jobDAL.Add(AppID, "", "", "", () => StartAsyncJob("Hello World Test!")); + var count = jobDAL.Update(jobID, AppID, "", "", "JobNameUpdated", () => StartAsyncJob("Updated Hello World Test!")); + + var job = jobDAL.GetJob(jobID); + jobDAL.Delete(new List { jobID }); + Assert.True(count > 0); + Assert.Equal("JobNameUpdated", job.JobName); + } + //Test auto delete older than 24 hours and Null(not started) status [Fact] public void DeleteOldJobs_NotStarted() diff --git a/Shift.UnitTest.DataLayer/JobDALRedisAsyncTest.cs b/Shift.UnitTest.DataLayer/JobDALRedisAsyncTest.cs index 877452c..fc80942 100644 --- a/Shift.UnitTest.DataLayer/JobDALRedisAsyncTest.cs +++ b/Shift.UnitTest.DataLayer/JobDALRedisAsyncTest.cs @@ -80,6 +80,32 @@ public async Task UpdateAsyncTest() Assert.Equal("JobNameUpdated", job.JobName); } + public async Task StartAsyncJob(string message) + { + Console.WriteLine(message); + await Task.Delay(1000); + } + + [Fact] + public async Task AddAsyncJobAsyncTest() + { + var jobID = await jobDAL.AddAsync(AppID, "", "", "", () => StartAsyncJob("Hello World Test!")); + await jobDAL.DeleteAsync(new List { jobID }); + Assert.True(!string.IsNullOrWhiteSpace(jobID)); + } + + [Fact] + public async Task UpdateAsyncJobAsyncTest() + { + var jobID = await jobDAL.AddAsync(AppID, "", "", "", () => StartAsyncJob("Hello World Test!")); + var count = await jobDAL.UpdateAsync(jobID, AppID, "", "", "JobNameUpdated", () => StartAsyncJob("Updated Hello World Test!")); + + var job = await jobDAL.GetJobAsync(jobID); + await jobDAL.DeleteAsync(new List { jobID }); + Assert.True(count > 0); + Assert.Equal("JobNameUpdated", job.JobName); + } + //Test auto delete older than 24 hours and Null(not started) status [Fact] public async Task DeleteAsyncOldJobs_NotStarted() diff --git a/Shift.UnitTest.DataLayer/JobDALRedisTest.cs b/Shift.UnitTest.DataLayer/JobDALRedisTest.cs index aba1ac5..ca096dc 100644 --- a/Shift.UnitTest.DataLayer/JobDALRedisTest.cs +++ b/Shift.UnitTest.DataLayer/JobDALRedisTest.cs @@ -5,6 +5,7 @@ using Shift.DataLayer; using System.Configuration; using System.Linq; +using System.Threading.Tasks; namespace Shift.UnitTest.DataLayer { @@ -79,6 +80,32 @@ public void UpdateTest() Assert.Equal("JobNameUpdated", job.JobName); } + public async Task StartAsyncJob(string message) + { + Console.WriteLine(message); + await Task.Delay(1000); + } + + [Fact] + public void AddAsyncJobTest() + { + var jobID = jobDAL.Add(AppID, "", "", "", () => StartAsyncJob("Hello World Test!")); + jobDAL.Delete(new List { jobID }); + Assert.True(!string.IsNullOrWhiteSpace(jobID)); + } + + [Fact] + public void UpdateAsyncJobTest() + { + var jobID = jobDAL.Add(AppID, "", "", "", () => StartAsyncJob("Hello World Test!")); + var count = jobDAL.Update(jobID, AppID, "", "", "JobNameUpdated", () => StartAsyncJob("Updated Hello World Test!")); + + var job = jobDAL.GetJob(jobID); + jobDAL.Delete(new List { jobID }); + Assert.True(count > 0); + Assert.Equal("JobNameUpdated", job.JobName); + } + //Test auto delete older than 24 hours and Null(not started) status [Fact] public void DeleteOldJobs_NotStarted() diff --git a/Shift.UnitTest.DataLayer/JobDALSqlAsyncTest.cs b/Shift.UnitTest.DataLayer/JobDALSqlAsyncTest.cs index 4a2af00..b72fd2f 100644 --- a/Shift.UnitTest.DataLayer/JobDALSqlAsyncTest.cs +++ b/Shift.UnitTest.DataLayer/JobDALSqlAsyncTest.cs @@ -80,6 +80,32 @@ public async Task UpdateAsyncTest() Assert.Equal("JobNameUpdated", job.JobName); } + public async Task StartAsyncJob(string message) + { + Console.WriteLine(message); + await Task.Delay(1000); + } + + [Fact] + public async Task AddAsyncJobAsyncTest() + { + var jobID = await jobDAL.AddAsync(AppID, "", "", "", () => StartAsyncJob("Hello World Test!")); + await jobDAL.DeleteAsync(new List { jobID }); + Assert.True(!string.IsNullOrWhiteSpace(jobID)); + } + + [Fact] + public async Task UpdateAsyncJobAsyncTest() + { + var jobID = await jobDAL.AddAsync(AppID, "", "", "", () => StartAsyncJob("Hello World Test!")); + var count = await jobDAL.UpdateAsync(jobID, AppID, "", "", "JobNameUpdated", () => StartAsyncJob("Updated Hello World Test!")); + + var job = await jobDAL.GetJobAsync(jobID); + await jobDAL.DeleteAsync(new List { jobID }); + Assert.True(count > 0); + Assert.Equal("JobNameUpdated", job.JobName); + } + //Test auto delete older than 24 hours and Null(not started) status [Fact] public async Task DeleteAsyncOldJobs_NotStarted() diff --git a/Shift.UnitTest.DataLayer/JobDALSqlTest.cs b/Shift.UnitTest.DataLayer/JobDALSqlTest.cs index 60003d8..b567963 100644 --- a/Shift.UnitTest.DataLayer/JobDALSqlTest.cs +++ b/Shift.UnitTest.DataLayer/JobDALSqlTest.cs @@ -5,6 +5,7 @@ using Shift.DataLayer; using System.Configuration; using System.Linq; +using System.Threading.Tasks; namespace Shift.UnitTest.DataLayer { @@ -78,6 +79,32 @@ public void UpdateTest() Assert.Equal("JobNameUpdated", job.JobName); } + public async Task StartAsyncJob(string message) + { + Console.WriteLine(message); + await Task.Delay(1000); + } + + [Fact] + public void AddAsyncJobTest() + { + var jobID = jobDAL.Add(AppID, "", "", "", () => StartAsyncJob("Hello World Test!")); + jobDAL.Delete(new List { jobID }); + Assert.True(!string.IsNullOrWhiteSpace(jobID)); + } + + [Fact] + public void UpdateAsyncJobTest() + { + var jobID = jobDAL.Add(AppID, "", "", "", () => StartAsyncJob("Hello World Test!")); + var count = jobDAL.Update(jobID, AppID, "", "", "JobNameUpdated", () => StartAsyncJob("Updated Hello World Test!")); + + var job = jobDAL.GetJob(jobID); + jobDAL.Delete(new List { jobID }); + Assert.True(count > 0); + Assert.Equal("JobNameUpdated", job.JobName); + } + //Test auto delete older than 24 hours and Null(not started) status [Fact] public void DeleteOldJobs_NotStarted() diff --git a/Shift.UnitTest.DataLayer/Properties/AssemblyInfo.cs b/Shift.UnitTest.DataLayer/Properties/AssemblyInfo.cs index 2fb0108..69901fc 100644 --- a/Shift.UnitTest.DataLayer/Properties/AssemblyInfo.cs +++ b/Shift.UnitTest.DataLayer/Properties/AssemblyInfo.cs @@ -32,4 +32,4 @@ // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.0.8.1")] +[assembly: AssemblyVersion("1.0.8.2")] diff --git a/Shift.UnitTest/Properties/AssemblyInfo.cs b/Shift.UnitTest/Properties/AssemblyInfo.cs index 87fdec2..cb91f78 100644 --- a/Shift.UnitTest/Properties/AssemblyInfo.cs +++ b/Shift.UnitTest/Properties/AssemblyInfo.cs @@ -32,4 +32,4 @@ // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.0.8.1")] +[assembly: AssemblyVersion("1.0.8.2")] diff --git a/Shift/Helpers.cs b/Shift/Helpers.cs index 4d587d4..c17b681 100644 --- a/Shift/Helpers.cs +++ b/Shift/Helpers.cs @@ -15,71 +15,6 @@ namespace Shift public static class Helpers { - private static IEnumerable GetAllMethods(Type type) - { - var methods = new List(type.GetMethods()); - - if (type.IsInterface) - { - methods.AddRange(type.GetInterfaces().SelectMany(x => x.GetMethods())); - } - - return methods; - } - - public static MethodInfo GetNonOpenMatchingMethod(Type type, string name, Type[] parameterTypes) - { - var methodCandidates = GetAllMethods(type); - - foreach (var methodCandidate in methodCandidates) - { - if (!methodCandidate.Name.Equals(name, StringComparison.Ordinal)) - { - continue; - } - - var parameters = methodCandidate.GetParameters(); - if (parameters.Length != parameterTypes.Length) - { - continue; - } - - var parameterTypesMatched = true; - var genericArguments = new List(); - - // Determining whether we can use this method candidate with - // current parameter types. - for (var i = 0; i < parameters.Length; i++) - { - var parameter = parameters[i]; - var parameterType = parameter.ParameterType; - var actualType = parameterTypes[i]; - - // Skipping generic parameters as we can use actual type. - if (parameterType.IsGenericParameter) - { - genericArguments.Add(actualType); - continue; - } - - // Skipping non-generic parameters of assignable types. - if (parameterType.IsAssignableFrom(actualType)) continue; - - parameterTypesMatched = false; - break; - } - - if (!parameterTypesMatched) continue; - - // Return first found method candidate with matching parameters. - return methodCandidate.ContainsGenericParameters - ? methodCandidate.MakeGenericMethod(genericArguments.ToArray()) - : methodCandidate; - } - - return null; - } - /* * Instead of Activator.CreateInstance * http://stackoverflow.com/questions/6582259/fast-creation-of-objects-instead-of-activator-createinstancetype/16162809#16162809 diff --git a/Shift/JobClient.cs b/Shift/JobClient.cs index f5ae986..a1f9c5b 100644 --- a/Shift/JobClient.cs +++ b/Shift/JobClient.cs @@ -62,7 +62,7 @@ public JobClient(IJobDAL jobDAL) this.jobDAL = jobDAL; } - #region Clients access + #region Add Expression //Provides the clients to submit jobs or commands for jobs /// @@ -137,7 +137,88 @@ public Task AddAsync(string appID, string userID, string jobType, string { return jobDAL.AddAsync(appID, userID, jobType, jobName, methodCall); } + #endregion + #region Add Func + //Provides the clients to submit jobs or commands for jobs + + /// + /// Add a method and parameters into the job table. + /// Ref and out parameters are not supported. + /// + /// Expression body for method call + /// JobID of the added job. + public string Add(Expression> methodCall) + { + return jobDAL.Add(null, null, null, null, methodCall); + } + + public Task AddAsync(Expression> methodCall) + { + return jobDAL.AddAsync(null, null, null, null, methodCall); + } + + /// + /// Add a method and parameters into the job table. + /// Ref and out parameters are not supported. + /// + /// Client application ID + /// Expression body for method call + /// JobID of the added job. + public string Add(string appID, Expression> methodCall) + { + return jobDAL.Add(appID, null, null, null, methodCall); + } + + public Task AddAsync(string appID, Expression> methodCall) + { + return jobDAL.AddAsync(appID, null, null, null, methodCall); + } + + /// + /// Add a method and parameters into the job table. + /// Job name defaults to class.method name. + /// Ref and out parameters are not supported. + /// + /// Client application ID + /// User ID + /// Job type category/group + /// Expression body for method call + /// JobID of the added job. + public string Add(string appID, string userID, string jobType, Expression> methodCall) + { + return jobDAL.Add(appID, userID, jobType, null, methodCall); + } + + public Task AddAsync(string appID, string userID, string jobType, Expression> methodCall) + { + return jobDAL.AddAsync(appID, userID, jobType, null, methodCall); + } + + /// + /// Add a method and parameters into the job table with a custom name. + /// Ref and out parameters are not supported. + /// + /// Client application ID + /// User ID + /// Job type category/group + /// Name for this job + /// Expression body for method call + /// JobID of the added job. + public string Add(string appID, string userID, string jobType, string jobName, Expression> methodCall) + { + return jobDAL.Add(appID, userID, jobType, jobName, methodCall); + } + + public Task AddAsync(string appID, string userID, string jobType, string jobName, Expression> methodCall) + { + return jobDAL.AddAsync(appID, userID, jobType, jobName, methodCall); + } + + + #endregion + + #region Update Expression /// /// Update a job's method and parameters. /// Ref and out parameters are not supported. @@ -193,6 +274,65 @@ public Task UpdateAsync(string jobID, string appID, string userID, string j { return jobDAL.UpdateAsync(jobID, appID, userID, jobType, jobName, methodCall); } + #endregion + + #region Update Expression> + /// + /// Update a job's method and parameters. + /// Ref and out parameters are not supported. + /// + /// Existing job ID + /// Expression body for method call + /// Number of successfully updated job + public int Update(string jobID, Expression> methodCall) + { + return jobDAL.Update(jobID, null, null, null, null, methodCall); + } + + public Task UpdateAsync(string jobID, Expression> methodCall) + { + return jobDAL.UpdateAsync(jobID, null, null, null, null, methodCall); + } + + /// + /// Update a job's method and parameters. + /// Ref and out parameters are not supported. + /// + /// Existing job ID + /// Client application ID + /// Expression body for method call + /// Number of successfully updated job + public int Update(string jobID, string appID, Expression> methodCall) + { + return jobDAL.Update(jobID, appID, null, null, null, methodCall); + } + + public Task UpdateAsync(string jobID, string appID, Expression> methodCall) + { + return jobDAL.UpdateAsync(jobID, appID, null, null, null, methodCall); + } + + /// + /// Update a job's method and parameters. + /// Ref and out parameters are not supported. + /// + /// Existing job ID + /// Client application ID + /// User ID + /// Job type category/group + /// Name for this job + /// Expression body for method call + /// Number of successfully updated job + public int Update(string jobID, string appID, string userID, string jobType, string jobName, Expression> methodCall) + { + return jobDAL.Update(jobID, appID, userID, jobType, jobName, methodCall); + } + + public Task UpdateAsync(string jobID, string appID, string userID, string jobType, string jobName, Expression> methodCall) + { + return jobDAL.UpdateAsync(jobID, appID, userID, jobType, jobName, methodCall); + } + #endregion /// /// Set "stop" command to already running or not running jobs. @@ -395,8 +535,6 @@ public Task GetProgressAsync(string jobID) return jobDAL.GetProgressAsync(jobID); } - #endregion - } diff --git a/Shift/Properties/AssemblyInfo.cs b/Shift/Properties/AssemblyInfo.cs index 610bdef..fb4c71d 100644 --- a/Shift/Properties/AssemblyInfo.cs +++ b/Shift/Properties/AssemblyInfo.cs @@ -32,4 +32,4 @@ // You can specify all the values or you can default the Build and Revision Numbers // by using the '*' as shown below: // [assembly: AssemblyVersion("1.0.*")] -[assembly: AssemblyVersion("1.0.8.1")] +[assembly: AssemblyVersion("1.0.8.2")] diff --git a/Shift/Worker.cs b/Shift/Worker.cs index 09b3539..28408d4 100644 --- a/Shift/Worker.cs +++ b/Shift/Worker.cs @@ -1,16 +1,16 @@ -using Autofac; -using Shift.Entities; -using System; +using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; using System.Reflection; +using System.Collections.ObjectModel; using Newtonsoft.Json; +using Shift; using Shift.DataLayer; -using System.Collections.ObjectModel; +using Shift.Entities; namespace Shift { @@ -98,7 +98,7 @@ private async Task RunClaimedJobsAsync(IEnumerable jobList, bool isSync) private static Type GetTypeFromAllAssemblies(string typeName) { //try this domain first - var type = Type.GetType(typeName); + var type = Type.GetType(typeName, throwOnError: false, ignoreCase: true); if (type != null) return type; @@ -108,7 +108,7 @@ private static Type GetTypeFromAllAssemblies(string typeName) foreach (var assembly in assemblies) { - Type t = assembly.GetType(typeName, false); + Type t = assembly.GetType(typeName, throwOnError:false, ignoreCase: true); if (t != null) return t; } @@ -122,7 +122,7 @@ private void CreateTask(string processID, string jobID, string invokeMeta, strin var type = GetTypeFromAllAssemblies(invokeMetaObj.Type); var parameterTypes = JsonConvert.DeserializeObject(invokeMetaObj.ParameterTypes, SerializerSettings.Settings); - var methodInfo = Helpers.GetNonOpenMatchingMethod(type, invokeMetaObj.Method, parameterTypes); + var methodInfo = type.GetNonOpenMatchingMethod(invokeMetaObj.Method, parameterTypes); if (methodInfo == null) { throw new InvalidOperationException(string.Format("The type '{0}' has no method with signature '{1}({2})'", type.FullName, invokeMetaObj.Method, string.Join(", ", parameterTypes.Select(x => x.Name)))); @@ -220,13 +220,39 @@ private async Task ExecuteJobAsync(string processID, string jobID, MethodInfo me var cancelSource = new CancellationTokenSource(); cancelToken = cancelSource.Token; } + if (pauseToken == null) { var pauseSource = new PauseTokenSource(); pauseToken = pauseSource.Token; } - var args = DALHelpers.DeserializeArguments(cancelToken.Value, pauseToken.Value, progress, methodInfo, parameters); - methodInfo.Invoke(instance, args); + + var arguments = DALHelpers.DeserializeArguments(cancelToken.Value, pauseToken.Value, progress, methodInfo, parameters); + + var result = methodInfo.Invoke(instance, arguments); + + //handle async method invocation + var task = result as Task; + if (task != null) + { + if (isSync) + task.GetAwaiter().GetResult(); + else + await task; + } + } + catch (OperationCanceledException exc) + { + if (isSync) + { + SetToStoppedAsync(new List { jobID }, isSync).GetAwaiter().GetResult(); + } + else + { + await SetToStoppedAsync(new List { jobID }, isSync); + } + + throw exc; } catch (TargetInvocationException exc) { @@ -331,6 +357,17 @@ public async Task StopJobsAsync(bool isSync) await StopJobsAsync(jobIDs, isSync); } + private async Task StopJobsAsync(TaskInfo taskInfo) + { + //Check if paused? Then un-paused/continue first, or the cancel will not be hit + if (taskInfo.PauseSource != null && taskInfo.PauseSource.Token.IsPaused) + Task.Run(() => taskInfo.PauseSource.Continue()).ConfigureAwait(false); //continue running task + + taskInfo.CancelSource.Cancel(); //attempt to cancel task + + await taskInfo.JobTask.ConfigureAwait(false); + } + private async Task StopJobsAsync(IReadOnlyCollection jobIDs, bool isSync) { var nonWaitJobIDs = new List(); @@ -345,14 +382,8 @@ private async Task StopJobsAsync(IReadOnlyCollection jobIDs, bool isSync { if (!taskInfo.CancelSource.Token.IsCancellationRequested) { - //Check if paused? Then un-paused/continue first, or the cancel will not be hit - if (taskInfo.PauseSource != null && taskInfo.PauseSource.Token.IsPaused) - taskInfo.PauseSource.Continue(); - - taskInfo.CancelSource.Cancel(); //attempt to cancel task - - //Don't hold the process, just run another task to wait for cancellable task - Task.Run(async () => await taskInfo.JobTask.ConfigureAwait(false)) + //Don't hold the process, just run another task to cancel and wait for cancellable task + Task.Run(async () => await StopJobsAsync(taskInfo)) .ContinueWith(result => { taskList.Remove(jobID); @@ -408,120 +439,6 @@ private async Task SetToStoppedAsync(IReadOnlyCollection jobIDs, bool is } #endregion - #region Clean Up - /// - /// Cleanup and synchronize running jobs and jobs table. - /// * Job is deleted based on AutoDeletePeriod and AutoDeleteStatus settings. - /// * Mark job as an error, when job status is "RUNNING" in DB table, but there is no actual running thread in the related server process (Zombie Jobs). - /// * Mark job as an error, when job status is "PAUSED" in DB table, but there is no actual running thread in the related server process (Zombie Jobs), can't run it again. - /// * Remove thread references in memory, when job is deleted or status in DB is: stopped, error, or completed. - /// - public async Task CleanUpAsync(bool isSync) - { - if(isSync) - { - StopJobsAsync(isSync).GetAwaiter().GetResult(); - - //Delete past completed jobs from storage - if (this.autoDeletePeriod != null) - { - jobDAL.Delete(this.autoDeletePeriod.Value, this.autoDeleteStatus); - } - } - else - { - await StopJobsAsync(isSync); - - //Delete past completed jobs from storage - if (this.autoDeletePeriod != null) - { - await jobDAL.DeleteAsync(this.autoDeletePeriod.Value, this.autoDeleteStatus); - } - } - - //Get all RUNNING process from this worker ProcessID - var jobList = isSync ? jobDAL.GetJobsByProcessAndStatus(workerProcessID, JobStatus.Running) : await jobDAL.GetJobsByProcessAndStatusAsync(workerProcessID, JobStatus.Running); - foreach (var job in jobList) - { - if (!taskList.ContainsKey(job.JobID)) - { - //Doesn't exist anymore? - var error = "Error: No actual running job process found. Try reset and run again."; - var processID = string.IsNullOrWhiteSpace(job.ProcessID) ? workerProcessID : job.ProcessID; - var count = isSync ? SetErrorAsync(processID, job.JobID, error, isSync).GetAwaiter().GetResult() - : await SetErrorAsync(processID, job.JobID, error, isSync); - } - } - - //Get all PAUSED process from this worker ProcessID - jobList = isSync ? jobDAL.GetJobsByProcessAndStatus(workerProcessID, JobStatus.Paused) : await jobDAL.GetJobsByProcessAndStatusAsync(workerProcessID, JobStatus.Paused); - foreach (var job in jobList) - { - if (!taskList.ContainsKey(job.JobID)) - { - //Doesn't exist anymore? - var error = "Error: No actual running job process found, unable to continue paused job. Try reset and run again."; - var processID = string.IsNullOrWhiteSpace(job.ProcessID) ? workerProcessID : job.ProcessID; - var count = isSync ? SetErrorAsync(processID, job.JobID, error, isSync).GetAwaiter().GetResult() - : await SetErrorAsync(processID, job.JobID, error, isSync); - } - } - - //Synchronize what's in taskList status and in DB - //Remove all non-running jobs from taskList - if (taskList.Count > 0) - { - var inDBjobIDs = new List(); - jobList = isSync ? jobDAL.GetJobs(taskList.Keys.ToList()) : await jobDAL.GetJobsAsync(taskList.Keys.ToList()); //get all jobs in taskList - - // If jobs doesn't even exists in storage (deleted manually?), remove from taskList. - inDBjobIDs = jobList.Select(j => j.JobID).ToList(); - var removalList = new List(); - foreach (var jobID in taskList.Keys) - { - //jobID is not in DB? - if (!inDBjobIDs.Contains(jobID)) - { - removalList.Add(jobID); - } - } - - //Try to Stop/Cancel jobs and remove from taskList - if (isSync) - { - StopJobsAsync(new ReadOnlyCollection(removalList), isSync).GetAwaiter().GetResult(); - } - else - { - await StopJobsAsync(new ReadOnlyCollection(removalList), isSync); - } - - // For job status that is stopped, error, completed => Remove from thread list, no need to keep track of them anymore. - var statuses = new List - { - (int)JobStatus.Stopped, - (int)JobStatus.Error, - (int)JobStatus.Completed - }; - - foreach (var job in jobList) - { - if (job.Status != null - && statuses.Contains((int)job.Status) - && taskList.ContainsKey(job.JobID)) - { - var taskInfo = taskList[job.JobID]; - if (taskInfo.CancelSource != null) - taskInfo.CancelSource.Dispose(); - taskList.Remove(job.JobID); - } - } - - } - - } - #endregion - #region Pause /// /// Pause jobs. @@ -551,7 +468,7 @@ private async Task PauseJobsAsync(IReadOnlyCollection jobIDs, bool isSyn { if (!taskInfo.PauseSource.Token.IsPaused) { - taskInfo.PauseSource.Pause(); //pause task + Task.Run(() => taskInfo.PauseSource.Pause()).ConfigureAwait(false); //pause task if (isSync) { SetToPausedAsync(new List() { jobID }, isSync).GetAwaiter().GetResult(); @@ -613,7 +530,7 @@ private async Task ContinueJobsAsync(IReadOnlyCollection jobIDs, bool is var taskInfo = taskList.ContainsKey(jobID) ? taskList[jobID] : null; if (taskInfo != null && taskInfo.PauseSource != null && taskInfo.PauseSource.Token.IsPaused) { - taskInfo.PauseSource.Continue(); //continue running task + Task.Run(() => taskInfo.PauseSource.Continue()).ConfigureAwait(false); //continue running task if (isSync) { SetToRunningAsync(new List() { jobID }, isSync).GetAwaiter().GetResult(); @@ -640,6 +557,120 @@ private async Task SetToRunningAsync(IReadOnlyCollection jobIDs, bool is } #endregion + + #region Clean Up + /// + /// Cleanup and synchronize running jobs and jobs table. + /// * Job is deleted based on AutoDeletePeriod and AutoDeleteStatus settings. + /// * Mark job as an error, when job status is "RUNNING" in DB table, but there is no actual running thread in the related server process (Zombie Jobs). + /// * Mark job as an error, when job status is "PAUSED" in DB table, but there is no actual running thread in the related server process (Zombie Jobs), can't run it again. + /// * Remove thread references in memory, when job is deleted or status in DB is: stopped, error, or completed. + /// + public async Task CleanUpAsync(bool isSync) + { + if (isSync) + { + StopJobsAsync(isSync).GetAwaiter().GetResult(); + + //Delete past completed jobs from storage + if (this.autoDeletePeriod != null) + { + jobDAL.Delete(this.autoDeletePeriod.Value, this.autoDeleteStatus); + } + } + else + { + await StopJobsAsync(isSync); + + //Delete past completed jobs from storage + if (this.autoDeletePeriod != null) + { + await jobDAL.DeleteAsync(this.autoDeletePeriod.Value, this.autoDeleteStatus); + } + } + + //Get all RUNNING process from this worker ProcessID + var jobList = isSync ? jobDAL.GetJobsByProcessAndStatus(workerProcessID, JobStatus.Running) : await jobDAL.GetJobsByProcessAndStatusAsync(workerProcessID, JobStatus.Running); + foreach (var job in jobList) + { + if (!taskList.ContainsKey(job.JobID)) + { + //Doesn't exist anymore? + var error = "Error: No actual running job process found. Try reset and run again."; + var processID = string.IsNullOrWhiteSpace(job.ProcessID) ? workerProcessID : job.ProcessID; + var count = isSync ? SetErrorAsync(processID, job.JobID, error, isSync).GetAwaiter().GetResult() + : await SetErrorAsync(processID, job.JobID, error, isSync); + } + } + + //Get all PAUSED process from this worker ProcessID + jobList = isSync ? jobDAL.GetJobsByProcessAndStatus(workerProcessID, JobStatus.Paused) : await jobDAL.GetJobsByProcessAndStatusAsync(workerProcessID, JobStatus.Paused); + foreach (var job in jobList) + { + if (!taskList.ContainsKey(job.JobID)) + { + //Doesn't exist anymore? + var error = "Error: No actual running job process found, unable to continue paused job. Try reset and run again."; + var processID = string.IsNullOrWhiteSpace(job.ProcessID) ? workerProcessID : job.ProcessID; + var count = isSync ? SetErrorAsync(processID, job.JobID, error, isSync).GetAwaiter().GetResult() + : await SetErrorAsync(processID, job.JobID, error, isSync); + } + } + + //Synchronize what's in taskList status and in DB + //Remove all non-running jobs from taskList + if (taskList.Count > 0) + { + var inDBjobIDs = new List(); + jobList = isSync ? jobDAL.GetJobs(taskList.Keys.ToList()) : await jobDAL.GetJobsAsync(taskList.Keys.ToList()); //get all jobs in taskList + + // If jobs doesn't even exists in storage (deleted manually?), remove from taskList. + inDBjobIDs = jobList.Select(j => j.JobID).ToList(); + var removalList = new List(); + foreach (var jobID in taskList.Keys) + { + //jobID is not in DB? + if (!inDBjobIDs.Contains(jobID)) + { + removalList.Add(jobID); + } + } + + //Try to Stop/Cancel jobs and remove from taskList + if (isSync) + { + StopJobsAsync(new ReadOnlyCollection(removalList), isSync).GetAwaiter().GetResult(); + } + else + { + await StopJobsAsync(new ReadOnlyCollection(removalList), isSync); + } + + // For job status that is stopped, error, completed => Remove from thread list, no need to keep track of them anymore. + var statuses = new List + { + (int)JobStatus.Stopped, + (int)JobStatus.Error, + (int)JobStatus.Completed + }; + + foreach (var job in jobList) + { + if (job.Status != null + && statuses.Contains((int)job.Status) + && taskList.ContainsKey(job.JobID)) + { + var taskInfo = taskList[job.JobID]; + if (taskInfo.CancelSource != null) + taskInfo.CancelSource.Dispose(); + taskList.Remove(job.JobID); + } + } + + } + + } + #endregion } }