diff --git a/suite/src/__tests__/fast/composedb-model-sync.test.ts b/suite/src/__tests__/fast/composedb-model-sync.test.ts index 25e96623..c676bc27 100644 --- a/suite/src/__tests__/fast/composedb-model-sync.test.ts +++ b/suite/src/__tests__/fast/composedb-model-sync.test.ts @@ -1,12 +1,11 @@ import { ComposeClient } from '@composedb/client' import { beforeAll, describe, test, expect } from '@jest/globals' import { Composite } from '@composedb/devtools' -import { newCeramic } from '../../utils/ceramicHelpers.js' +import { loadDocumentOrTimeout, newCeramic } from '../../utils/ceramicHelpers.js' import { createDid } from '../../utils/didHelper.js' import { BasicSchema } from '../../graphql-schemas/basicSchema' import { StreamID } from '@ceramicnetwork/streamid' import { waitForDocument } from '../../utils/composeDbHelpers.js' -import { CommonTestUtils as TestUtils } from '@ceramicnetwork/common-test-utils' const ComposeDbUrls = String(process.env.COMPOSEDB_URLS).split(',') const adminSeeds = String(process.env.COMPOSEDB_ADMIN_DID_SEEDS).split(',') @@ -40,14 +39,10 @@ describe('Sync Model and ModelInstanceDocument using ComposeDB GraphQL API', () const parts = String(resources[0]).split('model=') const modelId = parts[parts.length - 1] - await TestUtils.waitForConditionOrTimeout(async () => - ceramicInstance2 - .loadStream(modelId) - .then((_) => true) - .catch((_) => false), - ) + // Wait for model to be available on node 2. + await loadDocumentOrTimeout(ceramicInstance2, StreamID.fromString(modelId), 30 * 1000) - // start indexing for tha nodel on node 2 + // start indexing for the model on node 2 await ceramicInstance2.admin.startIndexingModels([StreamID.fromString(modelId)]) composeClient2 = await new ComposeClient({ ceramic: ceramicInstance2, diff --git a/suite/src/__tests__/fast/model-correctness.test.ts b/suite/src/__tests__/fast/model-correctness.test.ts index f5f315cd..cdc929c0 100644 --- a/suite/src/__tests__/fast/model-correctness.test.ts +++ b/suite/src/__tests__/fast/model-correctness.test.ts @@ -1,5 +1,5 @@ import { describe, test, beforeAll, expect } from '@jest/globals' -import { newCeramic } from '../../utils/ceramicHelpers.js' +import { loadDocumentOrTimeout, newCeramic } from '../../utils/ceramicHelpers.js' import { createDid } from '../../utils/didHelper.js' import { StreamID } from '@ceramicnetwork/streamid' import { Model } from '@ceramicnetwork/stream-model' @@ -7,7 +7,7 @@ import { ModelInstanceDocument } from '@ceramicnetwork/stream-model-instance' import { LIST_MODEL_DEFINITION } from '../../models/modelConstants' import { CeramicClient } from '@ceramicnetwork/http-client' import { CommonTestUtils as TestUtils } from '@ceramicnetwork/common-test-utils' -import { indexModelOnNode, loadDocumentOrTimeout } from '../../utils/composeDbHelpers.js' +import { indexModelOnNode } from '../../utils/composeDbHelpers.js' const ComposeDbUrls = String(process.env.COMPOSEDB_URLS).split(',') const adminSeeds = String(process.env.COMPOSEDB_ADMIN_DID_SEEDS).split(',') @@ -41,11 +41,11 @@ describe('Model Integration Test', () => { modelInstanceDocumentMetadata, { anchor: false }, ) - const document2 = await loadDocumentOrTimeout( + const document2 = (await loadDocumentOrTimeout( ceramicNode2, document1.id, 1000 * nodeSyncWaitTimeSec, - ) + )) as ModelInstanceDocument expect(document2.id).toEqual(document1.id) expect(document1.content).toEqual(document2.content) diff --git a/suite/src/__tests__/fast/update.test.ts b/suite/src/__tests__/fast/update.test.ts index d28c0c71..65478f58 100644 --- a/suite/src/__tests__/fast/update.test.ts +++ b/suite/src/__tests__/fast/update.test.ts @@ -3,8 +3,7 @@ import CeramicClient from '@ceramicnetwork/http-client' import { afterAll, beforeAll, expect, test, describe } from '@jest/globals' import * as helpers from '../../utils/dynamoDbHelpers.js' -import { utilities } from '../../utils/common.js' -import { newCeramic } from '../../utils/ceramicHelpers.js' +import { loadDocumentOrTimeout, newCeramic, waitForCondition } from '../../utils/ceramicHelpers.js' import { createDid } from '../../utils/didHelper.js' import { LIST_MODEL_DEFINITION } from '../../models/modelConstants.js' import { Model } from '@ceramicnetwork/stream-model' @@ -12,11 +11,10 @@ import { indexModelOnNode } from '../../utils/composeDbHelpers.js' import { StreamID } from '@ceramicnetwork/streamid' import { ModelInstanceDocument } from '@ceramicnetwork/stream-model-instance' -const delay = utilities.delay - // Environment variables const composeDbUrls = String(process.env.COMPOSEDB_URLS).split(',') const adminSeeds = String(process.env.COMPOSEDB_ADMIN_DID_SEEDS).split(',') +const SYNC_TIMEOUT_MS = 5 * 1000 // 5 seconds /////////////////////////////////////////////////////////////////////////////// /// Create/Update Tests @@ -94,12 +92,17 @@ function testUpdate(composeDbUrls: string[]) { const apiUrl = composeDbUrls[idx] let doc: ModelInstanceDocument test(`load stream on ${apiUrl}`, async () => { - await delay(5) const ceramic = await newCeramic(apiUrl) console.log( `Loading stream ${firstDocument.id.toString()} on ${apiUrl} with step ${content.step}`, ) - doc = await ModelInstanceDocument.load(ceramic, firstDocument.id) + doc = (await loadDocumentOrTimeout( + ceramic, + firstDocument.id, + SYNC_TIMEOUT_MS, + )) as ModelInstanceDocument + await waitForCondition(doc, (state) => state.content.step == content.step, SYNC_TIMEOUT_MS) + await doc.sync({ sync: SyncOptions.NEVER_SYNC }) expect(doc.content).toEqual(content) console.log( `Loaded stream on ${apiUrl}: ${firstDocument.id.toString()} successfully with step ${ @@ -120,9 +123,7 @@ function testUpdate(composeDbUrls: string[]) { }`, ) await firstDocument.replace(content, undefined, { anchor: isFinalWriter }) - console.log(`Updating complete, sleeping 5 seconds before syncing`) - await delay(5) - console.log(`Sleep complete, syncing`) + await waitForCondition(doc, (state) => state.content.step == content.step, SYNC_TIMEOUT_MS) await doc.sync({ sync: SyncOptions.NEVER_SYNC }) expect(doc.content).toEqual(firstDocument.content) console.log( diff --git a/suite/src/__tests__/slow/ceramic-ceramic-integration.test.ts b/suite/src/__tests__/slow/ceramic-ceramic-integration.test.ts index 187cef1e..1fc84e21 100644 --- a/suite/src/__tests__/slow/ceramic-ceramic-integration.test.ts +++ b/suite/src/__tests__/slow/ceramic-ceramic-integration.test.ts @@ -3,7 +3,7 @@ import { TileDocument } from '@ceramicnetwork/stream-tile' import { jest, describe, test, beforeAll, expect } from '@jest/globals' import { newCeramic, waitForAnchor, waitForCondition } from '../../utils/ceramicHelpers.js' -const UPDATE_TIMEOUT = 60 // 60 seconds for regular updates to propagate from one node to another +const UPDATE_TIMEOUT_MS = 60 * 1000 // 60 seconds for regular updates to propagate from one node to another const ComposeDbUrls = String(process.env.COMPOSEDB_URLS).split(',') const createWithOneLoadWithTheOther = async ( @@ -53,7 +53,7 @@ const updatesAreShared = async ( function (state) { return state.next?.content.foo == content1.foo || state.content.foo == content1.foo }, - UPDATE_TIMEOUT, + UPDATE_TIMEOUT_MS, ).catch((errStr) => { throw new Error(errStr) }) @@ -74,7 +74,7 @@ const updatesAreShared = async ( function (state) { return state.next?.content.foo == content2.foo || state.content.foo == content2.foo }, - UPDATE_TIMEOUT, + UPDATE_TIMEOUT_MS, ) expect(doc2.content).toEqual(content2) diff --git a/suite/src/utils/ceramicHelpers.ts b/suite/src/utils/ceramicHelpers.ts index 3173d1d7..54f3c824 100644 --- a/suite/src/utils/ceramicHelpers.ts +++ b/suite/src/utils/ceramicHelpers.ts @@ -5,6 +5,10 @@ import { Ed25519Provider } from 'key-did-provider-ed25519' import KeyDidResolver from 'key-did-resolver' import { AnchorStatus, Stream, StreamState, StreamUtils } from '@ceramicnetwork/common' import { filter, take } from 'rxjs/operators' +import { StreamID } from '@ceramicnetwork/streamid' +import { utilities } from './common.js' + +const delayMs = utilities.delayMs const seed = randomBytes(32) const provider = new Ed25519Provider(seed) @@ -12,7 +16,7 @@ const resolver = KeyDidResolver.getResolver() const did = new DID({ provider, resolver }) // 30 minutes for anchors to happen and be noticed (including potential failures and retries) -export const DEFAULT_ANCHOR_TIMEOUT = 60 * 30 +export const DEFAULT_ANCHOR_TIMEOUT_MS = 60 * 30 * 1000 export const newCeramic = async (apiUrl: string, didOverride?: DID) => { const ceramic = new CeramicClient(apiUrl, { syncInterval: 500 }) @@ -31,32 +35,66 @@ function defaultMsgGenerator(stream: Stream) { )}` } -async function withTimeout(prom: Promise, timeoutSecs: number) { +async function withTimeout(prom: Promise, timeoutMs: number) { const startTime = new Date().toISOString() return new Promise((resolve, reject) => { setTimeout(() => { const curTime = new Date().toISOString() reject( - `Timed out after ${timeoutSecs} seconds. Current time: ${curTime}, start time: ${startTime}`, + `Timed out after ${timeoutMs} millis. Current time: ${curTime}, start time: ${startTime}`, ) - }, timeoutSecs * 1000) + }, timeoutMs) prom.then(resolve) }) } +/** + * Loads a document from a ceramic node with a timeout. + * @param ceramicNode : CeramicClient to load the document from + * @param documentId : ID of the document to load + * @param timeoutMs : Timeout in milliseconds + * @returns The document if found, throws error on timeout + */ +export async function loadDocumentOrTimeout( + ceramicNode: CeramicClient, + documentId: StreamID, + timeoutMs: number, +): Promise { + let now = Date.now() + let count = 0 + const expirationTime = now + timeoutMs + let lastError = null + while (now < expirationTime) { + try { + count += 1 + return await ceramicNode.loadStream(documentId) + } catch (error) { + lastError = error + if (count % 10 === 0) { + console.log(`Error loading document : ${documentId} retrying`, error) + } + await delayMs(100) + now = Date.now() + } + } + throw Error( + `Timeout waiting for document ${documentId}. Last seen error when trying to load it: ${lastError}`, + ) +} + /** * Waits for 'timeoutSecs' for the given 'condition' to evaluate to try when applied to the current * stream state for 'stream'. * @param stream * @param condition - * @param timeoutSecs + * @param timeoutMs * @param msgGenerator - Function that takes a stream and returns a string to log every time * a new state is found that *doesn't* satisfy 'condition' */ export async function waitForCondition( stream: Stream, condition: (stream: StreamState) => boolean, - timeoutSecs: number, + timeoutMs: number, msgGenerator?: (stream: Stream) => string, ): Promise { const waiter = stream @@ -75,7 +113,7 @@ export async function waitForCondition( if (!condition(stream.state)) { // Only wait if condition isn't already true - await withTimeout(waiter, timeoutSecs) + await withTimeout(waiter, timeoutMs) } console.debug( @@ -87,7 +125,7 @@ export async function waitForCondition( export async function waitForAnchor( stream: any, - timeoutSecs: number = DEFAULT_ANCHOR_TIMEOUT, + timeoutMs: number = DEFAULT_ANCHOR_TIMEOUT_MS, ): Promise { const msgGenerator = function (stream: Stream) { const curTime = new Date().toISOString() @@ -100,7 +138,7 @@ export async function waitForAnchor( function (state) { return state.anchorStatus == AnchorStatus.ANCHORED }, - timeoutSecs, + timeoutMs, msgGenerator, ) } diff --git a/suite/src/utils/composeDbHelpers.ts b/suite/src/utils/composeDbHelpers.ts index fcdc234c..c79a06b0 100644 --- a/suite/src/utils/composeDbHelpers.ts +++ b/suite/src/utils/composeDbHelpers.ts @@ -1,7 +1,6 @@ import { ComposeClient } from '@composedb/client' import { utilities } from './common.js' import { CeramicClient } from '@ceramicnetwork/http-client' -import { ModelInstanceDocument } from '@ceramicnetwork/stream-model-instance' import { StreamID } from '@ceramicnetwork/streamid' import { CommonTestUtils as TestUtils } from '@ceramicnetwork/common-test-utils' @@ -43,40 +42,6 @@ export async function waitForDocument( } } -/** - * Loads a document from a ceramic node with a timeout. - * @param ceramicNode : Ceramic client to load the document from - * @param documentId : ID of the document to load - * @param timeoutMs : Timeout in milliseconds - * @returns The document if found, throws error on timeout - */ -export async function loadDocumentOrTimeout( - ceramicNode: CeramicClient, - documentId: StreamID, - timeoutMs: number, -) { - let now = Date.now() - let count = 0 - const expirationTime = now + timeoutMs - let lastError = null - while (now < expirationTime) { - try { - count += 1 - return await ModelInstanceDocument.load(ceramicNode, documentId) - } catch (error) { - lastError = error - if (count % 10 === 0) { - console.log(`Error loading document : ${documentId} retrying`, error) - } - await delayMs(100) - now = Date.now() - } - } - throw Error( - `Timeout waiting for document ${documentId}. Last seen error when trying to load it: ${lastError}`, - ) -} - /** * Checks if a model is indexed on a Ceramic node. * @param ceramicNode : Ceramic client to check indexing on @@ -96,7 +61,7 @@ async function isModelIndexed(ceramicNode: CeramicClient, modelId: StreamID): Pr /** * Waits for indexing to complete on both nodes or a timeout. - * @param ceramicNode1 : Ceramic client to check indexing on + * @param ceramicNode : Ceramic client to check indexing on * @param modelId : ID of the model to check indexing for * @param timeoutMs : Timeout in milliseconds * @returns True if indexing is complete, throws error on timeout