Skip to content

Commit

Permalink
chore: Poll for data to sync instead of sleeping (#137)
Browse files Browse the repository at this point in the history
  • Loading branch information
stbrody authored Jun 13, 2024
1 parent 6da2ea4 commit 67863e2
Show file tree
Hide file tree
Showing 6 changed files with 69 additions and 70 deletions.
13 changes: 4 additions & 9 deletions suite/src/__tests__/fast/composedb-model-sync.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
import { ComposeClient } from '@composedb/client'
import { beforeAll, describe, test, expect } from '@jest/globals'
import { Composite } from '@composedb/devtools'
import { newCeramic } from '../../utils/ceramicHelpers.js'
import { loadDocumentOrTimeout, newCeramic } from '../../utils/ceramicHelpers.js'
import { createDid } from '../../utils/didHelper.js'
import { BasicSchema } from '../../graphql-schemas/basicSchema'
import { StreamID } from '@ceramicnetwork/streamid'
import { waitForDocument } from '../../utils/composeDbHelpers.js'
import { CommonTestUtils as TestUtils } from '@ceramicnetwork/common-test-utils'

const ComposeDbUrls = String(process.env.COMPOSEDB_URLS).split(',')
const adminSeeds = String(process.env.COMPOSEDB_ADMIN_DID_SEEDS).split(',')
Expand Down Expand Up @@ -40,14 +39,10 @@ describe('Sync Model and ModelInstanceDocument using ComposeDB GraphQL API', ()
const parts = String(resources[0]).split('model=')
const modelId = parts[parts.length - 1]

await TestUtils.waitForConditionOrTimeout(async () =>
ceramicInstance2
.loadStream(modelId)
.then((_) => true)
.catch((_) => false),
)
// Wait for model to be available on node 2.
await loadDocumentOrTimeout(ceramicInstance2, StreamID.fromString(modelId), 30 * 1000)

// start indexing for tha nodel on node 2
// start indexing for the model on node 2
await ceramicInstance2.admin.startIndexingModels([StreamID.fromString(modelId)])
composeClient2 = await new ComposeClient({
ceramic: ceramicInstance2,
Expand Down
8 changes: 4 additions & 4 deletions suite/src/__tests__/fast/model-correctness.test.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { describe, test, beforeAll, expect } from '@jest/globals'
import { newCeramic } from '../../utils/ceramicHelpers.js'
import { loadDocumentOrTimeout, newCeramic } from '../../utils/ceramicHelpers.js'
import { createDid } from '../../utils/didHelper.js'
import { StreamID } from '@ceramicnetwork/streamid'
import { Model } from '@ceramicnetwork/stream-model'
import { ModelInstanceDocument } from '@ceramicnetwork/stream-model-instance'
import { LIST_MODEL_DEFINITION } from '../../models/modelConstants'
import { CeramicClient } from '@ceramicnetwork/http-client'
import { CommonTestUtils as TestUtils } from '@ceramicnetwork/common-test-utils'
import { indexModelOnNode, loadDocumentOrTimeout } from '../../utils/composeDbHelpers.js'
import { indexModelOnNode } from '../../utils/composeDbHelpers.js'

const ComposeDbUrls = String(process.env.COMPOSEDB_URLS).split(',')
const adminSeeds = String(process.env.COMPOSEDB_ADMIN_DID_SEEDS).split(',')
Expand Down Expand Up @@ -41,11 +41,11 @@ describe('Model Integration Test', () => {
modelInstanceDocumentMetadata,
{ anchor: false },
)
const document2 = await loadDocumentOrTimeout(
const document2 = (await loadDocumentOrTimeout(
ceramicNode2,
document1.id,
1000 * nodeSyncWaitTimeSec,
)
)) as ModelInstanceDocument
expect(document2.id).toEqual(document1.id)
expect(document1.content).toEqual(document2.content)

Expand Down
19 changes: 10 additions & 9 deletions suite/src/__tests__/fast/update.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,18 @@ import CeramicClient from '@ceramicnetwork/http-client'
import { afterAll, beforeAll, expect, test, describe } from '@jest/globals'

import * as helpers from '../../utils/dynamoDbHelpers.js'
import { utilities } from '../../utils/common.js'
import { newCeramic } from '../../utils/ceramicHelpers.js'
import { loadDocumentOrTimeout, newCeramic, waitForCondition } from '../../utils/ceramicHelpers.js'
import { createDid } from '../../utils/didHelper.js'
import { LIST_MODEL_DEFINITION } from '../../models/modelConstants.js'
import { Model } from '@ceramicnetwork/stream-model'
import { indexModelOnNode } from '../../utils/composeDbHelpers.js'
import { StreamID } from '@ceramicnetwork/streamid'
import { ModelInstanceDocument } from '@ceramicnetwork/stream-model-instance'

const delay = utilities.delay

// Environment variables
const composeDbUrls = String(process.env.COMPOSEDB_URLS).split(',')
const adminSeeds = String(process.env.COMPOSEDB_ADMIN_DID_SEEDS).split(',')
const SYNC_TIMEOUT_MS = 5 * 1000 // 5 seconds

///////////////////////////////////////////////////////////////////////////////
/// Create/Update Tests
Expand Down Expand Up @@ -94,12 +92,17 @@ function testUpdate(composeDbUrls: string[]) {
const apiUrl = composeDbUrls[idx]
let doc: ModelInstanceDocument
test(`load stream on ${apiUrl}`, async () => {
await delay(5)
const ceramic = await newCeramic(apiUrl)
console.log(
`Loading stream ${firstDocument.id.toString()} on ${apiUrl} with step ${content.step}`,
)
doc = await ModelInstanceDocument.load(ceramic, firstDocument.id)
doc = (await loadDocumentOrTimeout(
ceramic,
firstDocument.id,
SYNC_TIMEOUT_MS,
)) as ModelInstanceDocument
await waitForCondition(doc, (state) => state.content.step == content.step, SYNC_TIMEOUT_MS)
await doc.sync({ sync: SyncOptions.NEVER_SYNC })
expect(doc.content).toEqual(content)
console.log(
`Loaded stream on ${apiUrl}: ${firstDocument.id.toString()} successfully with step ${
Expand All @@ -120,9 +123,7 @@ function testUpdate(composeDbUrls: string[]) {
}`,
)
await firstDocument.replace(content, undefined, { anchor: isFinalWriter })
console.log(`Updating complete, sleeping 5 seconds before syncing`)
await delay(5)
console.log(`Sleep complete, syncing`)
await waitForCondition(doc, (state) => state.content.step == content.step, SYNC_TIMEOUT_MS)
await doc.sync({ sync: SyncOptions.NEVER_SYNC })
expect(doc.content).toEqual(firstDocument.content)
console.log(
Expand Down
6 changes: 3 additions & 3 deletions suite/src/__tests__/slow/ceramic-ceramic-integration.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { TileDocument } from '@ceramicnetwork/stream-tile'
import { jest, describe, test, beforeAll, expect } from '@jest/globals'
import { newCeramic, waitForAnchor, waitForCondition } from '../../utils/ceramicHelpers.js'

const UPDATE_TIMEOUT = 60 // 60 seconds for regular updates to propagate from one node to another
const UPDATE_TIMEOUT_MS = 60 * 1000 // 60 seconds for regular updates to propagate from one node to another
const ComposeDbUrls = String(process.env.COMPOSEDB_URLS).split(',')

const createWithOneLoadWithTheOther = async (
Expand Down Expand Up @@ -53,7 +53,7 @@ const updatesAreShared = async (
function (state) {
return state.next?.content.foo == content1.foo || state.content.foo == content1.foo
},
UPDATE_TIMEOUT,
UPDATE_TIMEOUT_MS,
).catch((errStr) => {
throw new Error(errStr)
})
Expand All @@ -74,7 +74,7 @@ const updatesAreShared = async (
function (state) {
return state.next?.content.foo == content2.foo || state.content.foo == content2.foo
},
UPDATE_TIMEOUT,
UPDATE_TIMEOUT_MS,
)

expect(doc2.content).toEqual(content2)
Expand Down
56 changes: 47 additions & 9 deletions suite/src/utils/ceramicHelpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ import { Ed25519Provider } from 'key-did-provider-ed25519'
import KeyDidResolver from 'key-did-resolver'
import { AnchorStatus, Stream, StreamState, StreamUtils } from '@ceramicnetwork/common'
import { filter, take } from 'rxjs/operators'
import { StreamID } from '@ceramicnetwork/streamid'
import { utilities } from './common.js'

const delayMs = utilities.delayMs

const seed = randomBytes(32)
const provider = new Ed25519Provider(seed)
const resolver = KeyDidResolver.getResolver()
const did = new DID({ provider, resolver })

// 30 minutes for anchors to happen and be noticed (including potential failures and retries)
export const DEFAULT_ANCHOR_TIMEOUT = 60 * 30
export const DEFAULT_ANCHOR_TIMEOUT_MS = 60 * 30 * 1000

export const newCeramic = async (apiUrl: string, didOverride?: DID) => {
const ceramic = new CeramicClient(apiUrl, { syncInterval: 500 })
Expand All @@ -31,32 +35,66 @@ function defaultMsgGenerator(stream: Stream) {
)}`
}

async function withTimeout(prom: Promise<any>, timeoutSecs: number) {
async function withTimeout(prom: Promise<any>, timeoutMs: number) {
const startTime = new Date().toISOString()
return new Promise((resolve, reject) => {
setTimeout(() => {
const curTime = new Date().toISOString()
reject(
`Timed out after ${timeoutSecs} seconds. Current time: ${curTime}, start time: ${startTime}`,
`Timed out after ${timeoutMs} millis. Current time: ${curTime}, start time: ${startTime}`,
)
}, timeoutSecs * 1000)
}, timeoutMs)
prom.then(resolve)
})
}

/**
* Loads a document from a ceramic node with a timeout.
* @param ceramicNode : CeramicClient to load the document from
* @param documentId : ID of the document to load
* @param timeoutMs : Timeout in milliseconds
* @returns The document if found, throws error on timeout
*/
export async function loadDocumentOrTimeout(
ceramicNode: CeramicClient,
documentId: StreamID,
timeoutMs: number,
): Promise<Stream> {
let now = Date.now()
let count = 0
const expirationTime = now + timeoutMs
let lastError = null
while (now < expirationTime) {
try {
count += 1
return await ceramicNode.loadStream(documentId)
} catch (error) {
lastError = error
if (count % 10 === 0) {
console.log(`Error loading document : ${documentId} retrying`, error)
}
await delayMs(100)
now = Date.now()
}
}
throw Error(
`Timeout waiting for document ${documentId}. Last seen error when trying to load it: ${lastError}`,
)
}

/**
* Waits for 'timeoutSecs' for the given 'condition' to evaluate to try when applied to the current
* stream state for 'stream'.
* @param stream
* @param condition
* @param timeoutSecs
* @param timeoutMs
* @param msgGenerator - Function that takes a stream and returns a string to log every time
* a new state is found that *doesn't* satisfy 'condition'
*/
export async function waitForCondition(
stream: Stream,
condition: (stream: StreamState) => boolean,
timeoutSecs: number,
timeoutMs: number,
msgGenerator?: (stream: Stream) => string,
): Promise<void> {
const waiter = stream
Expand All @@ -75,7 +113,7 @@ export async function waitForCondition(

if (!condition(stream.state)) {
// Only wait if condition isn't already true
await withTimeout(waiter, timeoutSecs)
await withTimeout(waiter, timeoutMs)
}

console.debug(
Expand All @@ -87,7 +125,7 @@ export async function waitForCondition(

export async function waitForAnchor(
stream: any,
timeoutSecs: number = DEFAULT_ANCHOR_TIMEOUT,
timeoutMs: number = DEFAULT_ANCHOR_TIMEOUT_MS,
): Promise<void> {
const msgGenerator = function (stream: Stream) {
const curTime = new Date().toISOString()
Expand All @@ -100,7 +138,7 @@ export async function waitForAnchor(
function (state) {
return state.anchorStatus == AnchorStatus.ANCHORED
},
timeoutSecs,
timeoutMs,
msgGenerator,
)
}
37 changes: 1 addition & 36 deletions suite/src/utils/composeDbHelpers.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import { ComposeClient } from '@composedb/client'
import { utilities } from './common.js'
import { CeramicClient } from '@ceramicnetwork/http-client'
import { ModelInstanceDocument } from '@ceramicnetwork/stream-model-instance'
import { StreamID } from '@ceramicnetwork/streamid'
import { CommonTestUtils as TestUtils } from '@ceramicnetwork/common-test-utils'

Expand Down Expand Up @@ -43,40 +42,6 @@ export async function waitForDocument(
}
}

/**
* Loads a document from a ceramic node with a timeout.
* @param ceramicNode : Ceramic client to load the document from
* @param documentId : ID of the document to load
* @param timeoutMs : Timeout in milliseconds
* @returns The document if found, throws error on timeout
*/
export async function loadDocumentOrTimeout(
ceramicNode: CeramicClient,
documentId: StreamID,
timeoutMs: number,
) {
let now = Date.now()
let count = 0
const expirationTime = now + timeoutMs
let lastError = null
while (now < expirationTime) {
try {
count += 1
return await ModelInstanceDocument.load(ceramicNode, documentId)
} catch (error) {
lastError = error
if (count % 10 === 0) {
console.log(`Error loading document : ${documentId} retrying`, error)
}
await delayMs(100)
now = Date.now()
}
}
throw Error(
`Timeout waiting for document ${documentId}. Last seen error when trying to load it: ${lastError}`,
)
}

/**
* Checks if a model is indexed on a Ceramic node.
* @param ceramicNode : Ceramic client to check indexing on
Expand All @@ -96,7 +61,7 @@ async function isModelIndexed(ceramicNode: CeramicClient, modelId: StreamID): Pr

/**
* Waits for indexing to complete on both nodes or a timeout.
* @param ceramicNode1 : Ceramic client to check indexing on
* @param ceramicNode : Ceramic client to check indexing on
* @param modelId : ID of the model to check indexing for
* @param timeoutMs : Timeout in milliseconds
* @returns True if indexing is complete, throws error on timeout
Expand Down

0 comments on commit 67863e2

Please sign in to comment.