Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace Cursor#observeChanges calls with MongoDB change streams #1003

Draft
wants to merge 58 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
cc3de07
:construction: progress: First attempt at leveraging change streams.
make-github-pseudonymous-again May 30, 2024
ef50c02
:construction: progress: Attempt to fix first draft.
make-github-pseudonymous-again May 30, 2024
d282c5e
:construction: progress: Second attempt.
make-github-pseudonymous-again May 30, 2024
78de8b6
:construction: progress: Sketch polling implementation.
make-github-pseudonymous-again May 31, 2024
b3ab843
:construction: progress: Squash.
make-github-pseudonymous-again Jun 4, 2024
cff8a6c
:construction: progress: First working polling implementation.
make-github-pseudonymous-again Jun 5, 2024
330766e
:construction: progress: Fix type issues.
make-github-pseudonymous-again Jun 5, 2024
21454d4
:construction: progress: Fix lint.
make-github-pseudonymous-again Jun 5, 2024
ecb159a
:construction: progress: Rewrite note on diffing.
make-github-pseudonymous-again Jul 4, 2024
95c3636
:construction: progress: Replace more observe logic with changes stre…
make-github-pseudonymous-again Jul 4, 2024
17393f5
:construction: progress: Fix initial implementation of event observers.
make-github-pseudonymous-again Jul 4, 2024
c9b7496
:construction: progress: Take initial value in consideration in observe.
make-github-pseudonymous-again Jul 5, 2024
37e3ddc
:construction: progress: Add comment on how to make events views reac…
make-github-pseudonymous-again Jul 5, 2024
26f8b5b
:woman_technologist: dx: Add missing methods for `meteor/mongo`'s `Cu…
make-github-pseudonymous-again Jul 7, 2024
8db9c00
:lock: deps: Update lock file.
make-github-pseudonymous-again Jul 7, 2024
706a675
:recycle: refactor: Depend on internals instead of `Meteor` in `api/s…
make-github-pseudonymous-again Jul 7, 2024
3325d64
:recycle: refactor: Use change streams instead in subscriptions/publi…
make-github-pseudonymous-again Jul 7, 2024
348b882
:recycle: refactor(`api/consultations`): Leverage `observeSetChanges`.
make-github-pseudonymous-again Jul 7, 2024
06c7965
:recycle: refactor(`api/createTagCollection`): Leverage `observeSetCh…
make-github-pseudonymous-again Jul 7, 2024
79d0e33
:recycle: refactor(`api/makeCachedFindOneOpt`): Leverage `observeSetC…
make-github-pseudonymous-again Jul 7, 2024
c315675
:recycle: refactor(`api/makeFilteredCollection`): Leverage `observeSe…
make-github-pseudonymous-again Jul 7, 2024
ce18d10
:recycle: refactor(`api/publication/patient/noShows`): Use `observeSe…
make-github-pseudonymous-again Jul 7, 2024
0827375
:recycle: refactor(`api/.../frequencyBySex`): Use `observeSetChanges`.
make-github-pseudonymous-again Jul 7, 2024
712233f
:recycle: refactor(`api/stats`): Leverage `observeSetChanges`.
make-github-pseudonymous-again Jul 7, 2024
1e97eba
:recycle: refactor(`publishCursors`): Extract duplicates detection lo…
make-github-pseudonymous-again Jul 8, 2024
81e3732
:woman_technologist: dx: Allow to define event emitters.
make-github-pseudonymous-again Jul 8, 2024
8204efa
:recycle: refactor(`meteor/diff-sequence`): Rename `Options` to `Diff…
make-github-pseudonymous-again Jul 8, 2024
81d8aa1
:recycle: refactor: Rewrite cursor `watch` and `observe*Changes` as e…
make-github-pseudonymous-again Jul 8, 2024
d77b152
:bicyclist: perf(observers): Narrow projections to skip ignored changes.
make-github-pseudonymous-again Jul 8, 2024
b8878d0
:adhesive_bandage: fix: Call `WatchHandle#emit('change', init)` befor…
make-github-pseudonymous-again Jul 8, 2024
bbacb30
:recycle: refactor(`watch`): Add `enqueue` typings.
make-github-pseudonymous-again Jul 8, 2024
ed9332f
:microscope: test: Increase coverage of `duplicates` and `unique`.
make-github-pseudonymous-again Jul 9, 2024
ffa8d46
:recycle: refactor(`publishCursors`): Extract input validation function.
make-github-pseudonymous-again Jul 9, 2024
6764269
:woman_technologist: dx(tests): Allow to use `undefined` in template …
make-github-pseudonymous-again Jul 10, 2024
ad438f2
:adhesive_bandage: fix(`frequencyBySex`): Correctly handle patient de…
make-github-pseudonymous-again Jul 10, 2024
4a04ee3
:microscope: test(`ui/stats/useFrequencyStats`): Increase coverage.
make-github-pseudonymous-again Jul 10, 2024
ebf1571
:woman_technologist: dx(`_test/fixtures`): Add missing `Meteor` import.
make-github-pseudonymous-again Jul 10, 2024
5e18e13
:woman_technologist: dx(`Accounts`): Remove rate limiting in all tests.
make-github-pseudonymous-again Jul 10, 2024
f1915d9
:recycle: refactor(`frequencyBySex`): Hide 0 counts to simplify testing.
make-github-pseudonymous-again Jul 10, 2024
d1884ae
:microscope: test(`useFrequencyStats`): Cover patients updates.
make-github-pseudonymous-again Jul 10, 2024
e1d48a8
:woman_technologist: dx(tests): Add `dropOwners` fixture.
make-github-pseudonymous-again Jul 10, 2024
cbb8810
:microscope: test(`useAvailability`): Cover common usage.
make-github-pseudonymous-again Jul 10, 2024
17c3b1c
:woman_technologist: dx: Keep track of collections by name in a regis…
make-github-pseudonymous-again Jul 10, 2024
57fcca1
:woman_technologist: dx(tests): Increase timeout of appointments E2E …
make-github-pseudonymous-again Jul 10, 2024
e109072
:adhesive_bandage: fix(`publishCursors`): Correct collection mapping.
make-github-pseudonymous-again Jul 10, 2024
12f68d6
:adhesive_bandage: fix: Actually use `publishCursors` in `publication…
make-github-pseudonymous-again Jul 10, 2024
387e704
:microscope: test(`publishCursors`): Increase coverage.
make-github-pseudonymous-again Jul 10, 2024
cc8f4c6
:recycle: refactor(appointments): Make patient names optional for upd…
make-github-pseudonymous-again Jul 10, 2024
263f308
:recycle: refactor(user): Update `Account.createUser{Async}` usage.
make-github-pseudonymous-again Jul 10, 2024
bdbe099
:microscope: test(`useNoShowsForPatient`): Cover common usage.
make-github-pseudonymous-again Jul 10, 2024
ce3e52b
:microscope: test: Increase coverage target to 68%.
make-github-pseudonymous-again Jul 10, 2024
3912c1f
:recycle: refactor(`publishCursors`): Extract `publishCursorObserver`.
make-github-pseudonymous-again Jul 10, 2024
cb25469
:recycle: refactor(`makeFilteredCollection`): Leverage `publishCursor…
make-github-pseudonymous-again Jul 10, 2024
3275a7f
:adhesive_bandage: fix(`useSubscription`): Set `loading = false` on s…
make-github-pseudonymous-again Jul 11, 2024
6983357
:woman_technologist: dx(`events/intersects`): Use silent error on bad…
make-github-pseudonymous-again Jul 11, 2024
54dd72a
:microscope: test(`useIntersectingEvents`): Cover common usage.
make-github-pseudonymous-again Jul 11, 2024
fea1722
:adhesive_bandage: fix(`useSubscription`): Disable stale handles call…
make-github-pseudonymous-again Jul 11, 2024
db10212
:adhesive_bandage: fix(`makeObservedQueryHook`): Move updates to `use…
make-github-pseudonymous-again Jul 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ coverage:
status:
project:
default:
target: 67%
target: 68%
threshold: 1%
patch:
default:
Expand Down
32 changes: 27 additions & 5 deletions imports/_test/fixtures.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,14 @@ import 'regenerator-runtime/runtime.js';

import {assert, expect} from 'chai';

import {Meteor} from 'meteor/meteor';

import {cleanup as unmount} from '@testing-library/react';
import totalOrder from 'total-order';
import {sorted} from '@iterable-iterator/sorted';

// eslint-disable-next-line import/no-unassigned-import
import '../api/endpoint/_dev/_disableRateLimiting';
import logout from '../api/user/logout';
import invoke from '../api/endpoint/invoke';
import call from '../api/endpoint/call';
Expand Down Expand Up @@ -174,19 +178,36 @@ export const dropId = ({_id, ...rest}) => {

export const dropIds = (x) => x.map(dropId);

export const create = (template, extra) => {
if (typeof template === 'function') return extra ?? template();
export const dropOwner = ({owner, ...rest}) => {
assert(typeof owner === 'string');
return rest;
};

export const dropOwners = (x) => x.map(dropOwner);

export const create = (template, extra, hasExtra: boolean) => {
if (typeof template === 'function') return hasExtra ? extra : template();
if (Array.isArray(template)) {
return template
.map((x, i) => create(x, extra?.[i]))
.map((x, i) =>
create(
x,
extra?.[i],
Object.prototype.hasOwnProperty.call(extra ?? [], i),
),
)
.concat(extra?.slice(template.length) ?? []);
}

return Object.fromEntries(
(extra === undefined ? [] : Object.entries(extra)).concat(
Object.entries(template).map(([key, value]) => [
key,
create(value, extra?.[key]),
create(
value,
extra?.[key],
Object.prototype.hasOwnProperty.call(extra ?? {}, key),
),
]),
),
);
Expand All @@ -204,4 +225,5 @@ export const findOneOrThrow = async <T extends Document, U = T>(
return result!;
};

export const makeTemplate = (template) => (extra?) => create(template, extra);
export const makeTemplate = (template) => (extra?) =>
create(template, extra, extra !== undefined);
8 changes: 8 additions & 0 deletions imports/api/ObserveSequenceChangesCallbacks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import {type Mongo} from 'meteor/mongo';

type ObserveSequenceChangesCallbacks<T> = Pick<
Mongo.ObserveChangesCallbacks<T>,
'addedBefore' | 'movedBefore' | 'changed' | 'removed'
>;

export default ObserveSequenceChangesCallbacks;
8 changes: 8 additions & 0 deletions imports/api/ObserveSetChangesCallbacks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import {type Mongo} from 'meteor/mongo';

type ObserveSetChangesCallbacks<T> = Pick<
Mongo.ObserveChangesCallbacks<T>,
'added' | 'changed' | 'removed'
>;

export default ObserveSetChangesCallbacks;
4 changes: 2 additions & 2 deletions imports/api/appointments.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ export const appointmentUpdate = schema.union([
schema.object({
patient: schema.object({
_id: schema.string(),
firstname: schema.string(),
lastname: schema.string(),
firstname: schema.string().optional(),
lastname: schema.string().optional(),
}),
phone: schema.string().optional(),
datetime: schema.date().optional(),
Expand Down
12 changes: 11 additions & 1 deletion imports/api/collection/define.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
import assert from 'assert';

import type Document from '../Document';
import Collection from '../Collection';

import {hasCollection, addCollection} from './registry';

const define = <T extends Document, U = T>(name: string) => {
return new Collection<T, U>(name, {
assert(!hasCollection(name));

const collection = new Collection<T, U>(name, {
idGeneration: 'STRING',
defineMutationMethods: false,
});

addCollection(name, collection);

return collection;
};

export default define;
26 changes: 26 additions & 0 deletions imports/api/collection/registry.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import assert from 'assert';

import type Document from '../Document';
import type Collection from '../Collection';

const _registry = new Map<string, Collection<any, any>>();

export const getCollection = <T extends Document, U = T>(
name: string,
): Collection<T, U> => {
const collection = _registry.get(name);
assert(collection !== undefined);
return collection;
};

export const hasCollection = (name: string) => {
return _registry.has(name);
};

export const addCollection = <T extends Document, U = T>(
name: string,
collection: Collection<T, U>,
) => {
assert(!hasCollection(name));
_registry.set(name, collection);
};
92 changes: 49 additions & 43 deletions imports/api/consultations.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import type Selector from './query/Selector';
import {type AuthenticatedContext} from './publication/Context';
import {type DocumentUpdate} from './DocumentUpdate';
import observeSetChanges from './query/observeSetChanges';

Check warning on line 35 in imports/api/consultations.ts

View check run for this annotation

Codecov / codecov/patch

imports/api/consultations.ts#L35

Added line #L35 was not covered by tests

export const DEFAULT_DURATION_IN_MINUTES = 15;
export const DEFAULT_DURATION_IN_SECONDS = DEFAULT_DURATION_IN_MINUTES * 60;
Expand Down Expand Up @@ -127,18 +128,18 @@
},
});

export function setupConsultationsStatsPublication(
export async function setupConsultationsStatsPublication(
this: AuthenticatedContext,
collectionName: string,
filter: Filter<ConsultationDocument>,
) {
// Generate unique key depending on parameters
const key = statsKey(filter);
const selector = {
const scopedFilter = {
...filter,
isDone: true,
owner: this.userId,
} as Selector<ConsultationDocument>;
} as Filter<ConsultationDocument>;
const options = {fields: {_id: 1, price: 1, datetime: 1}};

const minHeap = new PairingHeap(increasing);
Expand All @@ -158,48 +159,53 @@
// Until then, we don't want to send a lot of `changed` messages—hence
// tracking the `initializing` state.
let initializing = true;
const handle = Consultations.find(selector, options).observeChanges({
added: (_id, {price, datetime}) => {
count += 1;
if (price) total += price;
const minRef = minHeap.push(datetime);
const maxRef = maxHeap.push(datetime);
refs.set(_id, [price, minRef, maxRef]);

if (!initializing) {
this.changed(collectionName, key, state());
}
},

changed: (_id, fields) => {
const [oldPrice, minRef, maxRef] = refs.get(_id);
let newPrice: number = oldPrice;
if (Object.prototype.hasOwnProperty.call(fields, 'price')) {
newPrice = fields.price!;
if (oldPrice) total -= oldPrice;
if (newPrice) total += newPrice;
refs.set(_id, [newPrice, minRef, maxRef]);
}

if (Object.prototype.hasOwnProperty.call(fields, 'datetime')) {
const datetime = fields.datetime;
minHeap.update(minRef, datetime);
maxHeap.update(maxRef, datetime);
}

this.changed(collectionName, key, state());
},
const handle = await observeSetChanges(
Consultations,
scopedFilter,
options,
{
added: (_id, {price, datetime}) => {
count += 1;
if (price) total += price;
const minRef = minHeap.push(datetime);
const maxRef = maxHeap.push(datetime);
refs.set(_id, [price, minRef, maxRef]);

if (!initializing) {
this.changed(collectionName, key, state());
}
},

changed: (_id, fields) => {
const [oldPrice, minRef, maxRef] = refs.get(_id);
if (Object.prototype.hasOwnProperty.call(fields, 'price')) {
const newPrice = fields.price;
if (oldPrice) total -= oldPrice;
if (newPrice) total += newPrice;
refs.set(_id, [newPrice, minRef, maxRef]);
}

if (Object.prototype.hasOwnProperty.call(fields, 'datetime')) {
const datetime = fields.datetime;
minHeap.update(minRef, datetime);
maxHeap.update(maxRef, datetime);
}

removed: (_id) => {
count -= 1;
const [price, minRef, maxRef] = refs.get(_id);
if (price) total -= price;
minHeap.delete(minRef);
maxHeap.delete(maxRef);
refs.delete(_id);
this.changed(collectionName, key, state());
this.changed(collectionName, key, state());
},

removed: (_id) => {
count -= 1;
const [price, minRef, maxRef] = refs.get(_id);
if (price) total -= price;
minHeap.delete(minRef);
maxHeap.delete(maxRef);
refs.delete(_id);
this.changed(collectionName, key, state());
},
},
});
{projectionFn: ({price, datetime}) => ({price, datetime})},
);

// Instead, we'll send one `added` message right after `observeChanges` has
// returned, and mark the subscription as ready.
Expand Down
39 changes: 23 additions & 16 deletions imports/api/createTagCollection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import type UserFilter from './query/UserFilter';
import type Options from './query/Options';
import type Projection from './query/Projection';
import observeSetChanges from './query/observeSetChanges';

export const STATS_SUFFIX = '.stats';
export const FIND_CACHE_SUFFIX = '.find.cache';
Expand Down Expand Up @@ -190,12 +191,12 @@
name: statsPublication,
authentication: AuthenticationLoggedIn,
schema: schema.tuple([schema.string()]),
handle(name) {
async handle(name) {
const uid = JSON.stringify({name, owner: this.userId});
const query = {
[key]: {$elemMatch: {name}},
owner: this.userId,
} as Selector<P>;
} as Filter<P>;
// We only include relevant fields
const options = {fields: {_id: 1, [key]: 1}};

Expand All @@ -205,22 +206,28 @@
// `observeChanges` only returns after the initial `added` callbacks have run.
// Until then, we don't want to send a lot of `changed` messages—hence
// tracking the `initializing` state.
const handle = Parent.find(query, options).observeChanges({
added: () => {
count += 1;
const handle = await observeSetChanges(

Check warning on line 209 in imports/api/createTagCollection.ts

View check run for this annotation

Codecov / codecov/patch

imports/api/createTagCollection.ts#L209

Added line #L209 was not covered by tests
Parent,
query,
options,
{
added: () => {

Check warning on line 214 in imports/api/createTagCollection.ts

View check run for this annotation

Codecov / codecov/patch

imports/api/createTagCollection.ts#L214

Added line #L214 was not covered by tests
count += 1;

if (!initializing) {
if (!initializing) {

Check warning on line 217 in imports/api/createTagCollection.ts

View check run for this annotation

Codecov / codecov/patch

imports/api/createTagCollection.ts#L217

Added line #L217 was not covered by tests
this.changed(stats, uid, {count});
}
},

Check warning on line 220 in imports/api/createTagCollection.ts

View check run for this annotation

Codecov / codecov/patch

imports/api/createTagCollection.ts#L220

Added line #L220 was not covered by tests

removed: () => {
count -= 1;
this.changed(stats, uid, {count});
}
},
},

removed: () => {
count -= 1;
this.changed(stats, uid, {count});
// We don't care about `changed` events.
},

// We don't care about `changed` events.
});
{projectionFn: (_fields) => ({})},
);

// Instead, we'll send one `added` message right after `observeChanges` has
// returned, and mark the subscription as ready.
Expand All @@ -231,8 +238,8 @@
// Stop observing the cursor when the client unsubscribes. Stopping a
// subscription automatically takes care of sending the client any `removed`
// messages.
this.onStop(() => {
handle.stop();
this.onStop(async () => {
await handle.emit('stop');
});
},
});
Expand Down
9 changes: 9 additions & 0 deletions imports/api/endpoint/_dev/_disableRateLimiting.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import {Meteor} from 'meteor/meteor';
import {Accounts} from 'meteor/accounts-base';

import isTest from '../../../app/isTest';

if (Meteor.isServer && isTest()) {
// @ts-expect-error Missing from type definitions.
Accounts.removeDefaultRateLimit();
}
Loading
Loading