diff --git a/src/connectors/connectorRIS.js b/src/connectors/connectorRIS.js index 998d808b..081e5d64 100644 --- a/src/connectors/connectorRIS.js +++ b/src/connectors/connectorRIS.js @@ -35,6 +35,7 @@ import Connector from "./connector"; import { AS, Path } from "../model"; import brembo from "brembo"; import ipUtils from "ip-sub"; +import batchPromises from 'batch-promises'; const beacons = { v4: ["84.205.64.0/24", "84.205.65.0/24", "84.205.67.0/24", "84.205.68.0/24", "84.205.69.0/24", @@ -71,6 +72,7 @@ export default class ConnectorRIS extends Connector { this.canaryBeacons = {}; this.clientId = env.clientId; this.instanceId = env.instanceId; + this.risSubscriptionDelay = 200; this.url = brembo.build(this.params.url, { params: { @@ -193,9 +195,18 @@ export default class ConnectorRIS extends Connector { }; _subscribeToPrefixes = (input) => { - const monitoredPrefixes = input.getMonitoredLessSpecifics(); + let monitoredPrefixes = input.getMonitoredLessSpecifics(); + const risLimitPrefixes = 10000; const params = JSON.parse(JSON.stringify(this.params.subscription)); + if (monitoredPrefixes.length > risLimitPrefixes) { + this.logger.log({ + level: 'error', + message: "Prefix list of abnormal length, truncated to 10000 to prevent RIS overload" + }); + monitoredPrefixes = monitoredPrefixes.slice(0, risLimitPrefixes); + } + if (monitoredPrefixes.filter(i => (ipUtils.isEqualPrefix(i.prefix, '0:0:0:0:0:0:0:0/0') || ipUtils.isEqualPrefix(i.prefix,'0.0.0.0/0'))).length === 2) { delete params.prefix; @@ -214,33 +225,47 @@ export default class ConnectorRIS extends Connector { } else { - return Promise.all(monitoredPrefixes.map(p => { - if (!this.subscribed[p.prefix]) { - console.log("Monitoring", p.prefix); - this.subscribed[p.prefix] = true; - } + return batchPromises(1, monitoredPrefixes, p => { + return new Promise((resolve, reject) => { + if (!this.subscribed[p.prefix]) { + console.log("Monitoring", p.prefix); + this.subscribed[p.prefix] = true; + } - params.prefix = p.prefix; + params.prefix = p.prefix; - filteredBeacons = filteredBeacons.filter(prefix => { - return !ipUtils.isEqualPrefix(p.prefix, prefix) && !ipUtils.isSubnet(p.prefix, prefix); - }); + filteredBeacons = filteredBeacons.filter(prefix => { + return !ipUtils.isEqualPrefix(p.prefix, prefix) && !ipUtils.isSubnet(p.prefix, prefix); + }); - return this.ws.send(JSON.stringify({ - type: "ris_subscribe", - data: params - })); - })); + this.ws.send(JSON.stringify({ + type: "ris_subscribe", + data: params + })); + + setTimeout(() => resolve(true), this.risSubscriptionDelay); // Slow down subscriptions to avoid RIS drop/ban + }); + }); } }; _subscribeToASns = (input) => { - const monitoredASns = input.getMonitoredASns(); + let monitoredASns = input.getMonitoredASns(); + const risLimitAses = 10; const params = JSON.parse(JSON.stringify(this.params.subscription)); - return Promise.all(monitoredASns - .map(asn => { + if (monitoredASns.length > risLimitAses) { + this.logger.log({ + level: 'error', + message: "AS list of abnormal length, truncated to 10 to prevent RIS overload" + }); + monitoredASns = monitoredASns.slice(0, risLimitAses); + } + + return batchPromises(1, monitoredASns, asn => { + return new Promise((resolve, reject) => { + const asnString = asn.asn.getValue(); if (!this.subscribed[asnString]) { @@ -250,20 +275,23 @@ export default class ConnectorRIS extends Connector { params.path = `${asnString}\$`; - return this.ws.send(JSON.stringify({ + this.ws.send(JSON.stringify({ type: "ris_subscribe", data: params })); - })); + + setTimeout(() => resolve(true), this.risSubscriptionDelay); // Slow down subscriptions to avoid RIS drop/ban + }); + }); }; _startCanary = () => { if (this.connected) { - Promise.all(selectedBeacons - .map(prefix => { + return batchPromises(1, selectedBeacons, prefix => { + return new Promise((resolve, reject) => { this.canaryBeacons[prefix] = true; - return this.ws.send(JSON.stringify({ + this.ws.send(JSON.stringify({ type: "ris_subscribe", data: { moreSpecific: false, @@ -276,7 +304,10 @@ export default class ConnectorRIS extends Connector { } } })); - })) + + setTimeout(() => resolve(true), this.risSubscriptionDelay); // Slow down subscriptions to avoid RIS drop/ban + }); + }) .then(() => { this._checkCanary(); this.logger.log({