Skip to content

Commit

Permalink
Fixed bug where 'Segment doesn't have any more events exception' was …
Browse files Browse the repository at this point in the history
…throw when attempting to resume from a cusor pointed at a segment that had no more events, and newer segments exist in the Change Feed. (#22546)
  • Loading branch information
seanmcc-msft authored Jul 8, 2021
1 parent d59b3de commit 6dbac72
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 4 deletions.
4 changes: 1 addition & 3 deletions sdk/storage/Azure.Storage.Blobs.ChangeFeed/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
# Release History

## 12.0.0-preview.14 (Unreleased)

- TenantId can now be discovered through the service challenge response, when using a TokenCredential for authorization.
- A new property is now available on the ClientOptions called `EnableTenantDiscovery`. If set to true, the client will attempt an initial unauthorized request to the service to prompt a challenge containing the tenantId hint.

- This release contains bug fixes to improve quality.
- Fixed bug where "Segment doesn't have any more events" exception was throw when attempting to resume from a cusor pointed at a segment that had no more events, and newer segments exist in the Change Feed.

## 12.0.0-preview.13 (2021-06-08)
- This release contains bug fixes to improve quality.
Expand Down
2 changes: 1 addition & 1 deletion sdk/storage/Azure.Storage.Blobs.ChangeFeed/src/Segment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public virtual async Task<List<BlobChangeFeedEvent>> GetPage(

if (!HasNext())
{
throw new InvalidOperationException("Segment doesn't have any more events");
return new List<BlobChangeFeedEvent>(capacity: 0);
}

int i = 0;
Expand Down
88 changes: 88 additions & 0 deletions sdk/storage/Azure.Storage.Blobs.ChangeFeed/tests/SegmentTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -272,5 +272,93 @@ public async Task GetPage()
shards[1].Verify(r => r.Next(IsAsync, default));
shards[1].Verify(r => r.HasNext());
}

[RecordedTest]
public async Task GetPage_NoMoreEvents()
{
// Arrange
string manifestPath = "idx/segments/2020/03/25/0200/meta.json";
int shardCount = 3;

Mock<BlobContainerClient> containerClient = new Mock<BlobContainerClient>(MockBehavior.Strict);
Mock<BlobClient> blobClient = new Mock<BlobClient>(MockBehavior.Strict);
Mock<ShardFactory> shardFactory = new Mock<ShardFactory>(MockBehavior.Strict);

List<Mock<Shard>> shards = new List<Mock<Shard>>();

for (int i = 0; i < shardCount; i++)
{
shards.Add(new Mock<Shard>(MockBehavior.Strict));
}

containerClient.Setup(r => r.GetBlobClient(It.IsAny<string>())).Returns(blobClient.Object);

using FileStream stream = File.OpenRead(
$"{Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location)}{Path.DirectorySeparatorChar}Resources{Path.DirectorySeparatorChar}{"SegmentManifest.json"}");
BlobDownloadStreamingResult blobDownloadStreamingResult = BlobsModelFactory.BlobDownloadStreamingResult(content: stream);
Response<BlobDownloadStreamingResult> downloadResponse = Response.FromValue(blobDownloadStreamingResult, new MockResponse(200));

if (IsAsync)
{
blobClient.Setup(r => r.DownloadStreamingAsync(default, default, default, default)).ReturnsAsync(downloadResponse);
}
else
{
blobClient.Setup(r => r.DownloadStreaming(default, default, default, default)).Returns(downloadResponse);
}

shardFactory.SetupSequence(r => r.BuildShard(
It.IsAny<bool>(),
It.IsAny<string>(),
It.IsAny<ShardCursor>()))
.ReturnsAsync(shards[0].Object)
.ReturnsAsync(shards[1].Object)
.ReturnsAsync(shards[2].Object);

// Set up Shards
shards[0].SetupSequence(r => r.HasNext())
.Returns(false);

shards[1].SetupSequence(r => r.HasNext())
.Returns(false);

shards[2].SetupSequence(r => r.HasNext())
.Returns(false);

SegmentFactory segmentFactory = new SegmentFactory(
containerClient.Object,
shardFactory.Object);
Segment segment = await segmentFactory.BuildSegment(
IsAsync,
manifestPath);

// Act
List<BlobChangeFeedEvent> events = await segment.GetPage(IsAsync, 25);

// Assert
Assert.AreEqual(0, events.Count);

containerClient.Verify(r => r.GetBlobClient(manifestPath));
if (IsAsync)
{
blobClient.Verify(r => r.DownloadStreamingAsync(default, default, default, default));
}
else
{
blobClient.Verify(r => r.DownloadStreaming(default, default, default, default));
}

for (int i = 0; i < shards.Count; i++)
{
shardFactory.Verify(r => r.BuildShard(
IsAsync,
$"log/0{i}/2020/03/25/0200/",
default));
}

shards[0].Verify(r => r.HasNext());
shards[1].Verify(r => r.HasNext());
shards[2].Verify(r => r.HasNext());
}
}
}

0 comments on commit 6dbac72

Please sign in to comment.