diff --git a/src/ff/fast-forward.ts b/src/ff/fast-forward.ts new file mode 100644 index 0000000..b023a1a --- /dev/null +++ b/src/ff/fast-forward.ts @@ -0,0 +1,74 @@ +import { ClientRecord } from "../types/client-record"; +import { ClientID } from "../types/client-state"; +import { NullableVersion, Version } from "../types/version"; +import { ClientPokeBody } from "../types/client-poke-body"; +import { getPatch } from "./get-patch"; +import { Patch } from "../protocol/poke"; +import { must } from "../util/must"; + +export type GetClientRecord = (clientID: ClientID) => Promise; + +/** + * Returns zero or more pokes necessary to fast forward any clients in a room + * that are behind head. + * @param roomID room to fast-forward + * @param clients clients active in room + * @param getClientRecord function to get a client record by ID + * @param currentVersion head version to fast-forward to + * @param executor raw DB executor for finding entries by version quickly + * @param timestamp for resulting pokes + * @returns + */ +export async function fastForwardRoom( + clients: ClientID[], + getClientRecord: GetClientRecord, + currentVersion: Version, + durable: DurableObjectStorage, + timestamp: number +): Promise { + // Load all the client records in parallel + const getMapEntry = async (clientID: ClientID) => + [clientID, await getClientRecord(clientID)] as [ClientID, ClientRecord]; + const records = new Map(await Promise.all(clients.map(getMapEntry))); + + // Get all of the distinct base cookies. Typically almost all members of + // room will have same base cookie. No need to recalculate over and over. + const distinctBaseCookies = new Set( + [...records.values()].map((r) => r.baseCookie) + ); + + // No need to calculate a patch for the current version! + distinctBaseCookies.delete(currentVersion); + + // Calculate all the distinct patches in parallel + const getPatchEntry = async (baseCookie: NullableVersion) => + [baseCookie, await getPatch(durable, baseCookie ?? 0)] as [ + NullableVersion, + Patch + ]; + const distinctPatches = new Map( + await Promise.all([...distinctBaseCookies].map(getPatchEntry)) + ); + + const ret: ClientPokeBody[] = []; + for (const clientID of clients) { + const record = must(records.get(clientID)); + if (record.baseCookie === currentVersion) { + continue; + } + const patch = must(distinctPatches.get(record.baseCookie)); + const poke: ClientPokeBody = { + clientID, + poke: { + baseCookie: record.baseCookie, + cookie: currentVersion, + lastMutationID: record.lastMutationID, + timestamp, + patch, + }, + }; + ret.push(poke); + } + + return ret; +} diff --git a/src/ff/get-patch.ts b/src/ff/get-patch.ts new file mode 100644 index 0000000..b8d46de --- /dev/null +++ b/src/ff/get-patch.ts @@ -0,0 +1,39 @@ +import { Patch } from "../protocol/poke"; +import { userValuePrefix, userValueSchema } from "../types/user-value"; +import { Version } from "../types/version"; + +export async function getPatch( + durable: DurableObjectStorage, + fromCookie: Version +): Promise { + const result = await durable.list({ + prefix: userValuePrefix, + allowConcurrency: true, + }); + + const patch: Patch = []; + for (const [key, value] of result) { + const validValue = userValueSchema.parse(value); + + // TODO: More efficient way of finding changed values. + if (validValue.version <= fromCookie) { + continue; + } + + const unwrappedKey = key.substring(userValuePrefix.length); + const unwrappedValue = validValue.value; + if (validValue.deleted) { + patch.push({ + op: "del", + key: unwrappedKey, + }); + } else { + patch.push({ + op: "put", + key: unwrappedKey, + value: unwrappedValue, + }); + } + } + return patch; +} diff --git a/test/db/data.test.ts b/test/db/data.test.ts index 0903459..57da306 100644 --- a/test/db/data.test.ts +++ b/test/db/data.test.ts @@ -138,15 +138,12 @@ test("delEntry", async () => { ]; for (const c of cases) { - storage.delete("foo"); + await storage.delete("foo"); if (c.exists) { - storage.put("foo", 42); + await storage.put("foo", 42); } - // eslint-disable-next-line @typescript-eslint/no-explicit-any - let error: any | undefined; - await delEntry(storage, "foo").catch((e) => (error = String(e))); - + await delEntry(storage, "foo"); const value = storage.get("foo"); expect(value).toBeUndefined; } diff --git a/test/ff/fast-forward.test.ts b/test/ff/fast-forward.test.ts new file mode 100644 index 0000000..8d71932 --- /dev/null +++ b/test/ff/fast-forward.test.ts @@ -0,0 +1,216 @@ +import { DurableStorage } from "../../src/storage/durable-storage"; +import { ClientPokeBody } from "../../src/types/client-poke-body"; +import { + ClientRecord, + getClientRecord, + putClientRecord, +} from "../../src/types/client-record"; +import { ClientID } from "../../src/types/client-state"; +import { RoomID } from "../../src/types/room-state"; +import { putUserValue, UserValue } from "../../src/types/user-value"; +import { must } from "../../src/util/must"; +import { fastForwardRoom } from "../../src/ff/fast-forward"; + +const { COUNTER } = getMiniflareBindings(); +const id = COUNTER.newUniqueId(); + +test("fastForward", async () => { + type Case = { + name: string; + state: Map; + clientRecords: Map; + roomID: RoomID; + clients: ClientID[]; + timestamp: number; + expectedError?: string; + expectedPokes?: ClientPokeBody[]; + }; + + const cases: Case[] = [ + { + name: "no clients", + state: new Map([["foo", { value: "bar", version: 1, deleted: false }]]), + clientRecords: new Map([["c1", { lastMutationID: 1, baseCookie: 0 }]]), + roomID: "r1", + clients: [], + timestamp: 1, + expectedPokes: [], + }, + { + name: "no data", + state: new Map(), + clientRecords: new Map([["c1", { lastMutationID: 1, baseCookie: 0 }]]), + roomID: "r1", + clients: ["c1"], + timestamp: 1, + expectedPokes: [ + { + clientID: "c1", + poke: { + baseCookie: 0, + cookie: 42, + lastMutationID: 1, + patch: [], + timestamp: 1, + }, + }, + ], + }, + { + name: "up to date", + state: new Map(), + clientRecords: new Map([["c1", { lastMutationID: 1, baseCookie: 42 }]]), + roomID: "r1", + clients: ["c1"], + timestamp: 1, + expectedPokes: [], + }, + { + name: "one client two changes", + state: new Map([ + ["foo", { value: "bar", version: 42, deleted: false }], + ["hot", { value: "dog", version: 42, deleted: true }], + ]), + clientRecords: new Map([["c1", { lastMutationID: 3, baseCookie: 41 }]]), + roomID: "r1", + clients: ["c1"], + timestamp: 1, + expectedPokes: [ + { + clientID: "c1", + poke: { + baseCookie: 41, + cookie: 42, + lastMutationID: 3, + patch: [ + { + op: "put", + key: "foo", + value: "bar", + }, + { + op: "del", + key: "hot", + }, + ], + timestamp: 1, + }, + }, + ], + }, + { + name: "two clients different changes", + state: new Map([ + ["foo", { value: "bar", version: 41, deleted: false }], + ["hot", { value: "dog", version: 42, deleted: true }], + ]), + clientRecords: new Map([ + ["c1", { lastMutationID: 3, baseCookie: 40 }], + ["c2", { lastMutationID: 1, baseCookie: 41 }], + ]), + roomID: "r1", + clients: ["c1", "c2"], + timestamp: 1, + expectedPokes: [ + { + clientID: "c1", + poke: { + baseCookie: 40, + cookie: 42, + lastMutationID: 3, + patch: [ + { + op: "put", + key: "foo", + value: "bar", + }, + { + op: "del", + key: "hot", + }, + ], + timestamp: 1, + }, + }, + { + clientID: "c2", + poke: { + baseCookie: 41, + cookie: 42, + lastMutationID: 1, + patch: [ + { + op: "del", + key: "hot", + }, + ], + timestamp: 1, + }, + }, + ], + }, + { + name: "two clients with changes but only one active", + state: new Map([ + ["foo", { value: "bar", version: 41, deleted: false }], + ["hot", { value: "dog", version: 42, deleted: true }], + ]), + clientRecords: new Map([ + ["c1", { lastMutationID: 3, baseCookie: 40 }], + ["c2", { lastMutationID: 1, baseCookie: 41 }], + ]), + roomID: "r1", + clients: ["c1"], + timestamp: 1, + expectedPokes: [ + { + clientID: "c1", + poke: { + baseCookie: 40, + cookie: 42, + lastMutationID: 3, + patch: [ + { + op: "put", + key: "foo", + value: "bar", + }, + { + op: "del", + key: "hot", + }, + ], + timestamp: 1, + }, + }, + ], + }, + ]; + + const durable = await getMiniflareDurableObjectStorage(id); + + for (const c of cases) { + await durable.deleteAll(); + const storage = new DurableStorage(durable); + for (const [clientID, clientRecord] of c.clientRecords) { + await putClientRecord(clientID, clientRecord, storage); + } + for (const [key, value] of c.state) { + await putUserValue(key, value, storage); + } + + const gcr = async (clientID: ClientID) => { + return must(await getClientRecord(clientID, storage)); + }; + + const pokes = await fastForwardRoom( + c.clients, + gcr, + 42, + durable, + c.timestamp + ); + + expect(pokes).toEqual(c.expectedPokes); + } +}); diff --git a/test/ff/get-patch.test.ts b/test/ff/get-patch.test.ts new file mode 100644 index 0000000..f6b8f8e --- /dev/null +++ b/test/ff/get-patch.test.ts @@ -0,0 +1,125 @@ +import { PatchOperation } from "replicache"; +import { getPatch } from "../../src/ff/get-patch"; +import { Version } from "../../src/types/version"; +import { ReplicacheTransaction } from "../../src/storage/replicache-transaction"; +import { DurableStorage } from "../../src/storage/durable-storage"; + +const { COUNTER } = getMiniflareBindings(); +const id = COUNTER.newUniqueId(); + +test("getPatch", async () => { + type Case = { + name: string; + // undefined value means delete + muts?: { key: string; value?: number; version: number }[]; + fromCookie: Version; + expected: PatchOperation[]; + }; + + const cases: Case[] = [ + { + name: "add a, diff from null", + muts: [{ key: "a", value: 1, version: 2 }], + fromCookie: 0, + expected: [ + { + op: "put", + key: "a", + value: 1, + }, + ], + }, + { + name: "add a, diff from 1", + fromCookie: 1, + expected: [ + { + op: "put", + key: "a", + value: 1, + }, + ], + }, + { + name: "add a, diff from 2", + fromCookie: 2, + expected: [], + }, + { + name: "add a + b, diff from null", + muts: [{ key: "b", value: 2, version: 3 }], + fromCookie: 0, + expected: [ + { + op: "put", + key: "a", + value: 1, + }, + { + op: "put", + key: "b", + value: 2, + }, + ], + }, + { + name: "add a + b, diff from 2", + muts: [], + fromCookie: 2, + expected: [ + { + op: "put", + key: "b", + value: 2, + }, + ], + }, + { + name: "add a + b, diff from 3", + muts: [], + fromCookie: 3, + expected: [], + }, + { + name: "add a + b, diff from 4", + muts: [], + fromCookie: 4, + expected: [], + }, + { + name: "del a, diff from 3", + muts: [{ key: "a", version: 4 }], + fromCookie: 3, + expected: [ + { + op: "del", + key: "a", + }, + ], + }, + { + name: "del a, diff from 4", + fromCookie: 4, + expected: [], + }, + ]; + + const storage = await getMiniflareDurableObjectStorage(id); + + for (const c of cases) { + for (const p of c.muts || []) { + const tx = new ReplicacheTransaction( + new DurableStorage(storage), + "c1", + p.version + ); + if (p.value !== undefined) { + await tx.put(p.key, p.value); + } else { + await tx.del(p.key); + } + } + const patch = await getPatch(storage, c.fromCookie); + expect(patch).toEqual(c.expected); + } +}); diff --git a/test/util/test-utils.ts b/test/util/test-utils.ts index 02fba76..116a14e 100644 --- a/test/util/test-utils.ts +++ b/test/util/test-utils.ts @@ -1,9 +1,9 @@ -import { JSONType } from "../protocol/json"; -import { Mutation } from "../protocol/push"; -import { ClientMutation } from "../types/client-mutation"; -import { ClientID, ClientState, Socket } from "../types/client-state"; -import { RoomID, RoomMap, RoomState } from "../types/room-state"; -import { NullableVersion } from "../types/version"; +import { JSONType } from "../../src/protocol/json"; +import { Mutation } from "../../src/protocol/push"; +import { ClientMutation } from "../../src/types/client-mutation"; +import { ClientID, ClientState, Socket } from "../../src/types/client-state"; +import { RoomID, RoomMap, RoomState } from "../../src/types/room-state"; +import { NullableVersion } from "../../src/types/version"; export function roomMap(...rooms: [RoomID, RoomState][]): RoomMap { return new Map(rooms); diff --git a/tsconfig.json b/tsconfig.json index 5ce8215..454944b 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -13,5 +13,5 @@ "@/*": ["src/*"] } }, - "include": ["src/**/*", "bindings.d.ts", "test/storage/replicache-transaction.test.ts", "test/storage/entry-cache.text.ts", "test/storage/db-storage.test.ts", "test/util/test-utils.ts", "test/util/peek-iterator.test.ts", "test/util/lock.test.ts", "test/util/deep-clone.test.ts", "test/db/data.test.ts"] + "include": ["src/**/*", "bindings.d.ts", "test/storage/replicache-transaction.test.ts", "test/storage/entry-cache.text.ts", "test/storage/db-storage.test.ts", "test/util/test-utils.ts", "test/util/peek-iterator.test.ts", "test/util/lock.test.ts", "test/util/deep-clone.test.ts", "test/db/data.test.ts", "test/ff/fast-forward.test.ts", "test/ff/get-patch.test.ts"] }