diff --git a/imports/api/publication/useItem.ts b/imports/api/publication/useItem.ts index db93f7326..6794de0d2 100644 --- a/imports/api/publication/useItem.ts +++ b/imports/api/publication/useItem.ts @@ -23,15 +23,6 @@ const useItem = ( deps, ); - console.debug({ - what: 'USE_ITEM', - collection: collection?._collection.name, - selector, - options, - deps, - items, - }); - assert(items.length <= 1, `useItem got items.length === ${items.length}`); const result = items[0]; const found = Boolean(result); diff --git a/imports/api/query/watch.ts b/imports/api/query/watch.ts index 3af2878d6..610242058 100644 --- a/imports/api/query/watch.ts +++ b/imports/api/query/watch.ts @@ -4,10 +4,13 @@ import {type Filter as MongoFilter} from 'mongodb'; import { type ClientSessionOptions, + type ChangeStream, type ChangeStreamOptions, type Timestamp, } from 'mongodb'; +import debounce from 'debounce'; + import {isObject} from '@functional-abstraction/type'; import type Collection from '../Collection'; @@ -16,7 +19,7 @@ import type Document from '../Document'; import {type Options} from '../transaction/TransactionDriver'; import withSession from '../transaction/withSession'; -import {type EventEmitter, eventEmitter} from '../../lib/events'; +import {EventEmitter, eventEmitter} from '../../lib/events'; import type Filter from './Filter'; @@ -54,12 +57,23 @@ const _filterToFullDocumentFilter = ( ]), ); +type Match = { + $match: {} +}; + + +type Pipeline = { + pipeline: Match[]; + isSuperset: boolean; +}; + + const _fullDocumentMissingFilter = {fullDocument: undefined}; const _fullDocumentBeforeChangeMissingFilter = { fullDocumentBeforeChange: undefined, }; -const _filterToMatch = (filter: Filter) => ({ +const _filterToMatch = (filter: Filter): Match => ({ $match: { $or: [ // TODO Correctly configure collections to define fullDocument* @@ -75,7 +89,7 @@ const _filterToMatch = (filter: Filter) => ({ }, }); -const _filterToPipeline = ({$text, ...rest}: Filter) => { +const _filterToPipeline = ({$text, ...rest}: Filter): Pipeline => { return { pipeline: [_filterToMatch(rest as Filter)], // TODO Any occurrence of $text should yield this, not just top-level. @@ -83,7 +97,7 @@ const _filterToPipeline = ({$text, ...rest}: Filter) => { }; }; -const _noFullDocumentMatch = () => ({ +const _noFullDocumentMatch = (): Match => ({ $match: { // NOTE This matches everything if pre- or post- images are not // configured, which is very inefficient. @@ -91,7 +105,7 @@ const _noFullDocumentMatch = () => ({ }, }); -const _noFullDocumentPipeline = () => { +const _noFullDocumentPipeline = (): Pipeline => { return { pipeline: [_noFullDocumentMatch()], isSuperset: true, @@ -101,9 +115,11 @@ const _noFullDocumentPipeline = () => { const _optionsToPipeline = (options: Options) => options.project === undefined ? [] : [{$project: options.project}]; +let watchCount = 0; + const _watchStream = ( collection: Collection, - filterPipeline, + filterPipeline: Match[], options: Options, startAtOperationTime: Timestamp, changeStreamOptions?: ChangeStreamOptions, @@ -117,14 +133,66 @@ const _watchStream = ( {$changeStreamSplitLargeEvent: {}}, ]; - return collection.rawCollection().watch(pipeline, { + const rawCollection = collection.rawCollection(); + const {collectionName} = rawCollection; + + console.debug({collection: collectionName, watchCount: ++watchCount}); + const stream = rawCollection.watch(pipeline, { startAtOperationTime, fullDocument: 'whenAvailable', fullDocumentBeforeChange: 'whenAvailable', ...changeStreamOptions, }); + + return _groupFragments(stream); }; +const _groupFragments = ( stream: ChangeStream,) => { + const emitter = eventEmitter<{ entry: ChangeStreamEvent; close: undefined }>(); + + let event: Fragment = { + _id: { + _data: '', + }, + splitEvent: { + fragment: 1, + of: 1, + }, + }; + + stream.on('change', (fragment: ChangeStreamEvent | Fragment) => { + if (fragment.splitEvent === undefined) { + assert(fragment._id._data !== event._id._data); + assert(event.splitEvent.fragment === event.splitEvent.of); + event = {...fragment, splitEvent: {fragment: 1, of: 1}}; + } else if (fragment.splitEvent.fragment === 1) { + assert(fragment._id._data !== event._id._data); + assert(event.splitEvent.fragment === event.splitEvent.of); + assert(fragment.splitEvent.fragment === 1); + event = fragment; + } else { + assert(fragment._id._data === event._id._data); + assert(fragment.splitEvent.fragment === event.splitEvent.fragment + 1); + assert(fragment.splitEvent.of === event.splitEvent.of); + assert(fragment.splitEvent.fragment <= fragment.splitEvent.of); + event = {...event, ...fragment}; + } + + if (event.splitEvent.fragment !== event.splitEvent.of) return; + + const {splitEvent, ...rest} = event; + + void emitter.emitSerial('entry', rest); + }); + + emitter.once('close').then( + async () => stream.close() + ); + + return emitter; +}; + + const _watchSetup = async ( collection: Collection, filter: Filter, @@ -169,7 +237,6 @@ const _makeQueue = async () => { await queue; const enqueue = (task: () => Promise | void) => { - // TODO Throttle. if (queued !== 0) return; ++queued; queue = queue @@ -185,12 +252,6 @@ const _makeQueue = async () => { return enqueue; }; -export type WatchHandle = EventEmitter<{ - change: T[]; - start: undefined; - stop: undefined; -}>; - type Fragment = { _id: { _data: string; @@ -205,7 +266,76 @@ type ChangeStreamEvent = { _id: { _data: string; }; - splitEvent: undefined; + splitEvent?: undefined; +} + + +export type FilteredOplogHandle = EventEmitter<{ + entry: ChangeStreamEvent; + close: undefined; +}>; + +export type WatchHandle = EventEmitter<{ + change: T[]; + start: undefined; + stop: undefined; +}>; + +class Watch extends EventEmitter{ + collection: Collection; + filter: Filter; + options: Options; + changeStreamOptions?: ChangeStreamOptions; + sessionOptions?: ClientSessionOptions; + + constructor(collection, filter, options, sessionOptions) { + super(); + this.collection = collection; + this.filter = filter; + this.options = options; + this.sessionOptions = sessionOptions; + } + + get init() { + return []; + } + + stop() { + + } +} + +const PIPE_DEBOUNCE = 50; +let changeCount = 0; + +const _pipe = async ( + handle: WatchHandle, + emitter: FilteredOplogHandle, + w: Watch +) => { + + const enqueue = await _makeQueue(); + + const onEntry = debounce( + () => { + enqueue(async () => { + const {init} = await _watchInit( + w.collection, + w.filter, + w.options, + w.sessionOptions, + ); + + console.debug({changeCount: ++changeCount}); + await handle.emitSerial('change', init); + }); + }, + PIPE_DEBOUNCE + ); + + emitter.on('entry', onEntry); + + // TODO stream.on('stop', ???) }; const _watch = async ( @@ -224,54 +354,12 @@ const _watch = async ( sessionOptions, ); - const enqueue = await _makeQueue(); - await handle.emitSerial('change', init); - let event: Fragment = { - _id: { - _data: '', - }, - splitEvent: { - fragment: 1, - of: 1, - }, - }; - - stream.on('change', (fragment: ChangeStreamEvent | Fragment) => { - if (fragment.splitEvent === undefined) { - assert(fragment._id._data !== event._id._data); - assert(event.splitEvent.fragment === event.splitEvent.of); - event = {...fragment, splitEvent: {fragment: 1, of: 1}}; - } else if (fragment.splitEvent.fragment === 1) { - assert(fragment._id._data !== event._id._data); - assert(event.splitEvent.fragment === event.splitEvent.of); - assert(fragment.splitEvent.fragment === 1); - event = fragment; - } else { - assert(fragment._id._data === event._id._data); - assert(fragment.splitEvent.fragment === event.splitEvent.fragment + 1); - assert(fragment.splitEvent.of === event.splitEvent.of); - assert(fragment.splitEvent.fragment <= fragment.splitEvent.of); - event = {...event, ...fragment}; - } - - if (event.splitEvent.fragment !== event.splitEvent.of) return; - - enqueue(async () => { - const {init} = await _watchInit( - collection, - filter, - options, - sessionOptions, - ); + const w = new Watch(collection, filter, options, sessionOptions); + await _pipe(handle, stream, w); - await handle.emitSerial('change', init); - }); - }); - - // TODO stream.on('stop', ???) - const stop = async () => stream.close(); + const stop = async () => stream.emitSerial('close'); return stop; }; diff --git a/imports/lib/events.ts b/imports/lib/events.ts index bcb2603cd..427fbcaf7 100644 --- a/imports/lib/events.ts +++ b/imports/lib/events.ts @@ -1,4 +1,5 @@ import Emittery from 'emittery'; export type EventEmitter = Emittery; +export const EventEmitter = Emittery; export const eventEmitter = () => new Emittery(); diff --git a/types/meteor/mongo.d.ts b/types/meteor/mongo.d.ts index f335db054..3a6f6f698 100644 --- a/types/meteor/mongo.d.ts +++ b/types/meteor/mongo.d.ts @@ -17,13 +17,6 @@ declare module 'meteor/mongo' { observeAsync(options: ObserveCallbacks): Promise; } - // eslint-disable-next-line @typescript-eslint/consistent-type-definitions - interface Collection { - _collection: { - name: string; - }; - } - type ObserveHandle = { stop(): void; };