Skip to content

Commit

Permalink
feat(shell-api): add stream processor modify MONGOSH-1864 (#2245)
Browse files Browse the repository at this point in the history
Co-authored-by: Anna Henningsen <anna.henningsen@mongodb.com>
  • Loading branch information
mongodb-matthew-normyle and addaleax authored Nov 1, 2024
1 parent a1c1485 commit 0a85da0
Show file tree
Hide file tree
Showing 3 changed files with 153 additions and 0 deletions.
3 changes: 3 additions & 0 deletions packages/i18n/src/locales/en_US.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2771,6 +2771,9 @@ const translations: Catalog = {
description:
'Return stats captured from a named stream processor.',
},
modify: {
description: 'Modify a stream processor definition.',
},
},
},
},
Expand Down
41 changes: 41 additions & 0 deletions packages/shell-api/src/stream-processor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Document } from '@mongosh/service-provider-core';
import { CommonErrors, MongoshInvalidInputError } from '@mongosh/errors';

import type Mongo from './mongo';
import { asPrintable } from './enums';
Expand Down Expand Up @@ -57,6 +58,46 @@ export default class StreamProcessor extends ShellApiWithMongoClass {
});
}

/**
* modify is used to modify a stream processor definition, like below:
* Change the pipeline:
* sp.name.modify(newPipeline)
* Keep the same pipeline, change other options:
* sp.name.modify({resumeFromCheckpoint: false})
* Change the pipeline and set additional options:
* sp.name.modify(newPipeline, {resumeFromCheckpoint: false})
*/
async modify(options: Document): Promise<Document>;
async modify(pipeline: Document[], options?: Document): Promise<Document>;

@returnsPromise
async modify(
pipelineOrOptions: Document[] | Document,
options?: Document
): Promise<Document> {
if (Array.isArray(pipelineOrOptions)) {
options = { ...options, pipeline: pipelineOrOptions };
} else if (typeof pipelineOrOptions === 'object') {
if (options) {
throw new MongoshInvalidInputError(
'If the first argument to modify is an object, the second argument should not be specified.',
CommonErrors.InvalidArgument
);
}
options = { ...pipelineOrOptions };
} else {
throw new MongoshInvalidInputError(
'The first argument to modify must be an array or object.',
CommonErrors.InvalidArgument
);
}

return this._streams._runStreamCommand({
modifyStreamProcessor: this.name,
...options,
});
}

@returnsPromise
async sample(options: Document = {}) {
const r = await this._streams._runStreamCommand({
Expand Down
109 changes: 109 additions & 0 deletions packages/shell-api/src/streams.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import type Mongo from './mongo';
import Database from './database';
import { Streams } from './streams';
import { InterruptFlag, MongoshInterruptedError } from './interruptor';
import type { MongoshInvalidInputError } from '@mongosh/errors';

describe('Streams', function () {
let mongo: Mongo;
Expand Down Expand Up @@ -162,4 +163,112 @@ describe('Streams', function () {
).to.be.true;
});
});

describe('modify', function () {
it('throws with invalid parameters', async function () {
// Create the stream processor.
const runCmdStub = sinon
.stub(mongo._serviceProvider, 'runCommand')
.resolves({ ok: 1 });
const name = 'p1';
const pipeline = [{ $match: { foo: 'bar' } }];
const processor = await streams.createStreamProcessor(name, pipeline);
expect(processor).to.eql(streams.getProcessor(name));
const cmd = { createStreamProcessor: name, pipeline };
expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true;

// No arguments to modify.
const caught = await processor
.modify()
.catch((e: MongoshInvalidInputError) => e);
expect(caught.message).to.contain(
'[COMMON-10001] The first argument to modify must be an array or object.'
);

// A single numeric argument to modify.
const caught2 = await processor
.modify(1)
.catch((e: MongoshInvalidInputError) => e);
expect(caught2.message).to.contain(
'[COMMON-10001] The first argument to modify must be an array or object.'
);

// Two object arguments to modify.
const caught3 = await processor
.modify(
{ resumeFromCheckpoint: false },
{ dlq: { connectionName: 'foo' } }
)
.catch((e: MongoshInvalidInputError) => e);
expect(caught3.message).to.contain(
'[COMMON-10001] If the first argument to modify is an object, the second argument should not be specified.'
);
});

it('works with pipeline and options arguments', async function () {
const runCmdStub = sinon
.stub(mongo._serviceProvider, 'runCommand')
.resolves({ ok: 1 });

// Create the stream processor.
const name = 'p1';
const pipeline = [{ $match: { foo: 'bar' } }];
const processor = await streams.createStreamProcessor(name, pipeline);
expect(processor).to.eql(streams.getProcessor(name));
const cmd = { createStreamProcessor: name, pipeline };
expect(runCmdStub.calledOnceWithExactly('admin', cmd, {})).to.be.true;

// Start the stream processor.
await processor.start();
expect(
runCmdStub.calledWithExactly(
'admin',
{ startStreamProcessor: name },
{}
)
).to.be.true;

// Stop the stream processor.
await processor.stop();
expect(
runCmdStub.calledWithExactly('admin', { stopStreamProcessor: name }, {})
).to.be.true;

// Modify the stream processor.
const pipeline2 = [{ $match: { foo: 'baz' } }];
processor.modify(pipeline2);
expect(
runCmdStub.calledWithExactly(
'admin',
{ modifyStreamProcessor: name, pipeline: pipeline2 },
{}
)
).to.be.true;

// Modify the stream processor with extra options.
const pipeline3 = [{ $match: { foo: 'bat' } }];
processor.modify(pipeline3, { resumeFromCheckpoint: false });
expect(
runCmdStub.calledWithExactly(
'admin',
{
modifyStreamProcessor: name,
pipeline: pipeline3,
resumeFromCheckpoint: false,
},
{}
)
).to.be.true;

// Modify the stream processor without changing pipeline.
processor.modify({ resumeFromCheckpoint: false });
expect(
runCmdStub.calledWithExactly(
'admin',
{ modifyStreamProcessor: name, resumeFromCheckpoint: false },
{}
)
).to.be.true;
});
});
});

0 comments on commit 0a85da0

Please sign in to comment.