Skip to content

Commit

Permalink
feat: refreshing buckets when entering network
Browse files Browse the repository at this point in the history
`nodeConnectionManager.syncNodeGraph` now refreshes all buckets above the closest node as per the kademlia spec. This means adding a lot of buckets to the refresh bucket queue when an agent is started.

#345
  • Loading branch information
tegefaulkes authored and emmacasolin committed Jun 14, 2022
1 parent 4dec90b commit ee53cfa
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 12 deletions.
18 changes: 15 additions & 3 deletions src/nodes/NodeConnectionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -590,9 +590,21 @@ class NodeConnectionManager {
timer,
);
for (const [nodeId, nodeData] of nodes) {
// FIXME: this should be the `nodeManager.setNode`
// FIXME: no tran needed
await this.nodeGraph.setNode(nodeId, nodeData.address);
// FIXME: needs to ping the node right? we want to be non-blocking
try {
// FIXME: no tran needed
await this.nodeManager?.setNode(nodeId, nodeData.address);
} catch (e) {
if (!(e instanceof nodesErrors.ErrorNodeGraphSameNodeId)) throw e;
}
}
// Refreshing every bucket above the closest node
const [closestNode] = (
await this.nodeGraph.getClosestNodes(this.keyManager.getNodeId(), 1)
).pop()!;
const [bucketIndex] = this.nodeGraph.bucketIndex(closestNode);
for (let i = bucketIndex; i < this.nodeGraph.nodeIdBits; i++) {
this.nodeManager?.refreshBucketQueueAdd(i);
}
}
}
Expand Down
8 changes: 4 additions & 4 deletions src/nodes/NodeGraph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -697,10 +697,10 @@ class NodeGraph {
// 2. iterate over 0 ---> T-1
// 3. iterate over T+1 ---> K
// Need to work out the relevant bucket to start from
const startingBucket = nodesUtils.bucketIndex(
this.keyManager.getNodeId(),
nodeId,
);
const localNodeId = this.keyManager.getNodeId();
const startingBucket = localNodeId.equals(nodeId)
? 0
: nodesUtils.bucketIndex(this.keyManager.getNodeId(), nodeId);
// Getting the whole target's bucket first
const nodeIds: NodeBucket = await this.getBucket(
startingBucket,
Expand Down
2 changes: 1 addition & 1 deletion src/nodes/NodeManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class NodeManager {
// We need to attempt a connection using the proxies
// For now we will just do a forward connect + relay message
const targetAddress =
address ?? (await this.nodeConnectionManager.findNode(nodeId));
address ?? (await this.nodeConnectionManager.findNode(nodeId))!;
const targetHost = await networkUtils.resolveHost(targetAddress.host);
return await this.nodeConnectionManager.pingNode(
nodeId,
Expand Down
98 changes: 94 additions & 4 deletions tests/nodes/NodeConnectionManager.seednodes.test.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import type { NodeId, SeedNodes } from '@/nodes/types';
import type { Host, Port } from '@/network/types';
import type NodeManager from 'nodes/NodeManager';
import type { Sigchain } from '@/sigchain';
import fs from 'fs';
import path from 'path';
import os from 'os';
import { DB } from '@matrixai/db';
import Logger, { LogLevel, StreamHandler } from '@matrixai/logger';
import { IdInternal } from '@matrixai/id';
import NodeManager from '@/nodes/NodeManager';
import PolykeyAgent from '@/PolykeyAgent';
import KeyManager from '@/keys/KeyManager';
import NodeGraph from '@/nodes/NodeGraph';
Expand Down Expand Up @@ -78,7 +79,10 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => {
keysUtils,
'generateDeterministicKeyPair',
);
const dummyNodeManager = { setNode: jest.fn() } as unknown as NodeManager;
const dummyNodeManager = {
setNode: jest.fn(),
refreshBucketQueueAdd: jest.fn(),
} as unknown as NodeManager;

beforeAll(async () => {
mockedGenerateDeterministicKeyPair.mockImplementation((bits, _) => {
Expand Down Expand Up @@ -225,6 +229,12 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => {
});
test('should synchronise nodeGraph', async () => {
let nodeConnectionManager: NodeConnectionManager | undefined;
let nodeManager: NodeManager | undefined;
const mockedRefreshBucket = jest.spyOn(
NodeManager.prototype,
'refreshBucket',
);
mockedRefreshBucket.mockImplementation(async () => {});
try {
const seedNodes: SeedNodes = {};
seedNodes[nodesUtils.encodeNodeId(remoteNodeId1)] = {
Expand All @@ -242,6 +252,15 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => {
seedNodes,
logger: logger,
});
nodeManager = new NodeManager({
db,
keyManager,
logger,
nodeConnectionManager,
nodeGraph,
sigchain: {} as Sigchain,
});
await nodeManager.start();
await remoteNode1.nodeGraph.setNode(nodeId1, {
host: serverHost,
port: serverPort,
Expand All @@ -250,17 +269,77 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => {
host: serverHost,
port: serverPort,
});
await nodeConnectionManager.start({ nodeManager: dummyNodeManager });
await nodeConnectionManager.start({ nodeManager });
await nodeConnectionManager.syncNodeGraph();
expect(await nodeGraph.getNode(nodeId1)).toBeDefined();
expect(await nodeGraph.getNode(nodeId2)).toBeDefined();
expect(await nodeGraph.getNode(dummyNodeId)).toBeUndefined();
} finally {
mockedRefreshBucket.mockRestore();
await nodeManager?.stop();
await nodeConnectionManager?.stop();
}
});
test('should call refreshBucket when syncing nodeGraph', async () => {
let nodeConnectionManager: NodeConnectionManager | undefined;
let nodeManager: NodeManager | undefined;
const mockedRefreshBucket = jest.spyOn(
NodeManager.prototype,
'refreshBucket',
);
mockedRefreshBucket.mockImplementation(async () => {});
try {
const seedNodes: SeedNodes = {};
seedNodes[nodesUtils.encodeNodeId(remoteNodeId1)] = {
host: remoteNode1.proxy.getProxyHost(),
port: remoteNode1.proxy.getProxyPort(),
};
seedNodes[nodesUtils.encodeNodeId(remoteNodeId2)] = {
host: remoteNode2.proxy.getProxyHost(),
port: remoteNode2.proxy.getProxyPort(),
};
nodeConnectionManager = new NodeConnectionManager({
keyManager,
nodeGraph,
proxy,
seedNodes,
logger: logger,
});
nodeManager = new NodeManager({
db,
keyManager,
logger,
nodeConnectionManager,
nodeGraph,
sigchain: {} as Sigchain,
});
await nodeManager.start();
await remoteNode1.nodeGraph.setNode(nodeId1, {
host: serverHost,
port: serverPort,
});
await remoteNode2.nodeGraph.setNode(nodeId2, {
host: serverHost,
port: serverPort,
});
await nodeConnectionManager.start({ nodeManager });
await nodeConnectionManager.syncNodeGraph();
await nodeManager.refreshBucketQueueDrained();
expect(mockedRefreshBucket).toHaveBeenCalled();
} finally {
mockedRefreshBucket.mockRestore();
await nodeManager?.stop();
await nodeConnectionManager?.stop();
}
});
test('should handle an offline seed node when synchronising nodeGraph', async () => {
let nodeConnectionManager: NodeConnectionManager | undefined;
let nodeManager: NodeManager | undefined;
const mockedRefreshBucket = jest.spyOn(
NodeManager.prototype,
'refreshBucket',
);
mockedRefreshBucket.mockImplementation(async () => {});
try {
const seedNodes: SeedNodes = {};
seedNodes[nodesUtils.encodeNodeId(remoteNodeId1)] = {
Expand Down Expand Up @@ -292,14 +371,25 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => {
connConnectTime: 500,
logger: logger,
});
await nodeConnectionManager.start({ nodeManager: dummyNodeManager });
nodeManager = new NodeManager({
db,
keyManager,
logger,
nodeConnectionManager,
nodeGraph,
sigchain: {} as Sigchain,
});
await nodeManager.start();
await nodeConnectionManager.start({ nodeManager });
// This should complete without error
await nodeConnectionManager.syncNodeGraph();
// Information on remotes are found
expect(await nodeGraph.getNode(nodeId1)).toBeDefined();
expect(await nodeGraph.getNode(nodeId2)).toBeDefined();
} finally {
mockedRefreshBucket.mockRestore();
await nodeConnectionManager?.stop();
await nodeManager?.stop();
}
});
});

0 comments on commit ee53cfa

Please sign in to comment.