From af0e92a78eb3bdc25d8880e0278aaa5bac03c672 Mon Sep 17 00:00:00 2001 From: Roger Qiu Date: Tue, 7 Jun 2022 14:14:41 +1000 Subject: [PATCH] WIP: DBTransaction --- .eslintrc | 9 + .gitlab-ci.yml | 1 + package-lock.json | 60 +- package.json | 12 +- src/DB.ts | 231 +++-- src/DBIterator.ts | 136 +-- src/DBTransaction.ts | 889 +++++++----------- src/errors.ts | 12 +- src/index.ts | 4 +- src/rocksdb/napi/index.cpp | 61 +- src/rocksdb/napi/transaction.cpp | 4 + src/rocksdb/napi/workers/iterator_workers.cpp | 82 +- src/rocksdb/napi/workers/iterator_workers.h | 33 +- src/rocksdb/rocksdb.ts | 58 +- src/rocksdb/rocksdbP.ts | 97 +- src/rocksdb/types.ts | 52 +- src/types.ts | 42 +- src/utils.ts | 74 +- tests/DB.test.ts | 148 ++- tests/DBIterator.test.ts | 69 +- tests/DBTransaction.test.ts | 230 +++-- tests/rocksdb/rocksdbP.test.ts | 442 ++++++--- 22 files changed, 1594 insertions(+), 1152 deletions(-) diff --git a/.eslintrc b/.eslintrc index 9d5a9935..85ab771b 100644 --- a/.eslintrc +++ b/.eslintrc @@ -50,6 +50,11 @@ "ignoreConsecutiveComments": true } ], + "curly": [ + "error", + "multi-line", + "consistent" + ], "import/order": [ "error", { @@ -137,6 +142,10 @@ "format": ["PascalCase"], "trailingUnderscore": "allowSingleOrDouble" }, + { + "selector": "enumMember", + "format": ["PascalCase", "UPPER_CASE"] + }, { "selector": "objectLiteralProperty", "format": null diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 20d6e6af..4f687dde 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -177,6 +177,7 @@ build:macos: HOMEBREW_NO_INSTALL_UPGRADE: "true" HOMEBREW_NO_INSTALL_CLEANUP: "true" before_script: + - eval "$(brew shellenv)" - brew install node@16 - brew link --overwrite node@16 - brew install python@3.9 diff --git a/package-lock.json b/package-lock.json index 2f9342bb..2ca511eb 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,10 +10,10 @@ "hasInstallScript": true, "license": "Apache-2.0", "dependencies": { - "@matrixai/async-init": "^1.7.3", - "@matrixai/async-locks": "^2.2.0", - "@matrixai/errors": "^1.1.1", - "@matrixai/logger": "^2.1.1", + "@matrixai/async-init": "^1.8.1", + "@matrixai/async-locks": "^2.3.1", + "@matrixai/errors": "^1.1.2", + "@matrixai/logger": "^2.2.2", "@matrixai/resources": "^1.1.3", "@matrixai/workers": "^1.3.3", "node-gyp-build": "4.4.0", @@ -1330,18 +1330,18 @@ } }, "node_modules/@matrixai/async-init": { - "version": "1.7.3", - "resolved": "https://registry.npmjs.org/@matrixai/async-init/-/async-init-1.7.3.tgz", - "integrity": "sha512-Sf3q5ODhVJqrYiAdGXmwj606956lgEMKGM9LMFU5scIOh13WokHo3GthjB1yh/umCV75NYvHJn60R9gnudVZ3Q==", + "version": "1.8.1", + "resolved": "https://registry.npmjs.org/@matrixai/async-init/-/async-init-1.8.1.tgz", + "integrity": "sha512-ZAS1yd/PC+r3NwvT9fEz3OtAm68A8mKXXGdZRcYQF1ajl43jsV8/B4aDwr2oLFlV+RYZgWl7UwjZj4rtoZSycQ==", "dependencies": { - "@matrixai/async-locks": "^2.2.4", + "@matrixai/async-locks": "^2.3.1", "@matrixai/errors": "^1.1.1" } }, "node_modules/@matrixai/async-locks": { - "version": "2.2.4", - "resolved": "https://registry.npmjs.org/@matrixai/async-locks/-/async-locks-2.2.4.tgz", - "integrity": "sha512-AEGQMM7zw8Mkcc0hbNpOCNKa6DW+04rVIwyZgUnPWawPqwUt5HSGaQwdXI3dXO+35G/vjJppggv+JJZsGfEjvA==", + "version": "2.3.1", + "resolved": "https://registry.npmjs.org/@matrixai/async-locks/-/async-locks-2.3.1.tgz", + "integrity": "sha512-STz8VyiIXleaa72zMsq01x/ZO1gPzukUgMe25+uqMWn/nPrC9EtJOR7e3CW0DODfYDZ0748z196GeOjS3jh+4g==", "dependencies": { "@matrixai/errors": "^1.1.1", "@matrixai/resources": "^1.1.3", @@ -1357,9 +1357,9 @@ } }, "node_modules/@matrixai/logger": { - "version": "2.1.1", - "resolved": "https://registry.npmjs.org/@matrixai/logger/-/logger-2.1.1.tgz", - "integrity": "sha512-79KM0PyJTpfkALf9DK2xGniU+9gngsb5O8hcdUviWz+zR2W0hnTQq/g7tJW0YnIEhmDe/GkJf0Bnbs+gWfj3BA==" + "version": "2.2.2", + "resolved": "https://registry.npmjs.org/@matrixai/logger/-/logger-2.2.2.tgz", + "integrity": "sha512-6/G1svkcFiBMvmIdBv6YbxoLKwMWpXNzt93Cc4XbXXygCQrsn6oYwLvnRk/JNr6uM29M2T+Aa7K1o3n2XMTuLw==" }, "node_modules/@matrixai/resources": { "version": "1.1.3", @@ -2037,14 +2037,16 @@ }, "node_modules/async-mutex": { "version": "0.3.2", - "license": "MIT", + "resolved": "https://registry.npmjs.org/async-mutex/-/async-mutex-0.3.2.tgz", + "integrity": "sha512-HuTK7E7MT7jZEh1P9GtRW9+aTWiDWWi9InbZ5hjxrnRa39KS4BW04+xLBhYNS2aXhHUIKZSw3gj4Pn1pj+qGAA==", "dependencies": { "tslib": "^2.3.1" } }, "node_modules/async-mutex/node_modules/tslib": { "version": "2.4.0", - "license": "0BSD" + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.4.0.tgz", + "integrity": "sha512-d6xOpEDfsi2CZVlPQzGeux8XMwLT9hssAsaPYExaQMuYskwb+x1x7J371tWlbBdWHroy99KnVB6qIkUbs5X3UQ==" }, "node_modules/asynckit": { "version": "0.4.0", @@ -8458,18 +8460,18 @@ } }, "@matrixai/async-init": { - "version": "1.7.3", - "resolved": "https://registry.npmjs.org/@matrixai/async-init/-/async-init-1.7.3.tgz", - "integrity": "sha512-Sf3q5ODhVJqrYiAdGXmwj606956lgEMKGM9LMFU5scIOh13WokHo3GthjB1yh/umCV75NYvHJn60R9gnudVZ3Q==", + "version": "1.8.1", + "resolved": "https://registry.npmjs.org/@matrixai/async-init/-/async-init-1.8.1.tgz", + "integrity": "sha512-ZAS1yd/PC+r3NwvT9fEz3OtAm68A8mKXXGdZRcYQF1ajl43jsV8/B4aDwr2oLFlV+RYZgWl7UwjZj4rtoZSycQ==", "requires": { - "@matrixai/async-locks": "^2.2.4", + "@matrixai/async-locks": "^2.3.1", "@matrixai/errors": "^1.1.1" } }, "@matrixai/async-locks": { - "version": "2.2.4", - "resolved": "https://registry.npmjs.org/@matrixai/async-locks/-/async-locks-2.2.4.tgz", - "integrity": "sha512-AEGQMM7zw8Mkcc0hbNpOCNKa6DW+04rVIwyZgUnPWawPqwUt5HSGaQwdXI3dXO+35G/vjJppggv+JJZsGfEjvA==", + "version": "2.3.1", + "resolved": "https://registry.npmjs.org/@matrixai/async-locks/-/async-locks-2.3.1.tgz", + "integrity": "sha512-STz8VyiIXleaa72zMsq01x/ZO1gPzukUgMe25+uqMWn/nPrC9EtJOR7e3CW0DODfYDZ0748z196GeOjS3jh+4g==", "requires": { "@matrixai/errors": "^1.1.1", "@matrixai/resources": "^1.1.3", @@ -8485,9 +8487,9 @@ } }, "@matrixai/logger": { - "version": "2.1.1", - "resolved": "https://registry.npmjs.org/@matrixai/logger/-/logger-2.1.1.tgz", - "integrity": "sha512-79KM0PyJTpfkALf9DK2xGniU+9gngsb5O8hcdUviWz+zR2W0hnTQq/g7tJW0YnIEhmDe/GkJf0Bnbs+gWfj3BA==" + "version": "2.2.2", + "resolved": "https://registry.npmjs.org/@matrixai/logger/-/logger-2.2.2.tgz", + "integrity": "sha512-6/G1svkcFiBMvmIdBv6YbxoLKwMWpXNzt93Cc4XbXXygCQrsn6oYwLvnRk/JNr6uM29M2T+Aa7K1o3n2XMTuLw==" }, "@matrixai/resources": { "version": "1.1.3", @@ -8956,12 +8958,16 @@ }, "async-mutex": { "version": "0.3.2", + "resolved": "https://registry.npmjs.org/async-mutex/-/async-mutex-0.3.2.tgz", + "integrity": "sha512-HuTK7E7MT7jZEh1P9GtRW9+aTWiDWWi9InbZ5hjxrnRa39KS4BW04+xLBhYNS2aXhHUIKZSw3gj4Pn1pj+qGAA==", "requires": { "tslib": "^2.3.1" }, "dependencies": { "tslib": { - "version": "2.4.0" + "version": "2.4.0", + "resolved": "https://registry.npmjs.org/tslib/-/tslib-2.4.0.tgz", + "integrity": "sha512-d6xOpEDfsi2CZVlPQzGeux8XMwLT9hssAsaPYExaQMuYskwb+x1x7J371tWlbBdWHroy99KnVB6qIkUbs5X3UQ==" } } }, diff --git a/package.json b/package.json index ac2b01fe..82f4d00a 100644 --- a/package.json +++ b/package.json @@ -21,16 +21,16 @@ "test": "jest", "lint": "eslint '{src,tests,benches}/**/*.{js,ts}'", "lintfix": "eslint '{src,tests,benches}/**/*.{js,ts}' --fix", - "lintnative": "find ./src -type f -regextype posix-extended -regex '.*\\.(c|cc|cpp|h|hh|hpp)' -exec clang-format --dry-run -Werror {} +", - "lintnativefix": "find ./src -type f -regextype posix-extended -regex '.*\\.(c|cc|cpp|h|hh|hpp)' -exec clang-format -i {} +", + "lint-native": "find ./src -type f -regextype posix-extended -regex '.*\\.(c|cc|cpp|h|hh|hpp)' -exec clang-format --dry-run -Werror {} +", + "lintfix-native": "find ./src -type f -regextype posix-extended -regex '.*\\.(c|cc|cpp|h|hh|hpp)' -exec clang-format -i {} +", "docs": "rimraf ./docs && typedoc --gitRevision master --tsconfig ./tsconfig.build.json --out ./docs src", "bench": "rimraf ./benches/results && ts-node -r tsconfig-paths/register ./benches" }, "dependencies": { - "@matrixai/async-init": "^1.7.3", - "@matrixai/async-locks": "^2.2.0", - "@matrixai/errors": "^1.1.1", - "@matrixai/logger": "^2.1.1", + "@matrixai/async-init": "^1.8.1", + "@matrixai/async-locks": "^2.3.1", + "@matrixai/errors": "^1.1.2", + "@matrixai/logger": "^2.2.2", "@matrixai/resources": "^1.1.3", "@matrixai/workers": "^1.3.3", "node-gyp-build": "4.4.0", diff --git a/src/DB.ts b/src/DB.ts index 96954e33..389800ad 100644 --- a/src/DB.ts +++ b/src/DB.ts @@ -1,7 +1,3 @@ -import type { - RocksDBDatabase, - RocksDBDatabaseOptions, -} from './rocksdb'; import type { ResourceAcquire } from '@matrixai/resources'; import type { KeyPath, @@ -9,11 +5,14 @@ import type { FileSystem, Crypto, DBWorkerManagerInterface, - DBOptions, - DBIteratorOptions, DBBatch, DBOps, + DBOptions, + DBIteratorOptions, + DBClearOptions, + DBCountOptions, } from './types'; +import type { RocksDBDatabase, RocksDBDatabaseOptions } from './rocksdb'; import { Transfer } from 'threads'; import Logger from '@matrixai/logger'; import { withF, withG } from '@matrixai/resources'; @@ -22,7 +21,7 @@ import { ready, } from '@matrixai/async-init/dist/CreateDestroyStartStop'; import DBIterator from './DBIterator'; -// Import DBTransaction from './DBTransaction'; +import DBTransaction from './DBTransaction'; import { rocksdbP } from './rocksdb'; import * as utils from './utils'; import * as errors from './errors'; @@ -63,7 +62,6 @@ class DB { } public readonly dbPath: string; - protected crypto?: { key: Buffer; ops: Crypto; @@ -72,20 +70,32 @@ class DB { protected logger: Logger; protected workerManager?: DBWorkerManagerInterface; protected _db: RocksDBDatabase; - protected transactionCounter: number = 0; - /** * References to iterators - * This set must be empty when stopping the DB */ protected _iteratorRefs: Set> = new Set(); - /** * References to transactions - * This set must be empty when stopping the DB */ - // TODO: fix this to DBTransaction - protected _transactionRefs: Set = new Set(); + protected _transactionRefs: Set = new Set(); + + get db(): Readonly { + return this._db; + } + + /** + * @internal + */ + get iteratorRefs(): Readonly>> { + return this._iteratorRefs; + } + + /** + * @internal + */ + get transactionRefs(): Readonly> { + return this._transactionRefs; + } constructor({ dbPath, @@ -107,24 +117,6 @@ class DB { this.fs = fs; } - get db(): Readonly { - return this._db; - } - - /** - * @internal - */ - get iteratorRefs(): Readonly>> { - return this._iteratorRefs; - } - - // /** - // * @internal - // */ - // get transactionRefs(): Readonly> { - // return this._transactionRefs; - // } - public async start({ fresh = false, ...dbOptions @@ -165,8 +157,11 @@ class DB { public async stop(): Promise { this.logger.info(`Stopping ${this.constructor.name}`); - if (this._iteratorRefs.size > 0 || this._transactionRefs.size > 0) { - throw new errors.ErrorDBLiveReference(); + for (const iterator of this._iteratorRefs) { + await iterator.destroy(); + } + for (const transaction of this._transactionRefs) { + await transaction.rollback(); } await rocksdbP.dbClose(this._db); this.logger.info(`Stopped ${this.constructor.name}`); @@ -193,49 +188,41 @@ class DB { delete this.workerManager; } - // @ready(new errors.ErrorDBNotRunning()) - // public transaction(): ResourceAcquire { - // return async () => { - // const transactionId = this.transactionCounter++; - // const tran = await DBTransaction.createTransaction({ - // db: this, - // transactionId, - // logger: this.logger, - // }); - // return [ - // async (e?: Error) => { - // try { - // if (e == null) { - // try { - // await tran.commit(); - // } catch (e) { - // await tran.rollback(e); - // throw e; - // } - // await tran.finalize(); - // } else { - // await tran.rollback(e); - // } - // } finally { - // await tran.destroy(); - // } - // }, - // tran, - // ]; - // }; - // } - - // public async withTransactionF( - // f: (tran: DBTransaction) => Promise, - // ): Promise { - // return withF([this.transaction()], ([tran]) => f(tran)); - // } - - // public withTransactionG( - // g: (tran: DBTransaction) => AsyncGenerator, - // ): AsyncGenerator { - // return withG([this.transaction()], ([tran]) => g(tran)); - // } + @ready(new errors.ErrorDBNotRunning()) + public transaction(): ResourceAcquire { + return async () => { + const tran = new DBTransaction({ + db: this, + logger: this.logger, + }); + return [ + async (e?: Error) => { + try { + if (e == null) { + await tran.commit(); + } else { + await tran.rollback(e); + } + } finally { + await tran.destroy(); + } + }, + tran, + ]; + }; + } + + public async withTransactionF( + f: (tran: DBTransaction) => Promise, + ): Promise { + return withF([this.transaction()], ([tran]) => f(tran)); + } + + public withTransactionG( + g: (tran: DBTransaction) => AsyncGenerator, + ): AsyncGenerator { + return withG([this.transaction()], ([tran]) => g(tran)); + } /** * Gets a value from the DB @@ -442,39 +429,39 @@ class DB { * You must have at least one of them being true or undefined */ public iterator( + levelPath: LevelPath | undefined, options: DBIteratorOptions & { keys: false; values: false }, - levelPath?: LevelPath, ): DBIterator; public iterator( + levelPath: LevelPath | undefined, options: DBIteratorOptions & { keys: false; valueAsBuffer: false }, - levelPath?: LevelPath, ): DBIterator; public iterator( + levelPath: LevelPath | undefined, options: DBIteratorOptions & { keys: false }, - levelPath?: LevelPath, ): DBIterator; public iterator( + levelPath: LevelPath | undefined, options: DBIteratorOptions & { values: false }, - levelPath?: LevelPath, ): DBIterator; public iterator( + levelPath: LevelPath | undefined, options: DBIteratorOptions & { valueAsBuffer: false }, - levelPath?: LevelPath, ): DBIterator; public iterator( - options?: DBIteratorOptions, levelPath?: LevelPath, + options?: DBIteratorOptions, ): DBIterator; @ready(new errors.ErrorDBNotRunning()) public iterator( + levelPath: LevelPath = [], options: DBIteratorOptions & { keyAsBuffer?: any; valueAsBuffer?: any; } = {}, - levelPath: LevelPath = [], ): DBIterator { levelPath = ['data', ...levelPath]; - return this._iterator(options, levelPath); + return this._iterator(levelPath, options); } /** @@ -482,47 +469,47 @@ class DB { * @internal */ public _iterator( + levelPath: LevelPath | undefined, options: DBIteratorOptions & { keys: false; values: false }, - levelPath?: LevelPath, ): DBIterator; /** * @internal */ public _iterator( + levelPath: LevelPath | undefined, options: DBIteratorOptions & { keys: false; valueAsBuffer: false }, - levelPath?: LevelPath, ): DBIterator; /** * @internal */ public _iterator( + levelPath: LevelPath | undefined, options: DBIteratorOptions & { keys: false }, - levelPath?: LevelPath, ): DBIterator; /** * @internal */ public _iterator( + levelPath: LevelPath | undefined, options: DBIteratorOptions & { values: false }, - levelPath?: LevelPath, ): DBIterator; /** * @internal */ public _iterator( + levelPath: LevelPath | undefined, options?: DBIteratorOptions & { valueAsBuffer: false }, - levelPath?: LevelPath, ): DBIterator; /** * @internal */ public _iterator( + levelPath?: LevelPath | undefined, options?: DBIteratorOptions, - levelPath?: LevelPath, ): DBIterator; public _iterator( - options: DBIteratorOptions = {}, levelPath: LevelPath = [], + options: DBIteratorOptions = {}, ): DBIterator { return new DBIterator({ db: this, @@ -537,31 +524,45 @@ class DB { * This is not atomic, it will iterate over a snapshot of the DB */ @ready(new errors.ErrorDBNotRunning()) - public async clear(levelPath: LevelPath = []): Promise { + public async clear( + levelPath: LevelPath = [], + options: DBClearOptions = {}, + ): Promise { levelPath = ['data', ...levelPath]; - await this._clear(levelPath); + await this._clear(levelPath, options); } /** * Clear from root level * @internal */ - public async _clear(levelPath: LevelPath = []): Promise { - for await (const [keyPath] of this._iterator( - { values: false }, - levelPath, - )) { - await this._del(levelPath.concat(keyPath)); - } + public async _clear( + levelPath: LevelPath = [], + options: DBClearOptions = {}, + ): Promise { + const options_ = utils.iterationOptions(options, levelPath); + return rocksdbP.dbClear(this._db, options_); } @ready(new errors.ErrorDBNotRunning()) - public async count(levelPath: LevelPath = []): Promise { - let count = 0; - for await (const _ of this.iterator({ values: false }, levelPath)) { - count++; - } - return count; + public async count( + levelPath: LevelPath = [], + options: DBCountOptions = {}, + ): Promise { + levelPath = ['data', ...levelPath]; + return this._count(levelPath, options); + } + + /** + * Count from root level + * @internal + */ + public async _count( + levelPath: LevelPath = [], + options: DBCountOptions = {}, + ): Promise { + const options_ = utils.iterationOptions(options, levelPath); + return rocksdbP.dbCount(this._db, options_); } /** @@ -591,13 +592,10 @@ class DB { levelPath = ['data', ...levelPath]; } const records: Array<[KeyPath, any]> = []; - for await (const [keyPath, v] of this._iterator( - { - keyAsBuffer: raw, - valueAsBuffer: raw, - }, - levelPath, - )) { + for await (const [keyPath, v] of this._iterator(levelPath, { + keyAsBuffer: raw, + valueAsBuffer: raw, + })) { records.push([keyPath, v]); } return records; @@ -701,8 +699,7 @@ class DB { } protected async setupRootLevels(): Promise { - // Clear any dirty state in transactions - await this._clear(['transactions']); + // Nothing to do yet } protected async canaryCheck(): Promise { diff --git a/src/DBIterator.ts b/src/DBIterator.ts index aaa310e3..c24b255e 100644 --- a/src/DBIterator.ts +++ b/src/DBIterator.ts @@ -1,7 +1,13 @@ -import type { KeyPath, LevelPath, DBIteratorOptions } from './types'; -import type { RocksDBIterator, RocksDBIteratorOptions } from './rocksdb'; import type DB from './DB'; -import type Logger from '@matrixai/logger'; +import type DBTransaction from './DBTransaction'; +import type { Merge, KeyPath, LevelPath, DBIteratorOptions } from './types'; +import type { + RocksDBIterator, + RocksDBIteratorOptions, + RocksDBSnapshot, + RocksDBTransactionSnapshot, +} from './rocksdb'; +import Logger from '@matrixai/logger'; import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy'; import { Lock } from '@matrixai/async-locks'; import { rocksdbP } from './rocksdb'; @@ -12,81 +18,101 @@ import * as utils from './utils'; interface DBIterator extends CreateDestroy {} @CreateDestroy() class DBIterator { - protected db: DB; - protected levelPath: LevelPath; protected logger: Logger; + protected levelPath: LevelPath; + protected _db: DB; + protected _transaction?: DBTransaction; + protected _options: Merge< + DBIteratorOptions, + { + gt?: Buffer; + gte?: Buffer; + lt?: Buffer; + lte?: Buffer; + keyEncoding: 'buffer'; + valueEncoding: 'buffer'; + } + >; + protected _iterator: RocksDBIterator; protected first: boolean = true; protected finished: boolean = false; protected cache: Array<[Buffer, Buffer]> = []; protected cachePos: number = 0; protected lock: Lock = new Lock(); - protected _options: DBIteratorOptions & RocksDBIteratorOptions; - protected _iterator: RocksDBIterator; + public constructor( + options: { + db: DB; + levelPath: LevelPath; + logger?: Logger; + } & DBIteratorOptions, + ); + public constructor( + options: { + db: DB; + transaction: DBTransaction; + levelPath: LevelPath; + logger?: Logger; + } & DBIteratorOptions, + ); public constructor({ db, + transaction, levelPath, logger, ...options }: { db: DB; + transaction?: DBTransaction; levelPath: LevelPath; - logger: Logger; - } & DBIteratorOptions) { + logger?: Logger; + } & DBIteratorOptions) { + logger = logger ?? new Logger(this.constructor.name); logger.debug(`Constructing ${this.constructor.name}`); this.logger = logger; - this.db = db; this.levelPath = levelPath; - const options_ = { - ...options, - // Internally we always use the buffer - keyEncoding: 'buffer', - valueEncoding: 'buffer', - } as DBIteratorOptions & - RocksDBIteratorOptions & { - keyEncoding: 'buffer'; - valueEncoding: 'buffer'; - }; - if (options?.gt != null) { - options_.gt = utils.keyPathToKey( - levelPath.concat(utils.toKeyPath(options.gt)), - ); - } - if (options?.gte != null) { - options_.gte = utils.keyPathToKey( - levelPath.concat(utils.toKeyPath(options.gte)), - ); - } - if (options?.gt == null && options?.gte == null) { - options_.gt = utils.levelPathToKey(levelPath); - } - if (options?.lt != null) { - options_.lt = utils.keyPathToKey( - levelPath.concat(utils.toKeyPath(options.lt)), + const options_ = utils.iterationOptions>( + options, + levelPath, + ); + this._options = options_; + this._db = db; + if (transaction != null) { + this._transaction = transaction; + this._iterator = rocksdbP.transactionIteratorInit( + transaction.transaction, + options_ as RocksDBIteratorOptions & { + keyEncoding: 'buffer'; + valueEncoding: 'buffer'; + }, ); - } - if (options?.lte != null) { - options_.lte = utils.keyPathToKey( - levelPath.concat(utils.toKeyPath(options.lte)), + transaction.iteratorRefs.add(this); + } else { + this._iterator = rocksdbP.iteratorInit( + db.db, + options_ as RocksDBIteratorOptions & { + keyEncoding: 'buffer'; + valueEncoding: 'buffer'; + }, ); + db.iteratorRefs.add(this); } - if (options?.lt == null && options?.lte == null) { - const levelKeyEnd = utils.levelPathToKey(levelPath); - levelKeyEnd[levelKeyEnd.length - 1] += 1; - options_.lt = levelKeyEnd; - } - utils.filterUndefined(options_); - this._options = options_; - this._iterator = rocksdbP.iteratorInit(db.db, options_); - db.iteratorRefs.add(this); logger.debug(`Constructed ${this.constructor.name}`); } + get db(): Readonly { + return this._db; + } + + get transaction(): Readonly | undefined { + return this._transaction; + } + get iterator(): Readonly> { return this._iterator; } - get options(): Readonly { + get options(): Readonly> { return this._options; } @@ -94,7 +120,11 @@ class DBIterator { this.logger.debug(`Destroying ${this.constructor.name}`); this.cache = []; await rocksdbP.iteratorClose(this._iterator); - this.db.iteratorRefs.delete(this); + if (this._transaction != null) { + this._transaction.iteratorRefs.delete(this); + } else { + this._db.iteratorRefs.delete(this); + } this.logger.debug(`Destroyed ${this.constructor.name}`); } @@ -174,9 +204,9 @@ class DBIterator { value = undefined; } else { if (this._options.valueAsBuffer === false) { - value = await this.db.deserializeDecrypt(entry[1], false); + value = await this._db.deserializeDecrypt(entry[1], false); } else { - value = await this.db.deserializeDecrypt(entry[1], true); + value = await this._db.deserializeDecrypt(entry[1], true); } } return [keyPath, value] as [K, V]; diff --git a/src/DBTransaction.ts b/src/DBTransaction.ts index 53f1e650..2518a5e7 100644 --- a/src/DBTransaction.ts +++ b/src/DBTransaction.ts @@ -1,549 +1,390 @@ -// import type DB from './DB'; -// import type { -// KeyPath, -// LevelPath, -// DBIterator, -// DBOps, -// DBIteratorOptions, -// } from './types'; -// import Logger from '@matrixai/logger'; -// import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy'; -// import { Lock } from '@matrixai/async-locks'; -// import * as utils from './utils'; -// import * as errors from './errors'; +import type DB from './DB'; +import type { + KeyPath, + LevelPath, + DBIteratorOptions, + DBClearOptions, + DBCountOptions, +} from './types'; +import type { + RocksDBTransaction, + RocksDBTransactionOptions, + RocksDBTransactionSnapshot, +} from './rocksdb/types'; +import Logger from '@matrixai/logger'; +import { CreateDestroy, ready } from '@matrixai/async-init/dist/CreateDestroy'; +import DBIterator from './DBIterator'; +import { rocksdbP } from './rocksdb'; +import * as utils from './utils'; +import * as errors from './errors'; -// /** -// * Minimal read-committed transaction system -// * -// * Properties: -// * - No dirty reads - cannot read uncommitted writes from other transactions -// * - Non-repeatable reads - multiple reads on the same key may read -// * different values due to other committed -// * transactions -// * - Phantom reads - can read entries that are added or deleted by other -// * transactions -// * - Lost updates - can lose writes if 2 transactions commit writes to the -// * same key -// * -// * To prevent non-repeatable reads, phantom-reads or lost-updates, it is up to the -// * user to use advisory read/write locking on relevant keys or ranges of keys. -// * -// * This does not use LevelDB snapshots provided by the `iterator` method -// * which would provide "repeatable-read" isolation level by default -// * -// * See: https://en.wikipedia.org/wiki/Isolation_(database_systems) -// */ -// interface DBTransaction extends CreateDestroy {} -// @CreateDestroy() -// class DBTransaction { -// public static async createTransaction({ -// db, -// transactionId, -// logger = new Logger(this.name), -// }: { -// db: DB; -// transactionId: number; -// logger?: Logger; -// }): Promise { -// logger.debug(`Creating ${this.name} ${transactionId}`); -// const tran = new this({ -// db, -// transactionId, -// logger, -// }); -// logger.debug(`Created ${this.name} ${transactionId}`); -// return tran; -// } +interface DBTransaction extends CreateDestroy {} +@CreateDestroy() +class DBTransaction { + protected _db: DB; + protected logger: Logger; -// public readonly transactionId: number; -// public readonly transactionPath: LevelPath; -// public readonly transactionDataPath: LevelPath; -// public readonly transactionTombstonePath: LevelPath; + protected _options: RocksDBTransactionOptions; + protected _transaction: RocksDBTransaction; + protected _id: number; + protected _snapshot: RocksDBTransactionSnapshot; -// protected db: DB; -// protected logger: Logger; -// /** -// * LevelDB snapshots can only be accessed via an iterator -// * This maintains a consistent read-only snapshot of the DB -// * when `DBTransaction` is constructed -// */ -// protected snapshot: DBIterator; -// /** -// * Reading from the snapshot iterator needs to be an atomic operation -// * involving a synchronus seek and asynchronous next -// */ -// protected snapshotLock = new Lock(); -// protected _ops: DBOps = []; -// protected _callbacksSuccess: Array<() => any> = []; -// protected _callbacksFailure: Array<(e?: Error) => any> = []; -// protected _callbacksFinally: Array<(e?: Error) => any> = []; -// protected _committed: boolean = false; -// protected _rollbacked: boolean = false; + protected _callbacksSuccess: Array<() => any> = []; + protected _callbacksFailure: Array<(e?: Error) => any> = []; + protected _callbacksFinally: Array<(e?: Error) => any> = []; + protected _committed: boolean = false; + protected _rollbacked: boolean = false; -// public constructor({ -// db, -// transactionId, -// logger, -// }: { -// db: DB; -// transactionId: number; -// logger: Logger; -// }) { -// this.logger = logger; -// this.db = db; -// this.snapshot = db._iterator(undefined, ['data']); -// this.transactionId = transactionId; -// this.transactionPath = ['transactions', this.transactionId.toString()]; -// // Data path contains the COW overlay -// this.transactionDataPath = [...this.transactionPath, 'data']; -// // Tombstone path tracks whether key has been deleted -// // If `undefined`, it has not been deleted -// // If `true`, then it has been deleted -// // When deleted, the COW overlay entry must also be deleted -// this.transactionTombstonePath = [...this.transactionPath, 'tombstone']; -// } + /** + * References to iterators + */ + protected _iteratorRefs: Set> = new Set(); -// public async destroy() { -// this.logger.debug( -// `Destroying ${this.constructor.name} ${this.transactionId}`, -// ); -// await Promise.all([ -// this.snapshot.end(), -// this.db._clear(this.transactionPath), -// ]); -// this.logger.debug( -// `Destroyed ${this.constructor.name} ${this.transactionId}`, -// ); -// } + public constructor({ + db, + logger, + ...options + }: { + db: DB; + logger?: Logger; + } & RocksDBTransactionOptions) { + logger = logger ?? new Logger(this.constructor.name); + logger.debug(`Constructing ${this.constructor.name}`); + this.logger = logger; + this._db = db; + const options_ = { + ...options, + // Transactions should be synchronous + sync: true, + }; + utils.filterUndefined(options_); + this._options = options_; + this._transaction = rocksdbP.transactionInit(db.db, options_); + db.transactionRefs.add(this); + this._id = rocksdbP.transactionId(this._transaction); + logger.debug(`Constructed ${this.constructor.name} ${this._id}`); + } -// get ops(): Readonly { -// return this._ops; -// } + /** + * Destroy the transaction + * This cannot be called until the transaction is committed or rollbacked + */ + public async destroy() { + this.logger.debug(`Destroying ${this.constructor.name} ${this._id}`); + this._db.transactionRefs.delete(this); + if (!this._committed && !this._rollbacked) { + throw new errors.ErrorDBTransactionNotCommittedNorRollbacked(); + } + this.logger.debug(`Destroyed ${this.constructor.name} ${this._id}`); + } -// get callbacksSuccess(): Readonly any>> { -// return this._callbacksSuccess; -// } + get db(): Readonly { + return this._db; + } -// get callbacksFailure(): Readonly any>> { -// return this._callbacksFailure; -// } + get transaction(): Readonly { + return this._transaction; + } -// get committed(): boolean { -// return this._committed; -// } + get id(): number { + return this._id; + } -// get rollbacked(): boolean { -// return this._rollbacked; -// } + /** + * @internal + */ + get iteratorRefs(): Readonly>> { + return this._iteratorRefs; + } -// public async get( -// keyPath: KeyPath | string | Buffer, -// raw?: false, -// ): Promise; -// public async get( -// keyPath: KeyPath | string | Buffer, -// raw: true, -// ): Promise; -// @ready(new errors.ErrorDBTransactionDestroyed()) -// public async get( -// keyPath: KeyPath | string | Buffer, -// raw: boolean = false, -// ): Promise { -// keyPath = utils.toKeyPath(keyPath); -// let value = await this.db._get( -// [...this.transactionDataPath, ...keyPath], -// raw as any, -// ); -// if (value === undefined) { -// if ( -// (await this.db._get([ -// ...this.transactionTombstonePath, -// ...keyPath, -// ])) !== true -// ) { -// value = await this.getSnapshot(keyPath, raw as any); -// } -// } -// return value; -// } + get callbacksSuccess(): Readonly any>> { + return this._callbacksSuccess; + } -// public async put( -// keyPath: KeyPath | string | Buffer, -// value: any, -// raw?: false, -// ): Promise; -// public async put( -// keyPath: KeyPath | string | Buffer, -// value: Buffer, -// raw: true, -// ): Promise; -// @ready(new errors.ErrorDBTransactionDestroyed()) -// public async put( -// keyPath: KeyPath | string | Buffer, -// value: any, -// raw: boolean = false, -// ): Promise { -// keyPath = utils.toKeyPath(keyPath); -// await this.db._put( -// [...this.transactionDataPath, ...keyPath], -// value, -// raw as any, -// ); -// await this.db._del([...this.transactionTombstonePath, ...keyPath]); -// this._ops.push({ -// type: 'put', -// keyPath, -// value, -// raw, -// }); -// } + get callbacksFailure(): Readonly any>> { + return this._callbacksFailure; + } -// @ready(new errors.ErrorDBTransactionDestroyed()) -// public async del(keyPath: KeyPath | string | Buffer): Promise { -// keyPath = utils.toKeyPath(keyPath); -// await this.db._del([...this.transactionDataPath, ...keyPath]); -// await this.db._put([...this.transactionTombstonePath, ...keyPath], true); -// this._ops.push({ -// type: 'del', -// keyPath, -// }); -// } + get callbacksFinally(): Readonly any>> { + return this._callbacksFinally; + } -// public iterator( -// options: DBIteratorOptions & { values: false }, -// levelPath?: LevelPath, -// ): DBIterator; -// public iterator( -// options?: DBIteratorOptions & { valueAsBuffer?: true }, -// levelPath?: LevelPath, -// ): DBIterator; -// public iterator( -// options?: DBIteratorOptions & { valueAsBuffer: false }, -// levelPath?: LevelPath, -// ): DBIterator; -// @ready(new errors.ErrorDBTransactionDestroyed()) -// public iterator( -// options?: DBIteratorOptions, -// levelPath: LevelPath = [], -// ): DBIterator { -// const dataIterator = this.db._iterator( -// { -// ...options, -// keys: true, -// keyAsBuffer: true, -// }, -// ['data', ...levelPath], -// ); -// const tranIterator = this.db._iterator( -// { -// ...options, -// keys: true, -// keyAsBuffer: true, -// }, -// [...this.transactionDataPath, ...levelPath], -// ); -// const order = options?.reverse ? 'desc' : 'asc'; -// const processKV = ( -// kv: [KeyPath, Buffer | V | undefined], -// ): [KeyPath, Buffer | V | undefined] => { -// if (options?.keyAsBuffer === false) { -// kv[0] = kv[0].map((k) => k.toString('utf-8')); -// } -// return kv; -// }; -// const iterator = { -// _ended: false, -// _nexting: false, -// seek: (keyPath: KeyPath | Buffer | string): void => { -// if (iterator._ended) { -// throw new Error('cannot call seek() after end()'); -// } -// if (iterator._nexting) { -// throw new Error('cannot call seek() before next() has completed'); -// } -// dataIterator.seek(keyPath); -// tranIterator.seek(keyPath); -// }, -// end: async () => { -// if (iterator._ended) { -// throw new Error('end() already called on iterator'); -// } -// iterator._ended = true; -// await dataIterator.end(); -// await tranIterator.end(); -// }, -// next: async () => { -// if (iterator._ended) { -// throw new Error('cannot call next() after end()'); -// } -// if (iterator._nexting) { -// throw new Error( -// 'cannot call next() before previous next() has completed', -// ); -// } -// iterator._nexting = true; -// try { -// while (true) { -// const tranKV = (await tranIterator.next()) as -// | [KeyPath, Buffer | undefined] -// | undefined; -// const dataKV = (await dataIterator.next()) as -// | [KeyPath, Buffer | undefined] -// | undefined; -// // If both are finished, iterator is finished -// if (tranKV == null && dataKV == null) { -// return undefined; -// } -// // If tranIterator is not finished but dataIterator is finished -// // continue with tranIterator -// if (tranKV != null && dataKV == null) { -// return processKV(tranKV); -// } -// // If tranIterator is finished but dataIterator is not finished -// // continue with the dataIterator -// if (tranKV == null && dataKV != null) { -// // If the dataKey is entombed, skip iteration -// if ( -// (await this.db._get( -// this.transactionTombstonePath.concat(levelPath, dataKV[0]), -// )) === true -// ) { -// continue; -// } -// return processKV(dataKV); -// } -// const [tranKeyPath, tranData] = tranKV as [ -// KeyPath, -// Buffer | V | undefined, -// ]; -// const [dataKeyPath, dataData] = dataKV as [ -// KeyPath, -// Buffer | V | undefined, -// ]; -// const keyCompare = Buffer.compare( -// utils.keyPathToKey(tranKeyPath), -// utils.keyPathToKey(dataKeyPath), -// ); -// if (keyCompare < 0) { -// if (order === 'asc') { -// dataIterator.seek(tranKeyPath); -// return processKV([tranKeyPath, tranData]); -// } else if (order === 'desc') { -// tranIterator.seek(dataKeyPath); -// // If the dataKey is entombed, skip iteration -// if ( -// (await this.db._get( -// this.transactionTombstonePath.concat( -// levelPath, -// dataKeyPath, -// ), -// )) === true -// ) { -// continue; -// } -// return processKV([dataKeyPath, dataData]); -// } -// } else if (keyCompare > 0) { -// if (order === 'asc') { -// tranIterator.seek(dataKeyPath); -// // If the dataKey is entombed, skip iteration -// if ( -// (await this.db._get( -// this.transactionTombstonePath.concat( -// levelPath, -// dataKeyPath, -// ), -// )) === true -// ) { -// continue; -// } -// return processKV([dataKeyPath, dataData]); -// } else if (order === 'desc') { -// dataIterator.seek(tranKeyPath); -// return processKV([tranKeyPath, tranData]); -// } -// } else { -// return processKV([tranKeyPath, tranData]); -// } -// } -// } finally { -// iterator._nexting = false; -// } -// }, -// [Symbol.asyncIterator]: async function* () { -// try { -// let kv: [KeyPath, any] | undefined; -// while ((kv = await iterator.next()) !== undefined) { -// yield kv; -// } -// } finally { -// if (!iterator._ended) await iterator.end(); -// } -// }, -// }; -// return iterator; -// } + get committed(): boolean { + return this._committed; + } -// @ready(new errors.ErrorDBTransactionDestroyed()) -// public async clear(levelPath: LevelPath = []): Promise { -// for await (const [keyPath] of this.iterator({ values: false }, levelPath)) { -// await this.del(levelPath.concat(keyPath)); -// } -// } + get rollbacked(): boolean { + return this._rollbacked; + } -// @ready(new errors.ErrorDBTransactionDestroyed()) -// public async count(levelPath: LevelPath = []): Promise { -// let count = 0; -// for await (const _ of this.iterator({ values: false }, levelPath)) { -// count++; -// } -// return count; -// } + public async get( + keyPath: KeyPath | string | Buffer, + raw?: false, + ): Promise; + public async get( + keyPath: KeyPath | string | Buffer, + raw: true, + ): Promise; + @ready(new errors.ErrorDBTransactionDestroyed()) + public async get( + keyPath: KeyPath | string | Buffer, + raw: boolean = false, + ): Promise { + keyPath = utils.toKeyPath(keyPath); + keyPath = ['data', ...keyPath]; + let data: Buffer; + try { + const key = utils.keyPathToKey(keyPath); + data = await rocksdbP.transactionGet(this._transaction, key, { + valueEncoding: 'buffer', + snapshot: this.setupSnapshot(), + }); + } catch (e) { + if (e.code === 'NOT_FOUND') { + return undefined; + } + throw e; + } + return this._db.deserializeDecrypt(data, raw as any); + } -// /** -// * Dump from transaction level path -// * This will only show entries for the current transaction -// * It is intended for diagnostics -// */ -// public async dump( -// levelPath?: LevelPath, -// raw?: false, -// ): Promise>; -// public async dump( -// levelPath: LevelPath | undefined, -// raw: true, -// ): Promise>; -// @ready(new errors.ErrorDBTransactionDestroyed()) -// public async dump( -// levelPath: LevelPath = [], -// raw: boolean = false, -// ): Promise> { -// return await this.db.dump( -// this.transactionPath.concat(levelPath), -// raw as any, -// true, -// ); -// } + public async put( + keyPath: KeyPath | string | Buffer, + value: any, + raw?: false, + ): Promise; + public async put( + keyPath: KeyPath | string | Buffer, + value: Buffer, + raw: true, + ): Promise; + @ready(new errors.ErrorDBTransactionDestroyed()) + public async put( + keyPath: KeyPath | string | Buffer, + value: any, + raw: boolean = false, + ): Promise { + this.setupSnapshot(); + keyPath = utils.toKeyPath(keyPath); + keyPath = ['data', ...keyPath]; + const key = utils.keyPathToKey(keyPath); + const data = await this._db.serializeEncrypt(value, raw as any); + return rocksdbP.transactionPut(this._transaction, key, data); + } -// @ready(new errors.ErrorDBTransactionDestroyed()) -// public queueSuccess(f: () => any): void { -// this._callbacksSuccess.push(f); -// } + @ready(new errors.ErrorDBTransactionDestroyed()) + public async del(keyPath: KeyPath | string | Buffer): Promise { + this.setupSnapshot(); + keyPath = utils.toKeyPath(keyPath); + keyPath = ['data', ...keyPath]; + const key = utils.keyPathToKey(keyPath); + return rocksdbP.transactionDel(this._transaction, key); + } -// @ready(new errors.ErrorDBTransactionDestroyed()) -// public queueFailure(f: (e?: Error) => any): void { -// this._callbacksFailure.push(f); -// } + public iterator( + levelPath: LevelPath | undefined, + options: DBIteratorOptions & { + keys: false; + values: false; + }, + ): DBIterator; + public iterator( + levelPath: LevelPath | undefined, + options: DBIteratorOptions & { + keys: false; + valueAsBuffer: false; + }, + ): DBIterator; + public iterator( + levelPath: LevelPath | undefined, + options: DBIteratorOptions & { keys: false }, + ): DBIterator; + public iterator( + levelPath: LevelPath | undefined, + options: DBIteratorOptions & { values: false }, + ): DBIterator; + public iterator( + levelPath: LevelPath | undefined, + options: DBIteratorOptions & { + valueAsBuffer: false; + }, + ): DBIterator; + public iterator( + levelPath?: LevelPath | undefined, + options?: DBIteratorOptions, + ): DBIterator; + @ready(new errors.ErrorDBTransactionDestroyed()) + public iterator( + levelPath: LevelPath = [], + options: DBIteratorOptions = {}, + ): DBIterator { + levelPath = ['data', ...levelPath]; + return new DBIterator({ + ...options, + db: this._db, + transaction: this, + levelPath, + logger: this.logger.getChild(DBIterator.name), + snapshot: this.setupSnapshot(), + }); + } -// @ready(new errors.ErrorDBTransactionDestroyed()) -// public queueFinally(f: (e?: Error) => any): void { -// this._callbacksFinally.push(f); -// } + @ready(new errors.ErrorDBTransactionDestroyed()) + public async clear( + levelPath: LevelPath = [], + options: DBClearOptions = {}, + ): Promise { + levelPath = ['data', ...levelPath]; + const options_ = utils.iterationOptions(options, levelPath); + return rocksdbP.transactionClear(this._transaction, { + ...options_, + snapshot: this.setupSnapshot(), + }); + } -// @ready(new errors.ErrorDBTransactionDestroyed()) -// public async commit(): Promise { -// if (this._rollbacked) { -// throw new errors.ErrorDBTransactionRollbacked(); -// } -// if (this._committed) { -// return; -// } -// this.logger.debug( -// `Committing ${this.constructor.name} ${this.transactionId}`, -// ); -// this._committed = true; -// try { -// await this.db.batch(this._ops); -// } catch (e) { -// this._committed = false; -// throw e; -// } -// this.logger.debug( -// `Committed ${this.constructor.name} ${this.transactionId}`, -// ); -// } + @ready(new errors.ErrorDBTransactionDestroyed()) + public async count( + levelPath: LevelPath = [], + options: DBCountOptions = {}, + ): Promise { + const options_ = { + ...options, + keys: true, + values: false, + }; + let count = 0; + for await (const _ of this.iterator(levelPath, options_)) { + count++; + } + return count; + } -// @ready(new errors.ErrorDBTransactionDestroyed()) -// public async rollback(e?: Error): Promise { -// if (this._committed) { -// throw new errors.ErrorDBTransactionCommitted(); -// } -// if (this._rollbacked) { -// return; -// } -// this.logger.debug( -// `Rollbacking ${this.constructor.name} ${this.transactionId}`, -// ); -// this._rollbacked = true; -// for (const f of this._callbacksFailure) { -// await f(e); -// } -// for (const f of this._callbacksFinally) { -// await f(e); -// } -// this.logger.debug( -// `Rollbacked ${this.constructor.name} ${this.transactionId}`, -// ); -// } + /** + * Dump from transaction level path + * This will only show entries for the current transaction + * It is intended for diagnostics + */ + public async dump( + levelPath?: LevelPath, + raw?: false, + ): Promise>; + public async dump( + levelPath: LevelPath | undefined, + raw: true, + ): Promise>; + @ready(new errors.ErrorDBTransactionDestroyed()) + public async dump( + levelPath: LevelPath = [], + raw: boolean = false, + ): Promise> { + const records: Array<[KeyPath, any]> = []; + for await (const [keyPath, v] of this.iterator(levelPath, { + keyAsBuffer: raw, + valueAsBuffer: raw, + })) { + records.push([keyPath, v]); + } + return records; + } -// @ready(new errors.ErrorDBTransactionDestroyed()) -// public async finalize(): Promise { -// if (this._rollbacked) { -// throw new errors.ErrorDBTransactionRollbacked(); -// } -// if (!this._committed) { -// throw new errors.ErrorDBTransactionNotCommitted(); -// } -// this.logger.debug( -// `Finalize ${this.constructor.name} ${this.transactionId}`, -// ); -// for (const f of this._callbacksSuccess) { -// await f(); -// } -// for (const f of this._callbacksFinally) { -// await f(); -// } -// this.logger.debug( -// `Finalized ${this.constructor.name} ${this.transactionId}`, -// ); -// } + @ready(new errors.ErrorDBTransactionDestroyed()) + public queueSuccess(f: () => any): void { + this._callbacksSuccess.push(f); + } -// /** -// * Get value from the snapshot iterator -// * This is an atomic operation -// * It will seek to the key path and await the next entry -// * If the entry's key equals the desired key, the entry is returned -// */ -// protected async getSnapshot( -// keyPath: KeyPath, -// raw?: false, -// ): Promise; -// protected async getSnapshot( -// keyPath: KeyPath, -// raw: true, -// ): Promise; -// protected async getSnapshot( -// keyPath: KeyPath, -// raw: boolean = false, -// ): Promise { -// return await this.snapshotLock.withF(async () => { -// const key = utils.keyPathToKey(keyPath); -// this.snapshot.seek(utils.keyPathToKey(keyPath)); -// const snapKV = await this.snapshot.next(); -// if (snapKV == null) { -// return undefined; -// } -// const [snapKey, snapData] = snapKV; -// if (!key.equals(snapKey)) { -// return undefined; -// } -// if (raw) { -// return snapData; -// } else { -// return utils.deserialize(snapData); -// } -// }); -// } -// } + @ready(new errors.ErrorDBTransactionDestroyed()) + public queueFailure(f: (e?: Error) => any): void { + this._callbacksFailure.push(f); + } -// export default DBTransaction; + @ready(new errors.ErrorDBTransactionDestroyed()) + public queueFinally(f: (e?: Error) => any): void { + this._callbacksFinally.push(f); + } + + @ready(new errors.ErrorDBTransactionDestroyed()) + public async commit(): Promise { + if (this._rollbacked) { + throw new errors.ErrorDBTransactionRollbacked(); + } + if (this._committed) { + return; + } + this.logger.debug(`Committing ${this.constructor.name} ${this._id}`); + for (const iterator of this._iteratorRefs) { + await iterator.destroy(); + } + this._committed = true; + try { + try { + // If this fails, the `DBTransaction` is still considered committed + // it must be destroyed, it cannot be reused + await rocksdbP.transactionCommit(this._transaction); + } catch (e) { + if (e.code === 'TRANSACTION_CONFLICT') { + this.logger.debug( + `Failed Committing ${this.constructor.name} ${this._id} due to ${errors.ErrorDBTransactionConflict.name}`, + ); + throw new errors.ErrorDBTransactionConflict(undefined, { cause: e }); + } else { + this.logger.debug( + `Failed Committing ${this.constructor.name} ${this._id} due to ${e.message}`, + ); + throw e; + } + } + for (const f of this._callbacksSuccess) { + await f(); + } + } finally { + for (const f of this._callbacksFinally) { + await f(); + } + } + await this.destroy(); + this.logger.debug(`Committed ${this.constructor.name} ${this._id}`); + } + + @ready(new errors.ErrorDBTransactionDestroyed()) + public async rollback(e?: Error): Promise { + if (this._committed) { + throw new errors.ErrorDBTransactionCommitted(); + } + if (this._rollbacked) { + return; + } + this.logger.debug(`Rollbacking ${this.constructor.name} ${this._id}`); + for (const iterator of this._iteratorRefs) { + await iterator.destroy(); + } + this._rollbacked = true; + try { + // If this fails, the `DBTransaction` is still considered rollbacked + // it must be destroyed, it cannot be reused + await rocksdbP.transactionRollback(this._transaction); + for (const f of this._callbacksFailure) { + await f(e); + } + } finally { + for (const f of this._callbacksFinally) { + await f(e); + } + } + await this.destroy(); + this.logger.debug(`Rollbacked ${this.constructor.name} ${this._id}`); + } + + /** + * Sets up the snapshot + * This is executed lazily, not at this construction, + * but at the first transactional operation + */ + protected setupSnapshot(): RocksDBTransactionSnapshot { + if (this._snapshot == null) { + this._snapshot = rocksdbP.transactionSnapshot(this._transaction); + } + return this._snapshot; + } +} + +export default DBTransaction; diff --git a/src/errors.ts b/src/errors.ts index c6aaa580..4ad601af 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -40,10 +40,6 @@ class ErrorDBParseValue extends ErrorDB { static description = 'DB value parsing failed'; } -class ErrorDBLiveReference extends ErrorDB { - static description = 'DB has live DBIterator or DBTransaction references'; -} - class ErrorDBIterator extends ErrorDB { static description = 'DBIterator error'; } @@ -76,6 +72,12 @@ class ErrorDBTransactionRollbacked extends ErrorDBTransaction { static description = 'DBTransaction is rollbacked'; } +class ErrorDBTransactionNotCommittedNorRollbacked< + T, +> extends ErrorDBTransaction { + static description = 'DBTransaction is not comitted nor rollbacked'; +} + class ErrorDBTransactionConflict extends ErrorDBTransaction { static description = 'DBTransaction cannot commit due to conflicting writes'; } @@ -91,7 +93,6 @@ export { ErrorDBDecrypt, ErrorDBParseKey, ErrorDBParseValue, - ErrorDBLiveReference, ErrorDBIterator, ErrorDBIteratorDestroyed, ErrorDBIteratorBusy, @@ -100,5 +101,6 @@ export { ErrorDBTransactionCommitted, ErrorDBTransactionNotCommitted, ErrorDBTransactionRollbacked, + ErrorDBTransactionNotCommittedNorRollbacked, ErrorDBTransactionConflict, }; diff --git a/src/index.ts b/src/index.ts index 9ebe3224..791148a6 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,5 +1,7 @@ export { default as DB } from './DB'; -// export { default as DBTransaction } from './DBTransaction'; +export { default as DBTransaction } from './DBTransaction'; +export { default as DBIterator } from './DBIterator'; export * as utils from './utils'; export * as errors from './errors'; +export * as rocksdb from './rocksdb'; export * from './types'; diff --git a/src/rocksdb/napi/index.cpp b/src/rocksdb/napi/index.cpp index 98ef71f9..c632add8 100644 --- a/src/rocksdb/napi/index.cpp +++ b/src/rocksdb/napi/index.cpp @@ -467,7 +467,6 @@ NAPI_METHOD(dbClear) { NAPI_DB_CONTEXT(); napi_value options = argv[1]; napi_value callback = argv[2]; - const bool reverse = BooleanProperty(env, options, "reverse", false); const int limit = Int32Property(env, options, "limit", -1); std::string* lt = RangeOption(env, options, "lt"); std::string* lte = RangeOption(env, options, "lte"); @@ -475,9 +474,28 @@ NAPI_METHOD(dbClear) { std::string* gte = RangeOption(env, options, "gte"); const Snapshot* snapshot = SnapshotProperty(env, options, "snapshot"); const bool sync = BooleanProperty(env, options, "sync", false); - IteratorClearWorker* worker = - new IteratorClearWorker(env, database, callback, reverse, limit, lt, lte, - gt, gte, sync, snapshot); + IteratorClearWorker* worker = new IteratorClearWorker( + env, database, callback, limit, lt, lte, gt, gte, sync, snapshot); + worker->Queue(env); + NAPI_RETURN_UNDEFINED(); +} + +/** + * Count a range from a database. + */ +NAPI_METHOD(dbCount) { + NAPI_ARGV(3); + NAPI_DB_CONTEXT(); + napi_value options = argv[1]; + napi_value callback = argv[2]; + const int limit = Int32Property(env, options, "limit", -1); + std::string* lt = RangeOption(env, options, "lt"); + std::string* lte = RangeOption(env, options, "lte"); + std::string* gt = RangeOption(env, options, "gt"); + std::string* gte = RangeOption(env, options, "gte"); + const Snapshot* snapshot = SnapshotProperty(env, options, "snapshot"); + IteratorCountWorker* worker = new IteratorCountWorker( + env, database, callback, limit, lt, lte, gt, gte, snapshot); worker->Queue(env); NAPI_RETURN_UNDEFINED(); } @@ -807,6 +825,16 @@ NAPI_METHOD(transactionInit) { return transaction_ref; } +NAPI_METHOD(transactionId) { + NAPI_ARGV(1); + NAPI_TRANSACTION_CONTEXT(); + ASSERT_TRANSACTION_READY(env, transaction); + // This uses our own id instead of `Transaction::GetID()` and + // `Transaction::GetId()` + const uint32_t id = transaction->id_; + NAPI_RETURN_UINT32(id); +} + /** * Commit transaction */ @@ -1032,7 +1060,6 @@ NAPI_METHOD(transactionClear) { ASSERT_TRANSACTION_READY(env, transaction); napi_value options = argv[1]; napi_value callback = argv[2]; - const bool reverse = BooleanProperty(env, options, "reverse", false); const int limit = Int32Property(env, options, "limit", -1); std::string* lt = RangeOption(env, options, "lt"); std::string* lte = RangeOption(env, options, "lte"); @@ -1041,7 +1068,26 @@ NAPI_METHOD(transactionClear) { const TransactionSnapshot* snapshot = TransactionSnapshotProperty(env, options, "snapshot"); IteratorClearWorker* worker = new IteratorClearWorker( - env, transaction, callback, reverse, limit, lt, lte, gt, gte, snapshot); + env, transaction, callback, limit, lt, lte, gt, gte, snapshot); + worker->Queue(env); + NAPI_RETURN_UNDEFINED(); +} + +NAPI_METHOD(transactionCount) { + NAPI_ARGV(3); + NAPI_TRANSACTION_CONTEXT(); + ASSERT_TRANSACTION_READY(env, transaction); + napi_value options = argv[1]; + napi_value callback = argv[2]; + const int limit = Int32Property(env, options, "limit", -1); + std::string* lt = RangeOption(env, options, "lt"); + std::string* lte = RangeOption(env, options, "lte"); + std::string* gt = RangeOption(env, options, "gt"); + std::string* gte = RangeOption(env, options, "gte"); + const TransactionSnapshot* snapshot = + TransactionSnapshotProperty(env, options, "snapshot"); + IteratorCountWorker* worker = new IteratorCountWorker( + env, transaction, callback, limit, lt, lte, gt, gte, snapshot); worker->Queue(env); NAPI_RETURN_UNDEFINED(); } @@ -1061,6 +1107,7 @@ NAPI_INIT() { NAPI_EXPORT_FUNCTION(dbPut); NAPI_EXPORT_FUNCTION(dbDel); NAPI_EXPORT_FUNCTION(dbClear); + NAPI_EXPORT_FUNCTION(dbCount); NAPI_EXPORT_FUNCTION(dbApproximateSize); NAPI_EXPORT_FUNCTION(dbCompactRange); NAPI_EXPORT_FUNCTION(dbGetProperty); @@ -1084,6 +1131,7 @@ NAPI_INIT() { NAPI_EXPORT_FUNCTION(batchWrite); NAPI_EXPORT_FUNCTION(transactionInit); + NAPI_EXPORT_FUNCTION(transactionId); NAPI_EXPORT_FUNCTION(transactionCommit); NAPI_EXPORT_FUNCTION(transactionRollback); NAPI_EXPORT_FUNCTION(transactionGet); @@ -1095,4 +1143,5 @@ NAPI_INIT() { NAPI_EXPORT_FUNCTION(transactionSnapshot); NAPI_EXPORT_FUNCTION(transactionIteratorInit); NAPI_EXPORT_FUNCTION(transactionClear); + NAPI_EXPORT_FUNCTION(transactionCount); } diff --git a/src/rocksdb/napi/transaction.cpp b/src/rocksdb/napi/transaction.cpp index e6828f05..84a04014 100644 --- a/src/rocksdb/napi/transaction.cpp +++ b/src/rocksdb/napi/transaction.cpp @@ -75,6 +75,8 @@ rocksdb::Status Transaction::Commit() { } hasCommitted_ = true; rocksdb::Status status = tran_->Commit(); + // If the commit failed, this object is still considered committed + // this means this object cannot be used anymore // Early deletion delete tran_; tran_ = nullptr; @@ -93,6 +95,8 @@ rocksdb::Status Transaction::Rollback() { } hasRollbacked_ = true; rocksdb::Status status = tran_->Rollback(); + // If the rollback failed, this object is still considered rollbacked + // this means this object cannot be used anymore // Early deletion delete tran_; tran_ = nullptr; diff --git a/src/rocksdb/napi/workers/iterator_workers.cpp b/src/rocksdb/napi/workers/iterator_workers.cpp index 620acaad..adf14ed7 100644 --- a/src/rocksdb/napi/workers/iterator_workers.cpp +++ b/src/rocksdb/napi/workers/iterator_workers.cpp @@ -81,25 +81,26 @@ void IteratorNextWorker::DoFinally(napi_env env) { BaseWorker::DoFinally(env); } -IteratorClearWorker::IteratorClearWorker( - napi_env env, Database* database, napi_value callback, const bool reverse, - const int limit, std::string* lt, std::string* lte, std::string* gt, - std::string* gte, const bool sync, const Snapshot* snapshot) +IteratorClearWorker::IteratorClearWorker(napi_env env, Database* database, + napi_value callback, const int limit, + std::string* lt, std::string* lte, + std::string* gt, std::string* gte, + const bool sync, + const Snapshot* snapshot) : PriorityWorker(env, database, callback, "rocksdb.iterator.clear") { - iterator_ = new BaseIterator(database, reverse, lt, lte, gt, gte, limit, - false, snapshot); + iterator_ = new BaseIterator(database, false, lt, lte, gt, gte, limit, false, + snapshot); writeOptions_ = new rocksdb::WriteOptions(); writeOptions_->sync = sync; } IteratorClearWorker::IteratorClearWorker(napi_env env, Transaction* transaction, - napi_value callback, - const bool reverse, const int limit, + napi_value callback, const int limit, std::string* lt, std::string* lte, std::string* gt, std::string* gte, const TransactionSnapshot* snapshot) : PriorityWorker(env, transaction, callback, "rocksdb.iterator.clear") { - iterator_ = new BaseIterator(transaction, reverse, lt, lte, gt, gte, limit, + iterator_ = new BaseIterator(transaction, false, lt, lte, gt, gte, limit, false, snapshot); writeOptions_ = nullptr; } @@ -149,3 +150,66 @@ void IteratorClearWorker::DoExecute() { } iterator_->Close(); } + +IteratorCountWorker::IteratorCountWorker(napi_env env, Database* database, + napi_value callback, const int limit, + std::string* lt, std::string* lte, + std::string* gt, std::string* gte, + const Snapshot* snapshot) + : PriorityWorker(env, database, callback, "rocksdb.iterator.count") { + iterator_ = new BaseIterator(database, false, lt, lte, gt, gte, limit, false, + snapshot); +} + +IteratorCountWorker::IteratorCountWorker(napi_env env, Transaction* transaction, + napi_value callback, const int limit, + std::string* lt, std::string* lte, + std::string* gt, std::string* gte, + const TransactionSnapshot* snapshot) + : PriorityWorker(env, transaction, callback, "rocksdb.iterator.close") { + iterator_ = new BaseIterator(transaction, false, lt, lte, gt, gte, limit, + false, snapshot); +} + +IteratorCountWorker::~IteratorCountWorker() { delete iterator_; } + +void IteratorCountWorker::DoExecute() { + assert(database_ != nullptr || transaction_ != nullptr); + iterator_->SeekToRange(); + uint32_t hwm = 16 * 1024; + if (database_ != nullptr) { + while (true) { + size_t bytesRead = 0; + while (bytesRead <= hwm && iterator_->Valid() && iterator_->Increment()) { + rocksdb::Slice key = iterator_->CurrentKey(); + count_++; + bytesRead += key.size(); + iterator_->Next(); + } + if (!SetStatus(iterator_->Status()) || bytesRead == 0) { + break; + } + } + } else if (transaction_ != nullptr) { + while (true) { + size_t bytesRead = 0; + while (bytesRead <= hwm && iterator_->Valid() && iterator_->Increment()) { + rocksdb::Slice key = iterator_->CurrentKey(); + count_++; + bytesRead += key.size(); + iterator_->Next(); + } + if (!SetStatus(iterator_->Status()) || bytesRead == 0) { + break; + } + } + } + iterator_->Close(); +} + +void IteratorCountWorker::HandleOKCallback(napi_env env, napi_value callback) { + napi_value argv[2]; + napi_get_null(env, &argv[0]); + napi_create_uint32(env, count_, &argv[1]); + CallFunction(env, callback, 2, argv); +} diff --git a/src/rocksdb/napi/workers/iterator_workers.h b/src/rocksdb/napi/workers/iterator_workers.h index 1a681489..878654bf 100644 --- a/src/rocksdb/napi/workers/iterator_workers.h +++ b/src/rocksdb/napi/workers/iterator_workers.h @@ -58,14 +58,13 @@ struct IteratorNextWorker final : public BaseWorker { */ struct IteratorClearWorker final : public PriorityWorker { IteratorClearWorker(napi_env env, Database* database, napi_value callback, - const bool reverse, const int limit, std::string* lt, - std::string* lte, std::string* gt, std::string* gte, - const bool sync, const Snapshot* snapshot = nullptr); + const int limit, std::string* lt, std::string* lte, + std::string* gt, std::string* gte, const bool sync, + const Snapshot* snapshot = nullptr); IteratorClearWorker(napi_env env, Transaction* transaction, - napi_value callback, const bool reverse, const int limit, - std::string* lt, std::string* lte, std::string* gt, - std::string* gte, + napi_value callback, const int limit, std::string* lt, + std::string* lte, std::string* gt, std::string* gte, const TransactionSnapshot* snapshot = nullptr); ~IteratorClearWorker(); @@ -76,3 +75,25 @@ struct IteratorClearWorker final : public PriorityWorker { BaseIterator* iterator_; rocksdb::WriteOptions* writeOptions_; }; + +struct IteratorCountWorker final : public PriorityWorker { + IteratorCountWorker(napi_env env, Database* database, napi_value callback, + const int limit, std::string* lt, std::string* lte, + std::string* gt, std::string* gte, + const Snapshot* snapshot = nullptr); + + IteratorCountWorker(napi_env env, Transaction* transaction, + napi_value callback, const int limit, std::string* lt, + std::string* lte, std::string* gt, std::string* gte, + const TransactionSnapshot* snapshot = nullptr); + + ~IteratorCountWorker(); + + void DoExecute() override; + + void HandleOKCallback(napi_env env, napi_value callback) override; + + private: + BaseIterator* iterator_; + uint32_t count_ = 0; +}; diff --git a/src/rocksdb/rocksdb.ts b/src/rocksdb/rocksdb.ts index 1e6ce112..0b3158eb 100644 --- a/src/rocksdb/rocksdb.ts +++ b/src/rocksdb/rocksdb.ts @@ -16,6 +16,7 @@ import type { RocksDBBatchOptions, RocksDBBatchDelOperation, RocksDBBatchPutOperation, + RocksDBCountOptions, } from './types'; import path from 'path'; import nodeGypBuild from 'node-gyp-build'; @@ -71,6 +72,11 @@ interface RocksDB { options: RocksDBClearOptions, callback: Callback<[], void>, ): void; + dbCount( + database: RocksDBDatabase, + options: RocksDBCountOptions, + callback: Callback<[number], void>, + ): void; dbApproximateSize( database: RocksDBDatabase, start: string | Buffer, @@ -141,62 +147,79 @@ interface RocksDB { ): void; transactionInit( database: RocksDBDatabase, - options: RocksDBTransactionOptions + options: RocksDBTransactionOptions, ): RocksDBTransaction; + transactionId(transaction: RocksDBTransaction): number; transactionCommit( transaction: RocksDBTransaction, - callback: Callback<[], void> + callback: Callback<[], void>, ): void; transactionRollback( transaction: RocksDBTransaction, - callback: Callback<[], void> + callback: Callback<[], void>, ): void; transactionGet( transaction: RocksDBTransaction, key: string | Buffer, - options: RocksDBGetOptions & { valueEncoding?: 'utf8' }, + options: RocksDBGetOptions & { + valueEncoding?: 'utf8'; + }, callback: Callback<[string], void>, ): void; transactionGet( transaction: RocksDBTransaction, key: string | Buffer, - options: RocksDBGetOptions & { valueEncoding: 'buffer' }, + options: RocksDBGetOptions & { + valueEncoding: 'buffer'; + }, callback: Callback<[Buffer], void>, ): void; transactionGetForUpdate( transaction: RocksDBTransaction, key: string | Buffer, - options: RocksDBGetOptions & { valueEncoding?: 'utf8' }, + options: RocksDBGetOptions & { + valueEncoding?: 'utf8'; + }, callback: Callback<[string], void>, ): void; transactionGetForUpdate( transaction: RocksDBTransaction, key: string | Buffer, - options: RocksDBGetOptions & { valueEncoding: 'buffer' }, + options: RocksDBGetOptions & { + valueEncoding: 'buffer'; + }, callback: Callback<[Buffer], void>, ): void; transactionMultiGet( transaction: RocksDBTransaction, keys: Array, - options: RocksDBGetOptions & { valueEncoding?: 'utf8' }, + options: RocksDBGetOptions & { + valueEncoding?: 'utf8'; + }, callback: Callback<[Array], void>, ): void; transactionMultiGet( transaction: RocksDBTransaction, keys: Array, - options: RocksDBGetOptions & { valueEncoding: 'buffer' }, + options: RocksDBGetOptions & { + valueEncoding: 'buffer'; + }, callback: Callback<[Array], void>, ): void; transactionMultiGetForUpdate( transaction: RocksDBTransaction, keys: Array, - options: RocksDBGetOptions & { valueEncoding?: 'utf8' }, + options: RocksDBGetOptions & { + valueEncoding?: 'utf8'; + }, callback: Callback<[Array], void>, ): void; transactionMultiGetForUpdate( transaction: RocksDBTransaction, keys: Array, - options: RocksDBGetOptions & { valueEncoding: 'buffer' }, + options: RocksDBGetOptions & { + valueEncoding: 'buffer'; + }, callback: Callback<[Array], void>, ): void; transactionPut( @@ -222,11 +245,15 @@ interface RocksDB { ): RocksDBIterator; transactionIteratorInit( transaction: RocksDBTransaction, - options: RocksDBIteratorOptions & { keyEncoding: 'buffer' }, + options: RocksDBIteratorOptions & { + keyEncoding: 'buffer'; + }, ): RocksDBIterator; transactionIteratorInit( transaction: RocksDBTransaction, - options: RocksDBIteratorOptions & { valueEncoding: 'buffer' }, + options: RocksDBIteratorOptions & { + valueEncoding: 'buffer'; + }, ): RocksDBIterator; transactionIteratorInit( transaction: RocksDBTransaction, @@ -237,6 +264,11 @@ interface RocksDB { options: RocksDBClearOptions, callback: Callback<[], void>, ): void; + transactionCount( + transaction: RocksDBTransaction, + options: RocksDBCountOptions, + callback: Callback<[number], void>, + ): void; } const rocksdb: RocksDB = nodeGypBuild(path.join(__dirname, '../../')); diff --git a/src/rocksdb/rocksdbP.ts b/src/rocksdb/rocksdbP.ts index 709d0f89..c9fdead0 100644 --- a/src/rocksdb/rocksdbP.ts +++ b/src/rocksdb/rocksdbP.ts @@ -10,6 +10,7 @@ import type { RocksDBPutOptions, RocksDBDelOptions, RocksDBClearOptions, + RocksDBCountOptions, RocksDBIteratorOptions, RocksDBTransactionOptions, RocksDBBatchOptions, @@ -62,6 +63,10 @@ interface RocksDBP { database: RocksDBDatabase, options: RocksDBClearOptions, ): Promise; + dbCount( + database: RocksDBDatabase, + options: RocksDBCountOptions, + ): Promise; dbApproximateSize( database: RocksDBDatabase, start: string | Buffer, @@ -74,7 +79,7 @@ interface RocksDBP { ): Promise; dbGetProperty(database: RocksDBDatabase, property: string): string; snapshotInit(database: RocksDBDatabase): RocksDBSnapshot; - snapshotRelease(snap: RocksDBSnapshot): Promise; + snapshotRelease(snapshot: RocksDBSnapshot): Promise; destroyDb(location: string): Promise; repairDb(location: string): Promise; iteratorInit( @@ -121,60 +126,79 @@ interface RocksDBP { batchWrite(batch: RocksDBBatch, options: RocksDBBatchOptions): Promise; transactionInit( database: RocksDBDatabase, - options: RocksDBTransactionOptions + options: RocksDBTransactionOptions, ): RocksDBTransaction; - transactionCommit(tran: RocksDBTransaction): Promise; - transactionRollback(tran: RocksDBTransaction): Promise; + transactionId(transaction: RocksDBTransaction): number; + transactionCommit(transaction: RocksDBTransaction): Promise; + transactionRollback(transaction: RocksDBTransaction): Promise; transactionGet( - tran: RocksDBTransaction, + transaction: RocksDBTransaction, key: string | Buffer, - options: RocksDBGetOptions & { valueEncoding?: 'utf8' }, + options: RocksDBGetOptions & { + valueEncoding?: 'utf8'; + }, ): Promise; transactionGet( - tran: RocksDBTransaction, + transaction: RocksDBTransaction, key: string | Buffer, - options: RocksDBGetOptions & { valueEncoding: 'buffer' }, + options: RocksDBGetOptions & { + valueEncoding: 'buffer'; + }, ): Promise; transactionGetForUpdate( - tran: RocksDBTransaction, + transaction: RocksDBTransaction, key: string | Buffer, - options: RocksDBGetOptions & { valueEncoding?: 'utf8' }, + options: RocksDBGetOptions & { + valueEncoding?: 'utf8'; + }, ): Promise; transactionGetForUpdate( - tran: RocksDBTransaction, + transaction: RocksDBTransaction, key: string | Buffer, - options: RocksDBGetOptions & { valueEncoding: 'buffer' }, + options: RocksDBGetOptions & { + valueEncoding: 'buffer'; + }, ): Promise; transactionMultiGet( transaction: RocksDBTransaction, keys: Array, - options: RocksDBGetOptions & { valueEncoding?: 'utf8' }, + options: RocksDBGetOptions & { + valueEncoding?: 'utf8'; + }, ): Promise>; transactionMultiGet( transaction: RocksDBTransaction, keys: Array, - options: RocksDBGetOptions & { valueEncoding: 'buffer' }, + options: RocksDBGetOptions & { + valueEncoding: 'buffer'; + }, ): Promise>; transactionMultiGetForUpdate( transaction: RocksDBTransaction, keys: Array, - options: RocksDBGetOptions & { valueEncoding?: 'utf8' }, + options: RocksDBGetOptions & { + valueEncoding?: 'utf8'; + }, ): Promise>; transactionMultiGetForUpdate( transaction: RocksDBTransaction, keys: Array, - options: RocksDBGetOptions & { valueEncoding: 'buffer' }, + options: RocksDBGetOptions & { + valueEncoding: 'buffer'; + }, ): Promise>; transactionPut( - tran: RocksDBTransaction, + transaction: RocksDBTransaction, key: string | Buffer, value: string | Buffer, ): Promise; transactionDel( - tran: RocksDBTransaction, + transaction: RocksDBTransaction, key: string | Buffer, ): Promise; - transactionSnapshot(tran: RocksDBTransaction): RocksDBTransactionSnapshot; + transactionSnapshot( + transaction: RocksDBTransaction, + ): RocksDBTransactionSnapshot; transactionIteratorInit( transaction: RocksDBTransaction, options: RocksDBIteratorOptions & { @@ -184,11 +208,15 @@ interface RocksDBP { ): RocksDBIterator; transactionIteratorInit( transaction: RocksDBTransaction, - options: RocksDBIteratorOptions & { keyEncoding: 'buffer' }, + options: RocksDBIteratorOptions & { + keyEncoding: 'buffer'; + }, ): RocksDBIterator; transactionIteratorInit( transaction: RocksDBTransaction, - options: RocksDBIteratorOptions & { valueEncoding: 'buffer' }, + options: RocksDBIteratorOptions & { + valueEncoding: 'buffer'; + }, ): RocksDBIterator; transactionIteratorInit( database: RocksDBTransaction, @@ -198,6 +226,10 @@ interface RocksDBP { transaction: RocksDBTransaction, options: RocksDBClearOptions, ): Promise; + transactionCount( + transaction: RocksDBTransaction, + options: RocksDBCountOptions, + ): Promise; } /** @@ -212,9 +244,8 @@ const rocksdbP: RocksDBP = { dbPut: utils.promisify(rocksdb.dbPut).bind(rocksdb), dbDel: utils.promisify(rocksdb.dbDel).bind(rocksdb), dbClear: utils.promisify(rocksdb.dbClear).bind(rocksdb), - dbApproximateSize: utils - .promisify(rocksdb.dbApproximateSize) - .bind(rocksdb), + dbCount: utils.promisify(rocksdb.dbCount).bind(rocksdb), + dbApproximateSize: utils.promisify(rocksdb.dbApproximateSize).bind(rocksdb), dbCompactRange: utils.promisify(rocksdb.dbCompactRange).bind(rocksdb), dbGetProperty: rocksdb.dbGetProperty.bind(rocksdb), snapshotInit: rocksdb.snapshotInit.bind(rocksdb), @@ -232,17 +263,27 @@ const rocksdbP: RocksDBP = { batchClear: rocksdb.batchClear.bind(rocksdb), batchWrite: rocksdb.batchWrite.bind(rocksdb), transactionInit: rocksdb.transactionInit.bind(rocksdb), + transactionId: rocksdb.transactionId.bind(rocksdb), transactionCommit: utils.promisify(rocksdb.transactionCommit).bind(rocksdb), - transactionRollback: utils.promisify(rocksdb.transactionRollback).bind(rocksdb), + transactionRollback: utils + .promisify(rocksdb.transactionRollback) + .bind(rocksdb), transactionGet: utils.promisify(rocksdb.transactionGet).bind(rocksdb), - transactionGetForUpdate: utils.promisify(rocksdb.transactionGetForUpdate).bind(rocksdb), - transactionMultiGet: utils.promisify(rocksdb.transactionMultiGet).bind(rocksdb), - transactionMultiGetForUpdate: utils.promisify(rocksdb.transactionMultiGetForUpdate).bind(rocksdb), + transactionGetForUpdate: utils + .promisify(rocksdb.transactionGetForUpdate) + .bind(rocksdb), + transactionMultiGet: utils + .promisify(rocksdb.transactionMultiGet) + .bind(rocksdb), + transactionMultiGetForUpdate: utils + .promisify(rocksdb.transactionMultiGetForUpdate) + .bind(rocksdb), transactionPut: utils.promisify(rocksdb.transactionPut).bind(rocksdb), transactionDel: utils.promisify(rocksdb.transactionDel).bind(rocksdb), transactionSnapshot: rocksdb.transactionSnapshot.bind(rocksdb), transactionIteratorInit: rocksdb.transactionIteratorInit.bind(rocksdb), transactionClear: utils.promisify(rocksdb.transactionClear).bind(rocksdb), + transactionCount: utils.promisify(rocksdb.transactionCount).bind(rocksdb), }; export default rocksdbP; diff --git a/src/rocksdb/types.ts b/src/rocksdb/types.ts index c8917452..5a7e326b 100644 --- a/src/rocksdb/types.ts +++ b/src/rocksdb/types.ts @@ -1,5 +1,10 @@ import type { Opaque } from '../types'; +/** + * Note that `undefined` is not a valid value for these options + * If properties exist, they must have the correct type + */ + /** * RocksDBDatabase object * A `napi_external` type @@ -50,9 +55,6 @@ type RocksDBTransactionSnapshot = Opaque<'RocksDBTransactionSnapshot', object>; /** * RocksDB database options - * Note that `undefined` is not a valid value for these options - * Make sure that the property either exists and it is a correct type - * or that it does not exist */ type RocksDBDatabaseOptions = { createIfMissing?: boolean; // Default true @@ -69,11 +71,10 @@ type RocksDBDatabaseOptions = { /** * Get options - * Note that `undefined` is not a valid value for these options - * Make sure that the property either exists and it is a correct type - * or that it does not exist */ -type RocksDBGetOptions = { +type RocksDBGetOptions< + S extends RocksDBSnapshot | RocksDBTransactionSnapshot = RocksDBSnapshot, +> = { valueEncoding?: 'utf8' | 'buffer'; // Default 'utf8'; fillCache?: boolean; // Default true snapshot?: S; @@ -81,9 +82,6 @@ type RocksDBGetOptions -= RocksDBRangeOptions & { +type RocksDBClearOptions< + S extends RocksDBSnapshot | RocksDBTransactionSnapshot = RocksDBSnapshot, +> = Omit & { snapshot?: S; sync?: S extends RocksDBSnapshot ? boolean : void; // Default false }; +/** + * Count options + */ +type RocksDBCountOptions< + S extends RocksDBSnapshot | RocksDBTransactionSnapshot = RocksDBSnapshot, +> = Omit & { + snapshot?: S; +}; + /** * Iterator options - * Note that `undefined` is not a valid value for these options - * If properties exist, they must have the correct type */ -type RocksDBIteratorOptions - -= RocksDBGetOptions & +type RocksDBIteratorOptions< + S extends RocksDBSnapshot | RocksDBTransactionSnapshot = RocksDBSnapshot, +> = RocksDBGetOptions & RocksDBRangeOptions & { keys?: boolean; values?: boolean; @@ -146,15 +145,11 @@ type RocksDBIteratorOptions /** * Transaction options - * Note that `undefined` is not a valid value for these options - * If properties exist, they must have the correct type */ type RocksDBTransactionOptions = RocksDBPutOptions; /** * Batch options - * Note that `undefined` is not a valid value for these options - * If properties exist, they must have the correct type */ type RocksDBBatchOptions = RocksDBPutOptions; @@ -182,6 +177,7 @@ export type { RocksDBDelOptions, RocksDBRangeOptions, RocksDBClearOptions, + RocksDBCountOptions, RocksDBIteratorOptions, RocksDBTransactionOptions, RocksDBBatchOptions, diff --git a/src/types.ts b/src/types.ts index e522493c..bb85e39e 100644 --- a/src/types.ts +++ b/src/types.ts @@ -3,6 +3,10 @@ import type { RocksDBIteratorOptions, RocksDBBatchPutOperation, RocksDBBatchDelOperation, + RocksDBClearOptions, + RocksDBCountOptions, + RocksDBSnapshot, + RocksDBTransactionSnapshot, } from './rocksdb/types'; import type fs from 'fs'; import type { WorkerManagerInterface } from '@matrixai/workers'; @@ -85,21 +89,41 @@ type DBOptions = Omit< * The `valueAsBuffer` property controls value type * It should be considered to default to true */ -type DBIteratorOptions = Merge< - Omit, +type DBIteratorOptions< + S extends RocksDBSnapshot | RocksDBTransactionSnapshot = RocksDBSnapshot, +> = Merge< + Omit, 'keyEncoding' | 'valueEncoding'>, { gt?: KeyPath | Buffer | string; gte?: KeyPath | Buffer | string; lt?: KeyPath | Buffer | string; lte?: KeyPath | Buffer | string; - limit?: number; - keys?: boolean; - values?: boolean; keyAsBuffer?: boolean; valueAsBuffer?: boolean; - reverse?: boolean; - fillCache?: boolean; - highWaterMarkBytes?: number; + } +>; + +type DBClearOptions< + S extends RocksDBSnapshot | RocksDBTransactionSnapshot = RocksDBSnapshot, +> = Merge< + RocksDBClearOptions, + { + gt?: KeyPath | Buffer | string; + gte?: KeyPath | Buffer | string; + lt?: KeyPath | Buffer | string; + lte?: KeyPath | Buffer | string; + } +>; + +type DBCountOptions< + S extends RocksDBSnapshot | RocksDBTransactionSnapshot = RocksDBSnapshot, +> = Merge< + RocksDBCountOptions, + { + gt?: KeyPath | Buffer | string; + gte?: KeyPath | Buffer | string; + lt?: KeyPath | Buffer | string; + lte?: KeyPath | Buffer | string; } >; @@ -139,6 +163,8 @@ export type { LevelPath, DBOptions, DBIteratorOptions, + DBClearOptions, + DBCountOptions, DBBatch, DBOp, DBOps, diff --git a/src/utils.ts b/src/utils.ts index c014ce3c..d87d0177 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,4 +1,4 @@ -import type { Callback, KeyPath, LevelPath } from './types'; +import type { Callback, Merge, KeyPath, LevelPath } from './types'; import * as errors from './errors'; /** @@ -90,6 +90,7 @@ function decodePart(data: Buffer): Buffer { /** * Used to convert possible KeyPath into legal KeyPath + * Returns a copy which can be mutated */ function toKeyPath(keyPath: KeyPath | string | Buffer): KeyPath { if (!Array.isArray(keyPath)) { @@ -323,6 +324,76 @@ function filterUndefined(o: object): void { }); } +function iterationOptions< + O extends { + gt?: KeyPath | Buffer | string; + gte?: KeyPath | Buffer | string; + lt?: KeyPath | Buffer | string; + lte?: KeyPath | Buffer | string; + }, +>( + options: O, + levelPath: LevelPath, +): Merge< + O, + { + gt?: Buffer; + gte?: Buffer; + lt?: Buffer; + lte?: Buffer; + keyEncoding: 'buffer'; + valueEncoding: 'buffer'; + } +> { + const options_ = { + ...options, + // Internally we always use the buffer + keyEncoding: 'buffer' as const, + valueEncoding: 'buffer' as const, + } as Merge< + O, + { + gt?: Buffer; + gte?: Buffer; + lt?: Buffer; + lte?: Buffer; + keyEncoding: 'buffer'; + valueEncoding: 'buffer'; + } + >; + if (options?.gt != null) { + options_.gt = keyPathToKey(levelPath.concat(toKeyPath(options.gt))); + } + if (options?.gte != null) { + options_.gte = keyPathToKey(levelPath.concat(toKeyPath(options.gte))); + } + if (options?.gt == null && options?.gte == null) { + // If the level path is empty then all keys are allowed + if (levelPath.length > 0) { + options_.gt = levelPathToKey(levelPath); + } + } + if (options?.lt != null) { + options_.lt = keyPathToKey(levelPath.concat(toKeyPath(options.lt))); + } + if (options?.lte != null) { + options_.lte = keyPathToKey(levelPath.concat(toKeyPath(options.lte))); + } + if (options?.lt == null && options?.lte == null) { + // If the level path is empty then all keys are allowed + if (levelPath.length > 0) { + const levelKeyEnd = levelPathToKey(levelPath); + // This works because the separator byte is 0x00 + // Therefore we have `sep level sep` + // and we can acquire keys less than `sep level sep+1` + levelKeyEnd[levelKeyEnd.length - 1] += 1; + options_.lt = levelKeyEnd; + } + } + filterUndefined(options_); + return options_; +} + export { sep, encodePart, @@ -338,4 +409,5 @@ export { fromArrayBuffer, promisify, filterUndefined, + iterationOptions, }; diff --git a/tests/DB.test.ts b/tests/DB.test.ts index 8133fded..440b80ec 100644 --- a/tests/DB.test.ts +++ b/tests/DB.test.ts @@ -4,27 +4,24 @@ import os from 'os'; import path from 'path'; import fs from 'fs'; import nodeCrypto from 'crypto'; -import nodeUtil from 'util'; -import lexi from 'lexicographic-integer'; import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; import { WorkerManager } from '@matrixai/workers'; import { withF } from '@matrixai/resources'; import { spawn, Worker } from 'threads'; import DB from '@/DB'; -import rocksdbP from '@/rocksdb/rocksdbP'; import * as errors from '@/errors'; import * as utils from '@/utils'; -import * as testUtils from './utils'; +import * as testsUtils from './utils'; describe(DB.name, () => { const logger = new Logger(`${DB.name} Test`, LogLevel.WARN, [ new StreamHandler(), ]); const crypto = { - key: testUtils.generateKeySync(256), + key: testsUtils.generateKeySync(256), ops: { - encrypt: testUtils.encrypt, - decrypt: testUtils.decrypt, + encrypt: testsUtils.encrypt, + decrypt: testsUtils.decrypt, }, }; let dataDir: string; @@ -82,33 +79,13 @@ describe(DB.name, () => { expect(await db2.get('key')).toBeUndefined(); await db2.stop(); }); - test('start wipes dirty transaction state', async () => { - const dbPath = `${dataDir}/db`; - const db = await DB.createDB({ dbPath, crypto, logger }); - const data = await db.serializeEncrypt('bar', false); - // Put in dirty transaction state - await rocksdbP.dbPut( - db.db, - utils.keyPathToKey(['transactions', 'foo']), - data, - {}, - ); - expect(await db.dump(['transactions'], false, true)).toStrictEqual([ - [['foo'], 'bar'], - ]); - await db.stop(); - // Should wipe the transaction state - await db.start(); - expect(await db.dump(['transactions'], false, true)).toStrictEqual([]); - await db.stop(); - }); test('start performs canary check to validate key', async () => { const dbPath = `${dataDir}/db`; let db = await DB.createDB({ dbPath, crypto, logger }); await db.stop(); const crypto_ = { ...crypto, - key: testUtils.generateKeySync(256), + key: testsUtils.generateKeySync(256), }; await expect( DB.createDB({ dbPath, crypto: crypto_, logger }), @@ -154,8 +131,8 @@ describe(DB.name, () => { const dbPath = `${dataDir}/db`; const db = await DB.createDB({ dbPath, crypto, logger }); const keyPaths: Array = Array.from({ length: 1000 }, () => - Array.from({ length: testUtils.getRandomInt(0, 11) }, () => - nodeCrypto.randomBytes(testUtils.getRandomInt(0, 11)), + Array.from({ length: testsUtils.getRandomInt(0, 11) }, () => + nodeCrypto.randomBytes(testsUtils.getRandomInt(0, 11)), ), ); for (const kP of keyPaths) { @@ -203,7 +180,7 @@ describe(DB.name, () => { 'key', ]); const records: Array<[KeyPath, Buffer]> = []; - for await (const [kP, v] of db.iterator(undefined, [ + for await (const [kP, v] of db.iterator([ Buffer.concat([utils.sep, Buffer.from('level')]), ])) { records.push([kP, v]); @@ -213,24 +190,24 @@ describe(DB.name, () => { ]); await db.stop(); }); - // Test('keys that are empty arrays are converted to empty string', async () => { - // const dbPath = `${dataDir}/db`; - // const db = await DB.createDB({ dbPath, crypto, logger }); - // await db.put([], 'value'); - // expect(await db.get([])).toBe('value'); - // await db.del([]); - // expect(await db.get([])).toBeUndefined(); - // await withF([db.transaction()], async ([tran]) => { - // await tran.put([], 'value'); - // expect(await tran.get([])).toBe('value'); - // await tran.del([]); - // }); - // await withF([db.transaction()], async ([tran]) => { - // await tran.put([], 'value'); - // }); - // expect(await db.get([])).toBe('value'); - // await db.stop(); - // }); + test('keys that are empty arrays are converted to empty string', async () => { + const dbPath = `${dataDir}/db`; + const db = await DB.createDB({ dbPath, crypto, logger }); + await db.put([], 'value'); + expect(await db.get([])).toBe('value'); + await db.del([]); + expect(await db.get([])).toBeUndefined(); + await withF([db.transaction()], async ([tran]) => { + await tran.put([], 'value'); + expect(await tran.get([])).toBe('value'); + await tran.del([]); + }); + await withF([db.transaction()], async ([tran]) => { + await tran.put([], 'value'); + }); + expect(await db.get([])).toBe('value'); + await db.stop(); + }); test('keys can contain separator buffer', async () => { const dbPath = `${dataDir}/db`; const db = await DB.createDB({ dbPath, crypto, logger }); @@ -519,4 +496,77 @@ describe(DB.name, () => { expect(await db.get('d')).toBe('value3'); await db.stop(); }); + test('debug dumping', async () => { + const dbPath = `${dataDir}/db`; + const db = await DB.createDB({ dbPath, crypto, logger }); + await db.put('a', 'value0'); + await db.put('b', 'value1'); + await db.put('c', 'value2'); + await db.put('d', 'value3'); + expect(await db.dump()).toStrictEqual([ + [['a'], 'value0'], + [['b'], 'value1'], + [['c'], 'value2'], + [['d'], 'value3'], + ]); + // Remember non-raw data is always encoded with JSON + // So the raw dump is a buffer version of the JSON string + expect(await db.dump([], true)).toStrictEqual([ + [[Buffer.from('a')], Buffer.from(JSON.stringify('value0'))], + [[Buffer.from('b')], Buffer.from(JSON.stringify('value1'))], + [[Buffer.from('c')], Buffer.from(JSON.stringify('value2'))], + [[Buffer.from('d')], Buffer.from(JSON.stringify('value3'))], + ]); + // Raw dumping will acquire values from the `data` root level + // and also the canary key + expect(await db.dump([], true, true)).toStrictEqual([ + [ + [Buffer.from('data'), Buffer.from('a')], + Buffer.from(JSON.stringify('value0')), + ], + [ + [Buffer.from('data'), Buffer.from('b')], + Buffer.from(JSON.stringify('value1')), + ], + [ + [Buffer.from('data'), Buffer.from('c')], + Buffer.from(JSON.stringify('value2')), + ], + [ + [Buffer.from('data'), Buffer.from('d')], + Buffer.from(JSON.stringify('value3')), + ], + [[Buffer.from('canary')], Buffer.from(JSON.stringify('deadbeef'))], + ]); + // It is also possible to insert at root level + await db._put([], 'value0'); + await db._put(['a'], 'value1'); + await db._put(['a', 'b'], 'value2'); + expect(await db.dump([], true, true)).toStrictEqual([ + [ + [Buffer.from('a'), Buffer.from('b')], + Buffer.from(JSON.stringify('value2')), + ], + [ + [Buffer.from('data'), Buffer.from('a')], + Buffer.from(JSON.stringify('value0')), + ], + [ + [Buffer.from('data'), Buffer.from('b')], + Buffer.from(JSON.stringify('value1')), + ], + [ + [Buffer.from('data'), Buffer.from('c')], + Buffer.from(JSON.stringify('value2')), + ], + [ + [Buffer.from('data'), Buffer.from('d')], + Buffer.from(JSON.stringify('value3')), + ], + [[Buffer.from('')], Buffer.from(JSON.stringify('value0'))], + [[Buffer.from('a')], Buffer.from(JSON.stringify('value1'))], + [[Buffer.from('canary')], Buffer.from(JSON.stringify('deadbeef'))], + ]); + await db.stop(); + }); }); diff --git a/tests/DBIterator.test.ts b/tests/DBIterator.test.ts index cabe4ee6..aa2066dd 100644 --- a/tests/DBIterator.test.ts +++ b/tests/DBIterator.test.ts @@ -9,17 +9,17 @@ import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; import DB from '@/DB'; import DBIterator from '@/DBIterator'; import rocksdbP from '@/rocksdb/rocksdbP'; -import * as testUtils from './utils'; +import * as testsUtils from './utils'; describe(DBIterator.name, () => { const logger = new Logger(`${DBIterator.name} test`, LogLevel.WARN, [ new StreamHandler(), ]); const crypto = { - key: testUtils.generateKeySync(256), + key: testsUtils.generateKeySync(256), ops: { - encrypt: testUtils.encrypt, - decrypt: testUtils.decrypt, + encrypt: testsUtils.encrypt, + decrypt: testsUtils.decrypt, }, }; let dataDir: string; @@ -74,7 +74,7 @@ describe(DBIterator.name, () => { await db.put(Buffer.from([0x00, 0x00]), Buffer.alloc(0)); await db.put(Buffer.from([]), Buffer.alloc(0)); const keyPaths: Array = []; - for await (const [kP] of db.iterator({ values: false })) { + for await (const [kP] of db.iterator([], { values: false })) { keyPaths.push(kP); } expect(keyPaths).toEqual([ @@ -98,13 +98,13 @@ describe(DBIterator.name, () => { }); test('lexicographic iteration order fuzzing', async () => { const keys: Array = Array.from({ length: 1000 }, () => - nodeCrypto.randomBytes(testUtils.getRandomInt(0, 101)), + nodeCrypto.randomBytes(testsUtils.getRandomInt(0, 101)), ); for (const k of keys) { await db.put(k, 'value'); } const keyPaths: Array = []; - for await (const [kP] of db.iterator({ values: false })) { + for await (const [kP] of db.iterator([], { values: false })) { keyPaths.push(kP); } // Check that this matches Buffer.compare order @@ -125,7 +125,7 @@ describe(DBIterator.name, () => { await db.put(Buffer.from(lexi.pack(k)), 'value'); } const keysIterated: Array = []; - for await (const [kP] of db.iterator({ values: false })) { + for await (const [kP] of db.iterator([], { values: false })) { keysIterated.push(lexi.unpack([...kP[0]])); } expect(keys).not.toEqual(keysIterated); @@ -169,7 +169,7 @@ describe(DBIterator.name, () => { Buffer.alloc(0), ); const keyPaths: Array = []; - for await (const [kP] of db.iterator({ values: false })) { + for await (const [kP] of db.iterator([], { values: false })) { keyPaths.push(kP); } /** @@ -212,21 +212,21 @@ describe(DBIterator.name, () => { }); test('lexicographic level iteration order fuzzing', async () => { const keyPathsInput: Array = Array.from({ length: 5000 }, () => - Array.from({ length: testUtils.getRandomInt(0, 11) }, () => - nodeCrypto.randomBytes(testUtils.getRandomInt(0, 11)), + Array.from({ length: testsUtils.getRandomInt(0, 11) }, () => + nodeCrypto.randomBytes(testsUtils.getRandomInt(0, 11)), ), ); for (const kP of keyPathsInput) { await db.put(kP, 'value'); } const keyPathsOutput: Array = []; - for await (const [kP] of db.iterator({ values: false })) { + for await (const [kP] of db.iterator([], { values: false })) { keyPathsOutput.push(kP); } // Copy the DB sorted key paths const keyPathsOutput_ = [...keyPathsOutput]; // Shuffle the DB sorted key paths - testUtils.arrayShuffle(keyPathsOutput_); + testsUtils.arrayShuffle(keyPathsOutput_); keyPathsOutput_.sort((kP1: Array, kP2: Array) => { const lP1 = kP1.slice(0, kP1.length - 1); const lP2 = kP2.slice(0, kP2.length - 1); @@ -284,7 +284,7 @@ describe(DBIterator.name, () => { await db.put(['level1', 'level2', 'b'], 'value1'); let results: Array<[KeyPath, string]>; results = []; - for await (const [kP, v] of db.iterator({ + for await (const [kP, v] of db.iterator([], { keyAsBuffer: false, valueAsBuffer: false, })) { @@ -299,10 +299,10 @@ describe(DBIterator.name, () => { [['b'], 'value1'], ]); results = []; - for await (const [kP, v] of db.iterator( - { keyAsBuffer: false, valueAsBuffer: false }, - ['level1'], - )) { + for await (const [kP, v] of db.iterator(['level1'], { + keyAsBuffer: false, + valueAsBuffer: false, + })) { results.push([kP, v]); } expect(results).toStrictEqual([ @@ -330,13 +330,10 @@ describe(DBIterator.name, () => { // Notice that this will not cover the key of `0x30 0x34` // That's because of rule 3 // 3. Key parts with degree n are sorted in front of key parts with degree n - 1 - for await (const [kP] of db.iterator( - { - lte: [Buffer.from([0x30, 0x35]), ''], - values: false, - }, - ['level'], - )) { + for await (const [kP] of db.iterator(['level'], { + lte: [Buffer.from([0x30, 0x35]), ''], + values: false, + })) { keyPaths.push(kP); } expect(keyPaths).toStrictEqual([ @@ -348,13 +345,10 @@ describe(DBIterator.name, () => { // this would not work because of rule 3 // The deeper level is in front keyPaths = []; - for await (const [kP] of db.iterator( - { - gte: [Buffer.from([0x30, 0x35]), ''], - lt: [Buffer.from([0x30, 0x36]), ''], - }, - ['level'], - )) { + for await (const [kP] of db.iterator(['level'], { + gte: [Buffer.from([0x30, 0x35]), ''], + lt: [Buffer.from([0x30, 0x36]), ''], + })) { keyPaths.push(kP); } expect(keyPaths).toStrictEqual([ @@ -364,7 +358,7 @@ describe(DBIterator.name, () => { ]); // To actually do it, we need to specify as part of the level path parameter keyPaths = []; - for await (const [kP] of db.iterator(undefined, [ + for await (const [kP] of db.iterator([ 'level', Buffer.from([0x30, 0x35]), ])) { @@ -378,12 +372,9 @@ describe(DBIterator.name, () => { // However the deeper level is still there // But because of rule 3, we can do this instead keyPaths = []; - for await (const [kP] of db.iterator( - { - gte: '', - }, - ['level', Buffer.from([0x30, 0x35])], - )) { + for await (const [kP] of db.iterator(['level', Buffer.from([0x30, 0x35])], { + gte: '', + })) { keyPaths.push(kP); } expect(keyPaths).toStrictEqual([[Buffer.from([])], [Buffer.from([0x61])]]); diff --git a/tests/DBTransaction.test.ts b/tests/DBTransaction.test.ts index 190f8e7a..9ddf8f0d 100644 --- a/tests/DBTransaction.test.ts +++ b/tests/DBTransaction.test.ts @@ -1,4 +1,4 @@ -import type { KeyPath } from '@'; +import type { KeyPath } from '@/types'; import os from 'os'; import path from 'path'; import fs from 'fs'; @@ -6,17 +6,18 @@ import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; import { withF } from '@matrixai/resources'; import DB from '@/DB'; import DBTransaction from '@/DBTransaction'; -import * as testUtils from './utils'; +import * as errors from '@/errors'; +import * as testsUtils from './utils'; describe(DBTransaction.name, () => { const logger = new Logger(`${DBTransaction.name} test`, LogLevel.WARN, [ new StreamHandler(), ]); const crypto = { - key: testUtils.generateKeySync(256), + key: testsUtils.generateKeySync(256), ops: { - encrypt: testUtils.encrypt, - decrypt: testUtils.decrypt, + encrypt: testsUtils.encrypt, + decrypt: testsUtils.decrypt, }, }; let dataDir: string; @@ -35,24 +36,6 @@ describe(DBTransaction.name, () => { recursive: true, }); }); - test('snapshot state is cleared after releasing transactions', async () => { - const acquireTran1 = db.transaction(); - const [releaseTran1, tran1] = await acquireTran1(); - await tran1!.put('hello', 'world'); - const acquireTran2 = db.transaction(); - const [releaseTran2, tran2] = await acquireTran2(); - await tran2!.put('hello', 'world'); - expect(await db.dump(['transactions'], false, true)).toStrictEqual([ - [['0', 'data', 'hello'], 'world'], - [['1', 'data', 'hello'], 'world'], - ]); - await releaseTran1(); - expect(await db.dump(['transactions'], false, true)).toStrictEqual([ - [['1', 'data', 'hello'], 'world'], - ]); - await releaseTran2(); - expect(await db.dump(['transactions'], false, true)).toStrictEqual([]); - }); test('get, put and del', async () => { const p = withF([db.transaction()], async ([tran]) => { expect(await tran.get('foo')).toBeUndefined(); @@ -63,15 +46,11 @@ describe(DBTransaction.name, () => { expect(await tran.get('foo')).toBe('bar'); expect(await tran.get('hello')).toBe('world'); expect(await tran.dump()).toStrictEqual([ - [['data', 'foo'], 'bar'], - [['data', 'hello'], 'world'], + [['foo'], 'bar'], + [['hello'], 'world'], ]); // Delete hello -> world await tran.del('hello'); - // Transaction state should be used - expect( - Object.entries(await db.dump(['transactions'], false, true)).length > 0, - ).toBe(true); }); // While the transaction is executed, there is no data expect(await db.dump(['data'], false, true)).toStrictEqual([]); @@ -80,8 +59,6 @@ describe(DBTransaction.name, () => { expect(await db.dump(['data'], false, true)).toStrictEqual([ [['foo'], 'bar'], ]); - // Transaction state is cleared - expect(await db.dump(['transactions'], false, true)).toStrictEqual([]); }); test('transactional clear', async () => { await db.put('1', '1'); @@ -117,6 +94,20 @@ describe(DBTransaction.name, () => { expect(await tran.count(['level1'])).toBe(2); }); }); + test('snapshot is lazily initiated on the first operation', async () => { + await db.put('foo', 'first'); + expect(await db.get('foo')).toBe('first'); + await withF([db.transaction()], async ([tran]) => { + await db.put('foo', 'second'); + expect(await tran.get('foo')).toBe('second'); + expect(await db.get('foo')).toBe('second'); + await db.put('foo', 'third'); + // Transaction still sees it as `second` + expect(await tran.get('foo')).toBe('second'); + // Database sees it as `third` + expect(await db.get('foo')).toBe('third'); + }); + }); test('no dirty reads', async () => { await withF([db.transaction()], async ([tran1]) => { expect(await tran1.get('hello')).toBeUndefined(); @@ -127,53 +118,67 @@ describe(DBTransaction.name, () => { }); }); await db.clear(); - await withF([db.transaction()], async ([tran1]) => { - expect(await tran1.get('hello')).toBeUndefined(); - await tran1.put('hello', 'foo'); - await withF([db.transaction()], async ([tran2]) => { - // `tran1` has not yet committed - expect(await tran2.get('hello')).toBeUndefined(); - await tran2.put('hello', 'bar'); - // `tran2` has not yet committed - expect(await tran1.get('hello')).toBe('foo'); - }); - }); + await expect( + withF([db.transaction()], async ([tran1]) => { + expect(await tran1.get('hello')).toBeUndefined(); + await tran1.put('hello', 'foo'); + // This transaction commits, but the outside transaction will fail + await withF([db.transaction()], async ([tran2]) => { + // `tran1` has not yet committed + expect(await tran2.get('hello')).toBeUndefined(); + // This will cause a conflict with the external transaction + await tran2.put('hello', 'bar'); + // `tran2` has not yet committed + expect(await tran1.get('hello')).toBe('foo'); + }); + }), + ).rejects.toThrow(errors.ErrorDBTransactionConflict); }); - test('non-repeatable reads', async () => { + test('repeatable reads', async () => { await withF([db.transaction()], async ([tran1]) => { + expect(await tran1.get('hello')).toBeUndefined(); + await db.put('hello', '?'); expect(await tran1.get('hello')).toBeUndefined(); await db.withTransactionF(async (tran2) => { await tran2.put('hello', 'world'); }); - // `tran2` is now committed - expect(await tran1.get('hello')).toBe('world'); - }); - await db.clear(); - await db.withTransactionF(async (tran1) => { + // Even though `tran2` is now committed + // the snapshot was taken when `hello` was still undefined expect(await tran1.get('hello')).toBeUndefined(); - await tran1.put('hello', 'foo'); - await withF([db.transaction()], async ([tran2]) => { - // `tran1` has not yet committed - expect(await tran2.get('hello')).toBeUndefined(); - await tran2.put('hello', 'bar'); - }); - // `tran2` is now committed - // however because `foo` has been written in tran1, it stays as `foo` - expect(await tran1.get('hello')).toBe('foo'); }); + expect(await db.get('hello')).toBe('world'); + await db.clear(); + await expect( + db.withTransactionF(async (tran1) => { + expect(await tran1.get('hello')).toBeUndefined(); + await tran1.put('hello', 'foo'); + await expect( + withF([db.transaction()], async ([tran2]) => { + // `tran1` has not yet committed + expect(await tran2.get('hello')).toBeUndefined(); + await tran2.put('hello', 'bar'); + }), + ).resolves.toBeUndefined(); + // `tran2` is now committed + // however because `foo` has been written in tran1, it stays as `foo` + expect(await tran1.get('hello')).toBe('foo'); + // `hello` -> `foo` conflicts with `hello` -> `bar` + }), + ).rejects.toThrow(errors.ErrorDBTransactionConflict); + expect(await db.get('hello')).toBe('bar'); }); - test('phantom reads', async () => { + test('no phantom reads', async () => { await db.put('1', '1'); await db.put('2', '2'); await db.put('3', '3'); let rows: Array<[string, string]>; await withF([db.transaction()], async ([tran1]) => { rows = []; - for await (const [k, v] of tran1.iterator({ + for await (const [kP, v] of tran1.iterator([], { keyAsBuffer: false, valueAsBuffer: false, })) { - rows.push([k.toString(), v]); + rows.push([kP.join(), v]); } expect(rows).toStrictEqual([ ['1', '1'], @@ -184,11 +189,11 @@ describe(DBTransaction.name, () => { await tran2.del('1'); await tran2.put('4', '4'); rows = []; - for await (const [k, v] of tran1.iterator({ + for await (const [kP, v] of tran1.iterator([], { keyAsBuffer: false, valueAsBuffer: false, })) { - rows.push([k.toString(), v]); + rows.push([kP.join(), v]); } expect(rows).toStrictEqual([ ['1', '1'], @@ -197,29 +202,49 @@ describe(DBTransaction.name, () => { ]); }); rows = []; - for await (const [k, v] of tran1.iterator({ + for await (const [kP, v] of tran1.iterator([], { keyAsBuffer: false, valueAsBuffer: false, })) { - rows.push([k.toString(), v]); + rows.push([kP.toString(), v]); } + // This is the same as repeatable read + // but this applied to different key-values expect(rows).toStrictEqual([ + ['1', '1'], ['2', '2'], ['3', '3'], - ['4', '4'], ]); }); + // Starting a new iterator, see the new results + rows = []; + for await (const [kP, v] of db.iterator([], { + keyAsBuffer: false, + valueAsBuffer: false, + })) { + rows.push([kP.toString(), v]); + } + expect(rows).toStrictEqual([ + ['2', '2'], + ['3', '3'], + ['4', '4'], + ]); }); - test('lost updates', async () => { - await withF([db.transaction()], async ([tran1]) => { + test('no lost updates', async () => { + const p = withF([db.transaction()], async ([tran1]) => { await tran1.put('hello', 'foo'); await withF([db.transaction()], async ([tran2]) => { await tran2.put('hello', 'bar'); }); + // `tran1` sees `foo` expect(await tran1.get('hello')).toBe('foo'); + // However `db` sees `bar` as that's what is committed + expect(await db.get('hello')).toBe('bar'); }); - // `tran2` write is lost because `tran1` committed last - expect(await db.get('hello')).toBe('foo'); + // Even though `tran1` committed last, the `tran2` write is not lost, + // instead `tran1` results in a conflict + await expect(p).rejects.toThrow(errors.ErrorDBTransactionConflict); + expect(await db.get('hello')).toBe('bar'); }); test('get after delete consistency', async () => { await db.put('hello', 'world'); @@ -228,7 +253,6 @@ describe(DBTransaction.name, () => { await tran.put('hello', 'another'); expect(await tran.get('hello')).toBe('another'); await tran.del('hello'); - expect(await tran.dump()).toStrictEqual([[['tombstone', 'hello'], true]]); expect(await tran.get('hello')).toBeUndefined(); expect(await db.get('hello')).toBe('world'); }); @@ -266,10 +290,10 @@ describe(DBTransaction.name, () => { const results: Array<[string, string]> = []; await withF([db.transaction()], async ([tran]) => { await tran.del(['a', 'b']); - for await (const [kP, v] of tran.iterator( - { keyAsBuffer: false, valueAsBuffer: false }, - ['a'], - )) { + for await (const [kP, v] of tran.iterator(['a'], { + keyAsBuffer: false, + valueAsBuffer: false, + })) { results.push([kP[0] as string, v]); } }); @@ -277,7 +301,7 @@ describe(DBTransaction.name, () => { }); test('iterator with multiple entombed keys', async () => { /* - | KEYS | DB | SNAPSHOT | RESULT | + | KEYS | DB | TRAN | RESULT | |------|-------|----------|--------| | a | a = a | X | | | b | b = b | | b = b | @@ -308,7 +332,7 @@ describe(DBTransaction.name, () => { await tran.del('h'); await tran.put('j', '10'); await tran.del('k'); - for await (const [kP, v] of tran.iterator({ + for await (const [kP, v] of tran.iterator([], { keyAsBuffer: false, valueAsBuffer: false, })) { @@ -322,7 +346,7 @@ describe(DBTransaction.name, () => { ['j', '10'], ]); results = []; - for await (const [kP, v] of tran.iterator({ + for await (const [kP, v] of tran.iterator([], { keyAsBuffer: false, valueAsBuffer: false, reverse: true, @@ -340,7 +364,7 @@ describe(DBTransaction.name, () => { }); test('iterator with same largest key', async () => { /* - | KEYS | DB | SNAPSHOT | RESULT | + | KEYS | DB | TRAN | RESULT | |------|-------|----------|--------| | a | a = a | a = 1 | a = 1 | | b | b = b | | b = b | @@ -368,7 +392,7 @@ describe(DBTransaction.name, () => { await tran.put('f', '6'); await tran.put('j', '10'); await tran.put('k', '11'); - for await (const [kP, v] of tran.iterator({ + for await (const [kP, v] of tran.iterator([], { keyAsBuffer: false, valueAsBuffer: false, })) { @@ -387,9 +411,9 @@ describe(DBTransaction.name, () => { ['k', '11'], ]); }); - test('iterator with same largest key in reverse', async () => { + test('iterator with same largest key reversed', async () => { /* - | KEYS | DB | SNAPSHOT | RESULT | + | KEYS | DB | TRAN | RESULT | |------|-------|----------|--------| | a | a = a | a = 1 | a = 1 | | b | b = b | | b = b | @@ -417,7 +441,7 @@ describe(DBTransaction.name, () => { await tran.put('f', '6'); await tran.put('j', '10'); await tran.put('k', '11'); - for await (const [kP, v] of tran.iterator({ + for await (const [kP, v] of tran.iterator([], { keyAsBuffer: false, valueAsBuffer: false, reverse: true, @@ -439,9 +463,9 @@ describe(DBTransaction.name, () => { ].reverse(), ); }); - test('iterator with snapshot largest key', async () => { + test('iterator with largest key in transaction', async () => { /* - | KEYS | DB | SNAPSHOT | RESULT | + | KEYS | DB | TRAN | RESULT | |------|-------|----------|--------| | a | a = a | a = 1 | a = 1 | | b | b = b | | b = b | @@ -466,7 +490,7 @@ describe(DBTransaction.name, () => { await tran.put('e', '5'); await tran.put('f', '6'); await tran.put('j', '10'); - for await (const [kP, v] of tran.iterator({ + for await (const [kP, v] of tran.iterator([], { keyAsBuffer: false, valueAsBuffer: false, })) { @@ -484,9 +508,9 @@ describe(DBTransaction.name, () => { ['j', '10'], ]); }); - test('iterator with snapshot largest key in reverse', async () => { + test('iterator with largest key in transaction reversed', async () => { /* - | KEYS | DB | SNAPSHOT | RESULT | + | KEYS | DB | TRAN | RESULT | |------|-------|----------|--------| | a | a = a | a = 1 | a = 1 | | b | b = b | | b = b | @@ -511,7 +535,7 @@ describe(DBTransaction.name, () => { await tran.put('e', '5'); await tran.put('f', '6'); await tran.put('j', '10'); - for await (const [kP, v] of tran.iterator({ + for await (const [kP, v] of tran.iterator([], { keyAsBuffer: false, valueAsBuffer: false, reverse: true, @@ -532,9 +556,9 @@ describe(DBTransaction.name, () => { ].reverse(), ); }); - test('iterator with db largest key', async () => { + test('iterator with largest key in db', async () => { /* - | KEYS | DB | SNAPSHOT | RESULT | + | KEYS | DB | TRAN | RESULT | |------|-------|----------|--------| | a | a = a | a = 1 | a = 1 | | b | b = b | | b = b | @@ -556,7 +580,7 @@ describe(DBTransaction.name, () => { await tran.put('c', '3'); await tran.put('e', '5'); await tran.put('f', '6'); - for await (const [kP, v] of tran.iterator({ + for await (const [kP, v] of tran.iterator([], { keyAsBuffer: false, valueAsBuffer: false, })) { @@ -573,7 +597,7 @@ describe(DBTransaction.name, () => { ['h', 'h'], ]); }); - test('iterator with db largest key in reverse', async () => { + test('iterator with largest key in db reversed', async () => { /* | KEYS | DB | SNAPSHOT | RESULT | |------|-------|----------|--------| @@ -597,7 +621,7 @@ describe(DBTransaction.name, () => { await tran.put('c', '3'); await tran.put('e', '5'); await tran.put('f', '6'); - for await (const [kP, v] of tran.iterator({ + for await (const [kP, v] of tran.iterator([], { keyAsBuffer: false, valueAsBuffer: false, reverse: true, @@ -619,7 +643,7 @@ describe(DBTransaction.name, () => { }); test('iterator with undefined values', async () => { /* - | KEYS | DB | SNAPSHOT | RESULT | + | KEYS | DB | TRAN | RESULT | |------|-------|----------|--------| | a | a = a | a = 1 | a = 1 | | b | b = b | | b = b | @@ -647,7 +671,7 @@ describe(DBTransaction.name, () => { await tran.put('f', '6'); await tran.put('j', '10'); await tran.put('k', '11'); - for await (const [kP, v] of tran.iterator({ + for await (const [kP, v] of tran.iterator([], { keyAsBuffer: false, values: false, })) { @@ -668,7 +692,7 @@ describe(DBTransaction.name, () => { }); test('iterator using seek and next', async () => { /* - | KEYS | DB | SNAPSHOT | RESULT | + | KEYS | DB | TRAN | RESULT | |------|-------|----------|--------| | a | a = a | a = 1 | a = 1 | | b | b = b | | b = b | @@ -724,7 +748,7 @@ describe(DBTransaction.name, () => { [Buffer.from('j')], Buffer.from('"10"'), ]); - await iterator.end(); + await iterator.destroy(); }); }); test('iterator with async generator yield', async () => { @@ -733,7 +757,7 @@ describe(DBTransaction.name, () => { const g = db.withTransactionG(async function* ( tran: DBTransaction, ): AsyncGenerator<[string, string]> { - for await (const [kP, v] of tran.iterator({ + for await (const [kP, v] of tran.iterator([], { keyAsBuffer: false, valueAsBuffer: false, })) { @@ -856,15 +880,19 @@ describe(DBTransaction.name, () => { await db.put('1', 'a'); await db.put('2', 'b'); const mockFailure = jest.fn(); + const mockFinally = jest.fn(); + const e = new Error('Oh no!'); await expect( db.withTransactionF(async (tran) => { await tran.put('1', '1'); await tran.put('2', '2'); tran.queueFailure(mockFailure); - throw new Error('Oh no!'); + tran.queueFinally(mockFinally); + throw e; }), - ).rejects.toThrow('Oh no!'); - expect(mockFailure).toBeCalled(); + ).rejects.toThrow(e); + expect(mockFailure).toBeCalledWith(e); + expect(mockFinally).toBeCalledWith(e); expect(await db.get('1')).toBe('a'); expect(await db.get('2')).toBe('b'); }); diff --git a/tests/rocksdb/rocksdbP.test.ts b/tests/rocksdb/rocksdbP.test.ts index 3d2780d0..a82d8af8 100644 --- a/tests/rocksdb/rocksdbP.test.ts +++ b/tests/rocksdb/rocksdbP.test.ts @@ -18,10 +18,12 @@ describe('rocksdbP', () => { test('dbOpen invalid log level option', async () => { const dbPath = `${dataDir}/db`; const db = rocksdbP.dbInit(); - await expect(rocksdbP.dbOpen(db, dbPath, { - // @ts-ignore - infoLogLevel: 'incorrect' - })).rejects.toHaveProperty('code', 'DB_OPEN'); + await expect( + rocksdbP.dbOpen(db, dbPath, { + // @ts-ignore use incorrect value + infoLogLevel: 'incorrect', + }), + ).rejects.toHaveProperty('code', 'DB_OPEN'); }); test('dbClose is idempotent', async () => { const dbPath = `${dataDir}/db`; @@ -58,7 +60,11 @@ describe('rocksdbP', () => { test('dbMultiGet', async () => { await rocksdbP.dbPut(db, 'foo', 'bar', {}); await rocksdbP.dbPut(db, 'bar', 'foo', {}); - expect(await rocksdbP.dbMultiGet(db, ['foo', 'bar', 'abc'], {})).toEqual(['bar', 'foo', undefined]); + expect(await rocksdbP.dbMultiGet(db, ['foo', 'bar', 'abc'], {})).toEqual([ + 'bar', + 'foo', + undefined, + ]); }); test('dbGet and dbMultiget with snapshots', async () => { await rocksdbP.dbPut(db, 'K1', '100', {}); @@ -68,12 +74,17 @@ describe('rocksdbP', () => { await rocksdbP.dbPut(db, 'K2', '200', {}); expect(await rocksdbP.dbGet(db, 'K1', { snapshot: snap })).toBe('100'); expect(await rocksdbP.dbGet(db, 'K2', { snapshot: snap })).toBe('100'); - expect(await rocksdbP.dbMultiGet(db, ['K1', 'K2'], { - snapshot: snap - })).toEqual(['100', '100']); + expect( + await rocksdbP.dbMultiGet(db, ['K1', 'K2'], { + snapshot: snap, + }), + ).toEqual(['100', '100']); expect(await rocksdbP.dbGet(db, 'K1', {})).toBe('200'); expect(await rocksdbP.dbGet(db, 'K2', {})).toBe('200'); - expect(await rocksdbP.dbMultiGet(db, ['K1', 'K2'], {})).toEqual(['200', '200']); + expect(await rocksdbP.dbMultiGet(db, ['K1', 'K2'], {})).toEqual([ + '200', + '200', + ]); await rocksdbP.snapshotRelease(snap); }); describe('iterators', () => { @@ -86,22 +97,32 @@ describe('rocksdbP', () => { await rocksdbP.dbPut(db, 'K1', '100', {}); await rocksdbP.dbPut(db, 'K2', '100', {}); const iter1 = rocksdbP.iteratorInit(db, {}); - expect(await rocksdbP.iteratorNextv(iter1, 2)).toEqual( - [[['K1', '100'], ['K2', '100']], false] - ); + expect(await rocksdbP.iteratorNextv(iter1, 2)).toEqual([ + [ + ['K1', '100'], + ['K2', '100'], + ], + false, + ]); await rocksdbP.iteratorClose(iter1); const iter2 = rocksdbP.iteratorInit(db, {}); - expect(await rocksdbP.iteratorNextv(iter2, 3)).toEqual( - [[['K1', '100'], ['K2', '100']], true] - ); + expect(await rocksdbP.iteratorNextv(iter2, 3)).toEqual([ + [ + ['K1', '100'], + ['K2', '100'], + ], + true, + ]); await rocksdbP.iteratorClose(iter2); const iter3 = rocksdbP.iteratorInit(db, {}); - expect(await rocksdbP.iteratorNextv(iter3, 2)).toEqual( - [[['K1', '100'], ['K2', '100']], false] - ); - expect(await rocksdbP.iteratorNextv(iter3, 1)).toEqual( - [[], true] - ); + expect(await rocksdbP.iteratorNextv(iter3, 2)).toEqual([ + [ + ['K1', '100'], + ['K2', '100'], + ], + false, + ]); + expect(await rocksdbP.iteratorNextv(iter3, 1)).toEqual([[], true]); await rocksdbP.iteratorClose(iter3); }); test('iteratorInit with implicit snapshot', async () => { @@ -110,9 +131,13 @@ describe('rocksdbP', () => { const iter = rocksdbP.iteratorInit(db, {}); await rocksdbP.dbPut(db, 'K1', '200', {}); await rocksdbP.dbPut(db, 'K2', '200', {}); - expect(await rocksdbP.iteratorNextv(iter, 2)).toEqual( - [[['K1', '100'], ['K2', '100']], false] - ); + expect(await rocksdbP.iteratorNextv(iter, 2)).toEqual([ + [ + ['K1', '100'], + ['K2', '100'], + ], + false, + ]); await rocksdbP.iteratorClose(iter); }); test('iteratorInit with explicit snapshot', async () => { @@ -122,11 +147,15 @@ describe('rocksdbP', () => { await rocksdbP.dbPut(db, 'K1', '200', {}); await rocksdbP.dbPut(db, 'K2', '200', {}); const iter = rocksdbP.iteratorInit(db, { - snapshot: snap + snapshot: snap, }); - expect(await rocksdbP.iteratorNextv(iter, 2)).toEqual( - [[['K1', '100'], ['K2', '100']], false] - ); + expect(await rocksdbP.iteratorNextv(iter, 2)).toEqual([ + [ + ['K1', '100'], + ['K2', '100'], + ], + false, + ]); await rocksdbP.iteratorClose(iter); await rocksdbP.snapshotRelease(snap); }); @@ -134,21 +163,29 @@ describe('rocksdbP', () => { await rocksdbP.dbPut(db, 'K1', '100', {}); await rocksdbP.dbPut(db, 'K2', '100', {}); const iter = rocksdbP.iteratorInit(db, {}); - expect(await rocksdbP.iteratorNextv(iter, 1)).toEqual( - [[['K1', '100']], false] - ); + expect(await rocksdbP.iteratorNextv(iter, 1)).toEqual([ + [['K1', '100']], + false, + ]); await rocksdbP.dbPut(db, 'K2', '200', {}); - expect(await rocksdbP.iteratorNextv(iter, 1)).toEqual( - [[['K2', '100']], false] - ); + expect(await rocksdbP.iteratorNextv(iter, 1)).toEqual([ + [['K2', '100']], + false, + ]); await rocksdbP.iteratorClose(iter); }); test('dbClear with implicit snapshot', async () => { await rocksdbP.dbPut(db, 'K1', '100', {}); await rocksdbP.dbPut(db, 'K2', '100', {}); await rocksdbP.dbClear(db, {}); - await expect(rocksdbP.dbGet(db, 'K1', {})).rejects.toHaveProperty('code', 'NOT_FOUND'); - await expect(rocksdbP.dbGet(db, 'K2', {})).rejects.toHaveProperty('code', 'NOT_FOUND'); + await expect(rocksdbP.dbGet(db, 'K1', {})).rejects.toHaveProperty( + 'code', + 'NOT_FOUND', + ); + await expect(rocksdbP.dbGet(db, 'K2', {})).rejects.toHaveProperty( + 'code', + 'NOT_FOUND', + ); }); test('dbClear with explicit snapshot', async () => { await rocksdbP.dbPut(db, 'K1', '100', {}); @@ -159,11 +196,17 @@ describe('rocksdbP', () => { await rocksdbP.dbPut(db, 'K3', '200', {}); await rocksdbP.dbPut(db, 'K4', '200', {}); await rocksdbP.dbClear(db, { - snapshot: snap + snapshot: snap, }); await rocksdbP.snapshotRelease(snap); - await expect(rocksdbP.dbGet(db, 'K1', {})).rejects.toHaveProperty('code', 'NOT_FOUND'); - await expect(rocksdbP.dbGet(db, 'K2', {})).rejects.toHaveProperty('code', 'NOT_FOUND'); + await expect(rocksdbP.dbGet(db, 'K1', {})).rejects.toHaveProperty( + 'code', + 'NOT_FOUND', + ); + await expect(rocksdbP.dbGet(db, 'K2', {})).rejects.toHaveProperty( + 'code', + 'NOT_FOUND', + ); expect(await rocksdbP.dbGet(db, 'K3', {})).toBe('200'); expect(await rocksdbP.dbGet(db, 'K4', {})).toBe('200'); }); @@ -176,8 +219,12 @@ describe('rocksdbP', () => { }); test('transactionRollback is idempotent', async () => { const tran = rocksdbP.transactionInit(db, {}); - await expect(rocksdbP.transactionRollback(tran)).resolves.toBeUndefined(); - await expect(rocksdbP.transactionRollback(tran)).resolves.toBeUndefined(); + await expect( + rocksdbP.transactionRollback(tran), + ).resolves.toBeUndefined(); + await expect( + rocksdbP.transactionRollback(tran), + ).resolves.toBeUndefined(); }); test('transactionGet, transactionPut, transactionDel', async () => { const tran = rocksdbP.transactionInit(db, {}); @@ -187,7 +234,10 @@ describe('rocksdbP', () => { await rocksdbP.transactionDel(tran, 'bar'); await rocksdbP.transactionCommit(tran); expect(await rocksdbP.dbGet(db, 'foo', {})).toBe('bar'); - await expect(rocksdbP.dbGet(db, 'bar', {})).rejects.toHaveProperty('code', 'NOT_FOUND'); + await expect(rocksdbP.dbGet(db, 'bar', {})).rejects.toHaveProperty( + 'code', + 'NOT_FOUND', + ); }); test('transactionGetForUpdate addresses write skew by promoting gets into same-value puts', async () => { // Snapshot isolation allows write skew anomalies to occur @@ -201,8 +251,12 @@ describe('rocksdbP', () => { await rocksdbP.dbPut(db, 'balance2', '100', {}); const t1 = async () => { const tran1 = rocksdbP.transactionInit(db, {}); - let balance1 = parseInt(await rocksdbP.transactionGetForUpdate(tran1, 'balance1', {})); - const balance2 = parseInt(await rocksdbP.transactionGetForUpdate(tran1, 'balance2', {})); + let balance1 = parseInt( + await rocksdbP.transactionGetForUpdate(tran1, 'balance1', {}), + ); + const balance2 = parseInt( + await rocksdbP.transactionGetForUpdate(tran1, 'balance2', {}), + ); balance1 -= 100; expect(balance1 + balance2).toBeGreaterThanOrEqual(0); await rocksdbP.transactionPut(tran1, 'balance1', balance1.toString()); @@ -210,8 +264,12 @@ describe('rocksdbP', () => { }; const t2 = async () => { const tran2 = rocksdbP.transactionInit(db, {}); - const balance1 = parseInt(await rocksdbP.transactionGetForUpdate(tran2, 'balance1', {})); - let balance2 = parseInt(await rocksdbP.transactionGetForUpdate(tran2, 'balance2', {})); + const balance1 = parseInt( + await rocksdbP.transactionGetForUpdate(tran2, 'balance1', {}), + ); + let balance2 = parseInt( + await rocksdbP.transactionGetForUpdate(tran2, 'balance2', {}), + ); balance2 -= 100; expect(balance1 + balance2).toBeGreaterThanOrEqual(0); await rocksdbP.transactionPut(tran2, 'balance2', balance2.toString()); @@ -221,10 +279,17 @@ describe('rocksdbP', () => { // this causes a write-write conflict const results = await Promise.allSettled([t1(), t2()]); // One will succeed, one will fail - expect(results.some((result) => result.status === 'fulfilled')).toBe(true); - expect(results.some((result) => { - return result.status === 'rejected' && result.reason.code === 'TRANSACTION_CONFLICT'; - })).toBe(true); + expect(results.some((result) => result.status === 'fulfilled')).toBe( + true, + ); + expect( + results.some((result) => { + return ( + result.status === 'rejected' && + result.reason.code === 'TRANSACTION_CONFLICT' + ); + }), + ).toBe(true); }); test('transactionMultiGetForUpdate addresses write skew by promoting gets into same-value puts', async () => { // Snapshot isolation allows write skew anomalies to occur @@ -238,8 +303,24 @@ describe('rocksdbP', () => { await rocksdbP.dbPut(db, 'balance2', '100', {}); const t1 = async () => { const tran1 = rocksdbP.transactionInit(db, {}); - let balance1 = parseInt((await rocksdbP.transactionMultiGetForUpdate(tran1, ['balance1'], {}))[0]); - const balance2 = parseInt((await rocksdbP.transactionMultiGetForUpdate(tran1, ['balance2'], {}))[0]); + let balance1 = parseInt( + ( + await rocksdbP.transactionMultiGetForUpdate( + tran1, + ['balance1'], + {}, + ) + )[0], + ); + const balance2 = parseInt( + ( + await rocksdbP.transactionMultiGetForUpdate( + tran1, + ['balance2'], + {}, + ) + )[0], + ); balance1 -= 100; expect(balance1 + balance2).toBeGreaterThanOrEqual(0); await rocksdbP.transactionPut(tran1, 'balance1', balance1.toString()); @@ -247,8 +328,24 @@ describe('rocksdbP', () => { }; const t2 = async () => { const tran2 = rocksdbP.transactionInit(db, {}); - const balance1 = parseInt((await rocksdbP.transactionMultiGetForUpdate(tran2, ['balance1'], {}))[0]); - let balance2 = parseInt((await rocksdbP.transactionMultiGetForUpdate(tran2, ['balance2'], {}))[0]); + const balance1 = parseInt( + ( + await rocksdbP.transactionMultiGetForUpdate( + tran2, + ['balance1'], + {}, + ) + )[0], + ); + let balance2 = parseInt( + ( + await rocksdbP.transactionMultiGetForUpdate( + tran2, + ['balance2'], + {}, + ) + )[0], + ); balance2 -= 100; expect(balance1 + balance2).toBeGreaterThanOrEqual(0); await rocksdbP.transactionPut(tran2, 'balance2', balance2.toString()); @@ -258,10 +355,17 @@ describe('rocksdbP', () => { // this causes a write-write conflict const results = await Promise.allSettled([t1(), t2()]); // One will succeed, one will fail - expect(results.some((result) => result.status === 'fulfilled')).toBe(true); - expect(results.some((result) => { - return result.status === 'rejected' && result.reason.code === 'TRANSACTION_CONFLICT'; - })).toBe(true); + expect(results.some((result) => result.status === 'fulfilled')).toBe( + true, + ); + expect( + results.some((result) => { + return ( + result.status === 'rejected' && + result.reason.code === 'TRANSACTION_CONFLICT' + ); + }), + ).toBe(true); }); test('transactionIteratorInit iterates over overlay defaults to underlay', async () => { await rocksdbP.dbPut(db, 'K1', '100', {}); @@ -272,13 +376,14 @@ describe('rocksdbP', () => { await rocksdbP.transactionDel(tran, 'K3'); await rocksdbP.transactionPut(tran, 'K4', '200'); const iter = rocksdbP.transactionIteratorInit(tran, {}); - expect(await rocksdbP.iteratorNextv(iter, 3)).toEqual( - [[ + expect(await rocksdbP.iteratorNextv(iter, 3)).toEqual([ + [ ['K1', '100'], ['K2', '200'], ['K4', '200'], - ], false] - ); + ], + false, + ]); await rocksdbP.iteratorClose(iter); await rocksdbP.transactionRollback(tran); }); @@ -300,7 +405,10 @@ describe('rocksdbP', () => { // So T2 commits, but T1 will have conflict exception // And therefore the `exclusive` option is not relevant // to optimistic transactions - await expect(rocksdbP.transactionCommit(tran1)).rejects.toHaveProperty('code', 'TRANSACTION_CONFLICT'); + await expect(rocksdbP.transactionCommit(tran1)).rejects.toHaveProperty( + 'code', + 'TRANSACTION_CONFLICT', + ); }); test('transactionMultiGetForUpdate does not block transactions', async () => { await rocksdbP.dbPut(db, 'K1', '100', {}); @@ -320,7 +428,10 @@ describe('rocksdbP', () => { // So T2 commits, but T1 will have conflict exception // And therefore the `exclusive` option is not relevant // to optimistic transactions - await expect(rocksdbP.transactionCommit(tran1)).rejects.toHaveProperty('code', 'TRANSACTION_CONFLICT'); + await expect(rocksdbP.transactionCommit(tran1)).rejects.toHaveProperty( + 'code', + 'TRANSACTION_CONFLICT', + ); }); describe('transaction without snapshot', () => { test('no conflict when db write occurs before transaction write', async () => { @@ -336,7 +447,10 @@ describe('rocksdbP', () => { const tran = rocksdbP.transactionInit(db, {}); await rocksdbP.transactionPut(tran, 'K1', '200'); await rocksdbP.dbPut(db, 'K1', '100', {}); - await expect(rocksdbP.transactionCommit(tran)).rejects.toHaveProperty('code', 'TRANSACTION_CONFLICT'); + await expect(rocksdbP.transactionCommit(tran)).rejects.toHaveProperty( + 'code', + 'TRANSACTION_CONFLICT', + ); }); test('transactionGet non-repeatable reads', async () => { await rocksdbP.dbPut(db, 'K1', '100', {}); @@ -349,10 +463,17 @@ describe('rocksdbP', () => { test('transactionGetForUpdate non-repeatable reads', async () => { await rocksdbP.dbPut(db, 'K1', '100', {}); const tran = rocksdbP.transactionInit(db, {}); - expect(await rocksdbP.transactionGetForUpdate(tran, 'K1', {})).toBe('100'); + expect(await rocksdbP.transactionGetForUpdate(tran, 'K1', {})).toBe( + '100', + ); await rocksdbP.dbPut(db, 'K1', '200', {}); - expect(await rocksdbP.transactionGetForUpdate(tran, 'K1', {})).toBe('200'); - expect(rocksdbP.transactionCommit(tran)).rejects.toHaveProperty('code', 'TRANSACTION_CONFLICT'); + expect(await rocksdbP.transactionGetForUpdate(tran, 'K1', {})).toBe( + '200', + ); + await expect(rocksdbP.transactionCommit(tran)).rejects.toHaveProperty( + 'code', + 'TRANSACTION_CONFLICT', + ); }); test('iterator non-repeatable reads', async () => { await rocksdbP.dbPut(db, 'K1', '100', {}); @@ -361,22 +482,24 @@ describe('rocksdbP', () => { await rocksdbP.dbPut(db, 'K1', '200', {}); await rocksdbP.dbPut(db, 'K2', '200', {}); const iter1 = rocksdbP.transactionIteratorInit(tran, {}); - expect(await rocksdbP.iteratorNextv(iter1, 2)).toEqual( - [[ + expect(await rocksdbP.iteratorNextv(iter1, 2)).toEqual([ + [ ['K1', '200'], ['K2', '200'], - ], false] - ); + ], + false, + ]); await rocksdbP.iteratorClose(iter1); await rocksdbP.dbPut(db, 'K1', '300', {}); await rocksdbP.dbPut(db, 'K2', '300', {}); const iter2 = rocksdbP.transactionIteratorInit(tran, {}); - expect(await rocksdbP.iteratorNextv(iter2, 2)).toEqual( - [[ + expect(await rocksdbP.iteratorNextv(iter2, 2)).toEqual([ + [ ['K1', '300'], ['K2', '300'], - ], false] - ); + ], + false, + ]); await rocksdbP.iteratorClose(iter2); await rocksdbP.transactionRollback(tran); }); @@ -390,10 +513,22 @@ describe('rocksdbP', () => { // This will delete K1, K2, K3, K4 await rocksdbP.transactionClear(tran, {}); await rocksdbP.transactionCommit(tran); - await expect(rocksdbP.dbGet(db, 'K1', {})).rejects.toHaveProperty('code', 'NOT_FOUND'); - await expect(rocksdbP.dbGet(db, 'K2', {})).rejects.toHaveProperty('code', 'NOT_FOUND'); - await expect(rocksdbP.dbGet(db, 'K3', {})).rejects.toHaveProperty('code', 'NOT_FOUND'); - await expect(rocksdbP.dbGet(db, 'K4', {})).rejects.toHaveProperty('code', 'NOT_FOUND'); + await expect(rocksdbP.dbGet(db, 'K1', {})).rejects.toHaveProperty( + 'code', + 'NOT_FOUND', + ); + await expect(rocksdbP.dbGet(db, 'K2', {})).rejects.toHaveProperty( + 'code', + 'NOT_FOUND', + ); + await expect(rocksdbP.dbGet(db, 'K3', {})).rejects.toHaveProperty( + 'code', + 'NOT_FOUND', + ); + await expect(rocksdbP.dbGet(db, 'K4', {})).rejects.toHaveProperty( + 'code', + 'NOT_FOUND', + ); }); test('transactionMultiGet with non-repeatable read', async () => { await rocksdbP.dbPut(db, 'K1', '100', {}); @@ -402,11 +537,13 @@ describe('rocksdbP', () => { await rocksdbP.transactionPut(tran, 'K2', '200'); await rocksdbP.transactionPut(tran, 'K3', '200'); await rocksdbP.dbPut(db, 'K4', '200', {}); - expect(await rocksdbP.transactionMultiGet( - tran, ['K1', 'K2', 'K3', 'K4', 'K5'], {} - )).toEqual( - ['100', '200', '200', '200', undefined] - ); + expect( + await rocksdbP.transactionMultiGet( + tran, + ['K1', 'K2', 'K3', 'K4', 'K5'], + {}, + ), + ).toEqual(['100', '200', '200', '200', undefined]); await rocksdbP.transactionCommit(tran); }); test('transactionMultiGetForUpdate with non-repeatable read', async () => { @@ -416,11 +553,13 @@ describe('rocksdbP', () => { await rocksdbP.transactionPut(tran, 'K2', '200'); await rocksdbP.transactionPut(tran, 'K3', '200'); await rocksdbP.dbPut(db, 'K4', '200', {}); - expect(await rocksdbP.transactionMultiGetForUpdate( - tran, ['K1', 'K2', 'K3', 'K4', 'K5'], {} - )).toEqual( - ['100', '200', '200', '200', undefined] - ); + expect( + await rocksdbP.transactionMultiGetForUpdate( + tran, + ['K1', 'K2', 'K3', 'K4', 'K5'], + {}, + ), + ).toEqual(['100', '200', '200', '200', undefined]); // No conflict because K4 write was done prior to `transactionMultiGetForUpdate` await rocksdbP.transactionCommit(tran); }); @@ -432,37 +571,59 @@ describe('rocksdbP', () => { // Conflict because snapshot was set at the beginning of the transaction await rocksdbP.dbPut(db, 'K1', '100', {}); await rocksdbP.transactionPut(tran, 'K1', '200'); - await expect(rocksdbP.transactionCommit(tran)).rejects.toHaveProperty('code', 'TRANSACTION_CONFLICT'); + await expect(rocksdbP.transactionCommit(tran)).rejects.toHaveProperty( + 'code', + 'TRANSACTION_CONFLICT', + ); }); test('transactionGet repeatable reads', async () => { await rocksdbP.dbPut(db, 'K1', '100', {}); const tran = rocksdbP.transactionInit(db, {}); const tranSnap = rocksdbP.transactionSnapshot(tran); - expect(await rocksdbP.transactionGet(tran, 'K1', { snapshot: tranSnap })).toBe('100'); + expect( + await rocksdbP.transactionGet(tran, 'K1', { snapshot: tranSnap }), + ).toBe('100'); await rocksdbP.dbPut(db, 'K1', '200', {}); - expect(await rocksdbP.transactionGet(tran, 'K1', { snapshot: tranSnap })).toBe('100'); + expect( + await rocksdbP.transactionGet(tran, 'K1', { snapshot: tranSnap }), + ).toBe('100'); await rocksdbP.transactionRollback(tran); }); test('transactionGet repeatable reads use write overlay', async () => { await rocksdbP.dbPut(db, 'K1', '100', {}); const tran = rocksdbP.transactionInit(db, {}); const tranSnap = rocksdbP.transactionSnapshot(tran); - expect(await rocksdbP.transactionGet(tran, 'K1', { snapshot: tranSnap })).toBe('100'); + expect( + await rocksdbP.transactionGet(tran, 'K1', { snapshot: tranSnap }), + ).toBe('100'); await rocksdbP.transactionPut(tran, 'K1', '300'); await rocksdbP.dbPut(db, 'K1', '200', {}); // Here even though we're using the snapshot, because the transaction has 300 written // it ends up using 300, but it ignores the 200 that's written directly to the DB - expect(await rocksdbP.transactionGet(tran, 'K1', { snapshot: tranSnap })).toBe('300'); + expect( + await rocksdbP.transactionGet(tran, 'K1', { snapshot: tranSnap }), + ).toBe('300'); await rocksdbP.transactionRollback(tran); }); test('transactionGetForUpdate repeatable reads', async () => { await rocksdbP.dbPut(db, 'K1', '100', {}); const tran = rocksdbP.transactionInit(db, {}); const tranSnap = rocksdbP.transactionSnapshot(tran); - expect(await rocksdbP.transactionGetForUpdate(tran, 'K1', { snapshot: tranSnap })).toBe('100'); + expect( + await rocksdbP.transactionGetForUpdate(tran, 'K1', { + snapshot: tranSnap, + }), + ).toBe('100'); await rocksdbP.dbPut(db, 'K1', '200', {}); - expect(await rocksdbP.transactionGetForUpdate(tran, 'K1', { snapshot: tranSnap })).toBe('100'); - expect(rocksdbP.transactionCommit(tran)).rejects.toHaveProperty('code', 'TRANSACTION_CONFLICT'); + expect( + await rocksdbP.transactionGetForUpdate(tran, 'K1', { + snapshot: tranSnap, + }), + ).toBe('100'); + await expect(rocksdbP.transactionCommit(tran)).rejects.toHaveProperty( + 'code', + 'TRANSACTION_CONFLICT', + ); }); test('iterator repeatable reads', async () => { await rocksdbP.dbPut(db, 'K1', '100', {}); @@ -471,33 +632,35 @@ describe('rocksdbP', () => { await rocksdbP.transactionPut(tran, 'K3', '100'); const tranSnap1 = rocksdbP.transactionSnapshot(tran); const iter1 = rocksdbP.transactionIteratorInit(tran, { - snapshot: tranSnap1 + snapshot: tranSnap1, }); - expect(await rocksdbP.iteratorNextv(iter1, 3)).toEqual( - [[ + expect(await rocksdbP.iteratorNextv(iter1, 3)).toEqual([ + [ ['K1', '100'], ['K2', '100'], ['K3', '100'], - ], false] - ); + ], + false, + ]); await rocksdbP.iteratorClose(iter1); await rocksdbP.transactionPut(tran, 'K2', '200'); await rocksdbP.transactionPut(tran, 'K3', '200'); await rocksdbP.dbPut(db, 'K1', '200', {}); const iter2 = rocksdbP.transactionIteratorInit(tran, { - snapshot: tranSnap1 + snapshot: tranSnap1, }); // Notice that this iteration uses the new values written // to in this transaction, this mean the snapshot only applies // to the underlying database, it's not a snapshot on the transaction // writes - expect(await rocksdbP.iteratorNextv(iter2, 3)).toEqual( - [[ + expect(await rocksdbP.iteratorNextv(iter2, 3)).toEqual([ + [ ['K1', '100'], ['K2', '200'], ['K3', '200'], - ], false] - ); + ], + false, + ]); await rocksdbP.iteratorClose(iter2); // Resetting the snapshot for the transaction // Now the snapshot takes the current state of the DB, @@ -505,15 +668,16 @@ describe('rocksdbP', () => { const tranSnap2 = rocksdbP.transactionSnapshot(tran); await rocksdbP.dbPut(db, 'K2', '300', {}); const iter3 = rocksdbP.transactionIteratorInit(tran, { - snapshot: tranSnap2 + snapshot: tranSnap2, }); - expect(await rocksdbP.iteratorNextv(iter3, 3)).toEqual( - [[ + expect(await rocksdbP.iteratorNextv(iter3, 3)).toEqual([ + [ ['K1', '200'], ['K2', '200'], ['K3', '200'], - ], false] - ); + ], + false, + ]); await rocksdbP.iteratorClose(iter3); // Therefore iterators should always use the snapshot taken // at the beginning of the transaction @@ -530,9 +694,18 @@ describe('rocksdbP', () => { // This will delete K1, K2, K3 await rocksdbP.transactionClear(tran, { snapshot: tranSnap }); await rocksdbP.transactionCommit(tran); - await expect(rocksdbP.dbGet(db, 'K1', {})).rejects.toHaveProperty('code', 'NOT_FOUND'); - await expect(rocksdbP.dbGet(db, 'K2', {})).rejects.toHaveProperty('code', 'NOT_FOUND'); - await expect(rocksdbP.dbGet(db, 'K3', {})).rejects.toHaveProperty('code', 'NOT_FOUND'); + await expect(rocksdbP.dbGet(db, 'K1', {})).rejects.toHaveProperty( + 'code', + 'NOT_FOUND', + ); + await expect(rocksdbP.dbGet(db, 'K2', {})).rejects.toHaveProperty( + 'code', + 'NOT_FOUND', + ); + await expect(rocksdbP.dbGet(db, 'K3', {})).rejects.toHaveProperty( + 'code', + 'NOT_FOUND', + ); expect(await rocksdbP.dbGet(db, 'K4', {})).toBe('200'); }); test('transactionMultiGet with repeatable read', async () => { @@ -543,13 +716,15 @@ describe('rocksdbP', () => { await rocksdbP.transactionPut(tran, 'K2', '200'); await rocksdbP.transactionPut(tran, 'K3', '200'); await rocksdbP.dbPut(db, 'K4', '200', {}); - expect(await rocksdbP.transactionMultiGet( - tran, ['K1', 'K2', 'K3', 'K4', 'K5'], { - snapshot: tranSnap - } - )).toEqual( - ['100', '200', '200', undefined, undefined] - ); + expect( + await rocksdbP.transactionMultiGet( + tran, + ['K1', 'K2', 'K3', 'K4', 'K5'], + { + snapshot: tranSnap, + }, + ), + ).toEqual(['100', '200', '200', undefined, undefined]); await rocksdbP.transactionCommit(tran); }); test('transactionMultiGetForUpdate with repeatable read', async () => { @@ -560,15 +735,20 @@ describe('rocksdbP', () => { await rocksdbP.transactionPut(tran, 'K2', '200'); await rocksdbP.transactionPut(tran, 'K3', '200'); await rocksdbP.dbPut(db, 'K4', '200', {}); - expect(await rocksdbP.transactionMultiGetForUpdate( - tran, ['K1', 'K2', 'K3', 'K4', 'K5'], { - snapshot: tranSnap - } - )).toEqual( - ['100', '200', '200', undefined, undefined] - ); + expect( + await rocksdbP.transactionMultiGetForUpdate( + tran, + ['K1', 'K2', 'K3', 'K4', 'K5'], + { + snapshot: tranSnap, + }, + ), + ).toEqual(['100', '200', '200', undefined, undefined]); // Conflict because of K4 write was done after snapshot - await expect(rocksdbP.transactionCommit(tran)).rejects.toHaveProperty('code', 'TRANSACTION_CONFLICT'); + await expect(rocksdbP.transactionCommit(tran)).rejects.toHaveProperty( + 'code', + 'TRANSACTION_CONFLICT', + ); }); }); });