Skip to content

Commit

Permalink
feat: abort controller support for NodeManager.refreshBucket
Browse files Browse the repository at this point in the history
Added support to cancel out of a `refreshBucket` operation. This is to allow faster stopping of the `NodeManager` by aborting out of a slow `refreshBucket` operation. This has been implemented with the `AbortController`/`AbortSignal` API. This is not fully supported by Node14 so we're using the `node-abort-controller` to provide functionality for now.

#345
  • Loading branch information
tegefaulkes authored and emmacasolin committed Jun 14, 2022
1 parent ee53cfa commit 6302b0d
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 7 deletions.
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
"jose": "^4.3.6",
"lexicographic-integer": "^1.1.0",
"multiformats": "^9.4.8",
"node-abort-controller": "^3.0.1",
"node-forge": "^0.10.0",
"pako": "^1.0.11",
"prompts": "^2.4.1",
Expand Down
19 changes: 15 additions & 4 deletions src/nodes/NodeConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import type {
} from './types';
import { withF } from '@matrixai/resources';
import type NodeManager from './NodeManager';
import type { AbortSignal } from 'node-abort-controller';
import Logger from '@matrixai/logger';
import { ready, StartStop } from '@matrixai/async-init/dist/StartStop';
import { IdInternal } from '@matrixai/id';
Expand Down Expand Up @@ -383,16 +384,21 @@ class NodeConnectionManager {
* Retrieves the node address. If an entry doesn't exist in the db, then
* proceeds to locate it using Kademlia.
* @param targetNodeId Id of the node we are tying to find
* @param tran
* @param options
*/
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
public async findNode(targetNodeId: NodeId): Promise<NodeAddress> {
public async findNode(
targetNodeId: NodeId,
options: { signal?: AbortSignal } = {},
): Promise<NodeAddress> {
const { signal } = { ...options };
// First check if we already have an existing ID -> address record

let address = (await this.nodeGraph.getNode(targetNodeId))?.address;
// Otherwise, attempt to locate it by contacting network
if (address == null) {
address = await this.getClosestGlobalNodes(targetNodeId);
address = await this.getClosestGlobalNodes(targetNodeId, undefined, {
signal,
});
// TODO: This currently just does one iteration
// If not found in this single iteration, we throw an exception
if (address == null) {
Expand All @@ -418,13 +424,16 @@ class NodeConnectionManager {
* @param targetNodeId ID of the node attempting to be found (i.e. attempting
* to find its IP address and port)
* @param timer Connection timeout timer
* @param options
* @returns whether the target node was located in the process
*/
@ready(new nodesErrors.ErrorNodeConnectionManagerNotRunning())
public async getClosestGlobalNodes(
targetNodeId: NodeId,
timer?: Timer,
options: { signal?: AbortSignal } = {},
): Promise<NodeAddress | undefined> {
const { signal } = { ...options };
// Let foundTarget: boolean = false;
let foundAddress: NodeAddress | undefined = undefined;
// Get the closest alpha nodes to the target node (set as shortlist)
Expand All @@ -445,6 +454,7 @@ class NodeConnectionManager {
const contacted: { [nodeId: string]: boolean } = {};
// Iterate until we've found and contacted k nodes
while (Object.keys(contacted).length <= this.nodeGraph.nodeBucketLimit) {
if (signal?.aborted) throw new nodesErrors.ErrorNodeAborted();
// While (!foundTarget) {
// Remove the node from the front of the array
const nextNode = shortlist.shift();
Expand Down Expand Up @@ -479,6 +489,7 @@ class NodeConnectionManager {
// Check to see if any of these are the target node. At the same time, add
// them to the shortlist
for (const [nodeId, nodeData] of foundClosest) {
if (signal?.aborted) throw new nodesErrors.ErrorNodeAborted();
// Ignore a`ny nodes that have been contacted
if (contacted[nodeId]) {
continue;
Expand Down
23 changes: 20 additions & 3 deletions src/nodes/NodeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import type {
import type { ClaimEncoded } from '../claims/types';
import type { Timer } from '../types';
import type { PromiseType } from '../utils/utils';
import type { AbortSignal } from 'node-abort-controller';
import Logger from '@matrixai/logger';
import { StartStop, ready } from '@matrixai/async-init/dist/StartStop';
import { AbortController } from 'node-abort-controller';
import * as nodesErrors from './errors';
import * as nodesUtils from './utils';
import * as networkUtils from '../network/utils';
Expand Down Expand Up @@ -57,6 +59,7 @@ class NodeManager {
protected refreshBucketQueueRunner: Promise<void>;
protected refreshBucketQueuePlug_: PromiseType<void>;
protected refreshBucketQueueDrained_: PromiseType<void>;
protected refreshBucketQueueAbortController: AbortController;

constructor({
db,
Expand Down Expand Up @@ -647,16 +650,21 @@ class NodeManager {
* Connections during the search will will share node information with other
* nodes.
* @param bucketIndex
* @param options
*/
public async refreshBucket(bucketIndex: NodeBucketIndex) {
public async refreshBucket(
bucketIndex: NodeBucketIndex,
options: { signal?: AbortSignal } = {},
) {
const { signal } = { ...options };
// We need to generate a random nodeId for this bucket
const nodeId = this.keyManager.getNodeId();
const bucketRandomNodeId = nodesUtils.generateRandomNodeIdForBucket(
nodeId,
bucketIndex,
);
// We then need to start a findNode procedure
await this.nodeConnectionManager.findNode(bucketRandomNodeId);
await this.nodeConnectionManager.findNode(bucketRandomNodeId, { signal });
}

// Refresh bucket activity timer methods
Expand Down Expand Up @@ -752,6 +760,7 @@ class NodeManager {
this.refreshBucketQueueRunning = true;
this.refreshBucketQueuePlug();
let iterator: IterableIterator<NodeBucketIndex> | undefined;
this.refreshBucketQueueAbortController = new AbortController();
const pace = async () => {
// Wait for plug
await this.refreshBucketQueuePlug_.p;
Expand All @@ -772,7 +781,14 @@ class NodeManager {
this.logger.debug(
`processing refreshBucket for bucket ${bucketIndex}, ${this.refreshBucketQueue.size} left in queue`,
);
await this.refreshBucket(bucketIndex);
try {
await this.refreshBucket(bucketIndex, {
signal: this.refreshBucketQueueAbortController.signal,
});
} catch (e) {
if (e instanceof nodesErrors.ErrorNodeAborted) break;
throw e;
}
// Remove from queue and update bucket deadline
this.refreshBucketQueue.delete(bucketIndex);
this.refreshBucketUpdateDeadline(bucketIndex);
Expand All @@ -782,6 +798,7 @@ class NodeManager {

private async stopRefreshBucketQueue(): Promise<void> {
// Flag end and await queue finish
this.refreshBucketQueueAbortController.abort();
this.refreshBucketQueueRunning = false;
this.refreshBucketQueueUnplug();
}
Expand Down
6 changes: 6 additions & 0 deletions src/nodes/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@ import { ErrorPolykey, sysexits } from '../errors';

class ErrorNodes<T> extends ErrorPolykey<T> {}

class ErrorNodeAborted extends ErrorNodes {
description = 'Operation was aborted';
exitCode = sysexits.USAGE;
}

class ErrorNodeManagerNotRunning extends ErrorNodes {
static description = 'NodeManager is not running';
exitCode = sysexits.USAGE;
Expand Down Expand Up @@ -79,6 +84,7 @@ class ErrorNodeConnectionHostWildcard<T> extends ErrorNodes<T> {

export {
ErrorNodes,
ErrorNodeAborted,
ErrorNodeManagerNotRunning,
ErrorNodeGraphRunning,
ErrorNodeGraphNotRunning,
Expand Down
40 changes: 40 additions & 0 deletions tests/nodes/NodeManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import * as claimsUtils from '@/claims/utils';
import { promise, promisify, sleep } from '@/utils';
import * as nodesUtils from '@/nodes/utils';
import * as utilsPB from '@/proto/js/polykey/v1/utils/utils_pb';
import * as nodesErrors from '@/nodes/errors';
import * as nodesTestUtils from './utils';
import { generateNodeIdForBucket } from './utils';

Expand Down Expand Up @@ -969,4 +970,43 @@ describe(`${NodeManager.name} test`, () => {
await nodeManager.stop();
}
});
test('should abort refreshBucket queue when stopping', async () => {
const refreshBucketTimeout = 1000000;
const nodeManager = new NodeManager({
db,
sigchain: {} as Sigchain,
keyManager,
nodeGraph,
nodeConnectionManager: dummyNodeConnectionManager,
refreshBucketTimerDefault: refreshBucketTimeout,
logger,
});
const mockRefreshBucket = jest.spyOn(
NodeManager.prototype,
'refreshBucket',
);
try {
await nodeManager.start();
await nodeConnectionManager.start({ nodeManager });
mockRefreshBucket.mockImplementation(
async (bucket, options: { signal?: AbortSignal } = {}) => {
const { signal } = { ...options };
const prom = promise<void>();
signal?.addEventListener('abort', () =>
prom.rejectP(new nodesErrors.ErrorNodeAborted()),
);
await prom.p;
},
);
nodeManager.refreshBucketQueueAdd(1);
nodeManager.refreshBucketQueueAdd(2);
nodeManager.refreshBucketQueueAdd(3);
nodeManager.refreshBucketQueueAdd(4);
nodeManager.refreshBucketQueueAdd(5);
await nodeManager.stop();
} finally {
mockRefreshBucket.mockRestore();
await nodeManager.stop();
}
});
});

0 comments on commit 6302b0d

Please sign in to comment.