Skip to content

Commit

Permalink
Implement processRoom()
Browse files Browse the repository at this point in the history
  • Loading branch information
aboodman committed Dec 17, 2021
1 parent 1c9e2d0 commit 2d4fc0c
Show file tree
Hide file tree
Showing 9 changed files with 521 additions and 111 deletions.
2 changes: 1 addition & 1 deletion backend/process/generate-merged-mutations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ export function* generateMergedMutations(clients: ClientMap) {
}
const { value, done } = next.peek();
assert(!done);
yield value;
yield value as ClientMutation;
next.next();
insertIterator(next);
//dumpIterators("after insert");
Expand Down
141 changes: 62 additions & 79 deletions backend/process/process-frame.test.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
import { MemStorage } from "../storage/mem-storage";
import { ClientMutation } from "../types/client-mutation";
import { ClientPokeBody } from "../types/client-poke-body";
import { clientRecordKey, putClientRecord } from "../types/client-record";
import { ClientID } from "../types/client-state";
import { versionKey, versionSchema } from "../types/version";
import { clientMutation, clientRecord, userValue } from "../util/test-utils";
import { expect } from "chai";
import { test } from "mocha";
import { JSONType } from "protocol/json";
import { WriteTransaction } from "replicache";
import { z } from "zod";
import { MemStorage } from "../storage/mem-storage";
import { ClientMutation } from "../types/client-mutation";
import { ClientPokeBody } from "../types/client-poke-body";
import { ClientRecord, clientRecordKey } from "../types/client-record";
import { ClientID } from "../types/client-state";
import { UserValue, userValueKey } from "../types/user-value";
import { Version, versionKey } from "../types/version";
import { PeekIterator } from "../util/peek-iterator";
import { clientMutation, clientRecord, userValue } from "../util/test-utils";
import { processFrame } from "./process-frame";
import { userValueKey } from "../types/user-value";

test("processFrame", async () => {
const records = new Map([
["c1", clientRecord(null, 1)],
["c2", clientRecord(1, 7)],
[clientRecordKey("c1"), clientRecord(null, 1)],
[clientRecordKey("c2"), clientRecord(1, 7)],
]);
const startTime = 100;
const endTime = 200;
Expand All @@ -28,7 +29,9 @@ test("processFrame", async () => {
mutations: ClientMutation[];
clients: ClientID[];
expectedPokes: ClientPokeBody[];
expectedState: Record<string, JSONType>;
expectedUserValues: Map<string, UserValue>;
expectedClientRecords: Map<string, ClientRecord>;
expectedVersion: Version;
};

const mutators = new Map(
Expand All @@ -45,43 +48,24 @@ test("processFrame", async () => {
})
);

const baseExpectedState = {
[versionKey]: endVersion,
[clientRecordKey("c1")]: records.get("c1")!,
[clientRecordKey("c2")]: records.get("c2")!,
};

const cases: Case[] = [
{
name: "no mutations, no clients",
mutations: [],
clients: [],
expectedPokes: [],
expectedState: baseExpectedState,
expectedUserValues: new Map(),
expectedClientRecords: records,
expectedVersion: startVersion,
},
{
name: "no mutations, one client",
mutations: [],
clients: ["c1"],
expectedPokes: [
{
clientID: "c1",
poke: {
baseCookie: startVersion,
cookie: endVersion,
lastMutationID: 1,
patch: [],
timestamp: startTime,
},
},
],
expectedState: {
...baseExpectedState,
[clientRecordKey("c1")]: {
baseCookie: endVersion,
lastMutationID: 1,
},
},
expectedPokes: [],
expectedUserValues: new Map(),
expectedClientRecords: records,
expectedVersion: startVersion,
},
{
name: "one mutation, one client",
Expand All @@ -105,14 +89,14 @@ test("processFrame", async () => {
},
},
],
expectedState: {
...baseExpectedState,
[clientRecordKey("c1")]: {
baseCookie: endVersion,
lastMutationID: 2,
},
[userValueKey("foo")]: userValue("bar", endVersion),
},
expectedUserValues: new Map([
[userValueKey("foo"), userValue("bar", endVersion)],
]),
expectedClientRecords: new Map([
...records,
[clientRecordKey("c1"), clientRecord(endVersion, 2)],
]),
expectedVersion: endVersion,
},
{
name: "one mutation, two clients",
Expand Down Expand Up @@ -152,18 +136,14 @@ test("processFrame", async () => {
},
},
],
expectedState: {
...baseExpectedState,
[clientRecordKey("c1")]: {
baseCookie: endVersion,
lastMutationID: 2,
},
[clientRecordKey("c2")]: {
baseCookie: endVersion,
lastMutationID: 7,
},
[userValueKey("foo")]: userValue("bar", endVersion),
},
expectedUserValues: new Map([
[userValueKey("foo"), userValue("bar", endVersion)],
]),
expectedClientRecords: new Map([
[clientRecordKey("c1"), clientRecord(endVersion, 2)],
[clientRecordKey("c2"), clientRecord(endVersion, 7)],
]),
expectedVersion: endVersion,
},
{
name: "two mutations, one client, one key",
Expand All @@ -190,14 +170,14 @@ test("processFrame", async () => {
},
},
],
expectedState: {
...baseExpectedState,
[clientRecordKey("c1")]: {
baseCookie: endVersion,
lastMutationID: 3,
},
[userValueKey("foo")]: userValue("baz", endVersion),
},
expectedUserValues: new Map([
[userValueKey("foo"), userValue("baz", endVersion)],
]),
expectedClientRecords: new Map([
...records,
[clientRecordKey("c1"), clientRecord(endVersion, 3)],
]),
expectedVersion: endVersion,
},
{
name: "frame cutoff",
Expand Down Expand Up @@ -225,14 +205,14 @@ test("processFrame", async () => {
},
},
],
expectedState: {
...baseExpectedState,
[clientRecordKey("c1")]: {
baseCookie: endVersion,
lastMutationID: 3,
},
[userValueKey("foo")]: userValue("baz", endVersion),
},
expectedUserValues: new Map([
[userValueKey("foo"), userValue("baz", endVersion)],
]),
expectedClientRecords: new Map([
...records,
[clientRecordKey("c1"), clientRecord(endVersion, 3)],
]),
expectedVersion: endVersion,
},
];

Expand All @@ -241,11 +221,11 @@ test("processFrame", async () => {

await storage.put(versionKey, startVersion);
for (const [key, value] of records) {
await putClientRecord(key, value, storage);
await storage.put(key, value);
}

const result = await processFrame(
c.mutations[Symbol.iterator](),
new PeekIterator(c.mutations[Symbol.iterator]()),
mutators,
c.clients,
storage,
Expand All @@ -255,10 +235,13 @@ test("processFrame", async () => {

expect(result, c.name).deep.equal(c.expectedPokes);

expect(await storage.get(versionKey, versionSchema)).equal(endVersion);

expect(storage.size, c.name).equal(Object.keys(c.expectedState).length);
for (const [key, value] of Object.entries(c.expectedState)) {
const expectedState = new Map([
...(c.expectedUserValues as Map<string, JSONType>),
...(c.expectedClientRecords as Map<string, JSONType>),
[versionKey, c.expectedVersion],
]);
expect(storage.size, c.name).equal(expectedState.size);
for (const [key, value] of expectedState) {
expect(await storage.get(key, z.any()), c.name).deep.equal(value);
}
}
Expand Down
27 changes: 16 additions & 11 deletions backend/process/process-frame.ts
Original file line number Diff line number Diff line change
@@ -1,39 +1,44 @@
import { EntryCache } from "../storage/entry-cache";
import { unwrapPatch } from "../storage/replicache-transaction";
import { Storage } from "../storage/storage";
import { ClientPokeBody } from "../types/client-poke-body";
import { PeekIterator } from "../util/peek-iterator";
import { ClientMutation } from "../types/client-mutation";
import { ClientPokeBody } from "../types/client-poke-body";
import { getClientRecord, putClientRecord } from "../types/client-record";
import { ClientID } from "../types/client-state";
import { versionKey, versionSchema } from "../types/version";
import { getVersion } from "../types/version";
import { must } from "../util/must";
import { PeekIterator } from "../util/peek-iterator";
import { MutatorMap, processMutation } from "./process-mutation";
import { unwrapPatch } from "../storage/replicache-transaction";

// Processes zero or more mutations as a single "frame", returning pokes.
// Pokes are returned if the version changes, even if there is no patch,
// because we need clients to be in sync with server version so that pokes
// can continue to apply.
export async function processFrame(
mutations: Iterator<ClientMutation>,
mutations: PeekIterator<ClientMutation>,
mutators: MutatorMap,
clients: ClientID[],
storage: Storage,
startTime: number,
endTime: number
): Promise<ClientPokeBody[]> {
const cache = new EntryCache(storage);
const prevVersion = (await cache.get(versionKey, versionSchema))!;
const prevVersion = must(await getVersion(cache));
const nextVersion = (prevVersion ?? 0) + 1;

await cache.put(versionKey, nextVersion);

for (const it = new PeekIterator(mutations); !it.peek().done; it.next()) {
const { value: mutation } = it.peek();
for (; !mutations.peek().done; mutations.next()) {
const { value: mutation } = mutations.peek();
if (mutation!.timestamp >= endTime) {
break;
}
await processMutation(mutation!, mutators, cache, nextVersion);
}

const patch = unwrapPatch(cache.pending());
if (must(await getVersion(cache)) === prevVersion) {
return [];
}

const patch = unwrapPatch(cache.pending());
const ret: ClientPokeBody[] = [];
for (const clientID of clients) {
const clientRecord = (await getClientRecord(clientID, cache))!;
Expand Down
27 changes: 19 additions & 8 deletions backend/process/process-mutation.test.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
import { expect } from "chai";
import { test } from "mocha";
import { JSONType } from "protocol/json";
import { WriteTransaction } from "replicache";
import { MemStorage } from "../storage/mem-storage";
import { ClientMutation } from "../types/client-mutation";
import {
ClientRecord,
getClientRecord,
putClientRecord,
} from "../types/client-record";
import { MemStorage } from "../storage/mem-storage";
import { clientMutation, clientRecord, mutation } from "../util/test-utils";
import { expect } from "chai";
import { test } from "mocha";
import { JSONType } from "protocol/json";
import { WriteTransaction } from "replicache";
import { MutatorMap, processMutation } from "./process-mutation";
import { getUserValue } from "../types/user-value";
import { ClientMutation } from "backend/types/client-mutation";
import { getVersion } from "../types/version";
import { clientMutation, clientRecord } from "../util/test-utils";
import { MutatorMap, processMutation } from "./process-mutation";

test("processMutation", async () => {
type Case = {
Expand All @@ -21,6 +22,7 @@ test("processMutation", async () => {
expectedError?: string;
expectedRecord?: ClientRecord;
expectAppWrite: boolean;
expectVersionWrite: boolean;
};

const cases: Case[] = [
Expand All @@ -29,41 +31,47 @@ test("processMutation", async () => {
mutation: clientMutation("c1", 1),
expectedError: "Error: Client c1 not found",
expectAppWrite: false,
expectVersionWrite: false,
},
{
name: "duplicate mutation",
existingRecord: clientRecord(null, 1),
mutation: clientMutation("c1", 1),
expectedRecord: clientRecord(null, 1),
expectAppWrite: false,
expectVersionWrite: false,
},
{
name: "ooo mutation",
existingRecord: clientRecord(null, 1),
mutation: clientMutation("c1", 3),
expectedRecord: clientRecord(null, 1),
expectAppWrite: false,
expectVersionWrite: false,
},
{
name: "unknown mutator",
existingRecord: clientRecord(null, 1),
mutation: clientMutation("c1", 2, "unknown"),
expectedRecord: clientRecord(null, 2),
expectAppWrite: false,
expectVersionWrite: true,
},
{
name: "mutator throws",
existingRecord: clientRecord(null, 1),
mutation: clientMutation("c1", 2, "throws"),
expectedRecord: clientRecord(null, 2),
expectAppWrite: false,
expectVersionWrite: true,
},
{
name: "success",
existingRecord: clientRecord(null, 1),
mutation: clientMutation("c1", 2, "foo"),
expectedRecord: clientRecord(null, 2),
expectAppWrite: true,
expectVersionWrite: true,
},
];

Expand Down Expand Up @@ -105,5 +113,8 @@ test("processMutation", async () => {
expect(await getUserValue("foo", storage), c.name).deep.equal(
c.expectAppWrite ? { version, deleted: false, value: "bar" } : undefined
);

const expectedVersion = c.expectVersionWrite ? version : undefined;
expect(await getVersion(storage), c.name).equal(expectedVersion);
}
});
Loading

0 comments on commit 2d4fc0c

Please sign in to comment.