-
Notifications
You must be signed in to change notification settings - Fork 4.8k
/
Sample01b_HelloWorldAsync.cs
159 lines (137 loc) · 6.6 KB
/
Sample01b_HelloWorldAsync.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;
using Azure.Storage;
using NUnit.Framework;
namespace Azure.Storage.Blobs.ChangeFeed.Samples
{
/// <summary>
/// Basic Azure ChangeFeed Storage samples.
/// </summary>
public class Sample01b_HelloWorldAsync : SampleTest
{
/// <summary>
/// Download every event in the change feed.
/// </summary>
[Test]
public async Task ChangeFeedAsync()
{
// Get a connection string to our Azure Storage account.
string connectionString = ConnectionString;
// Get a new change feed client.
BlobChangeFeedClient changeFeedClient = new BlobChangeFeedClient(connectionString);
#region Snippet:SampleSnippetsChangeFeed_GetAllEvents
// Get all the events in the change feed.
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync())
{
changeFeedEvents.Add(changeFeedEvent);
}
#endregion
}
/// <summary>
/// Download change feed events between a start and end time.
/// </summary>
[Test]
public async Task ChangeFeedBetweenDatesAsync()
{
// Get a connection string to our Azure Storage account.
string connectionString = ConnectionString;
// Get a new change feed client.
BlobChangeFeedClient changeFeedClient = new BlobChangeFeedClient(connectionString);
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
#region Snippet:SampleSnippetsChangeFeed_GetEventsBetweenStartAndEndTime
// Create the start and end time. The change feed client will round start time down to
// the nearest hour, and round endTime up to the next hour if you provide DateTimeOffsets
// with minutes and seconds.
DateTimeOffset startTime = new DateTimeOffset(2017, 3, 2, 15, 0, 0, TimeSpan.Zero);
DateTimeOffset endTime = new DateTimeOffset(2020, 10, 7, 2, 0, 0, TimeSpan.Zero);
// You can also provide just a start or end time.
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
start: startTime,
end: endTime))
{
changeFeedEvents.Add(changeFeedEvent);
}
#endregion
}
/// <summary>
/// You can use the change feed cursor to resume iterating throw the change feed
/// at a later time.
/// </summary>
[Test]
public async Task ChangeFeedResumeWithCursorAsync()
{
// Get a connection string to our Azure Storage account.
string connectionString = ConnectionString;
// Get a new change feed client.
BlobChangeFeedClient changeFeedClient = new BlobChangeFeedClient(connectionString);
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
#region Snippet:SampleSnippetsChangeFeed_ResumeWithCursor
string continuationToken = null;
await foreach (Page<BlobChangeFeedEvent> page in changeFeedClient.GetChangesAsync().AsPages(pageSizeHint: 10))
{
foreach (BlobChangeFeedEvent changeFeedEvent in page.Values)
{
changeFeedEvents.Add(changeFeedEvent);
}
// Get the change feed continuation token. The continuation token is not required to get each page of events,
// it is intended to be saved and used to resume iterating at a later date.
continuationToken = page.ContinuationToken;
break;
}
// Resume iterating from the pervious position with the continuation token.
await foreach (BlobChangeFeedEvent changeFeedEvent in changeFeedClient.GetChangesAsync(
continuationToken: continuationToken))
{
changeFeedEvents.Add(changeFeedEvent);
}
#endregion
}
/// <summary>
/// You can use the change feed cursor to periodically poll for new events.
/// </summary>
[Test]
public async Task ChangeFeedPollForEventsWithCursor()
{
// Get a connection string to our Azure Storage account.
string connectionString = ConnectionString;
// Get a new change feed client.
BlobChangeFeedClient changeFeedClient = new BlobChangeFeedClient(connectionString);
List<BlobChangeFeedEvent> changeFeedEvents = new List<BlobChangeFeedEvent>();
#region Snippet:SampleSnippetsChangeFeed_PollForEventsWithCursor
// Create the start time. The change feed client will round start time down to
// the nearest hour if you provide DateTimeOffsets
// with minutes and seconds.
DateTimeOffset startTime = DateTimeOffset.Now;
// Create polling interval.
TimeSpan pollingInterval = TimeSpan.FromMinutes(5);
// Get initial set of events.
IAsyncEnumerable<Page<BlobChangeFeedEvent>> pages = changeFeedClient.GetChangesAsync(start: startTime).AsPages();
string continuationToken = null;
while (true)
{
await foreach (Page<BlobChangeFeedEvent> page in pages)
{
foreach (BlobChangeFeedEvent changeFeedEvent in page.Values)
{
changeFeedEvents.Add(changeFeedEvent);
}
// Get the change feed continuation token. The continuation token is not required to get each page of events,
// it is intended to be saved and used to resume iterating at a later date.
// For the purpose of actively listening to events the continuation token from last page is used.
continuationToken = page.ContinuationToken;
}
// Wait before processing next batch of events.
await Task.Delay(pollingInterval);
// Resume from last continuation token and fetch latest set of events.
pages = changeFeedClient.GetChangesAsync(continuationToken).AsPages();
}
#endregion
}
}
}