Skip to content

Commit

Permalink
Cherry picks for 4.1.0-preview (#519)
Browse files Browse the repository at this point in the history
* Fix Docker Compose Failure in Build Pipeline (#516)

* Adding docker compose commands for kafka containers

* Using powershell to run docker compose commands

* Installing docker compose in pipeline using bash script

* Trying run docker compose from the root folder

* Trying sudo to install docker compose

* Changing directory for docker compose

* Cleaning up public build pipeline

* Adding support for Flex Consumption (#515)

* Adding support for flex consumption and refactoring

* Supporting certificate PEMs and parsing for Keyvault

* Improved Null handling for regex matching

* Adding changes for Kafka Output binding

* Upgraded extension version to 4.1.0-preview (#518)
  • Loading branch information
jainharsh98 authored Oct 14, 2024
1 parent 4d40e1f commit 4dcf67b
Show file tree
Hide file tree
Showing 17 changed files with 496 additions and 75 deletions.
2 changes: 1 addition & 1 deletion build/common.props
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<Project>
<PropertyGroup>
<!-- Extensions can have independent versions and only increment when released -->
<Version>4.0.0$(VersionSuffix)</Version>
<Version>4.1.0-preview$(VersionSuffix)</Version>
<TargetFramework>netstandard2.0</TargetFramework>
<Authors>Microsoft</Authors>
<Company>Microsoft</Company>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Copyright (c) .NET Foundation. All rights reserved.
// Licensed under the MIT License. See License.txt in the project root for license information.

using Microsoft.Azure.WebJobs.Host.Scale;
using Microsoft.Extensions.DependencyInjection;
using System;

Expand All @@ -9,12 +10,12 @@ namespace Microsoft.Azure.WebJobs.Extensions.Kafka
public static class KafkaWebJobsBuilderExtensions
{
/// <summary>
/// Adds the Kafka extensions to the provider <see cref="IWebJobsBuilder"/>
/// Adds the Kafka extensions to the provider <see cref="IWebJobsBuilder"/>.
/// </summary>
public static IWebJobsBuilder AddKafka(this IWebJobsBuilder builder) => AddKafka(builder, o => { });

/// <summary>
/// Adds the Kafka extensions to the provider <see cref="IWebJobsBuilder"/>
/// Adds the Kafka extensions to the provider <see cref="IWebJobsBuilder"/>.
/// </summary>
public static IWebJobsBuilder AddKafka(this IWebJobsBuilder builder, Action<KafkaOptions> configure)
{
Expand All @@ -40,5 +41,22 @@ public static IWebJobsBuilder AddKafka(this IWebJobsBuilder builder, Action<Kafk

return builder;
}

internal static IWebJobsBuilder AddKafkaScaleForTrigger(this IWebJobsBuilder builder, TriggerMetadata triggerMetadata)
{
IServiceProvider serviceProvider = null;
var scalerProvider = new Lazy<KafkaScalerProvider>(() => new KafkaScalerProvider(serviceProvider, triggerMetadata));
builder.Services.AddSingleton((Func<IServiceProvider, IScaleMonitorProvider>)delegate (IServiceProvider resolvedServiceProvider)
{
serviceProvider = serviceProvider ?? resolvedServiceProvider;
return scalerProvider.Value;
});
builder.Services.AddSingleton((Func<IServiceProvider, ITargetScalerProvider>)delegate (IServiceProvider resolvedServiceProvider)
{
serviceProvider = serviceProvider ?? resolvedServiceProvider;
return scalerProvider.Value;
});
return builder;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
using Microsoft.Azure.WebJobs.Extensions.Kafka;
using Microsoft.Azure.WebJobs.Host.Executors;
using Microsoft.Azure.WebJobs.Host.Listeners;
using Microsoft.Azure.WebJobs.Host.Protocols;
Expand Down Expand Up @@ -48,8 +49,8 @@ internal class KafkaListener<TKey, TValue> : IListener, IScaleMonitorProvider, I
private readonly string functionId;
protected Lazy<KafkaMetricsProvider<TKey, TValue>> metricsProvider;
//protected for the unit test
protected Lazy<KafkaTopicScaler<TKey, TValue>> topicScaler;
protected Lazy<KafkaTargetScaler<TKey, TValue>> targetScaler;
protected Lazy<KafkaGenericTopicScaler<TKey, TValue>> topicScaler;
protected Lazy<KafkaGenericTargetScaler<TKey, TValue>> targetScaler;

/// <summary>
/// Gets the value deserializer.
Expand Down Expand Up @@ -80,8 +81,8 @@ public KafkaListener(
this.functionId = functionId;
this.consumer = new Lazy<IConsumer<TKey, TValue>>(() => CreateConsumer());
this.metricsProvider = new Lazy<KafkaMetricsProvider<TKey, TValue>>(CreateMetricsProvider);
this.topicScaler = new Lazy<KafkaTopicScaler<TKey, TValue>>(CreateTopicScaler);
this.targetScaler = new Lazy<KafkaTargetScaler<TKey, TValue>>(CreateTargetScaler);
this.topicScaler = new Lazy<KafkaGenericTopicScaler<TKey, TValue>>(CreateTopicScaler);
this.targetScaler = new Lazy<KafkaGenericTargetScaler<TKey, TValue>>(CreateTargetScaler);
}

private IConsumer<TKey, TValue> CreateConsumer()
Expand Down Expand Up @@ -121,14 +122,14 @@ private KafkaMetricsProvider<TKey, TValue> CreateMetricsProvider()
return new KafkaMetricsProvider<TKey, TValue>(this.topicName, new AdminClientConfig(GetConsumerConfiguration()), consumer.Value, this.logger);
}

private KafkaTopicScaler<TKey, TValue> CreateTopicScaler()
private KafkaGenericTopicScaler<TKey, TValue> CreateTopicScaler()
{
return new KafkaTopicScaler<TKey, TValue>(this.listenerConfiguration.Topic, this.consumerGroup, this.functionId, this.consumer.Value, metricsProvider.Value, this.listenerConfiguration.LagThreshold, this.logger);
return new KafkaGenericTopicScaler<TKey, TValue>(this.listenerConfiguration.Topic, this.consumerGroup, this.functionId, this.consumer.Value, metricsProvider.Value, this.listenerConfiguration.LagThreshold, this.logger);
}

private KafkaTargetScaler<TKey, TValue> CreateTargetScaler()
private KafkaGenericTargetScaler<TKey, TValue> CreateTargetScaler()
{
return new KafkaTargetScaler<TKey, TValue>(this.listenerConfiguration.Topic, this.consumerGroup, this.functionId, this.consumer.Value, metricsProvider.Value, this.listenerConfiguration.LagThreshold, this.logger);
return new KafkaGenericTargetScaler<TKey, TValue>(this.listenerConfiguration.Topic, this.consumerGroup, this.functionId, this.consumer.Value, metricsProvider.Value, this.listenerConfiguration.LagThreshold, this.logger);
}

public void Cancel()
Expand Down Expand Up @@ -189,6 +190,9 @@ private ConsumerConfig GetConsumerConfiguration()
SslCertificateLocation = this.listenerConfiguration.SslCertificateLocation,
SslKeyLocation = this.listenerConfiguration.SslKeyLocation,
SslKeyPassword = this.listenerConfiguration.SslKeyPassword,
SslCaPem = this.listenerConfiguration.SslCaPEM,
SslCertificatePem = this.listenerConfiguration.SslCertificatePEM,
SslKeyPem = this.listenerConfiguration.SslKeyPEM,

// OAuthBearer config
SaslOauthbearerMethod = this.listenerConfiguration.SaslOAuthBearerMethod,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

namespace Microsoft.Azure.WebJobs.Extensions.Kafka
{
internal class KafkaTargetScaler<Tkey, TValue> : ITargetScaler
internal class KafkaGenericTargetScaler<Tkey, TValue> : ITargetScaler
{
private readonly string topicName;
private readonly string consumerGroup;
Expand All @@ -21,28 +21,28 @@ internal class KafkaTargetScaler<Tkey, TValue> : ITargetScaler
protected TargetScalerResult lastTargetScalerResult;

public TargetScalerDescriptor TargetScalerDescriptor { get; }
internal KafkaTargetScaler(string topic, string consumerGroup, string functionID, IConsumer<Tkey, TValue> consumer, KafkaMetricsProvider<Tkey, TValue> metricsProvider, long lagThreshold, ILogger logger)

internal KafkaGenericTargetScaler(string topic, string consumerGroup, string functionID, IConsumer<Tkey, TValue> consumer, KafkaMetricsProvider<Tkey, TValue> metricsProvider, long lagThreshold, ILogger logger)
{
if (string.IsNullOrWhiteSpace(topic))
{
throw new ArgumentException("Invalid topic: ", nameof(topic));
}
}

if (string.IsNullOrWhiteSpace(consumerGroup))
{
throw new ArgumentException("Invalid consumer group: ", nameof(consumerGroup));
}

this.topicName = topic;
topicName = topic;
this.consumerGroup = consumerGroup;
this.TargetScalerDescriptor = new TargetScalerDescriptor(functionID);
TargetScalerDescriptor = new TargetScalerDescriptor(functionID);
this.lagThreshold = lagThreshold;
this.logger = logger;
this.metricsProvider = metricsProvider;

this.lastScaleUpTime = DateTime.MinValue;
this.lastTargetScalerResult = null;
lastScaleUpTime = DateTime.MinValue;
lastTargetScalerResult = null;

this.logger.LogInformation($"Started Target Scaler - topic name: {topicName}, consumerGroup: {consumerGroup}, functionID: {functionID}, lagThreshold: {lagThreshold}.");
}
Expand All @@ -51,20 +51,20 @@ public async Task<TargetScalerResult> GetScaleResultAsync(TargetScalerContext co
{
var metrics = await ValidateAndGetMetrics();
TargetScalerResult targetScalerResult = GetScaleResultInternal(context, metrics);
this.lastTargetScalerResult = targetScalerResult;
lastTargetScalerResult = targetScalerResult;
return targetScalerResult;
}

internal async Task<KafkaTriggerMetrics> ValidateAndGetMetrics()
{
// if the metrics don't exist or the last calculated metrics
// are older than 2 minutes, recalculate the metrics.
var metrics = this.metricsProvider.LastCalculatedMetrics;
var metrics = metricsProvider.LastCalculatedMetrics;
TimeSpan metricsTimeOut = TimeSpan.FromMinutes(1);
if (metrics == null || DateTime.UtcNow - metrics.Timestamp > metricsTimeOut)
{
metrics = await this.metricsProvider.GetMetricsAsync();
this.logger.LogInformation($"Calculating metrics as last calculated don't exist or were stored 1 minute ago.");
metrics = await metricsProvider.GetMetricsAsync();
logger.LogInformation($"Calculating metrics as last calculated don't exist or were stored 1 minute ago.");
}
return metrics;
}
Expand All @@ -85,18 +85,18 @@ internal TargetScalerResult GetScaleResultInternal(TargetScalerContext context,
};
}

var targetConcurrency = GetConcurrency(context, this.lagThreshold);
int targetWorkerCount = (int) Math.Ceiling(totalLag / (decimal) targetConcurrency);
var targetConcurrency = GetConcurrency(context, lagThreshold);

int targetWorkerCount = (int)Math.Ceiling(totalLag / (decimal)targetConcurrency);

targetWorkerCount = ValidateWithPartitionCount(targetWorkerCount, partitionCount);
targetWorkerCount = ThrottleResultIfNecessary(targetWorkerCount);
if (GetChangeInWorkerCount(targetWorkerCount) > 0)
{
this.lastScaleUpTime = DateTime.UtcNow;
lastScaleUpTime = DateTime.UtcNow;
}

this.logger.LogInformation($"Total Lag: {totalLag}, concurrency: {targetConcurrency} TargetWorkerCount: {targetWorkerCount}. For the topic {this.topicName}, consumer group {consumerGroup}.");
logger.LogInformation($"Total Lag: {totalLag}, concurrency: {targetConcurrency} TargetWorkerCount: {targetWorkerCount}. For the topic {topicName}, consumer group {consumerGroup}.");

return new TargetScalerResult
{
Expand All @@ -109,7 +109,7 @@ internal int GetConcurrency(TargetScalerContext context, long lagThreshold)
// If dynamicConcurrencyEnabled is set to true, target concurrency is
// set to instanceConcurrency value, else it is set to lagThreshold
// (default value = 1000).
int targetConcurrency = context.InstanceConcurrency ?? (int) lagThreshold;
int targetConcurrency = context.InstanceConcurrency ?? (int)lagThreshold;
if (targetConcurrency < 1)
{
throw new ArgumentException("Target concurrency must be larger than 0.");
Expand All @@ -122,7 +122,7 @@ internal int ValidateWithPartitionCount(int targetWorkerCount, long partitionCou
// Limit targetWorkerCount to number of partitions.
if (targetWorkerCount > partitionCount)
{
targetWorkerCount = (int) partitionCount;
targetWorkerCount = (int)partitionCount;
}

return targetWorkerCount;
Expand All @@ -134,12 +134,12 @@ internal int ThrottleResultIfNecessary(int targetWorkerCount)
if (GetChangeInWorkerCount(targetWorkerCount) < 0)
{
var scaleDownThrottleTime = TimeSpan.FromMinutes(1);
if (lastScaleUpTime != DateTime.MinValue && DateTime.UtcNow - this.lastScaleUpTime < scaleDownThrottleTime)
if (lastScaleUpTime != DateTime.MinValue && DateTime.UtcNow - lastScaleUpTime < scaleDownThrottleTime)
{
if (this.lastTargetScalerResult != null)
if (lastTargetScalerResult != null)
{
targetWorkerCount = this.lastTargetScalerResult.TargetWorkerCount;
this.logger.LogInformation($"Throttling scale down as last scale up was less than 1 minute ago. Returning last target worker count: {lastTargetScalerResult.TargetWorkerCount}");
targetWorkerCount = lastTargetScalerResult.TargetWorkerCount;
logger.LogInformation($"Throttling scale down as last scale up was less than 1 minute ago. Returning last target worker count: {lastTargetScalerResult.TargetWorkerCount}");
}
}
}
Expand All @@ -148,11 +148,11 @@ internal int ThrottleResultIfNecessary(int targetWorkerCount)

internal int GetChangeInWorkerCount(int targetWorkerCount)
{
if (this.lastTargetScalerResult == null)
if (lastTargetScalerResult == null)
{
return targetWorkerCount;
}
return targetWorkerCount - this.lastTargetScalerResult.TargetWorkerCount;
return targetWorkerCount - lastTargetScalerResult.TargetWorkerCount;
}
}
}
Loading

0 comments on commit 4dcf67b

Please sign in to comment.