Skip to content

Commit

Permalink
Implement the management client (#12484)
Browse files Browse the repository at this point in the history
ATOM based management operations
  • Loading branch information
ShivangiReja authored Jun 5, 2020
1 parent a6b6aee commit 9bd1293
Show file tree
Hide file tree
Showing 69 changed files with 6,066 additions and 488 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,11 @@ public static Exception ToMessagingContractException(string condition, string me
return new ServiceBusException(message, ServiceBusException.FailureReason.MessageLockLost);
}

if (string.Equals(condition, AmqpClientConstants.EntityAlreadyExistsError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ServiceBusException(message, ServiceBusException.FailureReason.MessagingEntityAlreadyExists);
}

if (string.Equals(condition, AmqpClientConstants.SessionLockLostError.Value, StringComparison.InvariantCultureIgnoreCase))
{
return new ServiceBusException(message, ServiceBusException.FailureReason.SessionLockLost);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,12 @@
using System.Runtime.Serialization;
using Azure.Core;
using Azure.Messaging.ServiceBus.Amqp.Framing;
using Azure.Messaging.ServiceBus.Filters;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Encoding;
using Microsoft.Azure.Amqp.Framing;
using Azure.Messaging.ServiceBus.Primitives;
using SBMessage = Azure.Messaging.ServiceBus.ServiceBusMessage;
using Azure.Messaging.ServiceBus.Management;

namespace Azure.Messaging.ServiceBus.Amqp
{
Expand Down Expand Up @@ -404,20 +404,20 @@ public static AmqpMap GetRuleDescriptionMap(RuleDescription description)

switch (description.Filter)
{
case SqlFilter sqlFilter:
var filterMap = GetSqlFilterMap(sqlFilter);
ruleDescriptionMap[ManagementConstants.Properties.SqlFilter] = filterMap;
case SqlRuleFilter sqlRuleFilter:
var filterMap = GetSqlRuleFilterMap(sqlRuleFilter);
ruleDescriptionMap[ManagementConstants.Properties.SqlRuleFilter] = filterMap;
break;
case CorrelationFilter correlationFilter:
var correlationFilterMap = GetCorrelationFilterMap(correlationFilter);
ruleDescriptionMap[ManagementConstants.Properties.CorrelationFilter] = correlationFilterMap;
case CorrelationRuleFilter correlationFilter:
var correlationFilterMap = GetCorrelationRuleFilterMap(correlationFilter);
ruleDescriptionMap[ManagementConstants.Properties.CorrelationRuleFilter] = correlationFilterMap;
break;
default:
throw new NotSupportedException(
Resources.RuleFilterNotSupported.FormatForUser(
description.Filter.GetType(),
nameof(SqlFilter),
nameof(CorrelationFilter)));
nameof(SqlRuleFilter),
nameof(CorrelationRuleFilter)));
}

var amqpAction = GetRuleActionMap(description.Action as SqlRuleAction);
Expand All @@ -440,28 +440,28 @@ public static RuleDescription GetRuleDescription(AmqpRuleDescriptionCodec amqpDe
return ruleDescription;
}

public static Filter GetFilter(AmqpFilterCodec amqpFilter)
public static RuleFilter GetFilter(AmqpRuleFilterCodec amqpFilter)
{
Filter filter;
RuleFilter filter;

switch (amqpFilter.DescriptorCode)
{
case AmqpSqlFilterCodec.Code:
var amqpSqlFilter = (AmqpSqlFilterCodec)amqpFilter;
filter = new SqlFilter(amqpSqlFilter.Expression);
case AmqpSqlRuleFilterCodec.Code:
var amqpSqlFilter = (AmqpSqlRuleFilterCodec)amqpFilter;
filter = new SqlRuleFilter(amqpSqlFilter.Expression);
break;

case AmqpTrueFilterCodec.Code:
filter = new TrueFilter();
case AmqpTrueRuleFilterCodec.Code:
filter = new TrueRuleFilter();
break;

case AmqpFalseFilterCodec.Code:
filter = new FalseFilter();
case AmqpFalseRuleFilterCodec.Code:
filter = new FalseRuleFilter();
break;

case AmqpCorrelationFilterCodec.Code:
var amqpCorrelationFilter = (AmqpCorrelationFilterCodec)amqpFilter;
var correlationFilter = new CorrelationFilter
case AmqpCorrelationRuleFilterCodec.Code:
var amqpCorrelationFilter = (AmqpCorrelationRuleFilterCodec)amqpFilter;
var correlationFilter = new CorrelationRuleFilter
{
CorrelationId = amqpCorrelationFilter.CorrelationId,
MessageId = amqpCorrelationFilter.MessageId,
Expand Down Expand Up @@ -498,10 +498,10 @@ private static RuleAction GetRuleAction(AmqpRuleActionCodec amqpAction)
}
else if (amqpAction.DescriptorCode == AmqpSqlRuleActionCodec.Code)
{
var amqpSqlAction = (AmqpSqlRuleActionCodec)amqpAction;
var sqlAction = new SqlRuleAction(amqpSqlAction.SqlExpression);
var amqpSqlRuleAction = (AmqpSqlRuleActionCodec)amqpAction;
var sqlRuleAction = new SqlRuleAction(amqpSqlRuleAction.SqlExpression);

action = sqlAction;
action = sqlRuleAction;
}
else
{
Expand Down Expand Up @@ -702,38 +702,38 @@ private static Data ToData(AmqpMessage message)
return new Data { Value = value };
}

internal static AmqpMap GetSqlFilterMap(SqlFilter sqlFilter)
internal static AmqpMap GetSqlRuleFilterMap(SqlRuleFilter sqlRuleFilter)
{
var amqpFilterMap = new AmqpMap
{
[ManagementConstants.Properties.Expression] = sqlFilter.SqlExpression
[ManagementConstants.Properties.Expression] = sqlRuleFilter.SqlExpression
};
return amqpFilterMap;
}

internal static AmqpMap GetCorrelationFilterMap(CorrelationFilter correlationFilter)
internal static AmqpMap GetCorrelationRuleFilterMap(CorrelationRuleFilter correlationRuleFilter)
{
var correlationFilterMap = new AmqpMap
{
[ManagementConstants.Properties.CorrelationId] = correlationFilter.CorrelationId,
[ManagementConstants.Properties.MessageId] = correlationFilter.MessageId,
[ManagementConstants.Properties.To] = correlationFilter.To,
[ManagementConstants.Properties.ReplyTo] = correlationFilter.ReplyTo,
[ManagementConstants.Properties.Label] = correlationFilter.Label,
[ManagementConstants.Properties.SessionId] = correlationFilter.SessionId,
[ManagementConstants.Properties.ReplyToSessionId] = correlationFilter.ReplyToSessionId,
[ManagementConstants.Properties.ContentType] = correlationFilter.ContentType
var correlationRuleFilterMap = new AmqpMap
{
[ManagementConstants.Properties.CorrelationId] = correlationRuleFilter.CorrelationId,
[ManagementConstants.Properties.MessageId] = correlationRuleFilter.MessageId,
[ManagementConstants.Properties.To] = correlationRuleFilter.To,
[ManagementConstants.Properties.ReplyTo] = correlationRuleFilter.ReplyTo,
[ManagementConstants.Properties.Label] = correlationRuleFilter.Label,
[ManagementConstants.Properties.SessionId] = correlationRuleFilter.SessionId,
[ManagementConstants.Properties.ReplyToSessionId] = correlationRuleFilter.ReplyToSessionId,
[ManagementConstants.Properties.ContentType] = correlationRuleFilter.ContentType
};

var propertiesMap = new AmqpMap();
foreach (var property in correlationFilter.Properties)
foreach (var property in correlationRuleFilter.Properties)
{
propertiesMap[new MapKey(property.Key)] = property.Value;
}

correlationFilterMap[ManagementConstants.Properties.CorrelationFilterProperties] = propertiesMap;
correlationRuleFilterMap[ManagementConstants.Properties.CorrelationRuleFilterProperties] = propertiesMap;

return correlationFilterMap;
return correlationRuleFilterMap;
}

internal static AmqpMap GetRuleActionMap(SqlRuleAction sqlRuleAction)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
using Azure.Core;
using Azure.Messaging.ServiceBus.Amqp.Framing;
using Azure.Messaging.ServiceBus.Core;
using Azure.Messaging.ServiceBus.Filters;
using Azure.Messaging.ServiceBus.Management;
using Microsoft.Azure.Amqp;
using Microsoft.Azure.Amqp.Encoding;

Expand Down Expand Up @@ -58,10 +58,10 @@ internal class AmqpRuleManager : TransportRuleManager

static AmqpRuleManager()
{
AmqpCodec.RegisterKnownTypes(AmqpTrueFilterCodec.Name, AmqpTrueFilterCodec.Code, () => new AmqpTrueFilterCodec());
AmqpCodec.RegisterKnownTypes(AmqpFalseFilterCodec.Name, AmqpFalseFilterCodec.Code, () => new AmqpFalseFilterCodec());
AmqpCodec.RegisterKnownTypes(AmqpCorrelationFilterCodec.Name, AmqpCorrelationFilterCodec.Code, () => new AmqpCorrelationFilterCodec());
AmqpCodec.RegisterKnownTypes(AmqpSqlFilterCodec.Name, AmqpSqlFilterCodec.Code, () => new AmqpSqlFilterCodec());
AmqpCodec.RegisterKnownTypes(AmqpTrueRuleFilterCodec.Name, AmqpTrueRuleFilterCodec.Code, () => new AmqpTrueRuleFilterCodec());
AmqpCodec.RegisterKnownTypes(AmqpFalseRuleFilterCodec.Name, AmqpFalseRuleFilterCodec.Code, () => new AmqpFalseRuleFilterCodec());
AmqpCodec.RegisterKnownTypes(AmqpCorrelationRuleFilterCodec.Name, AmqpCorrelationRuleFilterCodec.Code, () => new AmqpCorrelationRuleFilterCodec());
AmqpCodec.RegisterKnownTypes(AmqpSqlRuleFilterCodec.Name, AmqpSqlRuleFilterCodec.Code, () => new AmqpSqlRuleFilterCodec());
AmqpCodec.RegisterKnownTypes(AmqpEmptyRuleActionCodec.Name, AmqpEmptyRuleActionCodec.Code, () => new AmqpEmptyRuleActionCodec());
AmqpCodec.RegisterKnownTypes(AmqpSqlRuleActionCodec.Name, AmqpSqlRuleActionCodec.Code, () => new AmqpSqlRuleActionCodec());
AmqpCodec.RegisterKnownTypes(AmqpRuleDescriptionCodec.Name, AmqpRuleDescriptionCodec.Code, () => new AmqpRuleDescriptionCodec());
Expand Down Expand Up @@ -121,7 +121,7 @@ public AmqpRuleManager(
///
/// <remarks>
/// You can add rules to the subscription that decides which messages from the topic should reach the subscription.
/// A default <see cref="TrueFilter"/> rule named <see cref="RuleDescription.DefaultRuleName"/> is always added while creation of the Subscription.
/// A default <see cref="TrueRuleFilter"/> rule named <see cref="RuleDescription.DefaultRuleName"/> is always added while creation of the Subscription.
/// You can add multiple rules with distinct names to the same subscription.
/// Multiple filters combine with each other using logical OR condition. i.e., If any filter succeeds, the message is passed on to the subscription.
/// </remarks>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@

namespace Azure.Messaging.ServiceBus.Amqp.Framing
{
internal sealed class AmqpCorrelationFilterCodec : AmqpFilterCodec
internal sealed class AmqpCorrelationRuleFilterCodec : AmqpRuleFilterCodec
{
public const string Name = AmqpConstants.Vendor + ":correlation-filter:list";
public const ulong Code = 0x000001370000009;
private const int Fields = 9;
private AmqpMap properties;

public AmqpCorrelationFilterCodec() : base(Name, Code)
public AmqpCorrelationRuleFilterCodec() : base(Name, Code)
{
properties = new AmqpMap();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@

namespace Azure.Messaging.ServiceBus.Amqp.Framing
{
internal sealed class AmqpFalseFilterCodec : AmqpFilterCodec
internal sealed class AmqpFalseRuleFilterCodec : AmqpRuleFilterCodec
{
public const string Name = AmqpConstants.Vendor + ":false-filter:list";
public const ulong Code = 0x000001370000008;

public AmqpFalseFilterCodec() : base(Name, Code) { }
public AmqpFalseRuleFilterCodec() : base(Name, Code) { }

public override string ToString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ internal sealed class AmqpRuleDescriptionCodec : DescribedList

public AmqpRuleDescriptionCodec() : base(Name, Code) { }

public AmqpFilterCodec Filter { get; set; }
public AmqpRuleFilterCodec Filter { get; set; }

public AmqpRuleActionCodec Action { get; set; }

Expand All @@ -37,7 +37,7 @@ protected override void OnDecode(ByteBuffer buffer, int count)
{
if (count-- > 0)
{
Filter = (AmqpFilterCodec)AmqpCodec.DecodeAmqpDescribed(buffer);
Filter = (AmqpRuleFilterCodec)AmqpCodec.DecodeAmqpDescribed(buffer);
}

if (count-- > 0)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@

namespace Azure.Messaging.ServiceBus.Amqp.Framing
{
internal abstract class AmqpFilterCodec : DescribedList
internal abstract class AmqpRuleFilterCodec : DescribedList
{
protected AmqpFilterCodec(string name, ulong code)
protected AmqpRuleFilterCodec(string name, ulong code)
: base(name, code)
{
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@

namespace Azure.Messaging.ServiceBus.Amqp.Framing
{
internal sealed class AmqpSqlFilterCodec : AmqpFilterCodec
internal sealed class AmqpSqlRuleFilterCodec : AmqpRuleFilterCodec
{
public const string Name = AmqpConstants.Vendor + ":sql-filter:list";
public const ulong Code = 0x000001370000006;
private const int Fields = 2;

public AmqpSqlFilterCodec() : base(Name, Code) { }
public AmqpSqlRuleFilterCodec() : base(Name, Code) { }

public string Expression { get; set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@

namespace Azure.Messaging.ServiceBus.Amqp.Framing
{
internal sealed class AmqpTrueFilterCodec : AmqpFilterCodec
internal sealed class AmqpTrueRuleFilterCodec : AmqpRuleFilterCodec
{
public const string Name = AmqpConstants.Vendor + ":true-filter:list";
public const ulong Code = 0x000001370000007;

public AmqpTrueFilterCodec() : base(Name, Code) { }
public AmqpTrueRuleFilterCodec() : base(Name, Code) { }

public override string ToString()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,17 +71,17 @@ public static class Properties
public static readonly MapKey RuleName = new MapKey("rule-name");
public static readonly MapKey RuleDescription = new MapKey("rule-description");
public static readonly MapKey RuleCreatedAt = new MapKey("rule-created-at");
public static readonly MapKey SqlFilter = new MapKey("sql-filter");
public static readonly MapKey SqlRuleFilter = new MapKey("sql-filter");
public static readonly MapKey SqlRuleAction = new MapKey("sql-rule-action");
public static readonly MapKey CorrelationFilter = new MapKey("correlation-filter");
public static readonly MapKey CorrelationRuleFilter = new MapKey("correlation-filter");
public static readonly MapKey Expression = new MapKey("expression");
public static readonly MapKey CorrelationId = new MapKey("correlation-id");
public static readonly MapKey To = new MapKey("to");
public static readonly MapKey ReplyTo = new MapKey("reply-to");
public static readonly MapKey Label = new MapKey("label");
public static readonly MapKey ReplyToSessionId = new MapKey("reply-to-session-id");
public static readonly MapKey ContentType = new MapKey("content-type");
public static readonly MapKey CorrelationFilterProperties = new MapKey("properties");
public static readonly MapKey CorrelationRuleFilterProperties = new MapKey("properties");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@ namespace Azure.Messaging.ServiceBus.Authorization
///
internal class ServiceBusTokenCredential : TokenCredential
{
/// <summary>The default scope used for token authentication with EventHubs.</summary>
private const string DefaultScope = "https://servicebus.azure.net/.default";

/// <summary>
/// The Service Bus resource to which the token is intended to serve as authorization.
/// </summary>
Expand Down Expand Up @@ -103,6 +100,6 @@ public override ValueTask<AccessToken> GetTokenAsync(
/// <returns>The token representing the shared access signature for this credential.</returns>
///
public ValueTask<AccessToken> GetTokenUsingDefaultScopeAsync(CancellationToken cancellationToken) =>
GetTokenAsync(new TokenRequestContext(new string[] { DefaultScope }), cancellationToken);
GetTokenAsync(new TokenRequestContext(new string[] { Constants.DefaultScope }), cancellationToken);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace Azure.Messaging.ServiceBus.Authorization
internal class SharedAccessSignature
{
/// <summary>The maximum allowed length of the SAS key name.</summary>
private const int MaximumKeyNameLength = 256;
internal const int MaximumKeyNameLength = 256;

/// <summary>The maximum allowed length of the SAS key.</summary>
private const int MaximumKeyLength = 256;
Expand Down Expand Up @@ -276,7 +276,9 @@ private static (string KeyName, string Resource, DateTimeOffset ExpirationTime)

if (string.IsNullOrEmpty(value))
{
throw new ArgumentException(Resources.InvalidSharedAccessSignature, nameof(sharedAccessSignature));
throw new ArgumentException(
Resources.InvalidSharedAccessSignature,
nameof(sharedAccessSignature));
}

// Compare the token against the known signature properties and capture the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
<Compile Include="$(AzureCoreSharedSources)ArrayBufferWriter.cs" Link="SharedSource\Azure.Core\ArrayBufferWriter.cs" />
<Compile Include="$(AzureCoreSharedSources)TaskExtensions.cs" Link="SharedSource\Azure.Core\TaskExtensions.cs" />
<Compile Include="$(AzureCoreSharedSources)ValueStopwatch.cs" Link="SharedSource\Azure.Core\ValueStopwatch.cs" />
<Compile Include="$(AzureCoreSharedSources)PageResponseEnumerator.cs" Link="SharedSource\Azure.Core\PageResponseEnumerator.cs" />

</ItemGroup>
<ItemGroup>
Expand Down
4 changes: 3 additions & 1 deletion sdk/servicebus/Azure.Messaging.ServiceBus/src/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ internal static class Constants

public const int RuleNameMaximumLength = 50;

public const int MaximumSqlFilterStatementLength = 1024;
public const int MaximumSqlRuleFilterStatementLength = 1024;

public const int MaximumSqlRuleActionStatementLength = 1024;

Expand Down Expand Up @@ -47,5 +47,7 @@ internal static class Constants
public static readonly DateTime EpochTime = new DateTime(1970, 1, 1, 0, 0, 0, 0, DateTimeKind.Utc);

public const int WellKnownPublicPortsLimit = 1023;

public const string DefaultScope = "https://servicebus.azure.net/.default";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,9 +110,9 @@ public static void AssertPositive(TimeSpan argumentValue, string argumentName)
///
/// <exception cref="ArgumentOutOfRangeException"><paramref name="argumentValue"/> is less than <paramref name="minimumValue"/>.</exception>
///
public static void AssertAtLeast(long argumentValue, long minimumValue, string argumentName)
public static void AssertAtLeast<T>(T argumentValue, T minimumValue, string argumentName)where T : notnull, IComparable<T>
{
if (argumentValue < minimumValue)
if (minimumValue.CompareTo(argumentValue) > 0)
{
throw new ArgumentOutOfRangeException(argumentName, $"The value supplied must be greater than or equal to {minimumValue}.");
}
Expand Down
Loading

0 comments on commit 9bd1293

Please sign in to comment.