Skip to content

Commit

Permalink
Improve batch error handling (#16121)
Browse files Browse the repository at this point in the history
* Improve batch error handling
  • Loading branch information
christothes authored Oct 21, 2020
1 parent 1bd5b91 commit 834b0f9
Show file tree
Hide file tree
Showing 9 changed files with 807 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@ protected TableTransactionalBatch() { }
public virtual void AddEntities<T>(System.Collections.Generic.IEnumerable<T> entities) where T : class, Azure.Data.Tables.ITableEntity, new() { }
public virtual void AddEntity<T>(T entity) where T : class, Azure.Data.Tables.ITableEntity, new() { }
public virtual void DeleteEntity(string partitionKey, string rowKey, Azure.ETag ifMatch = default(Azure.ETag)) { }
public virtual Azure.Response<Azure.Data.Tables.Models.TableBatchResponse> SubmitBatch(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual System.Threading.Tasks.Task<Azure.Response<Azure.Data.Tables.Models.TableBatchResponse>> SubmitBatchAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual Azure.Response<Azure.Data.Tables.Models.TableBatchResponse> SubmitBatch<T>(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) { throw null; }
public virtual void UpdateEntity<T>(T entity, Azure.ETag ifMatch, Azure.Data.Tables.TableUpdateMode mode = Azure.Data.Tables.TableUpdateMode.Merge) where T : class, Azure.Data.Tables.ITableEntity, new() { }
}
public enum TableUpdateMode
Expand Down
64 changes: 56 additions & 8 deletions sdk/tables/Azure.Data.Tables/src/TableRestClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,13 @@
#nullable disable

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
Expand All @@ -19,6 +24,7 @@ internal partial class TableRestClient
internal ClientDiagnostics clientDiagnostics => _clientDiagnostics;
internal string endpoint => url;
internal string clientVersion => version;
private static readonly Regex s_entityIndexRegex = new Regex(@"""value"":""(?<index>[\d]+):", RegexOptions.Compiled);

internal HttpMessage CreateBatchRequest(MultipartContent content, string requestId, ResponseFormat? responsePreference)
{
Expand Down Expand Up @@ -51,11 +57,12 @@ internal static MultipartContent CreateBatchContent(Guid batchGuid)
return new MultipartContent("mixed", $"batch_{guid}");
}

/// <summary> Insert entity in a table. </summary>
/// <param name="message"></param>
/// <summary> Submits a batch operation to a table. </summary>
/// <param name="message">The message to send.</param>
/// <param name="messageList">TRhe ordered list of messages and entities.</param>
/// <param name="cancellationToken"> The cancellation token to use. </param>
/// <exception cref="ArgumentNullException"> <paramref name="message"/> is null. </exception>
public async Task<Response<List<Response>>> SendBatchRequestAsync(HttpMessage message, CancellationToken cancellationToken = default)
public async Task<Response<List<Response>>> SendBatchRequestAsync(HttpMessage message, List<(ITableEntity Entity, HttpMessage HttpMessage)> messageList, CancellationToken cancellationToken = default)
{
if (message == null)
{
Expand All @@ -76,7 +83,24 @@ public async Task<Response<List<Response>>> SendBatchRequestAsync(HttpMessage me

if (responses.Length == 1 && responses.Any(r => r.Status >= 400))
{
throw await _clientDiagnostics.CreateRequestFailedExceptionAsync(responses[0]).ConfigureAwait(false);
// Batch error messages should be formatted as follows:
// "0:<some error message>"
// where the number prefix is the index of the sub request that failed.
var ex = await _clientDiagnostics.CreateRequestFailedExceptionAsync(responses[0]).ConfigureAwait(false);

//Get the failed index
var match = s_entityIndexRegex.Match(ex.Message);

if (match.Success && int.TryParse(match.Groups["index"].Value, out int failedEntityIndex))
{
// create a new exception with the additional info populated.
var appendedMessage = AppendEntityInfoToMessage(ex.Message, messageList[failedEntityIndex].Entity);
throw new RequestFailedException(ex.Status, appendedMessage, ex.ErrorCode, ex.InnerException);
}
else
{
throw ex;
}
}

return Response.FromValue(responses.ToList(), message.Response);
Expand All @@ -86,11 +110,12 @@ public async Task<Response<List<Response>>> SendBatchRequestAsync(HttpMessage me
}
}

/// <summary> Insert entity in a table. </summary>
/// <param name="message"></param>
/// <summary> Submits a batch operation to a table. </summary>
/// <param name="message">The message to send.</param>
/// <param name="messageList">TRhe ordered list of messages and entities.</param>
/// <param name="cancellationToken"> The cancellation token to use. </param>
/// <exception cref="ArgumentNullException"> <paramref name="message"/> is null. </exception>
public Response<List<Response>> SendBatchRequest(HttpMessage message, CancellationToken cancellationToken = default)
public Response<List<Response>> SendBatchRequest(HttpMessage message, List<(ITableEntity Entity, HttpMessage HttpMessage)> messageList, CancellationToken cancellationToken = default)
{
if (message == null)
{
Expand All @@ -111,7 +136,25 @@ public Response<List<Response>> SendBatchRequest(HttpMessage message, Cancellati

if (responses.Length == 1 && responses.Any(r => r.Status >= 400))
{
throw _clientDiagnostics.CreateRequestFailedException(responses[0]);
// Batch error messages should be formatted as follows:
// "0:<some error message>"
// where the number prefix is the index of the sub request that failed.
var ex = _clientDiagnostics.CreateRequestFailedException(responses[0]);

//Get the failed index
var match = s_entityIndexRegex.Match(ex.Message);

if (match.Success && int.TryParse(match.Groups["index"].Value, out int failedEntityIndex))
{
// create a new exception with the additional info populated.
// reset the response stream position so we can read it again
var appendedMessage = AppendEntityInfoToMessage(ex.Message, messageList[failedEntityIndex].Entity);
throw new RequestFailedException(ex.Status, appendedMessage, ex.ErrorCode, ex.InnerException);
}
else
{
throw ex;
}
}

return Response.FromValue(responses.ToList(), message.Response);
Expand All @@ -120,5 +163,10 @@ public Response<List<Response>> SendBatchRequest(HttpMessage message, Cancellati
throw _clientDiagnostics.CreateRequestFailedException(message.Response);
}
}

private static string AppendEntityInfoToMessage(string messsage, ITableEntity entity)
{
return messsage += $"\n\nRowKey={entity.RowKey}";
}
}
}
80 changes: 55 additions & 25 deletions sdk/tables/Azure.Data.Tables/src/TableTransactionalBatch.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public partial class TableTransactionalBatch
internal Guid _batchGuid = default;
internal Guid _changesetGuid = default;
internal ConcurrentDictionary<string, (HttpMessage Message, RequestType RequestType)> _requestLookup = new ConcurrentDictionary<string, (HttpMessage Message, RequestType RequestType)>();
internal ConcurrentQueue<(string RowKey, HttpMessage HttpMessage)> _requestMessages = new ConcurrentQueue<(string RowKey, HttpMessage HttpMessage)>();
internal ConcurrentQueue<(ITableEntity Entity, HttpMessage HttpMessage)> _requestMessages = new ConcurrentQueue<(ITableEntity Entity, HttpMessage HttpMessage)>();

/// <summary>
/// Initializes a new instance of the <see cref="TableTransactionalBatch"/> class.
Expand Down Expand Up @@ -78,10 +78,10 @@ internal void SetBatchGuids(Guid batchGuid, Guid changesetGuid)
}

/// <summary>
/// Add an AddEntity requests to the batch.
/// Adds a Table Entity of type <typeparamref name="T"/> to the batch.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="entity"></param>
/// <typeparam name="T">A custom model type that implements <see cref="ITableEntity" /> or an instance of <see cref="TableEntity"/>.</typeparam>
/// <param name="entity">The entity to add.</param>
public virtual void AddEntity<T>(T entity) where T : class, ITableEntity, new()
{
var message = _batchOperations.CreateInsertEntityRequest(
Expand All @@ -92,13 +92,26 @@ internal void SetBatchGuids(Guid batchGuid, Guid changesetGuid)
tableEntityProperties: entity.ToOdataAnnotatedDictionary(),
queryOptions: new QueryOptions() { Format = _format });

AddMessage(entity.RowKey, message, RequestType.Create);
AddMessage(entity, message, RequestType.Create);
}

/// <summary>
/// Add an UpdateEntity request to the batch.
/// Adds an UpdateEntity request to the batch which
/// updates the specified table entity of type <typeparamref name="T"/>, if it exists.
/// If the <paramref name="mode"/> is <see cref="TableUpdateMode.Replace"/>, the entity will be replaced.
/// If the <paramref name="mode"/> is <see cref="TableUpdateMode.Merge"/>, the property values present in the <paramref name="entity"/> will be merged with the existing entity.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <remarks>
/// See <see cref="TableUpdateMode"/> for more information about the behavior of the <paramref name="mode"/>.
/// </remarks>
/// <typeparam name="T">A custom model type that implements <see cref="ITableEntity" /> or an instance of <see cref="TableEntity"/>.</typeparam>
/// <param name="entity">The entity to update.</param>
/// <param name="ifMatch">
/// The If-Match value to be used for optimistic concurrency.
/// If <see cref="ETag.All"/> is specified, the operation will be executed unconditionally.
/// If the <see cref="ITableEntity.ETag"/> value is specified, the operation will fail with a status of 412 (Precondition Failed) if the <see cref="ETag"/> value of the entity in the table does not match.
/// </param>
/// <param name="mode">Determines the behavior of the Update operation.</param>
public virtual void UpdateEntity<T>(T entity, ETag ifMatch, TableUpdateMode mode = TableUpdateMode.Merge) where T : class, ITableEntity, new()
{
var message = _batchOperations.CreateUpdateEntityRequest(
Expand All @@ -111,15 +124,20 @@ internal void SetBatchGuids(Guid batchGuid, Guid changesetGuid)
tableEntityProperties: entity.ToOdataAnnotatedDictionary(),
queryOptions: new QueryOptions() { Format = _format });

AddMessage(entity.RowKey, message, RequestType.Update);
AddMessage(entity, message, RequestType.Update);
}

/// <summary>
/// Add a DeleteEntity request to the batch.
/// </summary>
/// <param name="partitionKey"></param>
/// <param name="rowKey"></param>
/// <param name="ifMatch"></param>
/// <param name="partitionKey">The partition key of the entity to delete.</param>
/// <param name="rowKey">The row key of the entity to delete.</param>
/// <param name="ifMatch">
/// The If-Match value to be used for optimistic concurrency.
/// If <see cref="ETag.All"/> is specified, the operation will be executed unconditionally.
/// If the <see cref="ITableEntity.ETag"/> value is specified, the operation will fail with a status of 412 (Precondition Failed) if the <see cref="ETag"/> value of the entity in the table does not match.
/// The default is to delete unconditionally.
/// </param>
public virtual void DeleteEntity(string partitionKey, string rowKey, ETag ifMatch = default)
{
var message = _batchOperations.CreateDeleteEntityRequest(
Expand All @@ -131,14 +149,16 @@ public virtual void DeleteEntity(string partitionKey, string rowKey, ETag ifMatc
null,
queryOptions: new QueryOptions() { Format = _format });

AddMessage(rowKey, message, RequestType.Delete);
AddMessage(new TableEntity(partitionKey, rowKey) { ETag = ifMatch }, message, RequestType.Delete);
}

/// <summary>
/// Placeholder for batch operations. This is just being used for testing.
/// Submits the batch transaction to the service for execution.
/// The sub-operations contained in the batch will either succeed or fail together as a transaction.
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
/// <param name="cancellationToken">A <see cref="CancellationToken"/> controlling the request lifetime.</param>
/// <returns><see cref="Response{T}"/> containing a <see cref="TableBatchResponse"/>.</returns>
/// <exception cref="RequestFailedException"/> if the batch transaction fails./>
public virtual async Task<Response<TableBatchResponse>> SubmitBatchAsync(CancellationToken cancellationToken = default)
{
var messageList = BuildOrderedBatchRequests();
Expand All @@ -148,7 +168,7 @@ public virtual async Task<Response<TableBatchResponse>> SubmitBatchAsync(Cancell
try
{
var request = _tableOperations.CreateBatchRequest(_batch, null, null);
var response = await _tableOperations.SendBatchRequestAsync(request, cancellationToken).ConfigureAwait(false);
var response = await _tableOperations.SendBatchRequestAsync(request, messageList, cancellationToken).ConfigureAwait(false);

for (int i = 0; i < response.Value.Count; i++)
{
Expand All @@ -165,11 +185,13 @@ public virtual async Task<Response<TableBatchResponse>> SubmitBatchAsync(Cancell
}

/// <summary>
/// Placeholder for batch operations. This is just being used for testing.
/// Submits the batch transaction to the service for execution.
/// The sub-operations contained in the batch will either succeed or fail together as a transaction.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <returns></returns>
public virtual Response<TableBatchResponse> SubmitBatch<T>(CancellationToken cancellationToken = default)
/// <param name="cancellationToken">A <see cref="CancellationToken"/> controlling the request lifetime.</param>
/// <returns><see cref="Response{T}"/> containing a <see cref="TableBatchResponse"/>.</returns>
/// <exception cref="RequestFailedException"/> if the batch transaction fails./>
public virtual Response<TableBatchResponse> SubmitBatch(CancellationToken cancellationToken = default)
{
var messageList = BuildOrderedBatchRequests();

Expand All @@ -178,7 +200,7 @@ public virtual Response<TableBatchResponse> SubmitBatch<T>(CancellationToken can
try
{
var request = _tableOperations.CreateBatchRequest(_batch, null, null);
var response = _tableOperations.SendBatchRequest(request, cancellationToken);
var response = _tableOperations.SendBatchRequest(request, messageList, cancellationToken);

for (int i = 0; i < response.Value.Count; i++)
{
Expand All @@ -194,17 +216,25 @@ public virtual Response<TableBatchResponse> SubmitBatch<T>(CancellationToken can
}
}

private bool AddMessage(string rowKey, HttpMessage message, RequestType requestType)
/// <summary>
/// Adds a message to the batch to preserve sub-request ordering.
/// </summary>
/// <returns><c>true</c>if the add succeeded, else <c>false</c>.</returns>
private bool AddMessage(ITableEntity entity, HttpMessage message, RequestType requestType)
{
if (_requestLookup.TryAdd(rowKey, (message, requestType)))
if (_requestLookup.TryAdd(entity.RowKey, (message, requestType)))
{
_requestMessages.Enqueue((rowKey, message));
_requestMessages.Enqueue((entity, message));
return true;
}
throw new InvalidOperationException("Each entity can only be represented once per batch.");
}

private List<(string RowKey, HttpMessage HttpMessage)> BuildOrderedBatchRequests()
/// <summary>
/// Builds an ordered list of <see cref="HttpMessage"/>s containing the batch sub-requests.
/// </summary>
/// <returns></returns>
private List<(ITableEntity entity, HttpMessage HttpMessage)> BuildOrderedBatchRequests()
{
var orderedList = _requestMessages.ToList();
foreach (var item in orderedList)
Expand Down
Loading

0 comments on commit 834b0f9

Please sign in to comment.