Skip to content

Commit

Permalink
Added GetStream function (mentioned in #7)
Browse files Browse the repository at this point in the history
  • Loading branch information
Dzoukr committed Jan 2, 2019
1 parent 5e8f8ec commit d822308
Show file tree
Hide file tree
Showing 7 changed files with 54 additions and 3 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ type EventStore = {
GetEvent : string -> int64 -> Task<EventRead>
GetEvents : string -> EventsReadRange -> Task<EventRead list>
GetStreams : StreamsReadFilter -> Task<Stream list>
GetStream : string -> Task<Stream>
EventAppended : IObservable<EventRead>
}
Expand Down Expand Up @@ -208,7 +209,7 @@ type Stream = {
}
```

Streams can be queried using `GetStreams` function. The querying works similar way as filtering Events by range, but here you can query Streams by string `Id`:
If you know exact value of Stream `Id`, you can use function `GetStream`. To query more Streams, use `GetStreams` function. The querying works similar way as filtering Events by range, but here you can query Streams by `Id`:

```fsharp
let allAmazingStream = StreamsReadFilter.StartsWith("MyAmazing") |> eventStore.GetStreams
Expand Down
3 changes: 3 additions & 0 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
### 1.5.0 - January 02 2019
* Added GetStream function (mentioned in #7)

### 1.4.0 - December 15 2018
* Appended events can be observed over IObservable

Expand Down
1 change: 1 addition & 0 deletions src/CosmoStore/CosmoStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -52,5 +52,6 @@ type EventStore = {
GetEvent : string -> int64 -> Task<EventRead>
GetEvents : string -> EventsReadRange -> Task<EventRead list>
GetStreams : StreamsReadFilter -> Task<Stream list>
GetStream : string -> Task<Stream>
EventAppended : IObservable<EventRead>
}
11 changes: 11 additions & 0 deletions src/CosmoStore/CosmosDb/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,16 @@ let private getEvent (client:DocumentClient) (collectionUri:Uri) streamId positi
return events.Head
}

let private getStream (client:DocumentClient) (collectionUri:Uri) streamId =
task {
return createQuery
(sprintf "SELECT * FROM %s e WHERE e.type = 'Stream' AND e.streamId = @streamId" collectionName) [("@streamId", streamId :> obj)]
|> runQuery<Document> client collectionUri
|> Seq.toList
|> List.map Conversion.documentToStream
|> List.head
}

let private getRequestOptions streamId = RequestOptions(PartitionKey = PartitionKey(streamId))

let getEventStore (configuration:Configuration) =
Expand Down Expand Up @@ -160,5 +170,6 @@ let getEventStore (configuration:Configuration) =
GetEvent = getEvent client eventsCollectionUri
GetEvents = getEvents client eventsCollectionUri
GetStreams = getStreams client eventsCollectionUri
GetStream = getStream client eventsCollectionUri
EventAppended = Observable.ObserveOn(eventAppended.Publish :> IObservable<_>, ThreadPoolScheduler.Instance)
}
13 changes: 13 additions & 0 deletions src/CosmoStore/TableStorage/EventStore.fs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,18 @@ let private getStreams (table:CloudTable) (streamsRead:StreamsReadFilter) =
|> List.sortBy (fun x -> x.Id)
}

let private getStream (table:CloudTable) streamId =
let q = Querying.oneStream streamId
task {
let token = TableContinuationToken()
let! results = executeQuery table q token (Collections.Generic.List())
return
results
|> Seq.toList
|> List.map Conversion.entityToStream
|> List.head
}

let private getEvents (table:CloudTable) streamId (eventsRead:EventsReadRange) =
let q = Querying.allEventsFiltered streamId eventsRead
task {
Expand Down Expand Up @@ -152,5 +164,6 @@ let getEventStore (configuration:Configuration) =
GetEvent = getEvent table
GetEvents = getEvents table
GetStreams = getStreams table
GetStream = getStream table
EventAppended = Observable.ObserveOn(eventAppended.Publish :> IObservable<_>, ThreadPoolScheduler.Instance)
}
11 changes: 9 additions & 2 deletions src/CosmoStore/TableStorage/Querying.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,24 @@ module CosmoStore.TableStorage.Querying
open Microsoft.WindowsAzure.Storage.Table
open CosmoStore

let private toQuery filter = TableQuery<DynamicTableEntity>().Where(filter)

let allStreams = TableQuery<DynamicTableEntity>().Where(TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, Conversion.streamRowKey))

let oneStream streamId =
TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, streamId),
TableOperators.And,
TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.Equal, Conversion.streamRowKey)
) |> toQuery

let private allEventsFilter streamId =
TableQuery.CombineFilters(
TableQuery.GenerateFilterCondition("PartitionKey", QueryComparisons.Equal, streamId),
TableOperators.And,
TableQuery.GenerateFilterCondition("RowKey", QueryComparisons.NotEqual, Conversion.streamRowKey)
)

let private toQuery filter = TableQuery<DynamicTableEntity>().Where(filter)

let private withPositionGreaterOrEqual pos filter =
TableQuery.CombineFilters(
filter,
Expand Down
15 changes: 15 additions & 0 deletions tests/CosmoStore.Tests/BasicTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,21 @@ let ``Get streams (contains)`` ([<Values(StoreType.CosmosDB, StoreType.TableStor
Assert.AreEqual(3, streams.Length)
Assert.AreEqual(sprintf "C_%s_1" contains, streams.Head.Id)

[<Test>]
let ``Get stream`` ([<Values(StoreType.CosmosDB, StoreType.TableStorage)>] (typ:StoreType)) =
let store = typ |> getEventStore
let streamId = (sprintf "OS_%s" (Guid.NewGuid().ToString("N")))
[1..10]
|> List.map getEvent
|> store.AppendEvents streamId ExpectedPosition.Any
|> Async.AwaitTask
|> Async.RunSynchronously
|> ignore

let stream = store.GetStream streamId |> Async.AwaitTask |> Async.RunSynchronously
Assert.AreEqual(10, stream.LastPosition)
Assert.AreEqual(streamId, stream.Id)

[<Test>]
let ``Fails to append to existing position`` ([<Values(StoreType.CosmosDB, StoreType.TableStorage)>] (typ:StoreType)) =
let store = typ |> getEventStore
Expand Down

0 comments on commit d822308

Please sign in to comment.