From 53248f673c085c0ae3d30bf30511382d5c8733ec Mon Sep 17 00:00:00 2001 From: Spencer T Brody Date: Wed, 29 May 2024 12:42:07 -0400 Subject: [PATCH] feat: Restore test coverage of updates (#132) --- suite/src/__tests__/fast/anchor.test.ts | 17 ++-- suite/src/__tests__/fast/longevity.test.ts | 23 +++--- .../__tests__/fast/model-correctness.test.ts | 34 ++++---- suite/src/__tests__/fast/update.test.ts | 82 +++++++++++++------ suite/src/models/modelConstants.ts | 6 +- suite/src/utils/ceramicHelpers.ts | 22 +++-- suite/src/utils/composeDbHelpers.ts | 18 +++- 7 files changed, 124 insertions(+), 78 deletions(-) diff --git a/suite/src/__tests__/fast/anchor.test.ts b/suite/src/__tests__/fast/anchor.test.ts index de76c7b9..afa82a2e 100644 --- a/suite/src/__tests__/fast/anchor.test.ts +++ b/suite/src/__tests__/fast/anchor.test.ts @@ -1,5 +1,4 @@ import { AnchorStatus, StreamUtils } from '@ceramicnetwork/common' -import { TileDocument } from '@ceramicnetwork/stream-tile' import { afterAll, beforeAll, describe, expect, test } from '@jest/globals' import { DateTime } from 'luxon' @@ -18,8 +17,8 @@ describe('anchor', () => { for (const req of anchorReqs) { const ceramic = await newCeramic(ComposeDbUrls[0]) - const tile = await TileDocument.load(ceramic, req.StreamId.S) - console.log(`${tile.id}: anchor status = ${AnchorStatus[tile.state.anchorStatus]}`) + const doc = await ceramic.loadStream(req.StreamId.S) + console.log(`${doc.id}: anchor status = ${AnchorStatus[doc.state.anchorStatus]}`) const now = DateTime.utc() const createdAt = DateTime.fromSeconds(parseInt(req.Creation.N)) const deltaMinutes = now.diff(createdAt).as('minutes') @@ -29,20 +28,20 @@ describe('anchor', () => { // // Don't explicitly check for failures until a timeout because failed requests can be retried and successful // on subsequent attempts within the configured interval. - if (tile.state.anchorStatus == AnchorStatus.ANCHORED || deltaMinutes >= configMinutes) { + if (doc.state.anchorStatus == AnchorStatus.ANCHORED || deltaMinutes >= configMinutes) { try { - if (tile.state.anchorStatus != AnchorStatus.ANCHORED) { + if (doc.state.anchorStatus != AnchorStatus.ANCHORED) { // If the stream wasn't already anchored, make sure we haven't been waiting too long. This check // will also catch anchor failures (i.e. requests for which anchoring was attempted but failed // even after retries). expect(deltaMinutes).toBeLessThan(configMinutes) } - await helpers.markStreamReqAsAnchored(tile.id) + await helpers.markStreamReqAsAnchored(doc.id) } catch (err) { console.error( - `Test failed. StreamID: ${tile.id.toString()}, state:\n${JSON.stringify( - StreamUtils.serializeState(tile.state), + `Test failed. StreamID: ${doc.id.toString()}, state:\n${JSON.stringify( + StreamUtils.serializeState(doc.state), null, 2, )}`, @@ -50,7 +49,7 @@ describe('anchor', () => { // If the anchoring failed, we don't want to leave this stream in the database, // as it will cause all future test executions to fail as well. - await helpers.deleteStreamReq(tile.id) + await helpers.deleteStreamReq(doc.id) throw err } diff --git a/suite/src/__tests__/fast/longevity.test.ts b/suite/src/__tests__/fast/longevity.test.ts index e28b94a4..0eea9817 100644 --- a/suite/src/__tests__/fast/longevity.test.ts +++ b/suite/src/__tests__/fast/longevity.test.ts @@ -1,6 +1,5 @@ import { AnchorStatus, StreamUtils } from '@ceramicnetwork/common' import { StreamID } from '@ceramicnetwork/streamid' -import { TileDocument } from '@ceramicnetwork/stream-tile' import { afterAll, beforeAll, describe, expect, test } from '@jest/globals' import { newCeramic } from '../../utils/ceramicHelpers.js' @@ -34,35 +33,35 @@ describe('longevity', () => { const ceramic = await newCeramic(apiUrl) for (const streamId of streamIds) { console.log(`Loading stream ${streamId} on ${apiUrl}`) - const tile = await TileDocument.load(ceramic, streamId) + const doc = await ceramic.loadStream(streamId) try { - expect(tile.content).toEqual(expectedContent) - expect(tile.state.anchorStatus).toEqual(AnchorStatus.ANCHORED) + expect(doc.content).toEqual(expectedContent) + expect(doc.state.anchorStatus).toEqual(AnchorStatus.ANCHORED) // Now load the same stream but at a specific CommitID. Loading at a CommitID // means the Ceramic node can't get the state from the state store, but has // to actually load and apply the commits from IPFS, so it lets us check that the // underlying ipfs blocks are still available. - const commitIds = tile.allCommitIds + const commitIds = doc.allCommitIds const prevCommitId = commitIds[commitIds.length - 2] console.log( `Loading commit ${prevCommitId} on ${apiUrl} for stream state:\n${JSON.stringify( - StreamUtils.serializeState(tile.state), + StreamUtils.serializeState(doc.state), null, 2, )}`, ) - const tileAtPrevCommitId = await TileDocument.load(ceramic, prevCommitId) + const docAtPrevCommitId = await ceramic.loadStream(prevCommitId) // The last commit is an anchor commit, so the second to last commit will actually // have the same content as the current state with the most recent commit, it just // won't have the anchor information. - expect(tileAtPrevCommitId.content).toEqual(expectedContent) - expect(tileAtPrevCommitId.state.anchorStatus).not.toEqual(AnchorStatus.ANCHORED) + expect(docAtPrevCommitId.content).toEqual(expectedContent) + expect(docAtPrevCommitId.state.anchorStatus).not.toEqual(AnchorStatus.ANCHORED) } catch (err) { console.error( - `Test failed. StreamID: ${tile.id.toString()}, state:\n${JSON.stringify( - StreamUtils.serializeState(tile.state), + `Test failed. StreamID: ${doc.id.toString()}, state:\n${JSON.stringify( + StreamUtils.serializeState(doc.state), null, 2, )}`, @@ -70,7 +69,7 @@ describe('longevity', () => { // If the test failed, we don't want to leave this stream in the database, // as it will cause all future test executions to fail as well. - await helpers.deleteStreamReq(tile.id) + await helpers.deleteStreamReq(doc.id) throw err } diff --git a/suite/src/__tests__/fast/model-correctness.test.ts b/suite/src/__tests__/fast/model-correctness.test.ts index f957a7e3..7909212b 100644 --- a/suite/src/__tests__/fast/model-correctness.test.ts +++ b/suite/src/__tests__/fast/model-correctness.test.ts @@ -4,15 +4,14 @@ import { createDid } from '../../utils/didHelper.js' import { StreamID } from '@ceramicnetwork/streamid' import { Model } from '@ceramicnetwork/stream-model' import { ModelInstanceDocument } from '@ceramicnetwork/stream-model-instance' -import { newModel, basicModelDocumentContent } from '../../models/modelConstants' +import { newModel } from '../../models/modelConstants' import { CeramicClient } from '@ceramicnetwork/http-client' import { CommonTestUtils as TestUtils } from '@ceramicnetwork/common-test-utils' -import { loadDocumentOrTimeout, waitForIndexingOrTimeout } from '../../utils/composeDbHelpers.js' +import { indexModelOnNode, loadDocumentOrTimeout } from '../../utils/composeDbHelpers.js' const ComposeDbUrls = String(process.env.COMPOSEDB_URLS).split(',') const adminSeeds = String(process.env.COMPOSEDB_ADMIN_DID_SEEDS).split(',') const nodeSyncWaitTimeSec = 5 -const indexWaitTimeMin = 1 describe('Model Integration Test', () => { let ceramicNode1: CeramicClient @@ -28,25 +27,19 @@ describe('Model Integration Test', () => { ceramicNode1 = await newCeramic(ComposeDbUrls[0], did1) ceramicNode2 = await newCeramic(ComposeDbUrls[1], did2) const model = await Model.create(ceramicNode1, newModel) - await TestUtils.waitForConditionOrTimeout(async () => - ceramicNode2 - .loadStream(model.id) - .then((_) => true) - .catch((_) => false), - ) - await ceramicNode1.admin.startIndexingModels([model.id]) - await ceramicNode2.admin.startIndexingModels([model.id]) modelId = model.id - await waitForIndexingOrTimeout(ceramicNode1, modelId, 1000 * 60 * indexWaitTimeMin) - await waitForIndexingOrTimeout(ceramicNode2, modelId, 1000 * 60 * indexWaitTimeMin) + + await indexModelOnNode(ceramicNode1, model.id) + await indexModelOnNode(ceramicNode2, model.id) }) - test('Create a ModelInstanceDocument on one node and read it from another', async () => { + test('ModelInstanceDocuments sync between nodes', async () => { const modelInstanceDocumentMetadata = { model: modelId } const document1 = await ModelInstanceDocument.create( ceramicNode1, - basicModelDocumentContent, + { step: 1 }, modelInstanceDocumentMetadata, + { anchor: false }, ) const document2 = await loadDocumentOrTimeout( ceramicNode2, @@ -54,5 +47,16 @@ describe('Model Integration Test', () => { 1000 * nodeSyncWaitTimeSec, ) expect(document2.id).toEqual(document1.id) + expect(document1.content).toEqual(document2.content) + + // Now update on the second node and ensure update syncs back to the first node. + await document2.replace({ step: 2 }, null, { anchor: false }) + await TestUtils.waitForConditionOrTimeout(async () => { + await document1.sync() + return document1.content?.step == 2 + }) + expect(document1.content).toEqual(document2.content) + expect(document1.state.log.length).toEqual(2) + expect(document2.state.log.length).toEqual(2) }) }) diff --git a/suite/src/__tests__/fast/update.test.ts b/suite/src/__tests__/fast/update.test.ts index 9b4cb1f8..5ac477b8 100644 --- a/suite/src/__tests__/fast/update.test.ts +++ b/suite/src/__tests__/fast/update.test.ts @@ -1,27 +1,55 @@ import { StreamReaderWriter, SyncOptions } from '@ceramicnetwork/common' import CeramicClient from '@ceramicnetwork/http-client' -import { TileDocument } from '@ceramicnetwork/stream-tile' import { afterAll, beforeAll, expect, test, describe } from '@jest/globals' import * as helpers from '../../utils/dynamoDbHelpers.js' import { utilities } from '../../utils/common.js' -import { newCeramic, metadata } from '../../utils/ceramicHelpers.js' +import { newCeramic } from '../../utils/ceramicHelpers.js' +import { createDid } from '../../utils/didHelper.js' +import { newModel } from '../../models/modelConstants.js' +import { Model } from '@ceramicnetwork/stream-model' +import { indexModelOnNode } from '../../utils/composeDbHelpers.js' +import { StreamID } from '@ceramicnetwork/streamid' +import { ModelInstanceDocument } from '@ceramicnetwork/stream-model-instance' const delay = utilities.delay // Environment variables -const ComposeDbUrls = String(process.env.COMPOSEDB_URLS).split(',') +const composeDbUrls = String(process.env.COMPOSEDB_URLS).split(',') +const adminSeeds = String(process.env.COMPOSEDB_ADMIN_DID_SEEDS).split(',') /////////////////////////////////////////////////////////////////////////////// /// Create/Update Tests /////////////////////////////////////////////////////////////////////////////// -describe.skip('update', () => { - beforeAll(async () => await helpers.createTestTable()) +let modelId: StreamID + +describe('update', () => { + beforeAll(async () => { + await helpers.createTestTable() + + if (adminSeeds.length < composeDbUrls.length) { + throw new Error( + `Must provide an admin DID seed for each node. Number of nodes: ${composeDbUrls.length}, number of seeds: ${adminSeeds.length}`, + ) + } + + const did0 = await createDid(adminSeeds[0]) + const ceramicNode0 = await newCeramic(composeDbUrls[0], did0) + const model = await Model.create(ceramicNode0, newModel) + modelId = model.id + await indexModelOnNode(ceramicNode0, model.id) + + for (let i = 1; i < composeDbUrls.length; i++) { + const did = await createDid(adminSeeds[i]) + const node = await newCeramic(composeDbUrls[i], did) + await indexModelOnNode(node, model.id) + } + }) afterAll(async () => await helpers.cleanup()) // Run tests with each node being the node where a stream is created - generateUrlCombinations(ComposeDbUrls).forEach(testUpdate) + generateUrlCombinations(composeDbUrls).forEach(testUpdate) }) function generateUrlCombinations(urls: string[]): string[][] { @@ -33,24 +61,30 @@ function testUpdate(composeDbUrls: string[]) { const firstNodeUrl = composeDbUrls[0] const content = { step: 0 } let firstCeramic: CeramicClient.CeramicClient - let firstTile: TileDocument + let firstDocument: ModelInstanceDocument // Create and update on first node test(`create stream on ${firstNodeUrl}`, async () => { firstCeramic = await newCeramic(firstNodeUrl) - firstTile = await TileDocument.create(firstCeramic as StreamReaderWriter, content, metadata, { - anchor: false, - }) + const metadata = { controllers: [firstCeramic.did!.id], model: modelId } + firstDocument = await ModelInstanceDocument.create( + firstCeramic as StreamReaderWriter, + content, + metadata, + { + anchor: false, + }, + ) console.log( - `Created stream on ${firstNodeUrl}: ${firstTile.id.toString()} with step ${content.step}`, + `Created stream on ${firstNodeUrl}: ${firstDocument.id.toString()} with step ${content.step}`, ) }) test(`update stream on ${firstNodeUrl}`, async () => { content.step++ - await firstTile.update(content, undefined, { anchor: false }) + await firstDocument.replace(content, undefined, { anchor: false }) console.log( - `Updated stream on ${firstNodeUrl}: ${firstTile.id.toString()} with step ${content.step}`, + `Updated stream on ${firstNodeUrl}: ${firstDocument.id.toString()} with step ${content.step}`, ) }) @@ -58,17 +92,17 @@ function testUpdate(composeDbUrls: string[]) { // Skip first url because it was already handled in the previous tests for (let idx = 1; idx < composeDbUrls.length; idx++) { const apiUrl = composeDbUrls[idx] - let tile: TileDocument + let doc: ModelInstanceDocument test(`load stream on ${apiUrl}`, async () => { await delay(5) const ceramic = await newCeramic(apiUrl) console.log( - `Loading stream ${firstTile.id.toString()} on ${apiUrl} with step ${content.step}`, + `Loading stream ${firstDocument.id.toString()} on ${apiUrl} with step ${content.step}`, ) - tile = await TileDocument.load(ceramic, firstTile.id) - expect(tile.content).toEqual(content) + doc = await ModelInstanceDocument.load(ceramic, firstDocument.id) + expect(doc.content).toEqual(content) console.log( - `Loaded stream on ${apiUrl}: ${firstTile.id.toString()} successfully with step ${ + `Loaded stream on ${apiUrl}: ${firstDocument.id.toString()} successfully with step ${ content.step }`, ) @@ -81,25 +115,25 @@ function testUpdate(composeDbUrls: string[]) { // Update on first node and wait for update to propagate to other nodes via pubsub // Only anchor on the final write to avoid writes conflicting with anchors. console.log( - `Updating stream ${firstTile.id.toString()} on ${firstNodeUrl} so we can sync it on ${apiUrl} with step ${ + `Updating stream ${firstDocument.id.toString()} on ${firstNodeUrl} so we can sync it on ${apiUrl} with step ${ content.step }`, ) - await firstTile.update(content, undefined, { anchor: isFinalWriter }) + await firstDocument.replace(content, undefined, { anchor: isFinalWriter }) console.log(`Updating complete, sleeping 5 seconds before syncing`) await delay(5) console.log(`Sleep complete, syncing`) - await tile.sync({ sync: SyncOptions.NEVER_SYNC }) - expect(tile.content).toEqual(firstTile.content) + await doc.sync({ sync: SyncOptions.NEVER_SYNC }) + expect(doc.content).toEqual(firstDocument.content) console.log( - `Synced stream on ${apiUrl}: ${firstTile.id.toString()} successfully with step ${ + `Synced stream on ${apiUrl}: ${firstDocument.id.toString()} successfully with step ${ content.step }`, ) if (isFinalWriter) { // Store the anchor request in the DB - await helpers.storeStreamReq(firstTile.id) + await helpers.storeStreamReq(firstDocument.id) } }) } diff --git a/suite/src/models/modelConstants.ts b/suite/src/models/modelConstants.ts index 9f0ea379..ae998f64 100644 --- a/suite/src/models/modelConstants.ts +++ b/suite/src/models/modelConstants.ts @@ -7,7 +7,7 @@ export const newModel: ModelDefinition = { type: 'object', additionalProperties: false, properties: { - myData: { + step: { type: 'integer', minimum: 0, maximum: 10000, @@ -18,7 +18,3 @@ export const newModel: ModelDefinition = { type: 'list', }, } - -export const basicModelDocumentContent = { - myData: 2, -} diff --git a/suite/src/utils/ceramicHelpers.ts b/suite/src/utils/ceramicHelpers.ts index 5bbe87fe..3173d1d7 100644 --- a/suite/src/utils/ceramicHelpers.ts +++ b/suite/src/utils/ceramicHelpers.ts @@ -13,23 +13,21 @@ const did = new DID({ provider, resolver }) // 30 minutes for anchors to happen and be noticed (including potential failures and retries) export const DEFAULT_ANCHOR_TIMEOUT = 60 * 30 -export const metadata = { controllers: [] } export const newCeramic = async (apiUrl: string, didOverride?: DID) => { const ceramic = new CeramicClient(apiUrl, { syncInterval: 500 }) const effectiveDID = didOverride || did if (!effectiveDID.authenticated) { await effectiveDID.authenticate() - ;(metadata.controllers as string[]) = [effectiveDID.id] } - await ceramic.setDID(effectiveDID) + ceramic.did = effectiveDID return ceramic } function defaultMsgGenerator(stream: Stream) { const curTime = new Date().toISOString() return `Waiting for stream ${stream.id.toString()} to hit a specific stream state. Current time: ${curTime}. Current stream state: ${JSON.stringify( - StreamUtils.serializeState(stream.state) + StreamUtils.serializeState(stream.state), )}` } @@ -39,7 +37,7 @@ async function withTimeout(prom: Promise, timeoutSecs: number) { setTimeout(() => { const curTime = new Date().toISOString() reject( - `Timed out after ${timeoutSecs} seconds. Current time: ${curTime}, start time: ${startTime}` + `Timed out after ${timeoutSecs} seconds. Current time: ${curTime}, start time: ${startTime}`, ) }, timeoutSecs * 1000) prom.then(resolve) @@ -59,7 +57,7 @@ export async function waitForCondition( stream: Stream, condition: (stream: StreamState) => boolean, timeoutSecs: number, - msgGenerator?: (stream: Stream) => string + msgGenerator?: (stream: Stream) => string, ): Promise { const waiter = stream .pipe( @@ -71,7 +69,7 @@ export async function waitForCondition( console.debug(msg) return false }), - take(1) + take(1), ) .toPromise() @@ -82,19 +80,19 @@ export async function waitForCondition( console.debug( `Stream ${stream.id.toString()} successfully reached desired state. Current stream state: ${JSON.stringify( - StreamUtils.serializeState(stream.state) - )}` + StreamUtils.serializeState(stream.state), + )}`, ) } export async function waitForAnchor( stream: any, - timeoutSecs: number = DEFAULT_ANCHOR_TIMEOUT + timeoutSecs: number = DEFAULT_ANCHOR_TIMEOUT, ): Promise { const msgGenerator = function (stream: Stream) { const curTime = new Date().toISOString() return `Waiting for stream ${stream.id.toString()} to be anchored. Current time: ${curTime}. Current stream state: ${JSON.stringify( - StreamUtils.serializeState(stream.state) + StreamUtils.serializeState(stream.state), )}` } await waitForCondition( @@ -103,6 +101,6 @@ export async function waitForAnchor( return state.anchorStatus == AnchorStatus.ANCHORED }, timeoutSecs, - msgGenerator + msgGenerator, ) } diff --git a/suite/src/utils/composeDbHelpers.ts b/suite/src/utils/composeDbHelpers.ts index a2c07462..efe10e6a 100644 --- a/suite/src/utils/composeDbHelpers.ts +++ b/suite/src/utils/composeDbHelpers.ts @@ -1,8 +1,9 @@ import { ComposeClient } from '@composedb/client' -import { utilities } from './common' +import { utilities } from './common.js' import { CeramicClient } from '@ceramicnetwork/http-client' import { ModelInstanceDocument } from '@ceramicnetwork/stream-model-instance' import { StreamID } from '@ceramicnetwork/streamid' +import { CommonTestUtils as TestUtils } from '@ceramicnetwork/common-test-utils' const delay = utilities.delay const delayMs = utilities.delayMs @@ -114,3 +115,18 @@ export async function waitForIndexingOrTimeout( throw new Error(`Timeout waiting for indexing model: ${modelId}`) } + +export async function indexModelOnNode( + node: CeramicClient, + modelId: StreamID, + timeoutMs = 1000 * 10, +): Promise { + await TestUtils.waitForConditionOrTimeout(async () => + node + .loadStream(modelId) + .then((_) => true) + .catch((_) => false), + ) + await node.admin.startIndexingModels([modelId]) + await waitForIndexingOrTimeout(node, modelId, timeoutMs) +}