diff --git a/psalm-baseline.xml b/psalm-baseline.xml
index e6b0a3436..17571a178 100644
--- a/psalm-baseline.xml
+++ b/psalm-baseline.xml
@@ -101,6 +101,7 @@
cursor->nextBatch]]>
+ $resumeToken
postBatchResumeToken]]>
@@ -170,6 +171,7 @@
+ options['codec']]]>
options['typeMap']]]>
@@ -191,6 +193,8 @@
$args[0]
$args[0]
$args[0]
+ $args[0]
+ $args[1]
$args[1]
$args[1]
$args[1]
@@ -205,6 +209,8 @@
$args[0]
$args[0]
$args[0]
+ $args[0]
+ $args[1]
$args[1]
$args[1]
$args[1]
@@ -244,6 +250,8 @@
$args[2]
+ $operations[$i][$type][0]
+ $operations[$i][$type][1]
$operations[$i][$type][1]
$operations[$i][$type][2]
$operations[$i][$type][2]
@@ -401,6 +409,7 @@
+ options['codec']]]>
options['typeMap']]]>
@@ -427,18 +436,34 @@
+ $value
array|object|null
+ decode
isInTransaction
+ options['codec']->decode($value)]]>
+ options['codec']->decode($value)]]>
value ?? null) : null]]>
value ?? null) : null]]>
+
+
+ $document
+
+
+ array|object|null
+
+
+ $document === false ? null : $document
+ $document === false ? null : $document
+
+
@@ -448,6 +473,9 @@
+
+ $replacement
+
@@ -464,6 +492,9 @@
isInTransaction
+
+ $document
+
@@ -475,6 +506,9 @@
isInTransaction
+
+ $document
+
@@ -522,6 +556,11 @@
isInTransaction
+
+
+ $replacement
+
+
options['writeConcern']]]>
diff --git a/src/ChangeStream.php b/src/ChangeStream.php
index 6fc3bbaa1..8193a35eb 100644
--- a/src/ChangeStream.php
+++ b/src/ChangeStream.php
@@ -18,6 +18,8 @@
namespace MongoDB;
use Iterator;
+use MongoDB\BSON\Document;
+use MongoDB\Codec\DocumentCodec;
use MongoDB\Driver\CursorId;
use MongoDB\Driver\Exception\ConnectionException;
use MongoDB\Driver\Exception\RuntimeException;
@@ -27,6 +29,7 @@
use MongoDB\Model\ChangeStreamIterator;
use ReturnTypeWillChange;
+use function assert;
use function call_user_func;
use function in_array;
@@ -82,15 +85,22 @@ class ChangeStream implements Iterator
*/
private bool $hasAdvanced = false;
+ private ?DocumentCodec $codec;
+
/**
* @internal
*
* @param ResumeCallable $resumeCallable
*/
- public function __construct(ChangeStreamIterator $iterator, callable $resumeCallable)
+ public function __construct(ChangeStreamIterator $iterator, callable $resumeCallable, ?DocumentCodec $codec = null)
{
$this->iterator = $iterator;
$this->resumeCallable = $resumeCallable;
+ $this->codec = $codec;
+
+ if ($codec) {
+ $this->iterator->getInnerIterator()->setTypeMap(['root' => 'bson']);
+ }
}
/**
@@ -100,7 +110,15 @@ public function __construct(ChangeStreamIterator $iterator, callable $resumeCall
#[ReturnTypeWillChange]
public function current()
{
- return $this->iterator->current();
+ $value = $this->iterator->current();
+
+ if (! $this->codec) {
+ return $value;
+ }
+
+ assert($value instanceof Document);
+
+ return $this->codec->decode($value);
}
/** @return CursorId */
@@ -252,6 +270,10 @@ private function resume(): void
$this->iterator->rewind();
+ if ($this->codec) {
+ $this->iterator->getInnerIterator()->setTypeMap(['root' => 'bson']);
+ }
+
$this->onIteration($this->hasAdvanced);
}
diff --git a/src/Collection.php b/src/Collection.php
index 801511951..b5a672308 100644
--- a/src/Collection.php
+++ b/src/Collection.php
@@ -17,8 +17,10 @@
namespace MongoDB;
+use Iterator;
use MongoDB\BSON\JavascriptInterface;
-use MongoDB\Driver\Cursor;
+use MongoDB\Codec\DocumentCodec;
+use MongoDB\Driver\CursorInterface;
use MongoDB\Driver\Exception\RuntimeException as DriverRuntimeException;
use MongoDB\Driver\Manager;
use MongoDB\Driver\ReadConcern;
@@ -62,6 +64,7 @@
use function array_diff_key;
use function array_intersect_key;
+use function array_key_exists;
use function current;
use function is_array;
use function strlen;
@@ -76,6 +79,8 @@ class Collection
private const WIRE_VERSION_FOR_READ_CONCERN_WITH_WRITE_STAGE = 8;
+ private ?DocumentCodec $codec = null;
+
private string $collectionName;
private string $databaseName;
@@ -98,6 +103,9 @@ class Collection
*
* Supported options:
*
+ * * codec (MongoDB\Codec\DocumentCodec): Codec used to decode documents
+ * from BSON to PHP objects.
+ *
* * readConcern (MongoDB\Driver\ReadConcern): The default read concern to
* use for collection operations. Defaults to the Manager's read concern.
*
@@ -127,6 +135,10 @@ public function __construct(Manager $manager, string $databaseName, string $coll
throw new InvalidArgumentException('$collectionName is invalid: ' . $collectionName);
}
+ if (isset($options['codec']) && ! $options['codec'] instanceof DocumentCodec) {
+ throw InvalidArgumentException::invalidType('"codec" option', $options['codec'], DocumentCodec::class);
+ }
+
if (isset($options['readConcern']) && ! $options['readConcern'] instanceof ReadConcern) {
throw InvalidArgumentException::invalidType('"readConcern" option', $options['readConcern'], ReadConcern::class);
}
@@ -146,6 +158,8 @@ public function __construct(Manager $manager, string $databaseName, string $coll
$this->manager = $manager;
$this->databaseName = $databaseName;
$this->collectionName = $collectionName;
+
+ $this->codec = $options['codec'] ?? null;
$this->readConcern = $options['readConcern'] ?? $this->manager->getReadConcern();
$this->readPreference = $options['readPreference'] ?? $this->manager->getReadPreference();
$this->typeMap = $options['typeMap'] ?? self::DEFAULT_TYPE_MAP;
@@ -161,6 +175,7 @@ public function __construct(Manager $manager, string $databaseName, string $coll
public function __debugInfo()
{
return [
+ 'codec' => $this->codec,
'collectionName' => $this->collectionName,
'databaseName' => $this->databaseName,
'manager' => $this->manager,
@@ -188,7 +203,7 @@ public function __toString()
* @see Aggregate::__construct() for supported options
* @param array $pipeline Aggregation pipeline
* @param array $options Command options
- * @return Cursor
+ * @return CursorInterface&Iterator
* @throws UnexpectedValueException if the command response was malformed
* @throws UnsupportedException if options are not supported by the selected server
* @throws InvalidArgumentException for parameter/option parsing errors
@@ -198,9 +213,7 @@ public function aggregate(array $pipeline, array $options = [])
{
$hasWriteStage = is_last_pipeline_operator_write($pipeline);
- if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
- $options['readPreference'] = $this->readPreference;
- }
+ $options = $this->inheritReadPreference($options);
$server = $hasWriteStage
? select_server_for_aggregate_write_stage($this->manager, $options)
@@ -208,23 +221,15 @@ public function aggregate(array $pipeline, array $options = [])
/* MongoDB 4.2 and later supports a read concern when an $out stage is
* being used, but earlier versions do not.
- *
- * A read concern is also not compatible with transactions.
*/
- if (
- ! isset($options['readConcern']) &&
- ! is_in_transaction($options) &&
- ( ! $hasWriteStage || server_supports_feature($server, self::WIRE_VERSION_FOR_READ_CONCERN_WITH_WRITE_STAGE))
- ) {
- $options['readConcern'] = $this->readConcern;
+ if (! $hasWriteStage || server_supports_feature($server, self::WIRE_VERSION_FOR_READ_CONCERN_WITH_WRITE_STAGE)) {
+ $options = $this->inheritReadConcern($options);
}
- if (! isset($options['typeMap'])) {
- $options['typeMap'] = $this->typeMap;
- }
+ $options = $this->inheritCodecOrTypeMap($options);
- if ($hasWriteStage && ! isset($options['writeConcern']) && ! is_in_transaction($options)) {
- $options['writeConcern'] = $this->writeConcern;
+ if ($hasWriteStage) {
+ $options = $this->inheritWriteOptions($options);
}
$operation = new Aggregate($this->databaseName, $this->collectionName, $pipeline, $options);
@@ -245,14 +250,12 @@ public function aggregate(array $pipeline, array $options = [])
*/
public function bulkWrite(array $operations, array $options = [])
{
- if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
- $options['writeConcern'] = $this->writeConcern;
- }
+ $options = $this->inheritWriteOptions($options);
+ $options = $this->inheritCodec($options);
$operation = new BulkWrite($this->databaseName, $this->collectionName, $operations, $options);
- $server = select_server($this->manager, $options);
- return $operation->execute($server);
+ return $operation->execute(select_server($this->manager, $options));
}
/**
@@ -271,19 +274,11 @@ public function bulkWrite(array $operations, array $options = [])
*/
public function count($filter = [], array $options = [])
{
- if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
- $options['readPreference'] = $this->readPreference;
- }
-
- $server = select_server($this->manager, $options);
-
- if (! isset($options['readConcern']) && ! is_in_transaction($options)) {
- $options['readConcern'] = $this->readConcern;
- }
+ $options = $this->inheritReadOptions($options);
$operation = new Count($this->databaseName, $this->collectionName, $filter, $options);
- return $operation->execute($server);
+ return $operation->execute(select_server($this->manager, $options));
}
/**
@@ -300,19 +295,11 @@ public function count($filter = [], array $options = [])
*/
public function countDocuments($filter = [], array $options = [])
{
- if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
- $options['readPreference'] = $this->readPreference;
- }
-
- $server = select_server($this->manager, $options);
-
- if (! isset($options['readConcern']) && ! is_in_transaction($options)) {
- $options['readConcern'] = $this->readConcern;
- }
+ $options = $this->inheritReadOptions($options);
$operation = new CountDocuments($this->databaseName, $this->collectionName, $filter, $options);
- return $operation->execute($server);
+ return $operation->execute(select_server($this->manager, $options));
}
/**
@@ -366,15 +353,11 @@ public function createIndex($key, array $options = [])
*/
public function createIndexes(array $indexes, array $options = [])
{
- $server = select_server($this->manager, $options);
-
- if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
- $options['writeConcern'] = $this->writeConcern;
- }
+ $options = $this->inheritWriteOptions($options);
$operation = new CreateIndexes($this->databaseName, $this->collectionName, $indexes, $options);
- return $operation->execute($server);
+ return $operation->execute(select_server($this->manager, $options));
}
/**
@@ -391,14 +374,11 @@ public function createIndexes(array $indexes, array $options = [])
*/
public function deleteMany($filter, array $options = [])
{
- if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
- $options['writeConcern'] = $this->writeConcern;
- }
+ $options = $this->inheritWriteOptions($options);
$operation = new DeleteMany($this->databaseName, $this->collectionName, $filter, $options);
- $server = select_server($this->manager, $options);
- return $operation->execute($server);
+ return $operation->execute(select_server($this->manager, $options));
}
/**
@@ -415,14 +395,11 @@ public function deleteMany($filter, array $options = [])
*/
public function deleteOne($filter, array $options = [])
{
- if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
- $options['writeConcern'] = $this->writeConcern;
- }
+ $options = $this->inheritWriteOptions($options);
$operation = new DeleteOne($this->databaseName, $this->collectionName, $filter, $options);
- $server = select_server($this->manager, $options);
- return $operation->execute($server);
+ return $operation->execute(select_server($this->manager, $options));
}
/**
@@ -440,23 +417,12 @@ public function deleteOne($filter, array $options = [])
*/
public function distinct(string $fieldName, $filter = [], array $options = [])
{
- if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
- $options['readPreference'] = $this->readPreference;
- }
-
- if (! isset($options['typeMap'])) {
- $options['typeMap'] = $this->typeMap;
- }
-
- $server = select_server($this->manager, $options);
-
- if (! isset($options['readConcern']) && ! is_in_transaction($options)) {
- $options['readConcern'] = $this->readConcern;
- }
+ $options = $this->inheritReadOptions($options);
+ $options = $this->inheritTypeMap($options);
$operation = new Distinct($this->databaseName, $this->collectionName, $fieldName, $filter, $options);
- return $operation->execute($server);
+ return $operation->execute(select_server($this->manager, $options));
}
/**
@@ -471,16 +437,11 @@ public function distinct(string $fieldName, $filter = [], array $options = [])
*/
public function drop(array $options = [])
{
- if (! isset($options['typeMap'])) {
- $options['typeMap'] = $this->typeMap;
- }
+ $options = $this->inheritWriteOptions($options);
+ $options = $this->inheritTypeMap($options);
$server = select_server($this->manager, $options);
- if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
- $options['writeConcern'] = $this->writeConcern;
- }
-
if (! isset($options['encryptedFields'])) {
$options['encryptedFields'] = get_encrypted_fields_from_driver($this->databaseName, $this->collectionName, $this->manager)
?? get_encrypted_fields_from_server($this->databaseName, $this->collectionName, $this->manager, $server);
@@ -512,19 +473,12 @@ public function dropIndex($indexName, array $options = [])
throw new InvalidArgumentException('dropIndexes() must be used to drop multiple indexes');
}
- if (! isset($options['typeMap'])) {
- $options['typeMap'] = $this->typeMap;
- }
-
- $server = select_server($this->manager, $options);
-
- if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
- $options['writeConcern'] = $this->writeConcern;
- }
+ $options = $this->inheritWriteOptions($options);
+ $options = $this->inheritTypeMap($options);
$operation = new DropIndexes($this->databaseName, $this->collectionName, $indexName, $options);
- return $operation->execute($server);
+ return $operation->execute(select_server($this->manager, $options));
}
/**
@@ -539,19 +493,12 @@ public function dropIndex($indexName, array $options = [])
*/
public function dropIndexes(array $options = [])
{
- if (! isset($options['typeMap'])) {
- $options['typeMap'] = $this->typeMap;
- }
-
- $server = select_server($this->manager, $options);
-
- if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
- $options['writeConcern'] = $this->writeConcern;
- }
+ $options = $this->inheritWriteOptions($options);
+ $options = $this->inheritTypeMap($options);
$operation = new DropIndexes($this->databaseName, $this->collectionName, '*', $options);
- return $operation->execute($server);
+ return $operation->execute(select_server($this->manager, $options));
}
/**
@@ -567,19 +514,11 @@ public function dropIndexes(array $options = [])
*/
public function estimatedDocumentCount(array $options = [])
{
- if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
- $options['readPreference'] = $this->readPreference;
- }
-
- $server = select_server($this->manager, $options);
-
- if (! isset($options['readConcern']) && ! is_in_transaction($options)) {
- $options['readConcern'] = $this->readConcern;
- }
+ $options = $this->inheritReadOptions($options);
$operation = new EstimatedDocumentCount($this->databaseName, $this->collectionName, $options);
- return $operation->execute($server);
+ return $operation->execute(select_server($this->manager, $options));
}
/**
@@ -596,19 +535,12 @@ public function estimatedDocumentCount(array $options = [])
*/
public function explain(Explainable $explainable, array $options = [])
{
- if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
- $options['readPreference'] = $this->readPreference;
- }
-
- if (! isset($options['typeMap'])) {
- $options['typeMap'] = $this->typeMap;
- }
-
- $server = select_server($this->manager, $options);
+ $options = $this->inheritReadPreference($options);
+ $options = $this->inheritTypeMap($options);
$operation = new Explain($this->databaseName, $explainable, $options);
- return $operation->execute($server);
+ return $operation->execute(select_server($this->manager, $options));
}
/**
@@ -618,30 +550,19 @@ public function explain(Explainable $explainable, array $options = [])
* @see https://mongodb.com/docs/manual/crud/#read-operations
* @param array|object $filter Query by which to filter documents
* @param array $options Additional options
- * @return Cursor
+ * @return CursorInterface&Iterator
* @throws UnsupportedException if options are not supported by the selected server
* @throws InvalidArgumentException for parameter/option parsing errors
* @throws DriverRuntimeException for other driver errors (e.g. connection errors)
*/
public function find($filter = [], array $options = [])
{
- if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
- $options['readPreference'] = $this->readPreference;
- }
-
- $server = select_server($this->manager, $options);
-
- if (! isset($options['readConcern']) && ! is_in_transaction($options)) {
- $options['readConcern'] = $this->readConcern;
- }
-
- if (! isset($options['typeMap'])) {
- $options['typeMap'] = $this->typeMap;
- }
+ $options = $this->inheritReadOptions($options);
+ $options = $this->inheritCodecOrTypeMap($options);
$operation = new Find($this->databaseName, $this->collectionName, $filter, $options);
- return $operation->execute($server);
+ return $operation->execute(select_server($this->manager, $options));
}
/**
@@ -658,23 +579,12 @@ public function find($filter = [], array $options = [])
*/
public function findOne($filter = [], array $options = [])
{
- if (! isset($options['readPreference']) && ! is_in_transaction($options)) {
- $options['readPreference'] = $this->readPreference;
- }
-
- $server = select_server($this->manager, $options);
-
- if (! isset($options['readConcern']) && ! is_in_transaction($options)) {
- $options['readConcern'] = $this->readConcern;
- }
-
- if (! isset($options['typeMap'])) {
- $options['typeMap'] = $this->typeMap;
- }
+ $options = $this->inheritReadOptions($options);
+ $options = $this->inheritCodecOrTypeMap($options);
$operation = new FindOne($this->databaseName, $this->collectionName, $filter, $options);
- return $operation->execute($server);
+ return $operation->execute(select_server($this->manager, $options));
}
/**
@@ -694,19 +604,12 @@ public function findOne($filter = [], array $options = [])
*/
public function findOneAndDelete($filter, array $options = [])
{
- $server = select_server($this->manager, $options);
-
- if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
- $options['writeConcern'] = $this->writeConcern;
- }
-
- if (! isset($options['typeMap'])) {
- $options['typeMap'] = $this->typeMap;
- }
+ $options = $this->inheritWriteOptions($options);
+ $options = $this->inheritCodecOrTypeMap($options);
$operation = new FindOneAndDelete($this->databaseName, $this->collectionName, $filter, $options);
- return $operation->execute($server);
+ return $operation->execute(select_server($this->manager, $options));
}
/**
@@ -731,19 +634,12 @@ public function findOneAndDelete($filter, array $options = [])
*/
public function findOneAndReplace($filter, $replacement, array $options = [])
{
- $server = select_server($this->manager, $options);
-
- if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
- $options['writeConcern'] = $this->writeConcern;
- }
-
- if (! isset($options['typeMap'])) {
- $options['typeMap'] = $this->typeMap;
- }
+ $options = $this->inheritWriteOptions($options);
+ $options = $this->inheritCodecOrTypeMap($options);
$operation = new FindOneAndReplace($this->databaseName, $this->collectionName, $filter, $replacement, $options);
- return $operation->execute($server);
+ return $operation->execute(select_server($this->manager, $options));
}
/**
@@ -768,19 +664,12 @@ public function findOneAndReplace($filter, $replacement, array $options = [])
*/
public function findOneAndUpdate($filter, $update, array $options = [])
{
- $server = select_server($this->manager, $options);
-
- if (! isset($options['writeConcern']) && ! is_in_transaction($options)) {
- $options['writeConcern'] = $this->writeConcern;
- }
-
- if (! isset($options['typeMap'])) {
- $options['typeMap'] = $this->typeMap;
- }
+ $options = $this->inheritWriteOptions($options);
+ $options = $this->inheritCodecOrTypeMap($options);
$operation = new FindOneAndUpdate($this->databaseName, $this->collectionName, $filter, $update, $options);
- return $operation->execute($server);
+ return $operation->execute(select_server($this->manager, $options));
}
/**
@@ -871,22 +760,20 @@ public function getWriteConcern()
*
* @see InsertMany::__construct() for supported options
* @see https://mongodb.com/docs/manual/reference/command/insert/
- * @param array[]|object[] $documents The documents to insert
- * @param array $options Command options
+ * @param list