From 504ca3d43951b7dac952e63ddf60ae1d35d72e6e Mon Sep 17 00:00:00 2001 From: Jeremy Mikola Date: Fri, 8 Apr 2022 14:29:52 -0400 Subject: [PATCH] PHPLIB-814: Change stream support for point-in-time pre and post-images Introduces fullDocumentBeforeChange option. Tests for specifying "whenAvailable" and "required" for both fullDocument and fullDocumentBeforeChange (requires MongoDB 6.0+ with changeStreamPreAndPostImages enabled on the collection). Tests synced with mongodb/specifications#1176 --- src/Operation/Watch.php | 46 +- tests/Operation/WatchTest.php | 4 + tests/UnifiedSpecTests/UnifiedSpecTest.php | 4 + tests/UnifiedSpecTests/Util.php | 2 +- .../change-streams-pre_and_post_images.json | 826 ++++++++++++++++++ .../change-streams/change-streams.json | 624 ++++++++++++- 6 files changed, 1491 insertions(+), 15 deletions(-) create mode 100644 tests/UnifiedSpecTests/change-streams/change-streams-pre_and_post_images.json diff --git a/src/Operation/Watch.php b/src/Operation/Watch.php index 3b4927ce3..2e852941a 100644 --- a/src/Operation/Watch.php +++ b/src/Operation/Watch.php @@ -59,6 +59,12 @@ class Watch implements Executable, /* @internal */ CommandSubscriber { public const FULL_DOCUMENT_DEFAULT = 'default'; public const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup'; + public const FULL_DOCUMENT_WHEN_AVAILABLE = 'whenAvailable'; + public const FULL_DOCUMENT_REQUIRED = 'required'; + + public const FULL_DOCUMENT_BEFORE_CHANGE_OFF = 'off'; + public const FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLE = 'whenAvailable'; + public const FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED = 'required'; /** @var integer */ private static $wireVersionForStartAtOperationTime = 7; @@ -105,15 +111,33 @@ class Watch implements Executable, /* @internal */ CommandSubscriber * * * collation (document): Specifies a collation. * - * * fullDocument (string): Determines whether the "fullDocument" field - * will be populated for update operations. By default, change streams - * only return the delta of fields during the update operation (via the - * "updateDescription" field). To additionally return the most current - * majority-committed version of the updated document, specify - * "updateLookup" for this option. Defaults to "default". + * * fullDocument (string): Determines how the "fullDocument" response + * field will be populated for update operations. + * + * By default, change streams only return the delta of fields (via an + * "updateDescription" field) for update operations and "fullDocument" is + * omitted. Insert and replace operations always include the + * "fullDocument" field. Delete operations omit the field as the document + * no longer exists. + * + * Specify "updateLookup" to return the current majority-committed + * version of the updated document. + * + * MongoDB 6.0+ allows returning the post-image of the modified document + * if the collection has changeStreamPreAndPostImages enabled. Specify + * "whenAvailable" to return the post-image if available or a null value + * if not. Specify "required" to return the post-image if available or + * raise an error if not. * - * Insert and replace operations always include the "fullDocument" field - * and delete operations omit the field as the document no longer exists. + * * fullDocumentBeforeChange (string): Determines how the + * "fullDocumentBeforeChange" response field will be populated. By + * default, the field is omitted. + * + * MongoDB 6.0+ allows returning the pre-image of the modified document + * if the collection has changeStreamPreAndPostImages enabled. Specify + * "whenAvailable" to return the pre-image if available or a null value + * if not. Specify "required" to return the pre-image if available or + * raise an error if not. * * * maxAwaitTimeMS (integer): The maximum amount of time for the server to * wait on new documents to satisfy a change stream query. @@ -181,6 +205,10 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar throw InvalidArgumentException::invalidType('"fullDocument" option', $options['fullDocument'], 'string'); } + if (isset($options['fullDocumentBeforeChange']) && ! is_string($options['fullDocumentBeforeChange'])) { + throw InvalidArgumentException::invalidType('"fullDocumentBeforeChange" option', $options['fullDocumentBeforeChange'], 'string'); + } + if (! $options['readPreference'] instanceof ReadPreference) { throw InvalidArgumentException::invalidType('"readPreference" option', $options['readPreference'], ReadPreference::class); } @@ -212,7 +240,7 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar } $this->aggregateOptions = array_intersect_key($options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]); - $this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'resumeAfter' => 1, 'startAfter' => 1, 'startAtOperationTime' => 1]); + $this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'fullDocumentBeforeChange' => 1, 'resumeAfter' => 1, 'startAfter' => 1, 'startAtOperationTime' => 1]); // Null database name implies a cluster-wide change stream if ($databaseName === null) { diff --git a/tests/Operation/WatchTest.php b/tests/Operation/WatchTest.php index ba790fb5f..ced86d62b 100644 --- a/tests/Operation/WatchTest.php +++ b/tests/Operation/WatchTest.php @@ -56,6 +56,10 @@ public function provideInvalidConstructorOptions() $options[][] = ['fullDocument' => $value]; } + foreach ($this->getInvalidStringValues(true) as $value) { + $options[][] = ['fullDocumentBeforeChange' => $value]; + } + foreach ($this->getInvalidIntegerValues() as $value) { $options[][] = ['maxAwaitTimeMS' => $value]; } diff --git a/tests/UnifiedSpecTests/UnifiedSpecTest.php b/tests/UnifiedSpecTests/UnifiedSpecTest.php index 0ae0f36c9..b52166803 100644 --- a/tests/UnifiedSpecTests/UnifiedSpecTest.php +++ b/tests/UnifiedSpecTests/UnifiedSpecTest.php @@ -21,6 +21,10 @@ class UnifiedSpecTest extends FunctionalTestCase { /** @var array */ private static $incompleteTests = [ + 'change-streams/change-streams: to field is set in a rename change event' => 'Not yet implemented (PHPLIB-652, PHPLIB-828)', + 'change-streams/change-streams: Test with document comment' => 'Not yet implemented (PHPLIB-749)', + 'change-streams/change-streams: Test with string comment' => 'Not yet implemented (PHPLIB-749)', + 'change-streams/change-streams: Test that comment is set on getMore' => 'Not yet implemented (PHPLIB-749)', 'command-monitoring/pre-42-server-connection-id: command events do not include server connection id' => 'Not yet implemented (PHPC-1899, PHPLIB-718)', 'command-monitoring/server-connection-id: command events include server connection id' => 'Not yet implemented (PHPC-1899, PHPLIB-718)', // Many load balancer tests use CMAP events and/or assertNumberConnectionsCheckedOut diff --git a/tests/UnifiedSpecTests/Util.php b/tests/UnifiedSpecTests/Util.php index fd9ca40d4..01d09b109 100644 --- a/tests/UnifiedSpecTests/Util.php +++ b/tests/UnifiedSpecTests/Util.php @@ -72,7 +72,7 @@ final class Util Collection::class => [ 'aggregate' => ['pipeline', 'session', 'useCursor', 'allowDiskUse', 'batchSize', 'bypassDocumentValidation', 'collation', 'comment', 'explain', 'hint', 'let', 'maxAwaitTimeMS', 'maxTimeMS'], 'bulkWrite' => ['requests', 'session', 'ordered', 'bypassDocumentValidation'], - 'createChangeStream' => ['pipeline', 'session', 'fullDocument', 'resumeAfter', 'startAfter', 'startAtOperationTime', 'batchSize', 'collation', 'maxAwaitTimeMS'], + 'createChangeStream' => ['pipeline', 'session', 'fullDocument', 'fullDocumentBeforeChange', 'resumeAfter', 'startAfter', 'startAtOperationTime', 'batchSize', 'collation', 'maxAwaitTimeMS'], 'createFindCursor' => ['filter', 'session', 'allowDiskUse', 'allowPartialResults', 'batchSize', 'collation', 'comment', 'cursorType', 'hint', 'limit', 'max', 'maxAwaitTimeMS', 'maxScan', 'maxTimeMS', 'min', 'modifiers', 'noCursorTimeout', 'oplogReplay', 'projection', 'returnKey', 'showRecordId', 'skip', 'snapshot', 'sort'], 'createIndex' => ['keys', 'commitQuorum', 'maxTimeMS', 'name', 'session'], 'dropIndex' => ['name', 'session', 'maxTimeMS'], diff --git a/tests/UnifiedSpecTests/change-streams/change-streams-pre_and_post_images.json b/tests/UnifiedSpecTests/change-streams/change-streams-pre_and_post_images.json new file mode 100644 index 000000000..b6f0e2c3b --- /dev/null +++ b/tests/UnifiedSpecTests/change-streams/change-streams-pre_and_post_images.json @@ -0,0 +1,826 @@ +{ + "description": "change-streams-pre_and_post_images", + "schemaVersion": "1.0", + "runOnRequirements": [ + { + "minServerVersion": "6.0.0", + "topologies": [ + "replicaset", + "sharded-replicaset", + "load-balanced" + ] + } + ], + "createEntities": [ + { + "client": { + "id": "client0", + "observeEvents": [ + "commandStartedEvent" + ], + "ignoreCommandMonitoringEvents": [ + "collMod", + "insert", + "update", + "getMore", + "killCursors" + ] + } + }, + { + "database": { + "id": "database0", + "client": "client0", + "databaseName": "change-stream-tests" + } + }, + { + "collection": { + "id": "collection0", + "database": "database0", + "collectionName": "test" + } + } + ], + "initialData": [ + { + "collectionName": "test", + "databaseName": "change-stream-tests", + "documents": [ + { + "_id": 1 + } + ] + } + ], + "tests": [ + { + "description": "fullDocument:whenAvailable with changeStreamPreAndPostImages enabled", + "operations": [ + { + "name": "runCommand", + "object": "database0", + "arguments": { + "commandName": "collMod", + "command": { + "collMod": "test", + "changeStreamPreAndPostImages": { + "enabled": true + } + } + } + }, + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "fullDocument": "whenAvailable" + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "updateOne", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$set": { + "x": 1 + } + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "update", + "ns": { + "db": "change-stream-tests", + "coll": "test" + }, + "updateDescription": { + "$$type": "object" + }, + "fullDocument": { + "_id": 1, + "x": 1 + } + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "test", + "pipeline": [ + { + "$changeStream": { + "fullDocument": "whenAvailable" + } + } + ] + } + } + } + ] + } + ] + }, + { + "description": "fullDocument:whenAvailable with changeStreamPreAndPostImages disabled", + "operations": [ + { + "name": "runCommand", + "object": "database0", + "arguments": { + "commandName": "collMod", + "command": { + "collMod": "test", + "changeStreamPreAndPostImages": { + "enabled": false + } + } + } + }, + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "fullDocument": "whenAvailable" + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "updateOne", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$set": { + "x": 1 + } + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "update", + "ns": { + "db": "change-stream-tests", + "coll": "test" + }, + "updateDescription": { + "$$type": "object" + }, + "fullDocument": null + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "test", + "pipeline": [ + { + "$changeStream": { + "fullDocument": "whenAvailable" + } + } + ] + } + } + } + ] + } + ] + }, + { + "description": "fullDocument:required with changeStreamPreAndPostImages enabled", + "operations": [ + { + "name": "runCommand", + "object": "database0", + "arguments": { + "commandName": "collMod", + "command": { + "collMod": "test", + "changeStreamPreAndPostImages": { + "enabled": true + } + } + } + }, + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "fullDocument": "required" + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "updateOne", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$set": { + "x": 1 + } + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "update", + "ns": { + "db": "change-stream-tests", + "coll": "test" + }, + "updateDescription": { + "$$type": "object" + }, + "fullDocument": { + "_id": 1, + "x": 1 + } + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "test", + "pipeline": [ + { + "$changeStream": { + "fullDocument": "required" + } + } + ] + } + } + } + ] + } + ] + }, + { + "description": "fullDocument:required with changeStreamPreAndPostImages disabled", + "operations": [ + { + "name": "runCommand", + "object": "database0", + "arguments": { + "commandName": "collMod", + "command": { + "collMod": "test", + "changeStreamPreAndPostImages": { + "enabled": false + } + } + } + }, + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "fullDocument": "required" + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "updateOne", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$set": { + "x": 1 + } + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectError": { + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "test", + "pipeline": [ + { + "$changeStream": { + "fullDocument": "required" + } + } + ] + } + } + } + ] + } + ] + }, + { + "description": "fullDocumentBeforeChange:whenAvailable with changeStreamPreAndPostImages enabled", + "operations": [ + { + "name": "runCommand", + "object": "database0", + "arguments": { + "commandName": "collMod", + "command": { + "collMod": "test", + "changeStreamPreAndPostImages": { + "enabled": true + } + } + } + }, + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "fullDocumentBeforeChange": "whenAvailable" + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "updateOne", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$set": { + "x": 1 + } + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "update", + "ns": { + "db": "change-stream-tests", + "coll": "test" + }, + "updateDescription": { + "$$type": "object" + }, + "fullDocumentBeforeChange": { + "_id": 1 + } + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "test", + "pipeline": [ + { + "$changeStream": { + "fullDocumentBeforeChange": "whenAvailable" + } + } + ] + } + } + } + ] + } + ] + }, + { + "description": "fullDocumentBeforeChange:whenAvailable with changeStreamPreAndPostImages disabled", + "operations": [ + { + "name": "runCommand", + "object": "database0", + "arguments": { + "commandName": "collMod", + "command": { + "collMod": "test", + "changeStreamPreAndPostImages": { + "enabled": false + } + } + } + }, + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "fullDocumentBeforeChange": "whenAvailable" + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "updateOne", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$set": { + "x": 1 + } + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "update", + "ns": { + "db": "change-stream-tests", + "coll": "test" + }, + "updateDescription": { + "$$type": "object" + }, + "fullDocumentBeforeChange": null + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "test", + "pipeline": [ + { + "$changeStream": { + "fullDocumentBeforeChange": "whenAvailable" + } + } + ] + } + } + } + ] + } + ] + }, + { + "description": "fullDocumentBeforeChange:required with changeStreamPreAndPostImages enabled", + "operations": [ + { + "name": "runCommand", + "object": "database0", + "arguments": { + "commandName": "collMod", + "command": { + "collMod": "test", + "changeStreamPreAndPostImages": { + "enabled": true + } + } + } + }, + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "fullDocumentBeforeChange": "required" + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "updateOne", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$set": { + "x": 1 + } + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "update", + "ns": { + "db": "change-stream-tests", + "coll": "test" + }, + "updateDescription": { + "$$type": "object" + }, + "fullDocumentBeforeChange": { + "_id": 1 + } + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "test", + "pipeline": [ + { + "$changeStream": { + "fullDocumentBeforeChange": "required" + } + } + ] + } + } + } + ] + } + ] + }, + { + "description": "fullDocumentBeforeChange:required with changeStreamPreAndPostImages disabled", + "operations": [ + { + "name": "runCommand", + "object": "database0", + "arguments": { + "commandName": "collMod", + "command": { + "collMod": "test", + "changeStreamPreAndPostImages": { + "enabled": false + } + } + } + }, + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "fullDocumentBeforeChange": "required" + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "updateOne", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$set": { + "x": 1 + } + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectError": { + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "test", + "pipeline": [ + { + "$changeStream": { + "fullDocumentBeforeChange": "required" + } + } + ] + } + } + } + ] + } + ] + }, + { + "description": "fullDocumentBeforeChange:off with changeStreamPreAndPostImages enabled", + "operations": [ + { + "name": "runCommand", + "object": "database0", + "arguments": { + "commandName": "collMod", + "command": { + "collMod": "test", + "changeStreamPreAndPostImages": { + "enabled": true + } + } + } + }, + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "fullDocumentBeforeChange": "off" + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "updateOne", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$set": { + "x": 1 + } + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "update", + "ns": { + "db": "change-stream-tests", + "coll": "test" + }, + "updateDescription": { + "$$type": "object" + }, + "fullDocumentBeforeChange": { + "$$exists": false + } + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "test", + "pipeline": [ + { + "$changeStream": { + "fullDocumentBeforeChange": "off" + } + } + ] + } + } + } + ] + } + ] + }, + { + "description": "fullDocumentBeforeChange:off with changeStreamPreAndPostImages disabled", + "operations": [ + { + "name": "runCommand", + "object": "database0", + "arguments": { + "commandName": "collMod", + "command": { + "collMod": "test", + "changeStreamPreAndPostImages": { + "enabled": false + } + } + } + }, + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "fullDocumentBeforeChange": "off" + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "updateOne", + "object": "collection0", + "arguments": { + "filter": { + "_id": 1 + }, + "update": { + "$set": { + "x": 1 + } + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "update", + "ns": { + "db": "change-stream-tests", + "coll": "test" + }, + "updateDescription": { + "$$type": "object" + }, + "fullDocumentBeforeChange": { + "$$exists": false + } + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "test", + "pipeline": [ + { + "$changeStream": { + "fullDocumentBeforeChange": "off" + } + } + ] + } + } + } + ] + } + ] + } + ] +} diff --git a/tests/UnifiedSpecTests/change-streams/change-streams.json b/tests/UnifiedSpecTests/change-streams/change-streams.json index adaf00de2..7f3acb96d 100644 --- a/tests/UnifiedSpecTests/change-streams/change-streams.json +++ b/tests/UnifiedSpecTests/change-streams/change-streams.json @@ -1,10 +1,22 @@ { "description": "change-streams", "schemaVersion": "1.0", + "runOnRequirements": [ + { + "minServerVersion": "3.6", + "topologies": [ + "replicaset", + "sharded-replicaset" + ] + } + ], "createEntities": [ { "client": { - "id": "client0" + "id": "client0", + "observeEvents": [ + "commandStartedEvent" + ] } }, { @@ -34,10 +46,7 @@ "description": "Test array truncation", "runOnRequirements": [ { - "minServerVersion": "4.7", - "topologies": [ - "replicaset" - ] + "minServerVersion": "4.7" } ], "operations": [ @@ -111,6 +120,611 @@ } } ] + }, + { + "description": "Test with document comment", + "runOnRequirements": [ + { + "minServerVersion": "4.4" + } + ], + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "comment": { + "name": "test1" + } + }, + "saveResultAsEntity": "changeStream0" + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "pipeline": [ + { + "$changeStream": {} + } + ], + "comment": { + "name": "test1" + } + } + } + } + ] + } + ] + }, + { + "description": "Test with document comment - pre 4.4", + "runOnRequirements": [ + { + "maxServerVersion": "4.2.99" + } + ], + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "comment": { + "name": "test1" + } + }, + "expectError": { + "isClientError": false + } + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "pipeline": [ + { + "$changeStream": {} + } + ], + "comment": { + "name": "test1" + } + } + } + } + ] + } + ] + }, + { + "description": "Test with string comment", + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "comment": "comment" + }, + "saveResultAsEntity": "changeStream0" + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "pipeline": [ + { + "$changeStream": {} + } + ], + "comment": "comment" + } + } + } + ] + } + ] + }, + { + "description": "Test that comment is set on getMore", + "runOnRequirements": [ + { + "minServerVersion": "4.4.0", + "topologies": [ + "replicaset" + ] + } + ], + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "comment": { + "key": "value" + } + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 1, + "a": 1 + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0" + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "pipeline": [ + { + "$changeStream": {} + } + ], + "comment": { + "key": "value" + } + } + } + }, + { + "commandStartedEvent": { + "command": { + "insert": "collection0", + "documents": [ + { + "_id": 1, + "a": 1 + } + ] + } + } + }, + { + "commandStartedEvent": { + "command": { + "getMore": { + "$$type": [ + "int", + "long" + ] + }, + "collection": "collection0", + "comment": { + "key": "value" + } + }, + "commandName": "getMore", + "databaseName": "database0" + } + } + ] + } + ] + }, + { + "description": "Test that comment is not set on getMore - pre 4.4", + "runOnRequirements": [ + { + "maxServerVersion": "4.3.99", + "topologies": [ + "replicaset" + ] + } + ], + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [], + "comment": "comment" + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 1, + "a": 1 + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0" + } + ], + "expectEvents": [ + { + "client": "client0", + "events": [ + { + "commandStartedEvent": { + "command": { + "aggregate": "collection0", + "pipeline": [ + { + "$changeStream": {} + } + ], + "comment": "comment" + } + } + }, + { + "commandStartedEvent": { + "command": { + "insert": "collection0", + "documents": [ + { + "_id": 1, + "a": 1 + } + ] + } + } + }, + { + "commandStartedEvent": { + "command": { + "getMore": { + "$$type": [ + "int", + "long" + ] + }, + "collection": "collection0", + "comment": { + "$$exists": false + } + }, + "commandName": "getMore", + "databaseName": "database0" + } + } + ] + } + ] + }, + { + "description": "to field is set in a rename change event", + "runOnRequirements": [ + { + "minServerVersion": "4.0.1" + } + ], + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [] + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "dropCollection", + "object": "database0", + "arguments": { + "collection": "collection1" + } + }, + { + "name": "rename", + "object": "collection0", + "arguments": { + "to": "collection1" + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "rename", + "ns": { + "db": "database0", + "coll": "collection0" + }, + "to": { + "db": "database0", + "coll": "collection1" + } + } + } + ] + }, + { + "description": "Test unknown operationType MUST NOT err", + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$project": { + "operationType": "addedInFutureMongoDBVersion", + "ns": 1 + } + } + ] + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 1, + "a": 1 + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "addedInFutureMongoDBVersion", + "ns": { + "db": "database0", + "coll": "collection0" + } + } + } + ] + }, + { + "description": "Test newField added in response MUST NOT err", + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$project": { + "operationType": 1, + "ns": 1, + "newField": "newFieldValue" + } + } + ] + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 1, + "a": 1 + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "insert", + "ns": { + "db": "database0", + "coll": "collection0" + }, + "newField": "newFieldValue" + } + } + ] + }, + { + "description": "Test new structure in ns document MUST NOT err", + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$project": { + "operationType": "insert", + "ns.viewOn": "db.coll" + } + } + ] + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 1, + "a": 1 + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "insert", + "ns": { + "viewOn": "db.coll" + } + } + } + ] + }, + { + "description": "Test modified structure in ns document MUST NOT err", + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$project": { + "operationType": "insert", + "ns": { + "db": "$ns.db", + "coll": "$ns.coll", + "viewOn": "db.coll" + } + } + } + ] + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 1, + "a": 1 + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "operationType": "insert", + "ns": { + "db": "database0", + "coll": "collection0", + "viewOn": "db.coll" + } + } + } + ] + }, + { + "description": "Test server error on projecting out _id", + "runOnRequirements": [ + { + "minServerVersion": "4.2" + } + ], + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$project": { + "_id": 0 + } + } + ] + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 1, + "a": 1 + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectError": { + "errorCode": 280, + "errorCodeName": "ChangeStreamFatalError", + "errorLabelsContain": [ + "NonResumableChangeStreamError" + ] + } + } + ] + }, + { + "description": "Test projection in change stream returns expected fields", + "operations": [ + { + "name": "createChangeStream", + "object": "collection0", + "arguments": { + "pipeline": [ + { + "$project": { + "optype": "$operationType", + "ns": 1, + "newField": "value" + } + } + ] + }, + "saveResultAsEntity": "changeStream0" + }, + { + "name": "insertOne", + "object": "collection0", + "arguments": { + "document": { + "_id": 1, + "a": 1 + } + } + }, + { + "name": "iterateUntilDocumentOrError", + "object": "changeStream0", + "expectResult": { + "optype": "insert", + "ns": { + "db": "database0", + "coll": "collection0" + }, + "newField": "value" + } + } + ] } ] }