Skip to content

Commit

Permalink
Add ff.
Browse files Browse the repository at this point in the history
  • Loading branch information
aboodman committed Jan 23, 2022
1 parent bfac12f commit 3131b53
Show file tree
Hide file tree
Showing 7 changed files with 464 additions and 13 deletions.
74 changes: 74 additions & 0 deletions src/ff/fast-forward.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import { ClientRecord } from "../types/client-record";
import { ClientID } from "../types/client-state";
import { NullableVersion, Version } from "../types/version";
import { ClientPokeBody } from "../types/client-poke-body";
import { getPatch } from "./get-patch";
import { Patch } from "../protocol/poke";
import { must } from "../util/must";

export type GetClientRecord = (clientID: ClientID) => Promise<ClientRecord>;

/**
* Returns zero or more pokes necessary to fast forward any clients in a room
* that are behind head.
* @param roomID room to fast-forward
* @param clients clients active in room
* @param getClientRecord function to get a client record by ID
* @param currentVersion head version to fast-forward to
* @param executor raw DB executor for finding entries by version quickly
* @param timestamp for resulting pokes
* @returns
*/
export async function fastForwardRoom(
clients: ClientID[],
getClientRecord: GetClientRecord,
currentVersion: Version,
durable: DurableObjectStorage,
timestamp: number
): Promise<ClientPokeBody[]> {
// Load all the client records in parallel
const getMapEntry = async (clientID: ClientID) =>
[clientID, await getClientRecord(clientID)] as [ClientID, ClientRecord];
const records = new Map(await Promise.all(clients.map(getMapEntry)));

// Get all of the distinct base cookies. Typically almost all members of
// room will have same base cookie. No need to recalculate over and over.
const distinctBaseCookies = new Set(
[...records.values()].map((r) => r.baseCookie)
);

// No need to calculate a patch for the current version!
distinctBaseCookies.delete(currentVersion);

// Calculate all the distinct patches in parallel
const getPatchEntry = async (baseCookie: NullableVersion) =>
[baseCookie, await getPatch(durable, baseCookie ?? 0)] as [
NullableVersion,
Patch
];
const distinctPatches = new Map(
await Promise.all([...distinctBaseCookies].map(getPatchEntry))
);

const ret: ClientPokeBody[] = [];
for (const clientID of clients) {
const record = must(records.get(clientID));
if (record.baseCookie === currentVersion) {
continue;
}
const patch = must(distinctPatches.get(record.baseCookie));
const poke: ClientPokeBody = {
clientID,
poke: {
baseCookie: record.baseCookie,
cookie: currentVersion,
lastMutationID: record.lastMutationID,
timestamp,
patch,
},
};
ret.push(poke);
}

return ret;
}
39 changes: 39 additions & 0 deletions src/ff/get-patch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { Patch } from "../protocol/poke";
import { userValuePrefix, userValueSchema } from "../types/user-value";
import { Version } from "../types/version";

export async function getPatch(
durable: DurableObjectStorage,
fromCookie: Version
): Promise<Patch> {
const result = await durable.list({
prefix: userValuePrefix,
allowConcurrency: true,
});

const patch: Patch = [];
for (const [key, value] of result) {
const validValue = userValueSchema.parse(value);

// TODO: More efficient way of finding changed values.
if (validValue.version <= fromCookie) {
continue;
}

const unwrappedKey = key.substring(userValuePrefix.length);
const unwrappedValue = validValue.value;
if (validValue.deleted) {
patch.push({
op: "del",
key: unwrappedKey,
});
} else {
patch.push({
op: "put",
key: unwrappedKey,
value: unwrappedValue,
});
}
}
return patch;
}
9 changes: 3 additions & 6 deletions test/db/data.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,15 +138,12 @@ test("delEntry", async () => {
];

for (const c of cases) {
storage.delete("foo");
await storage.delete("foo");
if (c.exists) {
storage.put("foo", 42);
await storage.put("foo", 42);
}

// eslint-disable-next-line @typescript-eslint/no-explicit-any
let error: any | undefined;
await delEntry(storage, "foo").catch((e) => (error = String(e)));

await delEntry(storage, "foo");
const value = storage.get("foo");
expect(value).toBeUndefined;
}
Expand Down
216 changes: 216 additions & 0 deletions test/ff/fast-forward.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
import { DurableStorage } from "../../src/storage/durable-storage";
import { ClientPokeBody } from "../../src/types/client-poke-body";
import {
ClientRecord,
getClientRecord,
putClientRecord,
} from "../../src/types/client-record";
import { ClientID } from "../../src/types/client-state";
import { RoomID } from "../../src/types/room-state";
import { putUserValue, UserValue } from "../../src/types/user-value";
import { must } from "../../src/util/must";
import { fastForwardRoom } from "../../src/ff/fast-forward";

const { COUNTER } = getMiniflareBindings();
const id = COUNTER.newUniqueId();

test("fastForward", async () => {
type Case = {
name: string;
state: Map<string, UserValue>;
clientRecords: Map<string, ClientRecord>;
roomID: RoomID;
clients: ClientID[];
timestamp: number;
expectedError?: string;
expectedPokes?: ClientPokeBody[];
};

const cases: Case[] = [
{
name: "no clients",
state: new Map([["foo", { value: "bar", version: 1, deleted: false }]]),
clientRecords: new Map([["c1", { lastMutationID: 1, baseCookie: 0 }]]),
roomID: "r1",
clients: [],
timestamp: 1,
expectedPokes: [],
},
{
name: "no data",
state: new Map(),
clientRecords: new Map([["c1", { lastMutationID: 1, baseCookie: 0 }]]),
roomID: "r1",
clients: ["c1"],
timestamp: 1,
expectedPokes: [
{
clientID: "c1",
poke: {
baseCookie: 0,
cookie: 42,
lastMutationID: 1,
patch: [],
timestamp: 1,
},
},
],
},
{
name: "up to date",
state: new Map(),
clientRecords: new Map([["c1", { lastMutationID: 1, baseCookie: 42 }]]),
roomID: "r1",
clients: ["c1"],
timestamp: 1,
expectedPokes: [],
},
{
name: "one client two changes",
state: new Map([
["foo", { value: "bar", version: 42, deleted: false }],
["hot", { value: "dog", version: 42, deleted: true }],
]),
clientRecords: new Map([["c1", { lastMutationID: 3, baseCookie: 41 }]]),
roomID: "r1",
clients: ["c1"],
timestamp: 1,
expectedPokes: [
{
clientID: "c1",
poke: {
baseCookie: 41,
cookie: 42,
lastMutationID: 3,
patch: [
{
op: "put",
key: "foo",
value: "bar",
},
{
op: "del",
key: "hot",
},
],
timestamp: 1,
},
},
],
},
{
name: "two clients different changes",
state: new Map([
["foo", { value: "bar", version: 41, deleted: false }],
["hot", { value: "dog", version: 42, deleted: true }],
]),
clientRecords: new Map([
["c1", { lastMutationID: 3, baseCookie: 40 }],
["c2", { lastMutationID: 1, baseCookie: 41 }],
]),
roomID: "r1",
clients: ["c1", "c2"],
timestamp: 1,
expectedPokes: [
{
clientID: "c1",
poke: {
baseCookie: 40,
cookie: 42,
lastMutationID: 3,
patch: [
{
op: "put",
key: "foo",
value: "bar",
},
{
op: "del",
key: "hot",
},
],
timestamp: 1,
},
},
{
clientID: "c2",
poke: {
baseCookie: 41,
cookie: 42,
lastMutationID: 1,
patch: [
{
op: "del",
key: "hot",
},
],
timestamp: 1,
},
},
],
},
{
name: "two clients with changes but only one active",
state: new Map([
["foo", { value: "bar", version: 41, deleted: false }],
["hot", { value: "dog", version: 42, deleted: true }],
]),
clientRecords: new Map([
["c1", { lastMutationID: 3, baseCookie: 40 }],
["c2", { lastMutationID: 1, baseCookie: 41 }],
]),
roomID: "r1",
clients: ["c1"],
timestamp: 1,
expectedPokes: [
{
clientID: "c1",
poke: {
baseCookie: 40,
cookie: 42,
lastMutationID: 3,
patch: [
{
op: "put",
key: "foo",
value: "bar",
},
{
op: "del",
key: "hot",
},
],
timestamp: 1,
},
},
],
},
];

const durable = await getMiniflareDurableObjectStorage(id);

for (const c of cases) {
await durable.deleteAll();
const storage = new DurableStorage(durable);
for (const [clientID, clientRecord] of c.clientRecords) {
await putClientRecord(clientID, clientRecord, storage);
}
for (const [key, value] of c.state) {
await putUserValue(key, value, storage);
}

const gcr = async (clientID: ClientID) => {
return must(await getClientRecord(clientID, storage));
};

const pokes = await fastForwardRoom(
c.clients,
gcr,
42,
durable,
c.timestamp
);

expect(pokes).toEqual(c.expectedPokes);
}
});
Loading

0 comments on commit 3131b53

Please sign in to comment.