Skip to content

Commit

Permalink
PHPLIB-814: Change stream support for point-in-time pre and post-images
Browse files Browse the repository at this point in the history
Introduces fullDocumentBeforeChange option. Tests for specifying "whenAvailable" and "required" for both fullDocument and fullDocumentBeforeChange (requires MongoDB 6.0+ with changeStreamPreAndPostImages enabled on the collection).

Intentionally omits mention of FULL_DOCUMENT_DEFAULT (related to PHPLIB-808) and FULL_DOCUMENT_BEFORE_CHANGE_OFF constants, since those redundantly specified default behavior.

Tests synced with mongodb/specifications#1176
  • Loading branch information
jmikola committed Apr 12, 2022
1 parent aefff4c commit 126b906
Show file tree
Hide file tree
Showing 10 changed files with 1,548 additions and 21 deletions.
4 changes: 4 additions & 0 deletions docs/includes/apiargs-MongoDBClient-method-watch-option.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ source:
file: apiargs-method-watch-option.yaml
ref: fullDocument
---
source:
file: apiargs-method-watch-option.yaml
ref: fullDocumentBeforeChange
---
source:
file: apiargs-method-watch-option.yaml
ref: maxAwaitTimeMS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ source:
file: apiargs-method-watch-option.yaml
ref: fullDocument
---
source:
file: apiargs-method-watch-option.yaml
ref: fullDocumentBeforeChange
---
source:
file: apiargs-method-watch-option.yaml
ref: maxAwaitTimeMS
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@ source:
file: apiargs-method-watch-option.yaml
ref: fullDocument
---
source:
file: apiargs-method-watch-option.yaml
ref: fullDocumentBeforeChange
---
source:
file: apiargs-method-watch-option.yaml
ref: maxAwaitTimeMS
Expand Down
51 changes: 45 additions & 6 deletions docs/includes/apiargs-method-watch-option.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,53 @@ arg_name: option
name: fullDocument
type: string
description: |
Allowed values are 'default' and 'updateLookup'. Defaults to 'default'.
When set to 'updateLookup', the change notification for partial updates will
include both a delta describing the changes to the document, as well as a
copy of the entire document that was changed from some time after the change
occurred. The following values are supported:
Determines how the "fullDocument" response field will be populated for update
operations.
By default, change streams only return the delta of fields (via an
"udateDescription" field) for update operations and "fullDocument" is omitted.
Insert and replace operations always include the "fullDocument" field. Delete
operations omit the field as the document no longer exists.
Specify "updateLookup" to return the current majority-committed version of the
updated document.
MongoDB 6.0+ allows returning the post-image of the modified document if the
collection has ``changeStreamPreAndPostImages`` enabled. Specify
"whenAvailable" to return the post-image if available or a null value if not.
Specify "required" to return the post-image if available or raise an error if
not.
The following values are supported:
- ``MongoDB\Operation\Watch::FULL_DOCUMENT_DEFAULT`` (*default*)
- ``MongoDB\Operation\Watch::FULL_DOCUMENT_UPDATE_LOOKUP``
- ``MongoDB\Operation\Watch::FULL_DOCUMENT_WHEN_AVAILABLE``
- ``MongoDB\Operation\Watch::FULL_DOCUMENT_REQUIRED``
.. note::
This is an option of the ``$changeStream`` pipeline stage.
interface: phpmethod
operation: ~
optional: true
---
arg_name: option
name: fullDocumentBeforeChange
type: string
description: |
Determines how the "fullDocumentBeforeChange" response field will be
populated. By default, the field is omitted.
MongoDB 6.0+ allows returning the pre-image of the modified document if the
collection has ``changeStreamPreAndPostImages`` enabled. Specify
"whenAvailable" to return the pre-image if available or a null value if not.
Specify "required" to return the pre-image if available or raise an error if
not.
The following values are supported:
- ``MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLE``
- ``MongoDB\Operation\Watch::FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED``
.. note::
Expand Down
46 changes: 37 additions & 9 deletions src/Operation/Watch.php
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,12 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
{
public const FULL_DOCUMENT_DEFAULT = 'default';
public const FULL_DOCUMENT_UPDATE_LOOKUP = 'updateLookup';
public const FULL_DOCUMENT_WHEN_AVAILABLE = 'whenAvailable';
public const FULL_DOCUMENT_REQUIRED = 'required';

public const FULL_DOCUMENT_BEFORE_CHANGE_OFF = 'off';
public const FULL_DOCUMENT_BEFORE_CHANGE_WHEN_AVAILABLE = 'whenAvailable';
public const FULL_DOCUMENT_BEFORE_CHANGE_REQUIRED = 'required';

/** @var integer */
private static $wireVersionForStartAtOperationTime = 7;
Expand Down Expand Up @@ -105,15 +111,33 @@ class Watch implements Executable, /* @internal */ CommandSubscriber
*
* * collation (document): Specifies a collation.
*
* * fullDocument (string): Determines whether the "fullDocument" field
* will be populated for update operations. By default, change streams
* only return the delta of fields during the update operation (via the
* "updateDescription" field). To additionally return the most current
* majority-committed version of the updated document, specify
* "updateLookup" for this option. Defaults to "default".
* * fullDocument (string): Determines how the "fullDocument" response
* field will be populated for update operations.
*
* By default, change streams only return the delta of fields (via an
* "updateDescription" field) for update operations and "fullDocument" is
* omitted. Insert and replace operations always include the
* "fullDocument" field. Delete operations omit the field as the document
* no longer exists.
*
* Specify "updateLookup" to return the current majority-committed
* version of the updated document.
*
* MongoDB 6.0+ allows returning the post-image of the modified document
* if the collection has changeStreamPreAndPostImages enabled. Specify
* "whenAvailable" to return the post-image if available or a null value
* if not. Specify "required" to return the post-image if available or
* raise an error if not.
*
* Insert and replace operations always include the "fullDocument" field
* and delete operations omit the field as the document no longer exists.
* * fullDocumentBeforeChange (string): Determines how the
* "fullDocumentBeforeChange" response field will be populated. By
* default, the field is omitted.
*
* MongoDB 6.0+ allows returning the pre-image of the modified document
* if the collection has changeStreamPreAndPostImages enabled. Specify
* "whenAvailable" to return the pre-image if available or a null value
* if not. Specify "required" to return the pre-image if available or
* raise an error if not.
*
* * maxAwaitTimeMS (integer): The maximum amount of time for the server to
* wait on new documents to satisfy a change stream query.
Expand Down Expand Up @@ -181,6 +205,10 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
throw InvalidArgumentException::invalidType('"fullDocument" option', $options['fullDocument'], 'string');
}

if (isset($options['fullDocumentBeforeChange']) && ! is_string($options['fullDocumentBeforeChange'])) {
throw InvalidArgumentException::invalidType('"fullDocumentBeforeChange" option', $options['fullDocumentBeforeChange'], 'string');
}

if (! $options['readPreference'] instanceof ReadPreference) {
throw InvalidArgumentException::invalidType('"readPreference" option', $options['readPreference'], ReadPreference::class);
}
Expand Down Expand Up @@ -212,7 +240,7 @@ public function __construct(Manager $manager, $databaseName, $collectionName, ar
}

$this->aggregateOptions = array_intersect_key($options, ['batchSize' => 1, 'collation' => 1, 'maxAwaitTimeMS' => 1, 'readConcern' => 1, 'readPreference' => 1, 'session' => 1, 'typeMap' => 1]);
$this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'resumeAfter' => 1, 'startAfter' => 1, 'startAtOperationTime' => 1]);
$this->changeStreamOptions = array_intersect_key($options, ['fullDocument' => 1, 'fullDocumentBeforeChange' => 1, 'resumeAfter' => 1, 'startAfter' => 1, 'startAtOperationTime' => 1]);

// Null database name implies a cluster-wide change stream
if ($databaseName === null) {
Expand Down
4 changes: 4 additions & 0 deletions tests/Operation/WatchTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ public function provideInvalidConstructorOptions()
$options[][] = ['fullDocument' => $value];
}

foreach ($this->getInvalidStringValues() as $value) {
$options[][] = ['fullDocumentBeforeChange' => $value];
}

foreach ($this->getInvalidIntegerValues() as $value) {
$options[][] = ['maxAwaitTimeMS' => $value];
}
Expand Down
4 changes: 4 additions & 0 deletions tests/UnifiedSpecTests/UnifiedSpecTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ class UnifiedSpecTest extends FunctionalTestCase
{
/** @var array */
private static $incompleteTests = [
'change-streams/change-streams: to field is set in a rename change event' => 'Not yet implemented (PHPLIB-652, PHPLIB-828)',
'change-streams/change-streams: Test with document comment' => 'Not yet implemented (PHPLIB-749)',
'change-streams/change-streams: Test with string comment' => 'Not yet implemented (PHPLIB-749)',
'change-streams/change-streams: Test that comment is set on getMore' => 'Not yet implemented (PHPLIB-749)',
'command-monitoring/pre-42-server-connection-id: command events do not include server connection id' => 'Not yet implemented (PHPC-1899, PHPLIB-718)',
'command-monitoring/server-connection-id: command events include server connection id' => 'Not yet implemented (PHPC-1899, PHPLIB-718)',
// Many load balancer tests use CMAP events and/or assertNumberConnectionsCheckedOut
Expand Down
2 changes: 1 addition & 1 deletion tests/UnifiedSpecTests/Util.php
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ final class Util
Collection::class => [
'aggregate' => ['pipeline', 'session', 'useCursor', 'allowDiskUse', 'batchSize', 'bypassDocumentValidation', 'collation', 'comment', 'explain', 'hint', 'let', 'maxAwaitTimeMS', 'maxTimeMS'],
'bulkWrite' => ['requests', 'session', 'ordered', 'bypassDocumentValidation'],
'createChangeStream' => ['pipeline', 'session', 'fullDocument', 'resumeAfter', 'startAfter', 'startAtOperationTime', 'batchSize', 'collation', 'maxAwaitTimeMS'],
'createChangeStream' => ['pipeline', 'session', 'fullDocument', 'fullDocumentBeforeChange', 'resumeAfter', 'startAfter', 'startAtOperationTime', 'batchSize', 'collation', 'maxAwaitTimeMS'],
'createFindCursor' => ['filter', 'session', 'allowDiskUse', 'allowPartialResults', 'batchSize', 'collation', 'comment', 'cursorType', 'hint', 'limit', 'max', 'maxAwaitTimeMS', 'maxScan', 'maxTimeMS', 'min', 'modifiers', 'noCursorTimeout', 'oplogReplay', 'projection', 'returnKey', 'showRecordId', 'skip', 'snapshot', 'sort'],
'createIndex' => ['keys', 'commitQuorum', 'maxTimeMS', 'name', 'session'],
'dropIndex' => ['name', 'session', 'maxTimeMS'],
Expand Down
Loading

0 comments on commit 126b906

Please sign in to comment.