diff --git a/src/nodes/NodeConnectionManager.ts b/src/nodes/NodeConnectionManager.ts index 544fd5687..fb4f04e79 100644 --- a/src/nodes/NodeConnectionManager.ts +++ b/src/nodes/NodeConnectionManager.ts @@ -590,9 +590,21 @@ class NodeConnectionManager { timer, ); for (const [nodeId, nodeData] of nodes) { - // FIXME: this should be the `nodeManager.setNode` - // FIXME: no tran needed - await this.nodeGraph.setNode(nodeId, nodeData.address); + // FIXME: needs to ping the node right? we want to be non-blocking + try { + // FIXME: no tran needed + await this.nodeManager?.setNode(nodeId, nodeData.address); + } catch (e) { + if (!(e instanceof nodesErrors.ErrorNodeGraphSameNodeId)) throw e; + } + } + // Refreshing every bucket above the closest node + const [closestNode] = ( + await this.nodeGraph.getClosestNodes(this.keyManager.getNodeId(), 1) + ).pop()!; + const [bucketIndex] = this.nodeGraph.bucketIndex(closestNode); + for (let i = bucketIndex; i < this.nodeGraph.nodeIdBits; i++) { + this.nodeManager?.refreshBucketQueueAdd(i); } } } diff --git a/src/nodes/NodeGraph.ts b/src/nodes/NodeGraph.ts index d7437b389..3baf60299 100644 --- a/src/nodes/NodeGraph.ts +++ b/src/nodes/NodeGraph.ts @@ -697,10 +697,10 @@ class NodeGraph { // 2. iterate over 0 ---> T-1 // 3. iterate over T+1 ---> K // Need to work out the relevant bucket to start from - const startingBucket = nodesUtils.bucketIndex( - this.keyManager.getNodeId(), - nodeId, - ); + const localNodeId = this.keyManager.getNodeId(); + const startingBucket = localNodeId.equals(nodeId) + ? 0 + : nodesUtils.bucketIndex(this.keyManager.getNodeId(), nodeId); // Getting the whole target's bucket first const nodeIds: NodeBucket = await this.getBucket( startingBucket, diff --git a/src/nodes/NodeManager.ts b/src/nodes/NodeManager.ts index 233e1b316..37fdf3cf0 100644 --- a/src/nodes/NodeManager.ts +++ b/src/nodes/NodeManager.ts @@ -115,7 +115,7 @@ class NodeManager { // We need to attempt a connection using the proxies // For now we will just do a forward connect + relay message const targetAddress = - address ?? (await this.nodeConnectionManager.findNode(nodeId)); + address ?? (await this.nodeConnectionManager.findNode(nodeId))!; const targetHost = await networkUtils.resolveHost(targetAddress.host); return await this.nodeConnectionManager.pingNode( nodeId, diff --git a/tests/nodes/NodeConnectionManager.seednodes.test.ts b/tests/nodes/NodeConnectionManager.seednodes.test.ts index ec7d6ee44..4d47afb0c 100644 --- a/tests/nodes/NodeConnectionManager.seednodes.test.ts +++ b/tests/nodes/NodeConnectionManager.seednodes.test.ts @@ -1,12 +1,13 @@ import type { NodeId, SeedNodes } from '@/nodes/types'; import type { Host, Port } from '@/network/types'; -import type NodeManager from 'nodes/NodeManager'; +import type { Sigchain } from '@/sigchain'; import fs from 'fs'; import path from 'path'; import os from 'os'; import { DB } from '@matrixai/db'; import Logger, { LogLevel, StreamHandler } from '@matrixai/logger'; import { IdInternal } from '@matrixai/id'; +import NodeManager from '@/nodes/NodeManager'; import PolykeyAgent from '@/PolykeyAgent'; import KeyManager from '@/keys/KeyManager'; import NodeGraph from '@/nodes/NodeGraph'; @@ -78,7 +79,10 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { keysUtils, 'generateDeterministicKeyPair', ); - const dummyNodeManager = { setNode: jest.fn() } as unknown as NodeManager; + const dummyNodeManager = { + setNode: jest.fn(), + refreshBucketQueueAdd: jest.fn(), + } as unknown as NodeManager; beforeAll(async () => { mockedGenerateDeterministicKeyPair.mockImplementation((bits, _) => { @@ -225,6 +229,12 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { }); test('should synchronise nodeGraph', async () => { let nodeConnectionManager: NodeConnectionManager | undefined; + let nodeManager: NodeManager | undefined; + const mockedRefreshBucket = jest.spyOn( + NodeManager.prototype, + 'refreshBucket', + ); + mockedRefreshBucket.mockImplementation(async () => {}); try { const seedNodes: SeedNodes = {}; seedNodes[nodesUtils.encodeNodeId(remoteNodeId1)] = { @@ -242,6 +252,15 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { seedNodes, logger: logger, }); + nodeManager = new NodeManager({ + db, + keyManager, + logger, + nodeConnectionManager, + nodeGraph, + sigchain: {} as Sigchain, + }); + await nodeManager.start(); await remoteNode1.nodeGraph.setNode(nodeId1, { host: serverHost, port: serverPort, @@ -250,17 +269,77 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { host: serverHost, port: serverPort, }); - await nodeConnectionManager.start({ nodeManager: dummyNodeManager }); + await nodeConnectionManager.start({ nodeManager }); await nodeConnectionManager.syncNodeGraph(); expect(await nodeGraph.getNode(nodeId1)).toBeDefined(); expect(await nodeGraph.getNode(nodeId2)).toBeDefined(); expect(await nodeGraph.getNode(dummyNodeId)).toBeUndefined(); } finally { + mockedRefreshBucket.mockRestore(); + await nodeManager?.stop(); + await nodeConnectionManager?.stop(); + } + }); + test('should call refreshBucket when syncing nodeGraph', async () => { + let nodeConnectionManager: NodeConnectionManager | undefined; + let nodeManager: NodeManager | undefined; + const mockedRefreshBucket = jest.spyOn( + NodeManager.prototype, + 'refreshBucket', + ); + mockedRefreshBucket.mockImplementation(async () => {}); + try { + const seedNodes: SeedNodes = {}; + seedNodes[nodesUtils.encodeNodeId(remoteNodeId1)] = { + host: remoteNode1.proxy.getProxyHost(), + port: remoteNode1.proxy.getProxyPort(), + }; + seedNodes[nodesUtils.encodeNodeId(remoteNodeId2)] = { + host: remoteNode2.proxy.getProxyHost(), + port: remoteNode2.proxy.getProxyPort(), + }; + nodeConnectionManager = new NodeConnectionManager({ + keyManager, + nodeGraph, + proxy, + seedNodes, + logger: logger, + }); + nodeManager = new NodeManager({ + db, + keyManager, + logger, + nodeConnectionManager, + nodeGraph, + sigchain: {} as Sigchain, + }); + await nodeManager.start(); + await remoteNode1.nodeGraph.setNode(nodeId1, { + host: serverHost, + port: serverPort, + }); + await remoteNode2.nodeGraph.setNode(nodeId2, { + host: serverHost, + port: serverPort, + }); + await nodeConnectionManager.start({ nodeManager }); + await nodeConnectionManager.syncNodeGraph(); + await nodeManager.refreshBucketQueueDrained(); + expect(mockedRefreshBucket).toHaveBeenCalled(); + } finally { + mockedRefreshBucket.mockRestore(); + await nodeManager?.stop(); await nodeConnectionManager?.stop(); } }); test('should handle an offline seed node when synchronising nodeGraph', async () => { let nodeConnectionManager: NodeConnectionManager | undefined; + let nodeManager: NodeManager | undefined; + const mockedRefreshBucket = jest.spyOn( + NodeManager.prototype, + 'refreshBucket', + ); + mockedRefreshBucket.mockImplementation(async () => {}); try { const seedNodes: SeedNodes = {}; seedNodes[nodesUtils.encodeNodeId(remoteNodeId1)] = { @@ -292,14 +371,25 @@ describe(`${NodeConnectionManager.name} seed nodes test`, () => { connConnectTime: 500, logger: logger, }); - await nodeConnectionManager.start({ nodeManager: dummyNodeManager }); + nodeManager = new NodeManager({ + db, + keyManager, + logger, + nodeConnectionManager, + nodeGraph, + sigchain: {} as Sigchain, + }); + await nodeManager.start(); + await nodeConnectionManager.start({ nodeManager }); // This should complete without error await nodeConnectionManager.syncNodeGraph(); // Information on remotes are found expect(await nodeGraph.getNode(nodeId1)).toBeDefined(); expect(await nodeGraph.getNode(nodeId2)).toBeDefined(); } finally { + mockedRefreshBucket.mockRestore(); await nodeConnectionManager?.stop(); + await nodeManager?.stop(); } }); });