Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update data feed test #121

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
23 changes: 14 additions & 9 deletions suite/src/__tests__/fast/data-feed-api.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ describe.skip('Datafeed SSE Api Test', () => {
expect(event).toHaveProperty("content")
expect(event).toHaveProperty("metadata")
expect(event).toHaveProperty("eventType")
expect(event).toHaveProperty("resumeToken")
} finally {
source.close()
}
Expand Down Expand Up @@ -166,19 +167,21 @@ describe.skip('Datafeed SSE Api Test', () => {
throw new Error(errStr)
})
expectedEvents.add(doc.tip.toString())
// By waiting for the expected events we confirm the api delivers all events)
// By waiting for the expected events we confirm the api delivers all events
await accumulator.waitForEvents(expectedEvents, 1000 * 60 * 2)
} finally {
source.close()
}
})
// this wont be tested until the feature its ready
test.skip('if a connection goes offline can resume the missed events upon reconnection', async () => {

test('if a connection goes offline can resume the missed events upon reconnection', async () => {
const resumeTokens: string[] = []
const source = new EventSource(
new URL('/api/v0/feed/aggregation/documents', ComposeDbUrls[0]).toString(),
)
const parseEventData = (eventData: any) => {
const decoded: any = decode(Codec, eventData)
resumeTokens.push(decoded.resumeToken)
return decoded.commitId.commit.toString()
}

Expand All @@ -189,18 +192,20 @@ describe.skip('Datafeed SSE Api Test', () => {
// genesis commit
const doc = await genesisCommit(ceramicNode1, modelInstanceDocumentMetadata, false)
expectedEvents.add(doc.tip.toString())
// disconnect
// wait for latest event and disconnect
await accumulator.waitForEvents(expectedEvents, 1000 * 60)
accumulator.stop()
// data commit offline
// data commits offline
await doc.replace({ myData: 41 })
expectedEvents.add(doc.tip.toString())

// connection after events
accumulator.start()
await doc.replace({ myData: 42 })
expectedEvents.add(doc.tip.toString())

// connection after events using latest resumeToken
accumulator.start(resumeTokens.pop())
// By waiting for the expected events we confirm the api delivers all events
await accumulator.waitForEvents(expectedEvents, 1000 * 60)

expect(accumulator.allEvents).toBe(expectedEvents)
} finally {
source.close()
}
Expand Down
10 changes: 9 additions & 1 deletion suite/src/utils/common.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

import { CommonTestUtils } from '@ceramicnetwork/common-test-utils'
import { EventSource } from 'cross-eventsource'

export const utilities = {
valid: (exp: any) => {
Expand Down Expand Up @@ -65,7 +66,14 @@ export class EventAccumulator<T> {
this.#source.close()
}

start() {
start(resumeToken?: string) {
if(resumeToken) {
const afterQueryParam = "after=" + encodeURIComponent(resumeToken)
const newUrl = new URL(this.#source.url)
newUrl.search = afterQueryParam
this.#source = new EventSource(newUrl.toString())
}

this.#source.addEventListener('message', (event) => {
const parsedEvent = this.#parseEventData(event.data)
this.allEvents.add(parsedEvent)
Expand Down
Loading