Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support KEDA queue scale trigger for Logic App workflow app #196

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Common/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ public static TimeSpan MaxAllowedExecutionTime
public const string DurableTaskStorageProvider = "storageProvider";
public const string DurableTaskMicrosoftSqlProviderType = "MicrosoftSQL";
public const string MicrosoftSqlScaler = "mssql";
public const string AzureQueueScaler = "azure-queue";
public const string DurableTask = "durableTask";
public const string WorkflowAppKind = "workflowApp";
public const string WorkflowExtensionName = "workflow";
public const string WorkflowSettingsName = "Settings";
public const string Extensions = "extensions";
public const string SitePackages = "SitePackages";
public const string PackageNameTxt = "packagename.txt";
Expand Down
1 change: 1 addition & 0 deletions Kudu.Contracts/IEnvironment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,5 +34,6 @@ public interface IEnvironment
bool IsOnLinuxConsumption { get; } // e.g. True on Linux Consumption. False on App Service.
bool IsK8SEApp { get; }
string K8SEAppName { get; }
string K8SEAppKind { get; }
}
}
17 changes: 15 additions & 2 deletions Kudu.Core/Environment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class Environment : IEnvironment
private readonly string _secondaryJobsBinariesPath;
private readonly string _k8seAppName;
private readonly string _k8seAppNamespace;
private readonly string _k8seAppKind;


// This ctor is used only in unit tests
Expand All @@ -65,7 +66,8 @@ public Environment(
string requestId,
IHttpContextAccessor httpContextAccessor,
string k8seAppName = null,
string k8seAppNamespace = null)
string k8seAppNamespace = null,
string k8seAppKind = null)
{
if (repositoryPath == null)
{
Expand Down Expand Up @@ -106,6 +108,7 @@ public Environment(
_httpContextAccessor = httpContextAccessor;
_k8seAppName = k8seAppName;
_k8seAppNamespace = k8seAppNamespace;
_k8seAppKind = k8seAppKind;
}

public Environment(
Expand All @@ -116,7 +119,8 @@ public Environment(
string kuduConsoleFullPath,
IHttpContextAccessor httpContextAccessor,
string k8seAppName = null,
string k8seAppNamespace = null)
string k8seAppNamespace = null,
string k8seAppKind = null)
{
RootPath = rootPath;

Expand All @@ -133,6 +137,7 @@ public Environment(
_locksPath = Path.Combine(SiteRootPath, Constants.LocksPath);
_k8seAppName = k8seAppName;
_k8seAppNamespace = k8seAppNamespace;
_k8seAppKind = k8seAppKind;

if (OSDetector.IsOnWindows())
{
Expand Down Expand Up @@ -499,6 +504,14 @@ public string K8SEAppNamespace
}
}

public string K8SEAppKind
{
get
{
return _k8seAppKind;
}
}

public static string GetFreeSpaceHtml(string path)
{
try
Expand Down
64 changes: 61 additions & 3 deletions Kudu.Core/Functions/KedaFunctionTriggerProvider.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using Newtonsoft.Json.Linq;
using Kudu.Core.Helpers;
using Newtonsoft.Json.Linq;
using System;
using System.Collections.Generic;
using System.IO;
Expand All @@ -9,7 +10,7 @@ namespace Kudu.Core.Functions
{
public static class KedaFunctionTriggerProvider
{
public static IEnumerable<ScaleTrigger> GetFunctionTriggers(string zipFilePath)
public static IEnumerable<ScaleTrigger> GetFunctionTriggers(string zipFilePath, string appName = null, string appKind = null)
{
if (!File.Exists(zipFilePath))
{
Expand Down Expand Up @@ -55,7 +56,18 @@ bool IsHostJson(string fullName)
return fullName.Equals(Constants.FunctionsHostConfigFile, StringComparison.OrdinalIgnoreCase);
}

return CreateScaleTriggers(triggerBindings, hostJsonText);
var triggers = CreateScaleTriggers(triggerBindings, hostJsonText).ToList();

if (appKind?.ToLowerInvariant() == Constants.WorkflowAppKind.ToLowerInvariant())
{
// NOTE(haassyad) Check if the host json has the workflow extension loaded. If so we will add a queue scale trigger for the job dispatcher queue.
if (TryGetWorkflowKedaTrigger(hostJsonText, appName, out ScaleTrigger workflowScaleTrigger))
{
triggers.Add(workflowScaleTrigger);
}
}

return triggers;
}

public static IEnumerable<ScaleTrigger> GetFunctionTriggers(IEnumerable<JObject> functionsJson, string hostJsonText)
Expand Down Expand Up @@ -222,6 +234,52 @@ internal static bool TryGetDurableKedaTrigger(string hostJsonText, out ScaleTrig
return scaleTrigger != null;
}

/// <summary>
/// Tries to add a scale trigger if the app is a workflow app.
/// </summary>
/// <param name="hostJsonText">The host.json text.</param>
/// <param name="appName">The app name.</param>
/// <param name="scaleTrigger">The scale trigger.</param>
/// <returns>true if a scale trigger was found</returns>
internal static bool TryGetWorkflowKedaTrigger(string hostJsonText, string appName, out ScaleTrigger scaleTrigger)
{
JObject hostJson = JObject.Parse(hostJsonText);

// Check the host.json file for workflow settings.
JObject workflowSettings = hostJson
.SelectToken(path: $"{Constants.Extensions}.{Constants.WorkflowExtensionName}.{Constants.WorkflowSettingsName}") as JObject;

// Get the queue length if specified, otherwise default to arbitrary value.
var queueLengthObject = workflowSettings?["Runtime.ScaleMonitor.KEDA.TargetQueueLength"];
var queueLength = queueLengthObject != null ? queueLengthObject.ToString() : "20";

// Get the host id if specified, otherwise default to app name.
var hostIdObject = workflowSettings?["Runtime.HostId"];
var hostId = hostIdObject != null ? hostIdObject.ToString() : appName;

// Hash the host id.
var hostSpecificStorageId = StringHelper
.EscapeAndTrimStorageKeyPrefix(HashHelper.MurmurHash64(hostId).ToString("X"), 32)
.ToLowerInvariant();

var queuePrefix = $"flow{hostSpecificStorageId}jobtriggers";

scaleTrigger = new ScaleTrigger
{
// Azure queue scaler reference: https://keda.sh/docs/2.2/scalers/azure-storage-queue/
Type = Constants.AzureQueueScaler,
Metadata = new Dictionary<string, string>
{
// NOTE(haassyad): We only have one queue partition in single tenant.
["queueName"] = StringHelper.GetWorkflowQueueNameInternal(queuePrefix, 1),
["queueLength"] = queueLength,
["connectionStringFromEnv"] = "AzureWebJobsStorage",
}
};

return true;
}

// match https://github.com/Azure/azure-functions-core-tools/blob/6bfab24b2743f8421475d996402c398d2fe4a9e0/src/Azure.Functions.Cli/Kubernetes/KEDA/V2/KedaV2Resource.cs#L91
internal static IDictionary<string, string> PopulateMetadataDictionary(JToken t, string functionName)
{
Expand Down
138 changes: 138 additions & 0 deletions Kudu.Core/Helpers/HashHelper.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
using System.Text;

namespace Kudu.Core.Helpers
{
public static class HashHelper
{
/// <summary>
/// Computes 64-bit Murmur hash.
/// </summary>
/// <param name="str">The input string.</param>
/// <param name="seed">The input seed.</param>
public static ulong MurmurHash64(string str, uint seed = 0)
{
return HashHelper.MurmurHash64(Encoding.UTF8.GetBytes(str), seed);
}

/// <summary>
/// Computes 64-bit Murmur hash.
/// </summary>
/// <param name="data">The input data.</param>
/// <param name="seed">The input seed.</param>
public static ulong MurmurHash64(byte[] data, uint seed = 0)
{
const uint C1 = 0x239b961b;
const uint C2 = 0xab0e9789;
const uint C3 = 0x561ccd1b;
const uint C4 = 0x0bcaa747;
const uint C5 = 0x85ebca6b;
const uint C6 = 0xc2b2ae35;

int length = data.Length;

unchecked
{
uint h1 = seed;
uint h2 = seed;

int index = 0;
while (index + 7 < length)
{
uint k1 = (uint)(data[index + 0] | data[index + 1] << 8 | data[index + 2] << 16 | data[index + 3] << 24);
uint k2 = (uint)(data[index + 4] | data[index + 5] << 8 | data[index + 6] << 16 | data[index + 7] << 24);

k1 *= C1;
k1 = k1.RotateLeft32(15);
k1 *= C2;
h1 ^= k1;
h1 = h1.RotateLeft32(19);
h1 += h2;
h1 = (h1 * 5) + C3;

k2 *= C2;
k2 = k2.RotateLeft32(17);
k2 *= C1;
h2 ^= k2;
h2 = h2.RotateLeft32(13);
h2 += h1;
h2 = (h2 * 5) + C4;

index += 8;
}

int tail = length - index;
if (tail > 0)
{
uint k1 = (tail >= 4) ? (uint)(data[index + 0] | data[index + 1] << 8 | data[index + 2] << 16 | data[index + 3] << 24) :
(tail == 3) ? (uint)(data[index + 0] | data[index + 1] << 8 | data[index + 2] << 16) :
(tail == 2) ? (uint)(data[index + 0] | data[index + 1] << 8) :
(uint)data[index + 0];

k1 *= C1;
k1 = k1.RotateLeft32(15);
k1 *= C2;
h1 ^= k1;

if (tail > 4)
{
uint k2 = (tail == 7) ? (uint)(data[index + 4] | data[index + 5] << 8 | data[index + 6] << 16) :
(tail == 6) ? (uint)(data[index + 4] | data[index + 5] << 8) :
(uint)data[index + 4];

k2 *= C2;
k2 = k2.RotateLeft32(17);
k2 *= C1;
h2 ^= k2;
}
}

h1 ^= (uint)length;
h2 ^= (uint)length;

h1 += h2;
h2 += h1;

h1 ^= h1 >> 16;
h1 *= C5;
h1 ^= h1 >> 13;
h1 *= C6;
h1 ^= h1 >> 16;

h2 ^= h2 >> 16;
h2 *= C5;
h2 ^= h2 >> 13;
h2 *= C6;
h2 ^= h2 >> 16;

h1 += h2;
h2 += h1;

return ((ulong)h2 << 32) | (ulong)h1;
}
}

#region RotateLeft

/// <summary>
/// Rotates the bits in the provided value to the left (where the number of bits is specified).
/// </summary>
/// <param name="value">The value to be rotated.</param>
/// <param name="count">The number of bits to rotate.</param>
private static uint RotateLeft32(this uint value, int count)
{
return (value << count) | (value >> (32 - count));
}

/// <summary>
/// Rotates the bits in the provided value to the right (where the number of bits is specified).
/// </summary>
/// <param name="value">The value to be rotated.</param>
/// <param name="count">The number of bits to rotate.</param>
private static ulong RotateLeft64(this ulong value, int count)
{
return (value << count) | (value >> (64 - count));
}

#endregion
}
}
Loading