Skip to content

Commit

Permalink
Merge pull request #15 from rocicorp/dragging-kills-server
Browse files Browse the repository at this point in the history
Collapse mutations a different way.
  • Loading branch information
aboodman authored Mar 4, 2021
2 parents 364727b + f6e194f commit 928b058
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 85 deletions.
2 changes: 1 addition & 1 deletion backend/rds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ async function executeStatementInDatabase(
name,
value,
}));
console.log("Executing", database, sql, JSON.stringify(params, null, ''));
console.log("Executing", database, sql, JSON.stringify(params, null, ""));
const command = new ExecuteStatementCommand({
database: database ?? "",
resourceArn,
Expand Down
8 changes: 4 additions & 4 deletions pages/api/replicache-pull.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import { getCookieVersion, getLastMutationID } from "../../backend/data";
import { must } from "../../backend/decode";

export default async (req: NextApiRequest, res: NextApiResponse) => {
console.log(`Processing pull`, JSON.stringify(req.body, null, ''));
console.log(`Processing pull`, JSON.stringify(req.body, null, ""));

const pull = must(pullRequest.decode(req.body));
let cookie = pull.baseStateID === "" ? 0 : parseInt(pull.baseStateID);
Expand All @@ -24,8 +24,8 @@ export default async (req: NextApiRequest, res: NextApiResponse) => {
getCookieVersion(executor),
]);
});
console.log('lastMutationID: ', lastMutationID);
console.log('Read all objects in', Date.now() - t0);
console.log("lastMutationID: ", lastMutationID);
console.log("Read all objects in", Date.now() - t0);

// Grump. Typescript seems to not understand that the argument to transact()
// is guaranteed to have been called before transact() exits.
Expand Down Expand Up @@ -68,7 +68,7 @@ export default async (req: NextApiRequest, res: NextApiResponse) => {
}
}

console.log(`Returning`, JSON.stringify(resp, null, ''));
console.log(`Returning`, JSON.stringify(resp, null, ""));
res.json(resp);
res.end();
};
Expand Down
156 changes: 76 additions & 80 deletions pages/api/replicache-push.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import type Storage from "../../shared/storage";
import { must } from "../../backend/decode";
import Pusher from "pusher";
import type { NextApiRequest, NextApiResponse } from "next";
import { JSONValue } from "replicache";

const mutation = t.union([
t.type({
Expand Down Expand Up @@ -72,72 +73,62 @@ type Mutation = t.TypeOf<typeof mutation>;

export default async (req: NextApiRequest, res: NextApiResponse) => {
const push = must(pushRequest.decode(req.body));
console.log("Processing push", JSON.stringify(push, null, ''));
console.log("Processing push", JSON.stringify(push, null, ""));

const t0 = Date.now();
for (let i = 0; i < push.mutations.length; i++) {
await transact(async (executor) => {
let lastMutationID = await getLastMutationID(executor, push.clientID);
console.log('lastMutationID:', lastMutationID);

// Scan forward from here collapsing any collapsable mutations.
for (; i < push.mutations.length; i++) {
let mutation = push.mutations[i];
console.log('mutation:', JSON.stringify(mutation, null, ''));

const expectedMutationID = lastMutationID + 1;
if (mutation.id < expectedMutationID) {
console.log(
"This mutation has already been processed. Nothing to do."
);
return;
}
if (mutation.id > expectedMutationID) {
console.log(
"This mutation is from the future. Nothing to do but wait."
);
return;
}

const next = push.mutations[i + 1];
console.log("Considering for collapse", JSON.stringify(mutation, null, ''), JSON.stringify(next, null, ''));
if (next) {
if (collapse(mutation, next)) {
lastMutationID = mutation.id;
console.log("Collapsed into", JSON.stringify(next, null, ''));
continue;
}
}

const s = storage(executor);
const t1 = Date.now();

switch (mutation.name) {
case "moveShape":
await moveShape(s, mutation.args);
break;
case "createShape":
await putShape(s, mutation.args);
break;
case "initClientState":
await initClientState(s, mutation.args);
break;
case "setCursor":
await setCursor(s, mutation.args);
break;
case "overShape":
await overShape(s, mutation.args);
break;
}

await setLastMutationID(executor, push.clientID, expectedMutationID);
console.log('Processed mutation in', Date.now() - t1);
await transact(async (executor) => {
const s = storage(executor);

let lastMutationID = await getLastMutationID(executor, push.clientID);
console.log("lastMutationID:", lastMutationID);

for (let i = 0; i < push.mutations.length; i++) {
const mutation = push.mutations[i];
const expectedMutationID = lastMutationID + 1;

if (mutation.id < expectedMutationID) {
console.log(
`Mutation ${mutation.id} has already been processed - skipping`
);
continue;
}
if (mutation.id > expectedMutationID) {
console.warn(`Mutation ${mutation.id} is from the future - aborting`);
break;
}
});
}

console.log('Processed all mutations in', Date.now() - t0);
console.log("Processing mutation:", JSON.stringify(mutation, null, ""));

const t1 = Date.now();
switch (mutation.name) {
case "moveShape":
await moveShape(s, mutation.args);
break;
case "createShape":
await putShape(s, mutation.args);
break;
case "initClientState":
await initClientState(s, mutation.args);
break;
case "setCursor":
await setCursor(s, mutation.args);
break;
case "overShape":
await overShape(s, mutation.args);
break;
}

lastMutationID = expectedMutationID;
console.log("Processed mutation in", Date.now() - t1);
}

await Promise.all([
s.flush(),
setLastMutationID(executor, push.clientID, lastMutationID),
]);
});

console.log("Processed all mutations in", Date.now() - t0);

const pusher = new Pusher({
appId: "1157097",
Expand All @@ -149,29 +140,34 @@ export default async (req: NextApiRequest, res: NextApiResponse) => {

const t2 = Date.now();
await pusher.trigger("default", "poke", {});
console.log('Sent poke in', Date.now() - t2);
console.log("Sent poke in", Date.now() - t2);

res.status(200).json({});
};

// If prev and next are collapsible, collapse them by mutating next.
function collapse(prev: Mutation, next: Mutation): boolean {
if (prev.name === "moveShape" && next.name === "moveShape") {
next.args.dx += prev.args.dx;
next.args.dy += prev.args.dy;
return true;
}
if (prev.name == "setCursor" && next.name == "setCursor") {
next.args.x = prev.args.x;
next.args.y = prev.args.y;
return true;
}
return false;
}

function storage(executor: ExecuteStatementFn): Storage {
function storage(executor: ExecuteStatementFn) {
// TODO: When we have the real mysql client, check whether it appears to do
// this caching internally.
const cache: { [key: string]: { value: JSONValue; dirty: boolean } } = {};
return {
getObject: getObject.bind(null, executor),
putObject: putObject.bind(null, executor),
getObject: async (key: string) => {
const entry = cache[key];
if (entry) {
return entry.value;
}
const value = await getObject(executor, key);
cache[key] = { value, dirty: false };
return value;
},
putObject: async (key: string, value: JSONValue) => {
cache[key] = { value, dirty: true };
},
flush: async () => {
await Promise.all(
Object.entries(cache)
.filter(([, { dirty }]) => dirty)
.map(([k, { value }]) => putObject(executor, k, value))
);
},
};
}

0 comments on commit 928b058

Please sign in to comment.