Skip to content

Commit

Permalink
Add cursor in GetEventResponse (#287)
Browse files Browse the repository at this point in the history
* Add endLedger in GetEventResponse

* Add Cursor to response and deprecate PagingToken

* Update cursor logic and tests

* update CHANGELOG.md
  • Loading branch information
psheth9 authored Sep 20, 2024
1 parent 3bca89b commit cae1740
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 14 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

### Added

- Add `Cursor` in `GetEventsResponse`. This tells the client until what ledger events are being queried. e.g.: `startLEdger` (inclusive) - `endLedger` (exclusive)
- Limitation: getEvents are capped by 10K `LedgerScanLimit` which means you can query events for 10K ledger at maximum for a given request.
- Add `EndLedger` in `GetEventsRequest`. This provides finer control and clarity on the range of ledgers being queried.
- Disk-Based Event Storage: Events are now stored on disk instead of in memory. For context, storing approximately 3 million events will require around 1.5 GB of disk space.
This change enhances the scalability and can now support a larger retention window (~7 days) for events.
Expand Down
32 changes: 26 additions & 6 deletions cmd/soroban-rpc/internal/methods/get_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"strings"
"time"

Expand Down Expand Up @@ -85,11 +86,13 @@ func (e eventTypeSet) matches(event xdr.ContractEvent) bool {
}

type EventInfo struct {
EventType string `json:"type"`
Ledger int32 `json:"ledger"`
LedgerClosedAt string `json:"ledgerClosedAt"`
ContractID string `json:"contractId"`
ID string `json:"id"`
EventType string `json:"type"`
Ledger int32 `json:"ledger"`
LedgerClosedAt string `json:"ledgerClosedAt"`
ContractID string `json:"contractId"`
ID string `json:"id"`

// Deprecated: PagingToken field is deprecated, please use Cursor at top level for pagination
PagingToken string `json:"pagingToken"`
InSuccessfulContractCall bool `json:"inSuccessfulContractCall"`
TransactionHash string `json:"txHash"`
Expand Down Expand Up @@ -336,6 +339,8 @@ type PaginationOptions struct {
type GetEventsResponse struct {
Events []EventInfo `json:"events"`
LatestLedger uint32 `json:"latestLedger"`
// Cursor represents last populated event ID if total events reach the limit or end of the search window
Cursor string `json:"cursor"`
}

type eventsRPCHandler struct {
Expand Down Expand Up @@ -439,7 +444,10 @@ func (h eventsRPCHandler) getEvents(ctx context.Context, request GetEventsReques
limit = request.Pagination.Limit
}
}
endLedger := request.StartLedger + LedgerScanLimit
endLedger := start.Ledger + LedgerScanLimit

// endLedger should not exceed ledger retention window
endLedger = min(ledgerRange.LastLedger.Sequence+1, endLedger)

if request.EndLedger != 0 {
endLedger = min(request.EndLedger, endLedger)
Expand Down Expand Up @@ -509,9 +517,21 @@ func (h eventsRPCHandler) getEvents(ctx context.Context, request GetEventsReques
results = append(results, info)
}

var cursor string
if uint(len(results)) == limit {
lastEvent := results[len(results)-1]
cursor = lastEvent.ID
} else {
// cursor represents end of the search window if events does not reach limit
// here endLedger is always exclusive when fetching events
// so search window is max Cursor value with endLedger - 1
cursor = db.Cursor{Ledger: endLedger - 1, Tx: math.MaxUint32, Event: math.MaxUint32 - 1}.String()
}

return GetEventsResponse{
LatestLedger: ledgerRange.LastLedger.Sequence,
Events: results,
Cursor: cursor,
}, nil
}

Expand Down
34 changes: 26 additions & 8 deletions cmd/soroban-rpc/internal/methods/get_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"math"
"path"
"strconv"
"strings"
Expand Down Expand Up @@ -655,7 +656,8 @@ func TestGetEvents(t *testing.T) {
TransactionHash: ledgerCloseMeta.TransactionHash(i).HexString(),
})
}
assert.Equal(t, GetEventsResponse{expected, 1}, results)
cursor := db.Cursor{Ledger: 1, Tx: math.MaxUint32, Event: math.MaxUint32 - 1}.String()
assert.Equal(t, GetEventsResponse{expected, 1, cursor}, results)
})

t.Run("filtering by contract id", func(t *testing.T) {
Expand Down Expand Up @@ -801,7 +803,9 @@ func TestGetEvents(t *testing.T) {
TransactionHash: ledgerCloseMeta.TransactionHash(4).HexString(),
},
}
assert.Equal(t, GetEventsResponse{expected, 1}, results)
cursor := db.Cursor{Ledger: 1, Tx: math.MaxUint32, Event: math.MaxUint32 - 1}.String()

assert.Equal(t, GetEventsResponse{expected, 1, cursor}, results)

results, err = handler.getEvents(ctx, GetEventsRequest{
StartLedger: 1,
Expand Down Expand Up @@ -835,7 +839,7 @@ func TestGetEvents(t *testing.T) {

expected[0].ValueJSON = valueJs
expected[0].TopicJSON = topicsJs
require.Equal(t, GetEventsResponse{expected, 1}, results)
require.Equal(t, GetEventsResponse{expected, 1, cursor}, results)
})

t.Run("filtering by both contract id and topic", func(t *testing.T) {
Expand Down Expand Up @@ -946,7 +950,9 @@ func TestGetEvents(t *testing.T) {
TransactionHash: ledgerCloseMeta.TransactionHash(3).HexString(),
},
}
assert.Equal(t, GetEventsResponse{expected, 1}, results)
cursor := db.Cursor{Ledger: 1, Tx: math.MaxUint32, Event: math.MaxUint32 - 1}.String()

assert.Equal(t, GetEventsResponse{expected, 1, cursor}, results)
})

t.Run("filtering by event type", func(t *testing.T) {
Expand Down Expand Up @@ -1021,7 +1027,9 @@ func TestGetEvents(t *testing.T) {
TransactionHash: ledgerCloseMeta.TransactionHash(0).HexString(),
},
}
assert.Equal(t, GetEventsResponse{expected, 1}, results)
cursor := db.Cursor{Ledger: 1, Tx: math.MaxUint32, Event: math.MaxUint32 - 1}.String()

assert.Equal(t, GetEventsResponse{expected, 1, cursor}, results)
})

t.Run("with limit", func(t *testing.T) {
Expand Down Expand Up @@ -1092,7 +1100,9 @@ func TestGetEvents(t *testing.T) {
TransactionHash: ledgerCloseMeta.TransactionHash(i).HexString(),
})
}
assert.Equal(t, GetEventsResponse{expected, 1}, results)
cursor := expected[len(expected)-1].ID

assert.Equal(t, GetEventsResponse{expected, 1, cursor}, results)
})

t.Run("with cursor", func(t *testing.T) {
Expand Down Expand Up @@ -1192,7 +1202,8 @@ func TestGetEvents(t *testing.T) {
TransactionHash: ledgerCloseMeta.TransactionHash(i).HexString(),
})
}
assert.Equal(t, GetEventsResponse{expected, 5}, results)
cursor := expected[len(expected)-1].ID
assert.Equal(t, GetEventsResponse{expected, 5, cursor}, results)

results, err = handler.getEvents(context.TODO(), GetEventsRequest{
Pagination: &PaginationOptions{
Expand All @@ -1201,7 +1212,14 @@ func TestGetEvents(t *testing.T) {
},
})
require.NoError(t, err)
assert.Equal(t, GetEventsResponse{[]EventInfo{}, 5}, results)

latestLedger := 5
endLedger := min(5+LedgerScanLimit, latestLedger+1)

// Note: endLedger is always exclusive when fetching events
// so search window is always max Cursor value with endLedger - 1
cursor = db.Cursor{Ledger: uint32(endLedger - 1), Tx: math.MaxUint32, Event: math.MaxUint32 - 1}.String()
assert.Equal(t, GetEventsResponse{[]EventInfo{}, 5, cursor}, results)
})
}

Expand Down

0 comments on commit cae1740

Please sign in to comment.