Skip to content

Commit

Permalink
Updated async-mutex and use write-preferring RWLock for Schema
Browse files Browse the repository at this point in the history
  • Loading branch information
CMCDragonkai committed Jan 20, 2022
1 parent e141383 commit 2862e01
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 25 deletions.
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@
"@matrixai/logger": "^2.1.0",
"@matrixai/workers": "^1.2.5",
"ajv": "^7.0.4",
"async-mutex": "^0.2.4",
"async-mutex": "^0.3.2",
"bip39": "^3.0.3",
"canonicalize": "^1.0.5",
"cheerio": "^1.0.0-rc.5",
Expand Down
6 changes: 3 additions & 3 deletions src/schema/Schema.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ class Schema {
}

public async readVersion(): Promise<StateVersion | undefined> {
return await this.lock.read(async () => {
return await this.lock.withRead(async () => {
let stateVersionData: string;
try {
stateVersionData = await this.fs.promises.readFile(
Expand All @@ -169,7 +169,7 @@ class Schema {
}

protected async writeVersion(stateVersion: StateVersion): Promise<void> {
return await this.lock.write(async () => {
return await this.lock.withWrite(async () => {
try {
await this.fs.promises.writeFile(
this.stateVersionPath,
Expand All @@ -191,7 +191,7 @@ class Schema {
* This is only called when the version is older.
*/
protected async upgradeVersion(_stateVersion: StateVersion): Promise<void> {
return await this.lock.write(async () => {
return await this.lock.withWrite(async () => {
// TODO: to be implemented
throw new schemaErrors.ErrorSchemaVersionTooOld();
});
Expand Down
83 changes: 66 additions & 17 deletions src/utils/locks.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,84 @@
import type { MutexInterface } from 'async-mutex';
import { Mutex } from 'async-mutex';

/**
* Single threaded write-preferring read write lock
*/
class RWLock {
protected readerCount: number = 0;
protected lock: Mutex = new Mutex();
protected release: MutexInterface.Releaser;
protected readersLock: Mutex = new Mutex();
protected writersLock: Mutex = new Mutex();
protected readersRelease: MutexInterface.Releaser;
protected readerCountBlocked: number = 0;
protected _readerCount: number = 0;
protected _writerCount: number = 0;

public async read<T>(f: () => Promise<T>): Promise<T> {
let readerCount = ++this.readerCount;
// The first reader locks
if (readerCount === 1) {
this.release = await this.lock.acquire();
}
public get readerCount(): number {
return this._readerCount + this.readerCountBlocked;
}

public get writerCount(): number {
return this._writerCount;
}

public async withRead<T>(f: () => Promise<T>): Promise<T> {
const release = await this.acquireRead();
try {
return await f();
} finally {
readerCount = --this.readerCount;
// The last reader unlocks
if (readerCount === 0) {
this.release();
}
release();
}
}

public async write<T>(f: () => Promise<T>): Promise<T> {
this.release = await this.lock.acquire();
public async withWrite<T>(f: () => Promise<T>): Promise<T> {
const release = await this.acquireWrite();
try {
return await f();
} finally {
this.release();
release();
}
}

public async acquireRead(): Promise<() => void> {
if (this._writerCount > 0) {
++this.readerCountBlocked;
await this.writersLock.waitForUnlock();
--this.readerCountBlocked;
}
const readerCount = ++this._readerCount;
// The first reader locks
if (readerCount === 1) {
this.readersRelease = await this.readersLock.acquire();
}
return () => {
const readerCount = --this._readerCount;
// The last reader unlocks
if (readerCount === 0) {
this.readersRelease();
}
};
}

public async acquireWrite(): Promise<() => void> {
++this._writerCount;
const writersRelease = await this.writersLock.acquire();
this.readersRelease = await this.readersLock.acquire();
return () => {
this.readersRelease();
writersRelease();
--this._writerCount;
};
}

public isLocked(): boolean {
return this.readersLock.isLocked() || this.writersLock.isLocked();
}

public async waitForUnlock(): Promise<void> {
await Promise.all([
this.readersLock.waitForUnlock(),
this.writersLock.waitForUnlock(),
]);
return;
}
}

Expand Down

0 comments on commit 2862e01

Please sign in to comment.