Skip to content

Commit

Permalink
🚧 progress: First attempt at leveraging change streams.
Browse files Browse the repository at this point in the history
This is progress on #999.
  • Loading branch information
make-github-pseudonymous-again committed Jul 7, 2024
1 parent ab34112 commit f0c8fa6
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 22 deletions.
60 changes: 38 additions & 22 deletions imports/api/makeObservedQueryPublication.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ import schema from '../lib/schema';

import type Collection from './Collection';
import type Document from './Document';
import type ObserveChangesCallbacks from './ObserveChangesCallbacks';
import type Filter from './query/Filter';
import queryToSelectorOptionsPair from './query/queryToSelectorOptionsPair';
import {userQuery} from './query/UserQuery';
import type UserQuery from './query/UserQuery';

import watch from './watch';

const observeOptions = schema
.object({
added: schema.boolean().optional(),
Expand All @@ -28,7 +30,11 @@ const makeObservedQueryPublication = <T extends Document, U = T>(
QueriedCollection: Collection<T, U>,
observedQueryCacheCollectionName: string,
) =>
function (key: string, query: UserQuery<T>, observe: ObserveOptions | null) {
async function (
key: string,
query: UserQuery<T>,
observe: ObserveOptions | null,
) {
let [selector, options] = queryToSelectorOptionsPair(query);
selector = {
...selector,
Expand All @@ -45,41 +51,51 @@ const makeObservedQueryPublication = <T extends Document, U = T>(
options,
observe,
});
const results: T[] = [];
let initializing = true;

const stop = () => {
this.stop();
};

const observers: ObserveChangesCallbacks<T> = {
added(_id, fields) {
if (initializing) results.push({_id, ...fields} as unknown as T);
else if (callbacks.added) stop();
},
};
const handle = await watch<T, U>(
QueriedCollection,
selector as Filter<T>,
options,
({operationType}) => {
switch (operationType) {
case 'replace':
case 'update': {
if (callbacks.changed) stop();
break;
}

if (callbacks.removed) observers.removed = stop;
if (callbacks.changed) observers.changed = stop;
case 'insert': {
if (callbacks.added) stop();
break;
}

const handle = QueriedCollection.find(selector, options).observeChanges(
observers,
case 'delete': {
if (callbacks.removed) stop();
break;
}

default: {
stop();
}
}
},
);

// Instead, we'll send one `added` message right after `observeChanges` has
// returned, and mark the subscription as ready.
initializing = false;
const results = handle.init;

this.added(observedQueryCacheCollectionName, uid, {
key,
results,
});
this.ready();

// Stop observing the cursor when the client unsubscribes. Stopping a
// subscription automatically takes care of sending the client any `removed`
// messages.
this.onStop(() => {
handle.stop();
// NOTE: Stop observing the cursor when the client unsubscribes.
this.onStop(async () => {
await handle.stop();
});
};

Expand Down
118 changes: 118 additions & 0 deletions imports/api/watch.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
import assert from 'assert';

import {
type ChangeStreamDocument,
type ClientSessionOptions,
type TransactionOptions,
type ChangeStreamOptions,
} from 'mongodb';

import type Document from './Document';

import {type Options} from './transaction/TransactionDriver';

import type Filter from './query/Filter';
import type Collection from './Collection';
import withSession from './transaction/withSession';
import withTransactionDriver from './transaction/withTransactionDriver';

const _watchInit = async <T extends Document, U = T>(
collection: Collection<T, U>,
filter: Filter<T>,
options: Options,
transactionOptions?: TransactionOptions,
sessionOptions?: ClientSessionOptions,
) =>
withSession(
async (session) =>
withTransactionDriver(
session,
async (driver) => {
const init = await driver.fetch(collection, filter, options);
const {operationTime} = session;
assert(operationTime !== undefined);
return {init, operationTime};
},
transactionOptions,
),
sessionOptions,
);

const _filterToMatch = <T>(filter: Filter<T>) => {
return Object.fromEntries(
Object.entries(filter).map(([key, value]) => [
`fulldocument.${key}`,
value,
]),
);
};

const _watchStream = <T extends Document, U = T>(
collection: Collection<T, U>,
filter: Filter<T>,
options: Options,
startAtOperationTime: any, // TODO
changeStreamOptions?: ChangeStreamOptions,
) =>
collection
.rawCollection()
.watch([{$match: _filterToMatch(filter)}, {$project: options.project}], {
startAtOperationTime,
...changeStreamOptions,
});

const _watchSetup = async <T extends Document, U = T>(
collection: Collection<T, U>,
filter: Filter<T>,
options: Options,
changeStreamOptions?: ChangeStreamOptions,
transactionOptions?: TransactionOptions,
sessionOptions?: ClientSessionOptions,
) => {
const {init, operationTime} = await _watchInit(
collection,
filter,
options,
transactionOptions,
sessionOptions,
);

const stream = _watchStream(
collection,
filter,
options,
operationTime,
changeStreamOptions,
);

return {init, stream};
};

type OnChange<T extends Document> = (doc: ChangeStreamDocument<T>) => void;

const watch = async <T extends Document, U = T>(
collection: Collection<T, U>,
filter: Filter<T>,
options: Options,
onChange: OnChange<T>,
changeStreamOptions?: ChangeStreamOptions,
transactionOptions?: TransactionOptions,
sessionOptions?: ClientSessionOptions,
) => {
const {init, stream} = await _watchSetup(
collection,
filter,
options,
changeStreamOptions,
transactionOptions,
sessionOptions,
);

stream.on('change', onChange);

const stop = async () => stream.close();

return {init, stop};
};

export default watch;

0 comments on commit f0c8fa6

Please sign in to comment.