Skip to content

Commit

Permalink
Fix schedule concurrency errors (#2277)
Browse files Browse the repository at this point in the history
  • Loading branch information
Fosol authored Sep 11, 2024
1 parent 58b19cb commit c96b8ca
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 13 deletions.
45 changes: 43 additions & 2 deletions libs/net/services/Helpers/ApiService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,41 @@ protected async Task<T> RetryRequestAsync<T>(Func<Task<T>> callbackDelegate)
return await RetryRequestAsync<T>(callbackDelegate);
}
}

/// <summary>
/// Keep trying a request if the failure is caused by an optimistic concurrency error.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="callbackDelegate"></param>
/// <returns></returns>
public async Task<T> HandleConcurrencyAsync<T>(Func<Task<T>> callbackDelegate)
{
// Keep trying to update the record and handle concurrency errors.
while (true)
{
try
{
return await callbackDelegate();
}
catch (HttpClientRequestException ex)
{
// If it's a concurrency error, keep trying. Otherwise throw the error.
this.Logger.LogError(ex, "Failed to complete request. Determining if this is a concurrency error.");
var data = ex.Data["Body"] as string;
if (!String.IsNullOrWhiteSpace(data))
{
var json = JsonSerializer.Deserialize<API.Models.ErrorResponseModel>(data, _serializerOptions);
if (json != null && json.Type == nameof(Microsoft.EntityFrameworkCore.DbUpdateConcurrencyException))
{
// A concurrency error can be resolved by loading the latest and reapplying the values.
continue;
}
}
// It wasn't a concurrency error, throw as a real failure.
throw;
}
}
}
#endregion

#region Kafka Methods
Expand Down Expand Up @@ -826,13 +861,19 @@ protected async Task<T> RetryRequestAsync<T>(Func<Task<T>> callbackDelegate)

/// <summary>
/// Make a request to the API to update the event schedule for the specified 'model'.
/// The most common issue with this endpoint is concurrency errors. Retrying won't fix that, so use the HandleConcurrency function and set retry = false.
/// </summary>
/// <param name="model"></param>
/// <param name="retry"></param>
/// <returns></returns>
public async Task<API.Areas.Services.Models.EventSchedule.EventScheduleModel?> UpdateEventScheduleAsync(API.Areas.Services.Models.EventSchedule.EventScheduleModel model)
public async Task<API.Areas.Services.Models.EventSchedule.EventScheduleModel?> UpdateEventScheduleAsync(API.Areas.Services.Models.EventSchedule.EventScheduleModel model, bool retry = true)
{
var url = this.Options.ApiUrl.Append($"services/events/schedules/{model.Id}");
return await RetryRequestAsync(async () => await this.OpenClient.PutAsync<API.Areas.Services.Models.EventSchedule.EventScheduleModel?>(url, JsonContent.Create(model)));

if (retry)
return await RetryRequestAsync(async () => await this.OpenClient.PutAsync<API.Areas.Services.Models.EventSchedule.EventScheduleModel?>(url, JsonContent.Create(model)));
else
return await this.OpenClient.PutAsync<API.Areas.Services.Models.EventSchedule.EventScheduleModel?>(url, JsonContent.Create(model));
}
#endregion

Expand Down
13 changes: 11 additions & 2 deletions libs/net/services/Helpers/IApiService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,15 @@ public interface IApiService
#endregion

#region Helper Methods
public Task<T> HandleRequestFailure<T>(Func<Task<T>> callbackDelegate, bool ignoreError, T defaultResponse);
Task<T> HandleRequestFailure<T>(Func<Task<T>> callbackDelegate, bool ignoreError, T defaultResponse);

/// <summary>
/// Keep trying a request if the failure is caused by an optimistic concurrency error.
/// </summary>
/// <typeparam name="T"></typeparam>
/// <param name="callbackDelegate"></param>
/// <returns></returns>
Task<T> HandleConcurrencyAsync<T>(Func<Task<T>> callbackDelegate);
#endregion

#region Kafka
Expand Down Expand Up @@ -436,8 +444,9 @@ public interface IApiService
/// Make a request to the API to update the event schedule for the specified 'model'.
/// </summary>
/// <param name="model"></param>
/// <param name="retry"></param>
/// <returns></returns>
Task<API.Areas.Services.Models.EventSchedule.EventScheduleModel?> UpdateEventScheduleAsync(API.Areas.Services.Models.EventSchedule.EventScheduleModel model);
Task<API.Areas.Services.Models.EventSchedule.EventScheduleModel?> UpdateEventScheduleAsync(API.Areas.Services.Models.EventSchedule.EventScheduleModel model, bool retry = true);
#endregion

#region AV Overview
Expand Down
11 changes: 7 additions & 4 deletions services/net/event-handler/EventHandlerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,13 @@ private async Task ProcessEventHandlerAsync(ConsumeResult<string, EventScheduleR
await this.Api.RemoveContentFromFolder(eventSchedule.FolderId.Value);
this.Logger.LogInformation("Event schedule cleaned folder. Key: {key}, Event ID: {eventId}", result.Message.Key, request.EventScheduleId);

// Need to fetch the latest because it could have been updated recently.
eventSchedule = await this.Api.GetEventScheduleAsync(request.EventScheduleId) ?? throw new NoContentException("Event schedule no longer exists");
eventSchedule.LastRanOn = DateTime.UtcNow;
await this.Api.UpdateEventScheduleAsync(eventSchedule);
await this.Api.HandleConcurrencyAsync<API.Areas.Services.Models.EventSchedule.EventScheduleModel?>(async () =>
{
// Need to fetch the latest because it could have been updated recently.
eventSchedule = await this.Api.GetEventScheduleAsync(request.EventScheduleId) ?? throw new NoContentException($"Event schedule {eventSchedule.Id}:{eventSchedule.Name} does not exist.");
eventSchedule.LastRanOn = DateTime.UtcNow;
return await this.Api.UpdateEventScheduleAsync(eventSchedule, false);
});
}
else
{
Expand Down
9 changes: 6 additions & 3 deletions services/net/reporting/ReportingManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -380,9 +380,12 @@ private async Task ProcessReportAsync(ConsumeResult<string, ReportRequestModel>
// If the request originated from the scheduler service, update the last ran one.
if (request.EventScheduleId.HasValue)
{
var scheduledEvent = await this.Api.GetEventScheduleAsync(request.EventScheduleId.Value) ?? throw new NoContentException($"Event schedule '{request.EventScheduleId}' does not exist.");
scheduledEvent.LastRanOn = DateTime.UtcNow;
await this.Api.UpdateEventScheduleAsync(scheduledEvent);
await this.Api.HandleConcurrencyAsync<API.Areas.Services.Models.EventSchedule.EventScheduleModel?>(async () =>
{
var eventSchedule = await this.Api.GetEventScheduleAsync(request.EventScheduleId.Value) ?? throw new NoContentException($"Event schedule {request.EventScheduleId.Value} does not exist.");
eventSchedule.LastRanOn = DateTime.UtcNow;
return await this.Api.UpdateEventScheduleAsync(eventSchedule, false);
});
}
}

Expand Down
8 changes: 6 additions & 2 deletions services/net/scheduler/SchedulerManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,12 @@ public override async Task RunAsync()
else
await GenerateEventScheduleRequestAsync(scheduledEvent);

scheduledEvent.RequestSentOn = DateTime.UtcNow;
await this.Api.UpdateEventScheduleAsync(scheduledEvent);
await this.Api.HandleConcurrencyAsync<EventScheduleModel?>(async () =>
{
scheduledEvent = await this.Api.GetEventScheduleAsync(scheduledEvent.Id) ?? throw new NoContentException($"Event schedule {scheduledEvent.Id}:{scheduledEvent.Name} does not exist.");
scheduledEvent.RequestSentOn = DateTime.UtcNow;
return await this.Api.UpdateEventScheduleAsync(scheduledEvent, false);
});
}
}
}
Expand Down

0 comments on commit c96b8ca

Please sign in to comment.