Skip to content

Commit

Permalink
Merge pull request #19 from ledjon-behluli/stream-loading-space
Browse files Browse the repository at this point in the history
Startup loading behavior batch vs al
  • Loading branch information
ledjon-behluli authored Sep 10, 2023
2 parents 121764c + 1a8b140 commit d9211f8
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 14 deletions.
25 changes: 17 additions & 8 deletions src/OrleanSpaces/Agents/BaseAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -41,16 +41,11 @@ public BaseAgent(

async ValueTask ISpaceRouter<TTuple, TTemplate>.RouteDirector(IStoreDirector<TTuple> director)
{
this.director = director;
if (options.LoadSpaceContentsUponStartup)
{
var tuples = await director.GetAll();
foreach (var tuple in tuples)
{
this.tuples.Add(tuple);
}
await ReloadAsync();
}

this.director = director;
}

async ValueTask ISpaceRouter<TTuple, TTemplate>.RouteAction(TupleAction<TTuple> action)
Expand Down Expand Up @@ -194,7 +189,21 @@ public async IAsyncEnumerable<TTuple> EnumerateAsync(TTemplate template)
}
}

public async Task ReloadAsync() => tuples = await director.GetAll();
public async Task ReloadAsync()
{
if (options.LoadingStrategy == SpaceLoadingStrategy.Sequential)
{
tuples = tuples.Clear();
await foreach (var items in director.GetBatch())
{
tuples = tuples.AddRange(items);
}
}
else
{
tuples = await director.GetAllBatches();
}
}

public async Task ClearAsync()
{
Expand Down
21 changes: 17 additions & 4 deletions src/OrleanSpaces/Agents/SpaceAgent.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,11 @@ public SpaceAgent(

async ValueTask ISpaceRouter<SpaceTuple, SpaceTemplate>.RouteDirector(IStoreDirector<SpaceTuple> director)
{
this.director = director;
if (options.LoadSpaceContentsUponStartup)
{
tuples = await director.GetAll();
await ReloadAsync();
}

this.director = director;
}

async ValueTask ISpaceRouter<SpaceTuple, SpaceTemplate>.RouteAction(TupleAction<SpaceTuple> action)
Expand Down Expand Up @@ -187,7 +186,21 @@ public async IAsyncEnumerable<SpaceTuple> EnumerateAsync(SpaceTemplate template)
}
}

public async Task ReloadAsync() => tuples = await director.GetAll();
public async Task ReloadAsync()
{
if (options.LoadingStrategy == SpaceLoadingStrategy.Sequential)
{
tuples = tuples.Clear();
await foreach (var items in director.GetBatch())
{
tuples = tuples.AddRange(items);
}
}
else
{
tuples = await director.GetAllBatches();
}
}

public async Task ClearAsync()
{
Expand Down
14 changes: 13 additions & 1 deletion src/OrleanSpaces/Grains/Directors/BaseDirector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,19 @@ public override async Task OnActivateAsync(CancellationToken cancellationToken)
}
}

public async Task<ImmutableArray<StoreTuple<TTuple>>> GetAll()
public async IAsyncEnumerable<ImmutableArray<StoreTuple<TTuple>>> GetBatch()
{
foreach (string storeKey in StoreKeys)
{
Guid storeId = ParseStoreKey(storeKey);
var content = await GrainFactory.GetGrain<TStore>(storeKey).GetAll();
var result = content.Tuples.Select(tuple => new StoreTuple<TTuple>(storeId, tuple)).ToImmutableArray();

yield return result;
}
}

public async Task<ImmutableArray<StoreTuple<TTuple>>> GetAllBatches()
{
List<Task<StoreContent<TTuple>>> tasks = new();
foreach (string storeKey in StoreKeys)
Expand Down
4 changes: 3 additions & 1 deletion src/OrleanSpaces/IStoreDirector.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ namespace OrleanSpaces;
internal interface IStoreDirector<T>
where T : ISpaceTuple
{
[ReadOnly] Task<ImmutableArray<StoreTuple<T>>> GetAll();
[ReadOnly] IAsyncEnumerable<ImmutableArray<StoreTuple<T>>> GetBatch();
[ReadOnly] Task<ImmutableArray<StoreTuple<T>>> GetAllBatches();

Task<Guid> Insert(TupleAction<T> action);
Task Remove(TupleAction<T> action);
Task RemoveAll(Guid agentId);
Expand Down
24 changes: 24 additions & 0 deletions src/OrleanSpaces/SpaceOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ public sealed class SpaceClientOptions
/// <see cref="ISpaceAgent{T, TTuple, TTemplate}.ReloadAsync"/>, depending on the agent type.
/// </i></remarks>
public bool LoadSpaceContentsUponStartup { get; set; } = true;

/// <summary>
/// Defines the way how the space contents (<i>i.e. the tuples</i>) are loaded by the agent.
/// </summary>
public SpaceLoadingStrategy LoadingStrategy { get; set; } = SpaceLoadingStrategy.Parallel;
}

/// <summary>
Expand Down Expand Up @@ -96,3 +101,22 @@ public enum SpaceKind
ULong = 262144,
UShort = 524288
}

/// <summary>
/// Strategies for space loading.
/// </summary>
public enum SpaceLoadingStrategy
{
/// <summary>
/// <para>Content from all partitions are loaded sequentially.</para>
/// <para>Results in a longer time to load the space. But ultimately results in less resource contention, and avoids potential <see cref="ThreadPool"/> starvation.</para>
/// </summary>
/// <remarks><i>Use if fast loading time is not important, and the space is heavily partitioned.</i></remarks>
Sequential,
/// <summary>
/// <para>Content from all partitions are loaded in parallel.</para>
/// <para>Results in less resource contention, and avoids potential <see cref="ThreadPool"/> starvation. But ultimately results in a longer time to load the space.</para>
/// </summary>
/// <remarks><i>Use if fast loading time is important, and the space is not heavily partitioned.</i></remarks>
Parallel
}

0 comments on commit d9211f8

Please sign in to comment.