Skip to content

Commit

Permalink
PHPLIB-814: Change stream support for point-in-time pre and post-images
Browse files Browse the repository at this point in the history
Introduces fullDocumentBeforeChange option. Tests for specifying "whenAvailable" and "required" for both fullDocument and fullDocumentBeforeChange (requires MongoDB 6.0+ with changeStreamPreAndPostImages enabled on the collection).

Tests synced with mongodb/specifications#1176
  • Loading branch information
jmikola committed Apr 8, 2022
1 parent bc60323 commit 504ca3d
Show file tree
Hide file tree
Showing 6 changed files with 1,491 additions and 15 deletions.
46 changes: 37 additions & 9 deletions src/Operation/Watch.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
{
public const FULL_DOCUMENT_DEFAULT = 'default';
public const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup';
public const FULL_DOCUMENT_WHEN_AVAILABLE = 'whenAvailable';
public const FULL_DOCUMENT_REQUIRED = 'required';

public const FULL_DOCUMENT_BEFORE_CHANGE_OFF = 'off';
public const FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLE = 'whenAvailable';
public const FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED = 'required';

/** @var integer */
private static $wireVersionForStartAtOperationTime = 7;
Expand Down Expand Up @@ -105,15 +111,33 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
*
* * collation (document): Specifies a collation.
*
* * fullDocument (string): Determines whether the "fullDocument" field
* will be populated for update operations. By default, change streams
* only return the delta of fields during the update operation (via the
* "updateDescription" field). To additionally return the most current
* majority-committed version of the updated document, specify
* "updateLookup" for this option. Defaults to "default".
* * fullDocument (string): Determines how the "fullDocument" response
* field will be populated for update operations.
*
* By default, change streams only return the delta of fields (via an
* "updateDescription" field) for update operations and "fullDocument" is
* omitted. Insert and replace operations always include the
* "fullDocument" field. Delete operations omit the field as the document
* no longer exists.
*
* Specify "updateLookup" to return the current majority-committed
* version of the updated document.
*
* MongoDB 6.0+ allows returning the post-image of the modified document
* if the collection has changeStreamPreAndPostImages enabled. Specify
* "whenAvailable" to return the post-image if available or a null value
* if not. Specify "required" to return the post-image if available or
* raise an error if not.
*
* Insert and replace operations always include the "fullDocument" field
* and delete operations omit the field as the document no longer exists.
* * fullDocumentBeforeChange (string): Determines how the
* "fullDocumentBeforeChange" response field will be populated. By
* default, the field is omitted.
*
* MongoDB 6.0+ allows returning the pre-image of the modified document
* if the collection has changeStreamPreAndPostImages enabled. Specify
* "whenAvailable" to return the pre-image if available or a null value
* if not. Specify "required" to return the pre-image if available or
* raise an error if not.
*
* * maxAwaitTimeMS (integer): The maximum amount of time for the server to
* wait on new documents to satisfy a change stream query.
Expand Down Expand Up @@ -181,6 +205,10 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
throw InvalidArgumentException::invalidType('"fullDocument" option', $options['fullDocument'], 'string');
}

if (isset($options['fullDocumentBeforeChange']) && ! is_string($options['fullDocumentBeforeChange'])) {
throw InvalidArgumentException::invalidType('"fullDocumentBeforeChange" option', $options['fullDocumentBeforeChange'], 'string');
}

if (! $options['readPreference'] instanceof ReadPreference) {
throw InvalidArgumentException::invalidType('"readPreference" option', $options['readPreference'], ReadPreference::class);
}
Expand Down Expand Up @@ -212,7 +240,7 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
}

$this->aggregateOptions = array_intersect_key($options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]);
$this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'resumeAfter' => 1, 'startAfter' => 1, 'startAtOperationTime' => 1]);
$this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'fullDocumentBeforeChange' => 1, 'resumeAfter' => 1, 'startAfter' => 1, 'startAtOperationTime' => 1]);

// Null database name implies a cluster-wide change stream
if ($databaseName === null) {
Expand Down
4 changes: 4 additions & 0 deletions tests/Operation/WatchTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public function provideInvalidConstructorOptions()
$options[][] = ['fullDocument' => $value];
}

foreach ($this->getInvalidStringValues(true) as $value) {
$options[][] = ['fullDocumentBeforeChange' => $value];
}

foreach ($this->getInvalidIntegerValues() as $value) {
$options[][] = ['maxAwaitTimeMS' => $value];
}
Expand Down
4 changes: 4 additions & 0 deletions tests/UnifiedSpecTests/UnifiedSpecTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class UnifiedSpecTest extends FunctionalTestCase
{
/** @var array */
private static $incompleteTests = [
'change-streams/change-streams: to field is set in a rename change event' => 'Not yet implemented (PHPLIB-652, PHPLIB-828)',
'change-streams/change-streams: Test with document comment' => 'Not yet implemented (PHPLIB-749)',
'change-streams/change-streams: Test with string comment' => 'Not yet implemented (PHPLIB-749)',
'change-streams/change-streams: Test that comment is set on getMore' => 'Not yet implemented (PHPLIB-749)',
'command-monitoring/pre-42-server-connection-id: command events do not include server connection id' => 'Not yet implemented (PHPC-1899, PHPLIB-718)',
'command-monitoring/server-connection-id: command events include server connection id' => 'Not yet implemented (PHPC-1899, PHPLIB-718)',
// Many load balancer tests use CMAP events and/or assertNumberConnectionsCheckedOut
Expand Down
2 changes: 1 addition & 1 deletion tests/UnifiedSpecTests/Util.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ final class Util
Collection::class => [
'aggregate' => ['pipeline', 'session', 'useCursor', 'allowDiskUse', 'batchSize', 'bypassDocumentValidation', 'collation', 'comment', 'explain', 'hint', 'let', 'maxAwaitTimeMS', 'maxTimeMS'],
'bulkWrite' => ['requests', 'session', 'ordered', 'bypassDocumentValidation'],
'createChangeStream' => ['pipeline', 'session', 'fullDocument', 'resumeAfter', 'startAfter', 'startAtOperationTime', 'batchSize', 'collation', 'maxAwaitTimeMS'],
'createChangeStream' => ['pipeline', 'session', 'fullDocument', 'fullDocumentBeforeChange', 'resumeAfter', 'startAfter', 'startAtOperationTime', 'batchSize', 'collation', 'maxAwaitTimeMS'],
'createFindCursor' => ['filter', 'session', 'allowDiskUse', 'allowPartialResults', 'batchSize', 'collation', 'comment', 'cursorType', 'hint', 'limit', 'max', 'maxAwaitTimeMS', 'maxScan', 'maxTimeMS', 'min', 'modifiers', 'noCursorTimeout', 'oplogReplay', 'projection', 'returnKey', 'showRecordId', 'skip', 'snapshot', 'sort'],
'createIndex' => ['keys', 'commitQuorum', 'maxTimeMS', 'name', 'session'],
'dropIndex' => ['name', 'session', 'maxTimeMS'],
Expand Down
Loading

0 comments on commit 504ca3d

Please sign in to comment.