From 33208a4c9e7b05c1a6992f174069b35a6dffe72c Mon Sep 17 00:00:00 2001 From: JoshLove-msft <54595583+JoshLove-msft@users.noreply.github.com> Date: Tue, 28 Sep 2021 13:25:39 -0700 Subject: [PATCH 1/2] Initial set of API changes for Schema Registry --- .../Azure.Data.SchemaRegistry/README.md | 10 +- ...zure.Data.SchemaRegistry.netstandard2.0.cs | 53 +++++---- .../src/Generated/Models/SchemaFormat.cs | 48 ++++++++ .../src/Generated/Models/SerializationType.cs | 48 -------- .../src/Generated/SchemaRestClient.cs | 12 +- .../src/SchemaProperties.cs | 10 +- .../src/SchemaRegistryClient.cs | 111 ++++++++---------- .../src/SerializationType.cs | 2 +- .../tests/Samples/Sample01_ReadmeSnippets.cs | 10 +- .../tests/SchemaRegistryClientLiveTests.cs | 43 ++----- .../README.md | 2 +- .../src/SchemaRegistryAvroObjectSerializer.cs | 57 ++++++--- .../tests/Samples/Sample01_ReadmeSnippets.cs | 4 +- 13 files changed, 196 insertions(+), 214 deletions(-) create mode 100644 sdk/schemaregistry/Azure.Data.SchemaRegistry/src/Generated/Models/SchemaFormat.cs delete mode 100644 sdk/schemaregistry/Azure.Data.SchemaRegistry/src/Generated/Models/SerializationType.cs diff --git a/sdk/schemaregistry/Azure.Data.SchemaRegistry/README.md b/sdk/schemaregistry/Azure.Data.SchemaRegistry/README.md index 29be37c806c0e..0ba50d0297e78 100644 --- a/sdk/schemaregistry/Azure.Data.SchemaRegistry/README.md +++ b/sdk/schemaregistry/Azure.Data.SchemaRegistry/README.md @@ -45,7 +45,7 @@ Once you have the Azure resource credentials and the Event Hubs namespace hostna // Create a new SchemaRegistry client using the default credential from Azure.Identity using environment variables previously set, // including AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, and AZURE_TENANT_ID. // For more information on Azure.Identity usage, see: https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/identity/Azure.Identity/README.md -var client = new SchemaRegistryClient(endpoint: endpoint, credential: new DefaultAzureCredential()); +var client = new SchemaRegistryClient(fullyQualifiedNamespace: endpoint, credential: new DefaultAzureCredential()); ``` ## Key concepts @@ -90,7 +90,7 @@ Register a schema to be stored in the Azure Schema Registry. When registering a ```C# Snippet:SchemaRegistryRegisterSchema string name = "employeeSample"; -SerializationType type = SerializationType.Avro; +SchemaFormat format = SchemaFormat.Avro; // Example schema's content string content = @" { @@ -103,7 +103,7 @@ string content = @" ] }"; -Response schemaProperties = client.RegisterSchema(groupName, name, content, type); +Response schemaProperties = client.RegisterSchema(groupName, name, content, format); ``` ### Retrieve a schema ID @@ -112,7 +112,7 @@ Retrieve a previously registered schema ID from the Azure Schema Registry. When ```C# Snippet:SchemaRegistryRetrieveSchemaId string name = "employeeSample"; -SerializationType type = SerializationType.Avro; +SchemaFormat format = SchemaFormat.Avro; // Example schema's content string content = @" { @@ -125,7 +125,7 @@ string content = @" ] }"; -SchemaProperties schemaProperties = client.GetSchemaProperties(groupName, name, content, type); +SchemaProperties schemaProperties = client.GetSchemaProperties(groupName, name, content, format); string schemaId = schemaProperties.Id; ``` diff --git a/sdk/schemaregistry/Azure.Data.SchemaRegistry/api/Azure.Data.SchemaRegistry.netstandard2.0.cs b/sdk/schemaregistry/Azure.Data.SchemaRegistry/api/Azure.Data.SchemaRegistry.netstandard2.0.cs index 3c9ab1871f4c7..210a1e419d61d 100644 --- a/sdk/schemaregistry/Azure.Data.SchemaRegistry/api/Azure.Data.SchemaRegistry.netstandard2.0.cs +++ b/sdk/schemaregistry/Azure.Data.SchemaRegistry/api/Azure.Data.SchemaRegistry.netstandard2.0.cs @@ -1,22 +1,40 @@ namespace Azure.Data.SchemaRegistry { + [System.Runtime.InteropServices.StructLayoutAttribute(System.Runtime.InteropServices.LayoutKind.Sequential)] + public readonly partial struct SchemaFormat : System.IEquatable + { + private readonly object _dummy; + private readonly int _dummyPrimitive; + public SchemaFormat(string value) { throw null; } + public static Azure.Data.SchemaRegistry.SchemaFormat Avro { get { throw null; } } + public bool Equals(Azure.Data.SchemaRegistry.SchemaFormat other) { throw null; } + [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] + public override bool Equals(object obj) { throw null; } + [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] + public override int GetHashCode() { throw null; } + public static bool operator ==(Azure.Data.SchemaRegistry.SchemaFormat left, Azure.Data.SchemaRegistry.SchemaFormat right) { throw null; } + public static implicit operator Azure.Data.SchemaRegistry.SchemaFormat (string value) { throw null; } + public static bool operator !=(Azure.Data.SchemaRegistry.SchemaFormat left, Azure.Data.SchemaRegistry.SchemaFormat right) { throw null; } + public override string ToString() { throw null; } + } public partial class SchemaProperties { internal SchemaProperties() { } + public Azure.Data.SchemaRegistry.SchemaFormat Format { get { throw null; } } public string Id { get { throw null; } } - public Azure.Data.SchemaRegistry.SerializationType Type { get { throw null; } } } public partial class SchemaRegistryClient { protected SchemaRegistryClient() { } - public SchemaRegistryClient(string endpoint, Azure.Core.TokenCredential credential) { } - public SchemaRegistryClient(string endpoint, Azure.Core.TokenCredential credential, Azure.Data.SchemaRegistry.SchemaRegistryClientOptions options) { } - public virtual Azure.Data.SchemaRegistry.SchemaRegistrySchema GetSchema(string schemaId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } - public virtual System.Threading.Tasks.ValueTask GetSchemaAsync(string schemaId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } - public virtual Azure.Data.SchemaRegistry.SchemaProperties GetSchemaProperties(string groupName, string name, string content, Azure.Data.SchemaRegistry.SerializationType serializationType, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } - public virtual System.Threading.Tasks.ValueTask GetSchemaPropertiesAsync(string groupName, string name, string content, Azure.Data.SchemaRegistry.SerializationType serializationType, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } - public virtual Azure.Response RegisterSchema(string groupName, string name, string content, Azure.Data.SchemaRegistry.SerializationType serializationType, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } - public virtual System.Threading.Tasks.Task> RegisterSchemaAsync(string groupName, string name, string content, Azure.Data.SchemaRegistry.SerializationType serializationType, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public SchemaRegistryClient(string fullyQualifiedNamespace, Azure.Core.TokenCredential credential) { } + public SchemaRegistryClient(string fullyQualifiedNamespace, Azure.Core.TokenCredential credential, Azure.Data.SchemaRegistry.SchemaRegistryClientOptions options) { } + public string FullyQualifiedNamespace { get { throw null; } } + public virtual Azure.Response GetSchema(string schemaId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public virtual System.Threading.Tasks.Task> GetSchemaAsync(string schemaId, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public virtual Azure.Response GetSchemaProperties(string groupName, string name, string schemaDefinition, Azure.Data.SchemaRegistry.SchemaFormat format, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public virtual System.Threading.Tasks.Task> GetSchemaPropertiesAsync(string groupName, string name, string schemaDefinition, Azure.Data.SchemaRegistry.SchemaFormat format, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public virtual Azure.Response RegisterSchema(string groupName, string name, string schemaDefinition, Azure.Data.SchemaRegistry.SchemaFormat format, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } + public virtual System.Threading.Tasks.Task> RegisterSchemaAsync(string groupName, string name, string schemaDefinition, Azure.Data.SchemaRegistry.SchemaFormat format, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; } } public partial class SchemaRegistryClientOptions : Azure.Core.ClientOptions { @@ -33,21 +51,4 @@ internal SchemaRegistrySchema() { } public string Content { get { throw null; } } public Azure.Data.SchemaRegistry.SchemaProperties Properties { get { throw null; } } } - [System.Runtime.InteropServices.StructLayoutAttribute(System.Runtime.InteropServices.LayoutKind.Sequential)] - public readonly partial struct SerializationType : System.IEquatable - { - private readonly object _dummy; - private readonly int _dummyPrimitive; - public SerializationType(string value) { throw null; } - public static Azure.Data.SchemaRegistry.SerializationType Avro { get { throw null; } } - public bool Equals(Azure.Data.SchemaRegistry.SerializationType other) { throw null; } - [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] - public override bool Equals(object obj) { throw null; } - [System.ComponentModel.EditorBrowsableAttribute(System.ComponentModel.EditorBrowsableState.Never)] - public override int GetHashCode() { throw null; } - public static bool operator ==(Azure.Data.SchemaRegistry.SerializationType left, Azure.Data.SchemaRegistry.SerializationType right) { throw null; } - public static implicit operator Azure.Data.SchemaRegistry.SerializationType (string value) { throw null; } - public static bool operator !=(Azure.Data.SchemaRegistry.SerializationType left, Azure.Data.SchemaRegistry.SerializationType right) { throw null; } - public override string ToString() { throw null; } - } } diff --git a/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/Generated/Models/SchemaFormat.cs b/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/Generated/Models/SchemaFormat.cs new file mode 100644 index 0000000000000..950a71a258259 --- /dev/null +++ b/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/Generated/Models/SchemaFormat.cs @@ -0,0 +1,48 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +// + +#nullable disable + +using System; +using System.ComponentModel; + +namespace Azure.Data.SchemaRegistry +{ + /// The SerializationType. + public readonly partial struct SchemaFormat : IEquatable + { + private readonly string _value; + + /// Initializes a new instance of . + /// is null. + public SchemaFormat(string value) + { + _value = value ?? throw new ArgumentNullException(nameof(value)); + } + + private const string AvroValue = "avro"; + + /// Avro Serialization schema type. + public static SchemaFormat Avro { get; } = new SchemaFormat(AvroValue); + /// Determines if two values are the same. + public static bool operator ==(SchemaFormat left, SchemaFormat right) => left.Equals(right); + /// Determines if two values are not the same. + public static bool operator !=(SchemaFormat left, SchemaFormat right) => !left.Equals(right); + /// Converts a string to a . + public static implicit operator SchemaFormat(string value) => new SchemaFormat(value); + + /// + [EditorBrowsable(EditorBrowsableState.Never)] + public override bool Equals(object obj) => obj is SchemaFormat other && Equals(other); + /// + public bool Equals(SchemaFormat other) => string.Equals(_value, other._value, StringComparison.InvariantCultureIgnoreCase); + + /// + [EditorBrowsable(EditorBrowsableState.Never)] + public override int GetHashCode() => _value?.GetHashCode() ?? 0; + /// + public override string ToString() => _value; + } +} diff --git a/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/Generated/Models/SerializationType.cs b/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/Generated/Models/SerializationType.cs deleted file mode 100644 index 4258c4021e852..0000000000000 --- a/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/Generated/Models/SerializationType.cs +++ /dev/null @@ -1,48 +0,0 @@ -// Copyright (c) Microsoft Corporation. All rights reserved. -// Licensed under the MIT License. - -// - -#nullable disable - -using System; -using System.ComponentModel; - -namespace Azure.Data.SchemaRegistry -{ - /// The SerializationType. - public readonly partial struct SerializationType : IEquatable - { - private readonly string _value; - - /// Initializes a new instance of . - /// is null. - public SerializationType(string value) - { - _value = value ?? throw new ArgumentNullException(nameof(value)); - } - - private const string AvroValue = "avro"; - - /// Avro Serialization schema type. - public static SerializationType Avro { get; } = new SerializationType(AvroValue); - /// Determines if two values are the same. - public static bool operator ==(SerializationType left, SerializationType right) => left.Equals(right); - /// Determines if two values are not the same. - public static bool operator !=(SerializationType left, SerializationType right) => !left.Equals(right); - /// Converts a string to a . - public static implicit operator SerializationType(string value) => new SerializationType(value); - - /// - [EditorBrowsable(EditorBrowsableState.Never)] - public override bool Equals(object obj) => obj is SerializationType other && Equals(other); - /// - public bool Equals(SerializationType other) => string.Equals(_value, other._value, StringComparison.InvariantCultureIgnoreCase); - - /// - [EditorBrowsable(EditorBrowsableState.Never)] - public override int GetHashCode() => _value?.GetHashCode() ?? 0; - /// - public override string ToString() => _value; - } -} diff --git a/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/Generated/SchemaRestClient.cs b/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/Generated/SchemaRestClient.cs index 64cde37bd4933..88896bf97534d 100644 --- a/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/Generated/SchemaRestClient.cs +++ b/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/Generated/SchemaRestClient.cs @@ -107,7 +107,7 @@ public ResponseWithHeaders GetById(string schemaId } } - internal HttpMessage CreateQueryIdByContentRequest(string groupName, string schemaName, SerializationType serializationType, string schemaContent) + internal HttpMessage CreateQueryIdByContentRequest(string groupName, string schemaName, SchemaFormat serializationType, string schemaContent) { var message = _pipeline.CreateMessage(); var request = message.Request; @@ -135,7 +135,7 @@ internal HttpMessage CreateQueryIdByContentRequest(string groupName, string sche /// String representation (UTF-8) of the registered schema. /// The cancellation token to use. /// , , or is null. - public async Task> QueryIdByContentAsync(string groupName, string schemaName, SerializationType serializationType, string schemaContent, CancellationToken cancellationToken = default) + public async Task> QueryIdByContentAsync(string groupName, string schemaName, SchemaFormat serializationType, string schemaContent, CancellationToken cancellationToken = default) { if (groupName == null) { @@ -174,7 +174,7 @@ public async Task> /// String representation (UTF-8) of the registered schema. /// The cancellation token to use. /// , , or is null. - public ResponseWithHeaders QueryIdByContent(string groupName, string schemaName, SerializationType serializationType, string schemaContent, CancellationToken cancellationToken = default) + public ResponseWithHeaders QueryIdByContent(string groupName, string schemaName, SchemaFormat serializationType, string schemaContent, CancellationToken cancellationToken = default) { if (groupName == null) { @@ -206,7 +206,7 @@ public ResponseWithHeaders QueryIdByCon } } - internal HttpMessage CreateRegisterRequest(string groupName, string schemaName, SerializationType serializationType, string schemaContent) + internal HttpMessage CreateRegisterRequest(string groupName, string schemaName, SchemaFormat serializationType, string schemaContent) { var message = _pipeline.CreateMessage(); var request = message.Request; @@ -237,7 +237,7 @@ internal HttpMessage CreateRegisterRequest(string groupName, string schemaName, /// String representation (UTF-8) of the schema being registered. /// The cancellation token to use. /// , , or is null. - public async Task> RegisterAsync(string groupName, string schemaName, SerializationType serializationType, string schemaContent, CancellationToken cancellationToken = default) + public async Task> RegisterAsync(string groupName, string schemaName, SchemaFormat serializationType, string schemaContent, CancellationToken cancellationToken = default) { if (groupName == null) { @@ -279,7 +279,7 @@ public async Task> Register /// String representation (UTF-8) of the schema being registered. /// The cancellation token to use. /// , , or is null. - public ResponseWithHeaders Register(string groupName, string schemaName, SerializationType serializationType, string schemaContent, CancellationToken cancellationToken = default) + public ResponseWithHeaders Register(string groupName, string schemaName, SchemaFormat serializationType, string schemaContent, CancellationToken cancellationToken = default) { if (groupName == null) { diff --git a/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/SchemaProperties.cs b/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/SchemaProperties.cs index 17dbbdface8e4..95e25ec0996e9 100644 --- a/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/SchemaProperties.cs +++ b/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/SchemaProperties.cs @@ -8,12 +8,12 @@ namespace Azure.Data.SchemaRegistry /// public class SchemaProperties { - internal SchemaProperties(string location, SerializationType xSchemaType, string xSchemaId, int? xSchemaVersion) + internal SchemaProperties(string location, SchemaFormat format, string schemaId, int? schemaVersion) { - Id = xSchemaId; + Id = schemaId; Location = location; - Type = xSchemaType; - Version = xSchemaVersion ?? 0; + Format = format; + Version = schemaVersion ?? 0; } /// @@ -29,7 +29,7 @@ internal SchemaProperties(string location, SerializationType xSchemaType, string /// /// Serialization type for the schema being stored. /// - public SerializationType Type { get; } + public SchemaFormat Format { get; } /// /// Version of the schema. diff --git a/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/SchemaRegistryClient.cs b/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/SchemaRegistryClient.cs index bd422167dd6db..92987cce386e8 100644 --- a/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/SchemaRegistryClient.cs +++ b/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/SchemaRegistryClient.cs @@ -21,23 +21,20 @@ public class SchemaRegistryClient internal SchemaRestClient RestClient { get; } private const string CredentialScope = "https://eventhubs.azure.net/.default"; - private readonly ConcurrentDictionary _schemaIdToSchemaMap = new(); - private readonly ConcurrentDictionary<(string, string, string, SerializationType), SchemaProperties> _contentToPropertiesMap = new(); - /// /// Initializes a new instance of the . /// - public SchemaRegistryClient(string endpoint, TokenCredential credential) : this(endpoint, credential, new SchemaRegistryClientOptions()) + public SchemaRegistryClient(string fullyQualifiedNamespace, TokenCredential credential) : this(fullyQualifiedNamespace, credential, new SchemaRegistryClientOptions()) { } /// /// Initializes a new instance of the . /// - public SchemaRegistryClient(string endpoint, TokenCredential credential, SchemaRegistryClientOptions options) : this( + public SchemaRegistryClient(string fullyQualifiedNamespace, TokenCredential credential, SchemaRegistryClientOptions options) : this( new ClientDiagnostics(options), HttpPipelineBuilder.Build(options, new BearerTokenAuthenticationPolicy(credential, CredentialScope)), - endpoint, + fullyQualifiedNamespace, options.Version) { } @@ -50,14 +47,20 @@ protected SchemaRegistryClient() /// Initializes a new instance of . /// The handler for diagnostic messaging in the client. /// The HTTP pipeline for sending and receiving REST requests and responses. - /// The endpoint URI. For example, myschemaregistry.servicebus.windows.net. + /// The fully qualified namespace. For example, myschemaregistry.servicebus.windows.net. /// The API version of the service. - internal SchemaRegistryClient(ClientDiagnostics clientDiagnostics, HttpPipeline pipeline, string endpoint, string apiVersion) + internal SchemaRegistryClient(ClientDiagnostics clientDiagnostics, HttpPipeline pipeline, string fullyQualifiedNamespace, string apiVersion) { - RestClient = new SchemaRestClient(clientDiagnostics, pipeline, endpoint, apiVersion); + RestClient = new SchemaRestClient(clientDiagnostics, pipeline, fullyQualifiedNamespace, apiVersion); _clientDiagnostics = clientDiagnostics; + FullyQualifiedNamespace = fullyQualifiedNamespace; } + /// + /// Gets the fully qualified namespace that the client is connecting to. + /// + public string FullyQualifiedNamespace { get; } + private const string RegisterSchemaScopeName = "SchemaRegistryClient.RegisterSchema"; private const string GetSchemaIdScopeName = "SchemaRegistryClient.GetSchemaId"; private const string GetSchemaScopeName = "SchemaRegistryClient.GetSchema"; @@ -67,17 +70,17 @@ internal SchemaRegistryClient(ClientDiagnostics clientDiagnostics, HttpPipeline /// /// The name of the SchemaRegistry group. /// The name of the schema. - /// The string representation of the schema's content. - /// The serialization format of the schema. + /// The string representation of the schema's content. + /// The serialization format of the schema. /// The cancellation token for the operation. /// The properties of the schema. public virtual async Task> RegisterSchemaAsync( string groupName, string name, - string content, - SerializationType serializationType, + string schemaDefinition, + SchemaFormat format, CancellationToken cancellationToken = default) => - await RegisterSchemaInternalAsync(groupName, name, content, serializationType, true, cancellationToken) + await RegisterSchemaInternalAsync(groupName, name, schemaDefinition, format, true, cancellationToken) .ConfigureAwait(false); /// @@ -87,24 +90,24 @@ await RegisterSchemaInternalAsync(groupName, name, content, serializationType, t /// /// The name of the SchemaRegistry group. /// The name of the schema. - /// The string representation of the schema's content. - /// The serialization format of the schema. + /// The string representation of the schema's content. + /// The serialization format of the schema. /// The cancellation token for the operation. /// The properties of the schema. public virtual Response RegisterSchema( string groupName, string name, - string content, - SerializationType serializationType, + string schemaDefinition, + SchemaFormat format, CancellationToken cancellationToken = default) => - RegisterSchemaInternalAsync(groupName, name, content, serializationType, false, cancellationToken) + RegisterSchemaInternalAsync(groupName, name, schemaDefinition, format, false, cancellationToken) .EnsureCompleted(); private async Task> RegisterSchemaInternalAsync( string groupName, string name, - string content, - SerializationType serializationType, + string schemaDefinition, + SchemaFormat format, bool async, CancellationToken cancellationToken = default) { @@ -115,15 +118,14 @@ private async Task> RegisterSchemaInternalAsync( ResponseWithHeaders response; if (async) { - response = await RestClient.RegisterAsync(groupName, name, serializationType, content, cancellationToken).ConfigureAwait(false); + response = await RestClient.RegisterAsync(groupName, name, format, schemaDefinition, cancellationToken).ConfigureAwait(false); } else { - response = RestClient.Register(groupName, name, serializationType, content, cancellationToken); + response = RestClient.Register(groupName, name, format, schemaDefinition, cancellationToken); } var properties = new SchemaProperties(response.Headers.Location, response.Headers.SerializationType, response.Headers.SchemaId, response.Headers.SchemaVersion); - _contentToPropertiesMap[(groupName, name, content, serializationType)] = properties; return Response.FromValue(properties, response); } @@ -139,19 +141,19 @@ private async Task> RegisterSchemaInternalAsync( /// /// The name of the SchemaRegistry group. /// The name of the schema. - /// The string representation of the schema's content. - /// The serialization format of the schema. + /// The string representation of the schema's content. + /// The serialization format of the schema. /// The cancellation token for the operation. /// The properties of the schema, including the schema ID provided by the service. #pragma warning disable AZC0015 // Unexpected client method return type. - public virtual async ValueTask GetSchemaPropertiesAsync( + public virtual async Task> GetSchemaPropertiesAsync( string groupName, string name, - string content, - SerializationType serializationType, + string schemaDefinition, + SchemaFormat format, CancellationToken cancellationToken = default) => #pragma warning restore AZC0015 // Unexpected client method return type. - await GetSchemaPropertiesInternalAsync(groupName, name, content, serializationType, true, cancellationToken) + await GetSchemaPropertiesInternalAsync(groupName, name, schemaDefinition, format, true, cancellationToken) .ConfigureAwait(false); /// @@ -159,35 +161,28 @@ await GetSchemaPropertiesInternalAsync(groupName, name, content, serializationTy /// /// The name of the SchemaRegistry group. /// The name of the schema. - /// The string representation of the schema's content. - /// The serialization format of the schema. + /// The string representation of the schema's content. + /// The serialization format of the schema. /// The cancellation token for the operation. /// The properties of the schema, including the schema ID provided by the service. #pragma warning disable AZC0015 // Unexpected client method return type. - public virtual SchemaProperties GetSchemaProperties( + public virtual Response GetSchemaProperties( string groupName, string name, - string content, - SerializationType serializationType, + string schemaDefinition, + SchemaFormat format, CancellationToken cancellationToken = default) => #pragma warning restore AZC0015 // Unexpected client method return type. - GetSchemaPropertiesInternalAsync(groupName, name, content, serializationType, false, cancellationToken).EnsureCompleted(); + GetSchemaPropertiesInternalAsync(groupName, name, schemaDefinition, format, false, cancellationToken).EnsureCompleted(); - private async ValueTask GetSchemaPropertiesInternalAsync( + private async Task> GetSchemaPropertiesInternalAsync( string groupName, string name, - string content, - SerializationType serializationType, + string schemaDefinition, + SchemaFormat format, bool async, CancellationToken cancellationToken) { - if (_contentToPropertiesMap.TryGetValue( - (groupName, name, content, serializationType), - out SchemaProperties schemaProperties)) - { - return schemaProperties; - } - using DiagnosticScope scope = _clientDiagnostics.CreateScope(GetSchemaIdScopeName); scope.Start(); try @@ -195,19 +190,16 @@ private async ValueTask GetSchemaPropertiesInternalAsync( ResponseWithHeaders response; if (async) { - response = await RestClient.QueryIdByContentAsync(groupName, name, serializationType, content, cancellationToken).ConfigureAwait(false); + response = await RestClient.QueryIdByContentAsync(groupName, name, format, schemaDefinition, cancellationToken).ConfigureAwait(false); } else { - response = RestClient.QueryIdByContent(groupName, name, serializationType, content, cancellationToken); + response = RestClient.QueryIdByContent(groupName, name, format, schemaDefinition, cancellationToken); } var properties = new SchemaProperties(response.Headers.Location, response.Headers.SerializationType, response.Headers.SchemaId, response.Headers.SchemaVersion); - var schema = new SchemaRegistrySchema(properties, content); - _schemaIdToSchemaMap[properties.Id] = schema; - _contentToPropertiesMap[(groupName, name, content, serializationType)] = properties; - return properties; + return Response.FromValue(properties, response); } catch (Exception e) { @@ -223,7 +215,7 @@ private async ValueTask GetSchemaPropertiesInternalAsync( /// The cancellation token for the operation. /// The properties of the schema, including the schema content provided by the service. #pragma warning disable AZC0015 // Unexpected client method return type. - public virtual async ValueTask GetSchemaAsync(string schemaId, CancellationToken cancellationToken = default) => + public virtual async Task> GetSchemaAsync(string schemaId, CancellationToken cancellationToken = default) => #pragma warning restore AZC0015 // Unexpected client method return type. await GetSchemaInternalAsync(schemaId, true, cancellationToken).ConfigureAwait(false); @@ -234,17 +226,12 @@ public virtual async ValueTask GetSchemaAsync(string schem /// The cancellation token for the operation. /// The properties of the schema, including the schema content provided by the service. #pragma warning disable AZC0015 // Unexpected client method return type. - public virtual SchemaRegistrySchema GetSchema(string schemaId, CancellationToken cancellationToken = default) => + public virtual Response GetSchema(string schemaId, CancellationToken cancellationToken = default) => #pragma warning restore AZC0015 // Unexpected client method return type. GetSchemaInternalAsync(schemaId, false, cancellationToken).EnsureCompleted(); - private async ValueTask GetSchemaInternalAsync(string schemaId, bool async, CancellationToken cancellationToken) + private async Task> GetSchemaInternalAsync(string schemaId, bool async, CancellationToken cancellationToken) { - if (_schemaIdToSchemaMap.TryGetValue(schemaId, out SchemaRegistrySchema schema)) - { - return schema; - } - using DiagnosticScope scope = _clientDiagnostics.CreateScope(GetSchemaScopeName); scope.Start(); try @@ -260,9 +247,9 @@ private async ValueTask GetSchemaInternalAsync(string sche } var properties = new SchemaProperties(response.Headers.Location, response.Headers.SerializationType, response.Headers.SchemaId, response.Headers.SchemaVersion); - _schemaIdToSchemaMap[schemaId] = new SchemaRegistrySchema(properties, response.Value); + var schema = new SchemaRegistrySchema(properties, response.Value); - return _schemaIdToSchemaMap[schemaId]; + return Response.FromValue(schema, response); } catch (Exception e) { diff --git a/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/SerializationType.cs b/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/SerializationType.cs index 8f8f7c11c52cb..5751a59b693cb 100644 --- a/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/SerializationType.cs +++ b/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/SerializationType.cs @@ -9,7 +9,7 @@ namespace Azure.Data.SchemaRegistry /// The format used when serializing SchemaRegistry messages. /// [CodeGenModel("SerializationType")] - public readonly partial struct SerializationType + public readonly partial struct SchemaFormat { } } diff --git a/sdk/schemaregistry/Azure.Data.SchemaRegistry/tests/Samples/Sample01_ReadmeSnippets.cs b/sdk/schemaregistry/Azure.Data.SchemaRegistry/tests/Samples/Sample01_ReadmeSnippets.cs index a14dd22775048..e5ddad224b134 100644 --- a/sdk/schemaregistry/Azure.Data.SchemaRegistry/tests/Samples/Sample01_ReadmeSnippets.cs +++ b/sdk/schemaregistry/Azure.Data.SchemaRegistry/tests/Samples/Sample01_ReadmeSnippets.cs @@ -25,7 +25,7 @@ public void CreateSchemaRegistryClient() // Create a new SchemaRegistry client using the default credential from Azure.Identity using environment variables previously set, // including AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, and AZURE_TENANT_ID. // For more information on Azure.Identity usage, see: https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/identity/Azure.Identity/README.md - var client = new SchemaRegistryClient(endpoint: endpoint, credential: new DefaultAzureCredential()); + var client = new SchemaRegistryClient(fullyQualifiedNamespace: endpoint, credential: new DefaultAzureCredential()); #endregion this.client = client; @@ -39,7 +39,7 @@ public void RegisterSchema() #region Snippet:SchemaRegistryRegisterSchema string name = "employeeSample"; - SerializationType type = SerializationType.Avro; + SchemaFormat format = SchemaFormat.Avro; // Example schema's content string content = @" { @@ -52,7 +52,7 @@ public void RegisterSchema() ] }"; - Response schemaProperties = client.RegisterSchema(groupName, name, content, type); + Response schemaProperties = client.RegisterSchema(groupName, name, content, format); #endregion Assert.NotNull(schemaProperties); @@ -68,7 +68,7 @@ public void RetrieveSchemaId() #region Snippet:SchemaRegistryRetrieveSchemaId string name = "employeeSample"; - SerializationType type = SerializationType.Avro; + SchemaFormat format = SchemaFormat.Avro; // Example schema's content string content = @" { @@ -81,7 +81,7 @@ public void RetrieveSchemaId() ] }"; - SchemaProperties schemaProperties = client.GetSchemaProperties(groupName, name, content, type); + SchemaProperties schemaProperties = client.GetSchemaProperties(groupName, name, content, format); string schemaId = schemaProperties.Id; #endregion diff --git a/sdk/schemaregistry/Azure.Data.SchemaRegistry/tests/SchemaRegistryClientLiveTests.cs b/sdk/schemaregistry/Azure.Data.SchemaRegistry/tests/SchemaRegistryClientLiveTests.cs index c6db9481dc608..5b57e1c20d213 100644 --- a/sdk/schemaregistry/Azure.Data.SchemaRegistry/tests/SchemaRegistryClientLiveTests.cs +++ b/sdk/schemaregistry/Azure.Data.SchemaRegistry/tests/SchemaRegistryClientLiveTests.cs @@ -31,24 +31,12 @@ public async Task CanRegisterSchema() var client = CreateClient(); var schemaName = "test1"; var groupName = TestEnvironment.SchemaRegistryGroup; - var schemaType = SerializationType.Avro; + var format = SchemaFormat.Avro; - SchemaProperties registerProperties = await client.RegisterSchemaAsync(groupName, schemaName, SchemaContent, schemaType); + SchemaProperties registerProperties = await client.RegisterSchemaAsync(groupName, schemaName, SchemaContent, format); AssertSchemaProperties(registerProperties); - // this should be a cached lookup - var schemaProperties = await client.GetSchemaPropertiesAsync(groupName, schemaName, SchemaContent, schemaType); - AssertSchemaProperties(schemaProperties); - AssertPropertiesAreEqual(registerProperties, schemaProperties); - - // this should be an uncached lookup - var client2 = CreateClient(); - schemaProperties = await client2.GetSchemaPropertiesAsync(groupName, schemaName, SchemaContent, schemaType); - AssertSchemaProperties(schemaProperties); - AssertPropertiesAreEqual(registerProperties, schemaProperties); - - // this should be a cached lookup - schemaProperties = await client2.GetSchemaPropertiesAsync(groupName, schemaName, SchemaContent, schemaType); + SchemaProperties schemaProperties = await client.GetSchemaPropertiesAsync(groupName, schemaName, SchemaContent, format); AssertSchemaProperties(schemaProperties); AssertPropertiesAreEqual(registerProperties, schemaProperties); } @@ -59,24 +47,12 @@ public async Task CanGetSchemaId() var client = CreateClient(); var schemaName = "test1"; var groupName = TestEnvironment.SchemaRegistryGroup; - var schemaType = SerializationType.Avro; + var format = SchemaFormat.Avro; - SchemaProperties registerProperties = await client.RegisterSchemaAsync(groupName, schemaName, SchemaContent, schemaType); + SchemaProperties registerProperties = await client.RegisterSchemaAsync(groupName, schemaName, SchemaContent, format); AssertSchemaProperties(registerProperties); - // this should be a cached lookup - var schemaProperties = await client.GetSchemaPropertiesAsync(groupName, schemaName, SchemaContent, schemaType); - AssertSchemaProperties(schemaProperties); - AssertPropertiesAreEqual(registerProperties, schemaProperties); - - // this should be an uncached lookup - var client2 = CreateClient(); - schemaProperties = await client2.GetSchemaPropertiesAsync(groupName, schemaName, SchemaContent, schemaType); - AssertSchemaProperties(schemaProperties); - AssertPropertiesAreEqual(registerProperties, schemaProperties); - - // this should be a cached lookup - schemaProperties = await client2.GetSchemaPropertiesAsync(groupName, schemaName, SchemaContent, schemaType); + SchemaProperties schemaProperties = await client.GetSchemaPropertiesAsync(groupName, schemaName, SchemaContent, format); AssertSchemaProperties(schemaProperties); AssertPropertiesAreEqual(registerProperties, schemaProperties); } @@ -87,13 +63,12 @@ public async Task CanGetSchema() var client = CreateClient(); var schemaName = "test1"; var groupName = TestEnvironment.SchemaRegistryGroup; - var schemaType = SerializationType.Avro; + var format = SchemaFormat.Avro; - var registerProperties = await client.RegisterSchemaAsync(groupName, schemaName, SchemaContent, schemaType); + var registerProperties = await client.RegisterSchemaAsync(groupName, schemaName, SchemaContent, format); AssertSchemaProperties(registerProperties); - // this should be an uncached lookup as we only cache the schema value when it comes back from the service - var schema = await client.GetSchemaAsync(registerProperties.Value.Id); + SchemaRegistrySchema schema = await client.GetSchemaAsync(registerProperties.Value.Id); AssertSchema(schema); AssertPropertiesAreEqual(registerProperties, schema.Properties); } diff --git a/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/README.md b/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/README.md index 576b1474a0da7..05117c0ea05ac 100644 --- a/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/README.md +++ b/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/README.md @@ -45,7 +45,7 @@ Once you have the Azure resource credentials and the Event Hubs namespace hostna // Create a new SchemaRegistry client using the default credential from Azure.Identity using environment variables previously set, // including AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, and AZURE_TENANT_ID. // For more information on Azure.Identity usage, see: https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/identity/Azure.Identity/README.md -var schemaRegistryClient = new SchemaRegistryClient(endpoint: endpoint, credential: new DefaultAzureCredential()); +var schemaRegistryClient = new SchemaRegistryClient(fullyQualifiedNamespace: fullyQualifiedNamespace, credential: new DefaultAzureCredential()); ``` ## Key concepts diff --git a/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/SchemaRegistryAvroObjectSerializer.cs b/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/SchemaRegistryAvroObjectSerializer.cs index d5f5b7e86e53c..d756ed4bc820f 100644 --- a/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/SchemaRegistryAvroObjectSerializer.cs +++ b/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/src/SchemaRegistryAvroObjectSerializer.cs @@ -9,6 +9,7 @@ using Azure.Core.Serialization; using Azure.Data.SchemaRegistry; using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; @@ -45,7 +46,8 @@ public SchemaRegistryAvroObjectSerializer(SchemaRegistryClient client, string gr private const int RecordFormatIndicatorLength = 4; private const int SchemaIdLength = 32; private const int PayloadStartPosition = RecordFormatIndicatorLength + SchemaIdLength; - private readonly Dictionary _cachedSchemas = new Dictionary(); + private readonly ConcurrentDictionary _idToSchemaMap = new(); + private readonly ConcurrentDictionary _schemaToIdMap = new(); private enum SupportedType { @@ -68,20 +70,36 @@ private static SupportedType GetSupportedTypeOrThrow(Type type) throw new ArgumentException($"Type {type.Name} is not supported for serialization operations."); } - private string GetSchemaId(Schema schema, CancellationToken cancellationToken) + private async Task GetSchemaIdAsync(Schema schema, bool async, CancellationToken cancellationToken) { - var schemaProperties = _options.AutoRegisterSchemas - ? _client.RegisterSchema(_groupName, schema.Fullname, schema.ToString(), SerializationType.Avro, cancellationToken) - : _client.GetSchemaProperties(_groupName, schema.Fullname, schema.ToString(), SerializationType.Avro, cancellationToken); - return schemaProperties.Id; - } + if (_schemaToIdMap.TryGetValue(schema, out string schemaId)) + { + return schemaId; + } - private async Task GetSchemaIdAsync(Schema schema, CancellationToken cancellationToken) - { - var schemaProperties = _options.AutoRegisterSchemas - ? (await _client.RegisterSchemaAsync(_groupName, schema.Fullname, schema.ToString(), SerializationType.Avro, cancellationToken).ConfigureAwait(false)).Value - : await _client.GetSchemaPropertiesAsync(_groupName, schema.Fullname, schema.ToString(), SerializationType.Avro, cancellationToken).ConfigureAwait(false); - return schemaProperties.Id; + SchemaProperties schemaProperties; + if (async) + { + schemaProperties = _options.AutoRegisterSchemas + ? (await _client + .RegisterSchemaAsync(_groupName, schema.Fullname, schema.ToString(), SchemaFormat.Avro, cancellationToken) + .ConfigureAwait(false)).Value + : await _client + .GetSchemaPropertiesAsync(_groupName, schema.Fullname, schema.ToString(), SchemaFormat.Avro, cancellationToken) + .ConfigureAwait(false); + } + else + { + schemaProperties = _options.AutoRegisterSchemas + ? _client.RegisterSchema(_groupName, schema.Fullname, schema.ToString(), SchemaFormat.Avro, cancellationToken) + : _client.GetSchemaProperties(_groupName, schema.Fullname, schema.ToString(), SchemaFormat.Avro, cancellationToken); + } + + string id = schemaProperties.Id; + + _schemaToIdMap.TryAdd(schema, id); + _idToSchemaMap.TryAdd(id, schema); + return id; } private static DatumWriter GetWriterAndSchema(object value, SupportedType supportedType, out Schema schema) @@ -124,11 +142,11 @@ private async ValueTask SerializeInternalAsync( string schemaId; if (async) { - schemaId = await GetSchemaIdAsync(schema, cancellationToken).ConfigureAwait(false); + schemaId = await GetSchemaIdAsync(schema, true, cancellationToken).ConfigureAwait(false); } else { - schemaId = GetSchemaId(schema, cancellationToken); + schemaId = GetSchemaIdAsync(schema, false, cancellationToken).EnsureCompleted(); } var binaryEncoder = new BinaryEncoder(stream); @@ -150,7 +168,7 @@ private async ValueTask SerializeInternalAsync( private async Task GetSchemaByIdAsync(string schemaId, bool async, CancellationToken cancellationToken) { - if (_cachedSchemas.TryGetValue(schemaId, out Schema cachedSchema)) + if (_idToSchemaMap.TryGetValue(schemaId, out Schema cachedSchema)) { return cachedSchema; } @@ -158,14 +176,15 @@ private async Task GetSchemaByIdAsync(string schemaId, bool async, Cance string schemaContent; if (async) { - schemaContent = (await _client.GetSchemaAsync(schemaId, cancellationToken).ConfigureAwait(false)).Content; + schemaContent = (await _client.GetSchemaAsync(schemaId, cancellationToken).ConfigureAwait(false)).Value.Content; } else { - schemaContent = _client.GetSchema(schemaId, cancellationToken).Content; + schemaContent = _client.GetSchema(schemaId, cancellationToken).Value.Content; } var schema = Schema.Parse(schemaContent); - _cachedSchemas.Add(schemaId, schema); + _idToSchemaMap.TryAdd(schemaId, schema); + _schemaToIdMap.TryAdd(schema, schemaId); return schema; } diff --git a/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/tests/Samples/Sample01_ReadmeSnippets.cs b/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/tests/Samples/Sample01_ReadmeSnippets.cs index 2e14b6b279c4c..e2fcece238f5b 100644 --- a/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/tests/Samples/Sample01_ReadmeSnippets.cs +++ b/sdk/schemaregistry/Microsoft.Azure.Data.SchemaRegistry.ApacheAvro/tests/Samples/Sample01_ReadmeSnippets.cs @@ -21,13 +21,13 @@ public class Sample01_ReadmeSnippets : SamplesBase Date: Tue, 28 Sep 2021 14:15:08 -0700 Subject: [PATCH 2/2] PR fb --- sdk/schemaregistry/Azure.Data.SchemaRegistry/README.md | 2 +- .../src/{SerializationType.cs => SchemaFormat.cs} | 0 .../tests/Samples/Sample01_ReadmeSnippets.cs | 4 ++-- 3 files changed, 3 insertions(+), 3 deletions(-) rename sdk/schemaregistry/Azure.Data.SchemaRegistry/src/{SerializationType.cs => SchemaFormat.cs} (100%) diff --git a/sdk/schemaregistry/Azure.Data.SchemaRegistry/README.md b/sdk/schemaregistry/Azure.Data.SchemaRegistry/README.md index 0ba50d0297e78..38f967d5c1387 100644 --- a/sdk/schemaregistry/Azure.Data.SchemaRegistry/README.md +++ b/sdk/schemaregistry/Azure.Data.SchemaRegistry/README.md @@ -45,7 +45,7 @@ Once you have the Azure resource credentials and the Event Hubs namespace hostna // Create a new SchemaRegistry client using the default credential from Azure.Identity using environment variables previously set, // including AZURE_CLIENT_ID, AZURE_CLIENT_SECRET, and AZURE_TENANT_ID. // For more information on Azure.Identity usage, see: https://github.com/Azure/azure-sdk-for-net/blob/main/sdk/identity/Azure.Identity/README.md -var client = new SchemaRegistryClient(fullyQualifiedNamespace: endpoint, credential: new DefaultAzureCredential()); +var client = new SchemaRegistryClient(fullyQualifiedNamespace: fullyQualifiedNamespace, credential: new DefaultAzureCredential()); ``` ## Key concepts diff --git a/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/SerializationType.cs b/sdk/schemaregistry/Azure.Data.SchemaRegistry/src/SchemaFormat.cs similarity index 100% rename from sdk/schemaregistry/Azure.Data.SchemaRegistry/src/SerializationType.cs rename to sdk/schemaregistry/Azure.Data.SchemaRegistry/src/SchemaFormat.cs diff --git a/sdk/schemaregistry/Azure.Data.SchemaRegistry/tests/Samples/Sample01_ReadmeSnippets.cs b/sdk/schemaregistry/Azure.Data.SchemaRegistry/tests/Samples/Sample01_ReadmeSnippets.cs index e5ddad224b134..d14bdad30f1d6 100644 --- a/sdk/schemaregistry/Azure.Data.SchemaRegistry/tests/Samples/Sample01_ReadmeSnippets.cs +++ b/sdk/schemaregistry/Azure.Data.SchemaRegistry/tests/Samples/Sample01_ReadmeSnippets.cs @@ -19,13 +19,13 @@ public class Sample01_ReadmeSnippets : SamplesBase