Skip to content

Commit

Permalink
Adding transactionClear
Browse files Browse the repository at this point in the history
  • Loading branch information
CMCDragonkai committed Jun 19, 2022
1 parent 434ef4a commit 23d3405
Show file tree
Hide file tree
Showing 9 changed files with 183 additions and 69 deletions.
25 changes: 24 additions & 1 deletion src/rocksdb/napi/index.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,8 +469,10 @@ NAPI_METHOD(dbClear) {
std::string* lte = RangeOption(env, options, "lte");
std::string* gt = RangeOption(env, options, "gt");
std::string* gte = RangeOption(env, options, "gte");
const Snapshot* snapshot = SnapshotProperty(env, options, "snapshot");
const bool sync = BooleanProperty(env, options, "sync", false);
ClearWorker* worker = new ClearWorker(env, database, callback, reverse, limit,
lt, lte, gt, gte);
lt, lte, gt, gte, sync, snapshot);
worker->Queue(env);
NAPI_RETURN_UNDEFINED();
}
Expand Down Expand Up @@ -959,6 +961,26 @@ NAPI_METHOD(transactionIteratorInit) {
return iterator_ref;
}

NAPI_METHOD(transactionClear) {
NAPI_ARGV(3);
NAPI_TRANSACTION_CONTEXT();
ASSERT_TRANSACTION_READY(env, transaction);
napi_value options = argv[1];
napi_value callback = argv[2];
const bool reverse = BooleanProperty(env, options, "reverse", false);
const int limit = Int32Property(env, options, "limit", -1);
std::string* lt = RangeOption(env, options, "lt");
std::string* lte = RangeOption(env, options, "lte");
std::string* gt = RangeOption(env, options, "gt");
std::string* gte = RangeOption(env, options, "gte");
const TransactionSnapshot* snapshot =
TransactionSnapshotProperty(env, options, "snapshot");
ClearWorker* worker = new ClearWorker(env, transaction, callback, reverse,
limit, lt, lte, gt, gte, snapshot);
worker->Queue(env);
NAPI_RETURN_UNDEFINED();
}

/**
* All exported functions.
*/
Expand Down Expand Up @@ -1005,4 +1027,5 @@ NAPI_INIT() {
NAPI_EXPORT_FUNCTION(transactionDel);
NAPI_EXPORT_FUNCTION(transactionSnapshot);
NAPI_EXPORT_FUNCTION(transactionIteratorInit);
NAPI_EXPORT_FUNCTION(transactionClear);
}
48 changes: 1 addition & 47 deletions src/rocksdb/napi/workers/database_workers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

#include "../worker.h"
#include "../database.h"
#include "../iterator.h"
#include "../snapshot.h"
#include "../utils.h"

OpenWorker::OpenWorker(napi_env env, Database* database, napi_value callback,
Expand Down Expand Up @@ -186,52 +186,6 @@ DelWorker::~DelWorker() { DisposeSliceBuffer(key_); }

void DelWorker::DoExecute() { SetStatus(database_->Del(options_, key_)); }

ClearWorker::ClearWorker(napi_env env, Database* database, napi_value callback,
const bool reverse, const int limit, std::string* lt,
std::string* lte, std::string* gt, std::string* gte)
: PriorityWorker(env, database, callback, "rocksdb.db.clear") {
iterator_ =
new BaseIterator(database, reverse, lt, lte, gt, gte, limit, false);
writeOptions_ = new rocksdb::WriteOptions();
writeOptions_->sync = false;
}

ClearWorker::~ClearWorker() {
delete iterator_;
delete writeOptions_;
}

void ClearWorker::DoExecute() {
iterator_->SeekToRange();

// TODO: add option
uint32_t hwm = 16 * 1024;
rocksdb::WriteBatch batch;

while (true) {
size_t bytesRead = 0;

while (bytesRead <= hwm && iterator_->Valid() && iterator_->Increment()) {
rocksdb::Slice key = iterator_->CurrentKey();
batch.Delete(key);
bytesRead += key.size();
iterator_->Next();
}

if (!SetStatus(iterator_->Status()) || bytesRead == 0) {
break;
}

if (!SetStatus(database_->WriteBatch(*writeOptions_, &batch))) {
break;
}

batch.Clear();
}

iterator_->Close();
}

ApproximateSizeWorker::ApproximateSizeWorker(napi_env env, Database* database,
napi_value callback,
rocksdb::Slice start,
Expand Down
18 changes: 0 additions & 18 deletions src/rocksdb/napi/workers/database_workers.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include <rocksdb/slice.h>

#include "../worker.h"
#include "../iterator.h"
#include "../database.h"
#include "../snapshot.h"

Expand Down Expand Up @@ -125,23 +124,6 @@ struct DelWorker final : public PriorityWorker {
rocksdb::Slice key_;
};

/**
* Worker class for deleting a range from a database.
*/
struct ClearWorker final : public PriorityWorker {
ClearWorker(napi_env env, Database* database, napi_value callback,
const bool reverse, const int limit, std::string* lt,
std::string* lte, std::string* gt, std::string* gte);

~ClearWorker();

void DoExecute() override;

private:
BaseIterator* iterator_;
rocksdb::WriteOptions* writeOptions_;
};

/**
* Worker class for calculating the size of a range.
*/
Expand Down
69 changes: 69 additions & 0 deletions src/rocksdb/napi/workers/iterator_workers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

#include <cstddef>
#include <cstdint>
#include <cassert>

#include <node/node_api.h>

Expand Down Expand Up @@ -79,3 +80,71 @@ void NextWorker::DoFinally(napi_env env) {

BaseWorker::DoFinally(env);
}

ClearWorker::ClearWorker(napi_env env, Database* database, napi_value callback,
const bool reverse, const int limit, std::string* lt,
std::string* lte, std::string* gt, std::string* gte,
const bool sync, const Snapshot* snapshot)
: PriorityWorker(env, database, callback, "rocksdb.db.clear") {
iterator_ = new BaseIterator(database, reverse, lt, lte, gt, gte, limit,
false, snapshot);
writeOptions_ = new rocksdb::WriteOptions();
writeOptions_->sync = sync;
}

ClearWorker::ClearWorker(napi_env env, Transaction* transaction,
napi_value callback, const bool reverse,
const int limit, std::string* lt, std::string* lte,
std::string* gt, std::string* gte,
const TransactionSnapshot* snapshot)
: PriorityWorker(env, transaction, callback, "rocksdb.db.clear") {
iterator_ = new BaseIterator(transaction, reverse, lt, lte, gt, gte, limit,
false, snapshot);
writeOptions_ = nullptr;
}

ClearWorker::~ClearWorker() {
delete iterator_;
delete writeOptions_;
}

void ClearWorker::DoExecute() {
assert(database_ != nullptr || transaction_ != nullptr);
iterator_->SeekToRange();
uint32_t hwm = 16 * 1024;
if (database_ != nullptr) {
rocksdb::WriteBatch batch;
while (true) {
size_t bytesRead = 0;
while (bytesRead <= hwm && iterator_->Valid() && iterator_->Increment()) {
rocksdb::Slice key = iterator_->CurrentKey();
// If this fails, we return
if (!SetStatus(batch.Delete(key))) return;
bytesRead += key.size();
iterator_->Next();
}
if (!SetStatus(iterator_->Status()) || bytesRead == 0) {
break;
}
if (!SetStatus(database_->WriteBatch(*writeOptions_, &batch))) {
break;
}
batch.Clear();
}
} else if (transaction_ != nullptr) {
while (true) {
size_t bytesRead = 0;
while (bytesRead <= hwm && iterator_->Valid() && iterator_->Increment()) {
rocksdb::Slice key = iterator_->CurrentKey();
// If this fails, we return
if (!SetStatus(transaction_->Del(key))) return;
bytesRead += key.size();
iterator_->Next();
}
if (!SetStatus(iterator_->Status()) || bytesRead == 0) {
break;
}
}
}
iterator_->Close();
}
28 changes: 28 additions & 0 deletions src/rocksdb/napi/workers/iterator_workers.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,16 @@
#endif

#include <cstdint>
#include <string>

#include <node/node_api.h>
#include <rocksdb/options.h>

#include "../worker.h"
#include "../database.h"
#include "../iterator.h"
#include "../transaction.h"
#include "../snapshot.h"

/**
* Worker class for closing an iterator
Expand Down Expand Up @@ -47,3 +52,26 @@ struct NextWorker final : public BaseWorker {
uint32_t size_;
bool ok_;
};

/**
* Worker class for deleting a range from a database.
*/
struct ClearWorker final : public PriorityWorker {
ClearWorker(napi_env env, Database* database, napi_value callback,
const bool reverse, const int limit, std::string* lt,
std::string* lte, std::string* gt, std::string* gte,
const bool sync, const Snapshot* snapshot = nullptr);

ClearWorker(napi_env env, Transaction* transaction, napi_value callback,
const bool reverse, const int limit, std::string* lt,
std::string* lte, std::string* gt, std::string* gte,
const TransactionSnapshot* snapshot = nullptr);

~ClearWorker();

void DoExecute() override;

private:
BaseIterator* iterator_;
rocksdb::WriteOptions* writeOptions_;
};
7 changes: 6 additions & 1 deletion src/rocksdb/rocksdb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,14 @@ interface RocksDB {
options: RocksDBIteratorOptions<RocksDBTransactionSnapshot> & { valueEncoding: 'buffer' },
): RocksDBIterator<string, Buffer>;
transactionIteratorInit(
database: RocksDBDatabase,
transaction: RocksDBTransaction,
options: RocksDBIteratorOptions<RocksDBTransactionSnapshot>,
): RocksDBIterator<string, string>;
transactionClear(
transaction: RocksDBTransaction,
options: RocksDBClearOptions<RocksDBTransactionSnapshot>,
callback: Callback<[], void>,
): void;
}

const rocksdb: RocksDB = nodeGypBuild(path.join(__dirname, '../../'));
Expand Down
5 changes: 5 additions & 0 deletions src/rocksdb/rocksdbP.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ interface RocksDBP {
database: RocksDBTransaction,
options: RocksDBIteratorOptions<RocksDBTransactionSnapshot>,
): RocksDBIterator<string, string>;
transactionClear(
transaction: RocksDBTransaction,
options: RocksDBClearOptions<RocksDBTransactionSnapshot>,
): Promise<void>;
}

/**
Expand Down Expand Up @@ -217,6 +221,7 @@ const rocksdbP: RocksDBP = {
transactionDel: utils.promisify(rocksdb.transactionDel).bind(rocksdb),
transactionSnapshot: rocksdb.transactionSnapshot.bind(rocksdb),
transactionIteratorInit: rocksdb.transactionIteratorInit.bind(rocksdb),
transactionClear: rocksdb.transactionClear.bind(rocksdb),
};

export default rocksdbP;
11 changes: 9 additions & 2 deletions src/rocksdb/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,14 +132,21 @@ type RocksDBRangeOptions = {
* Note that `undefined` is not a valid value for these options
* If properties exist, they must have the correct type
*/
type RocksDBClearOptions = RocksDBRangeOptions;
type RocksDBClearOptions
<S extends RocksDBSnapshot | RocksDBTransactionSnapshot = RocksDBSnapshot>
= RocksDBRangeOptions & {
snapshot?: S;
sync?: S extends RocksDBSnapshot ? boolean : void; // Default false
};

/**
* Iterator options
* Note that `undefined` is not a valid value for these options
* If properties exist, they must have the correct type
*/
type RocksDBIteratorOptions<S extends RocksDBSnapshot | RocksDBTransactionSnapshot = RocksDBSnapshot> = RocksDBGetOptions<S> &
type RocksDBIteratorOptions
<S extends RocksDBSnapshot | RocksDBTransactionSnapshot = RocksDBSnapshot>
= RocksDBGetOptions<S> &
RocksDBRangeOptions & {
keys?: boolean;
values?: boolean;
Expand Down
41 changes: 41 additions & 0 deletions tests/rocksdb/rocksdbP.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,30 @@ describe('rocksdbP', () => {
);
await rocksdbP.iteratorClose(iter);
});
test('dbClear with implicit snapshot', async () => {
await rocksdbP.dbPut(db, 'K1', '100', {});
await rocksdbP.dbPut(db, 'K2', '100', {});
await rocksdbP.dbClear(db, {});
await expect(rocksdbP.dbGet(db, 'K1', {})).rejects.toHaveProperty('code', 'NOT_FOUND');
await expect(rocksdbP.dbGet(db, 'K2', {})).rejects.toHaveProperty('code', 'NOT_FOUND');
});
test('dbClear with explicit snapshot', async () => {
await rocksdbP.dbPut(db, 'K1', '100', {});
await rocksdbP.dbPut(db, 'K2', '100', {});
const snap = rocksdbP.snapshotInit(db);
await rocksdbP.dbPut(db, 'K1', '200', {});
await rocksdbP.dbPut(db, 'K2', '200', {});
await rocksdbP.dbPut(db, 'K3', '200', {});
await rocksdbP.dbPut(db, 'K4', '200', {});
await rocksdbP.dbClear(db, {
snapshot: snap
});
await rocksdbP.snapshotRelease(snap);
await expect(rocksdbP.dbGet(db, 'K1', {})).rejects.toHaveProperty('code', 'NOT_FOUND');
await expect(rocksdbP.dbGet(db, 'K2', {})).rejects.toHaveProperty('code', 'NOT_FOUND');
expect(await rocksdbP.dbGet(db, 'K3', {})).toBe('200');
expect(await rocksdbP.dbGet(db, 'K4', {})).toBe('200');
});
});
describe('transactions', () => {
test('transactionCommit is idempotent', async () => {
Expand Down Expand Up @@ -357,6 +381,23 @@ describe('rocksdbP', () => {
// at the beginning of the transaction
await rocksdbP.transactionRollback(tran);
});
test.only('clear with repeatable read', async () => {
await rocksdbP.dbPut(db, 'K1', '100', {});
await rocksdbP.dbPut(db, 'K2', '100', {});
const tran = rocksdbP.transactionInit(db, {});
const tranSnap = rocksdbP.transactionSnapshot(tran);
await rocksdbP.transactionPut(tran, 'K2', '200');
await rocksdbP.transactionPut(tran, 'K3', '200');
await rocksdbP.dbPut(db, 'K4', '200', {});
console.log('OH NO');
await rocksdbP.transactionClear(tran, { snapshot: tranSnap });
console.log('????');
await rocksdbP.transactionCommit(tran);
await expect(rocksdbP.dbGet(db, 'K1', {})).rejects.toHaveProperty('code', 'NOT_FOUND');
await expect(rocksdbP.dbGet(db, 'K2', {})).rejects.toHaveProperty('code', 'NOT_FOUND');
await expect(rocksdbP.dbGet(db, 'K3', {})).rejects.toHaveProperty('code', 'NOT_FOUND');
expect(await rocksdbP.dbGet(db, 'K4', {})).toBe('200');
});
});
});
});
Expand Down

0 comments on commit 23d3405

Please sign in to comment.