Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add iwant request tracking #107

Merged
merged 3 commits into from
Jul 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 2 additions & 9 deletions test/peerScore.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,13 @@ const { expect } = require('chai')
const PeerId = require('peer-id')
const { utils } = require('libp2p-pubsub')
const delay = require('delay')

const { PeerScore, createPeerScoreParams, createTopicScoreParams } = require('../src/score')
const { makeTestMessage } = require('./utils')

const connectionManager = new Map()
connectionManager.getAll = () => ([])

const makeTestMessage = (i, topicIDs = []) => {
return {
seqno: Buffer.alloc(8, i),
data: Buffer.from([i]),
from: "test",
topicIDs
}
}

describe('PeerScore', () => {
it('should score based on time in mesh', async () => {
// Create parameters with reasonable default values
Expand Down
68 changes: 68 additions & 0 deletions test/tracer.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
const { expect } = require('chai')
const delay = require('delay')
const { utils } = require('libp2p-pubsub')

const { IWantTracer } = require('../src/tracer')
const constants = require('../src/constants')
const { makeTestMessage } = require('./utils')

const getMsgId = (msg) => utils.msgId(msg.from, msg.seqno)

describe('IWantTracer', () => {
it('should track broken promises', async function () {
// tests that unfullfilled promises are tracked correctly
this.timeout(6000)
const t = new IWantTracer(getMsgId)
const peerA = 'A'
const peerB = 'B'

const msgIds = []
for (let i = 0; i < 100; i++) {
const m = makeTestMessage(i)
m.from = Buffer.from(peerA)
msgIds.push(getMsgId(m))
}

t.addPromise(peerA, msgIds)
t.addPromise(peerB, msgIds)

// no broken promises yet
let brokenPromises = t.getBrokenPromises()
expect(brokenPromises.size).to.be.equal(0)

// make promises break
await delay(constants.GossipsubIWantFollowupTime + 10)

brokenPromises = t.getBrokenPromises()
expect(brokenPromises.size).to.be.equal(2)
expect(brokenPromises.get(peerA)).to.be.equal(1)
expect(brokenPromises.get(peerB)).to.be.equal(1)
})
it('should track unbroken promises', async function () {
// like above, but this time we deliver messages to fullfil the promises
this.timeout(6000)
const t = new IWantTracer(getMsgId)
const peerA = 'A'
const peerB = 'B'

const msgs = []
const msgIds = []
for (let i = 0; i < 100; i++) {
const m = makeTestMessage(i)
m.from = Buffer.from(peerA)
msgs.push(m)
msgIds.push(getMsgId(m))
}

t.addPromise(peerA, msgIds)
t.addPromise(peerB, msgIds)

msgs.forEach(msg => t.deliverMessage(peerA, msg))

await delay(constants.GossipsubIWantFollowupTime + 10)

// there should be no broken promises
const brokenPromises = t.getBrokenPromises()
expect(brokenPromises.size).to.be.equal(0)
})
})
3 changes: 2 additions & 1 deletion test/utils/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ exports.createFloodsubNode = createFloodsubNode

for (const [k, v] of Object.entries({
...require('./createPeer'),
...require('./createGossipsub')
...require('./createGossipsub'),
...require('./makeTestMessage')
})) {
exports[k] = v
}
10 changes: 10 additions & 0 deletions test/utils/makeTestMessage.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
const makeTestMessage = (i, topicIDs = []) => {
return {
seqno: Buffer.alloc(8, i),
data: Buffer.from([i]),
from: "test",
topicIDs
}
}

module.exports.makeTestMessage = makeTestMessage
3 changes: 3 additions & 0 deletions ts/heartbeat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,9 @@ export class Heartbeat {
this.gossipsub.peerhave.clear()
this.gossipsub.iasked.clear()

// apply IWANT request penalties
this.gossipsub._applyIwantPenalties()

// ensure direct peers are connected
this.gossipsub._directConnect()

Expand Down
32 changes: 29 additions & 3 deletions ts/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import { getGossipPeers } from './getGossipPeers'
import { createGossipRpc, shuffle, hasGossipProtocol } from './utils'
import { Peer } from './peer'
import { PeerScore, PeerScoreParams, PeerScoreThresholds, createPeerScoreParams, createPeerScoreThresholds } from './score'
import { IWantTracer } from './tracer'
import { AddrInfo, Libp2p } from './interfaces'
// @ts-ignore
import TimeCache = require('time-cache')
Expand Down Expand Up @@ -52,6 +53,7 @@ class Gossipsub extends BasicPubsub {
outbound: Map<Peer, boolean>
score: PeerScore
heartbeatTicks: number
gossipTracer: IWantTracer
_libp2p: Libp2p
_options: GossipOptions

Expand Down Expand Up @@ -197,6 +199,11 @@ class Gossipsub extends BasicPubsub {
*/
this.heartbeatTicks = 0

/**
* Tracks IHAVE/IWANT promises broken by peers
*/
this.gossipTracer = new IWantTracer(this._msgIdFn)

/**
* libp2p
*/
Expand Down Expand Up @@ -341,7 +348,9 @@ class Gossipsub extends BasicPubsub {
* @returns {void}
*/
_publishFrom (peer: Peer, msg: InMessage): void {
this.score.deliverMessage(peer.id.toB58String(), msg as Message)
const id = peer.id.toB58String()
this.score.deliverMessage(id, msg as Message)
this.gossipTracer.deliverMessage(id, msg as Message)
const topics = msg.topicIDs

// If options.gossipIncoming is false, do NOT emit incoming messages to peers
Expand Down Expand Up @@ -401,15 +410,18 @@ class Gossipsub extends BasicPubsub {
*/
_processTopicValidatorResult (topic: string, peer: Peer, message: Message, result: unknown): boolean {
if (typeof result === 'string') {
const id = peer.id.toB58String()
// assume an extended topic validator result
switch (result) {
case ExtendedValidatorResult.accept:
return true
case ExtendedValidatorResult.reject:
this.score.rejectMessage(peer.id.toB58String(), message)
this.score.rejectMessage(id, message)
this.gossipTracer.rejectMessage(id, message)
return false
case ExtendedValidatorResult.ignore:
this.score.ignoreMessage(peer.id.toB58String(), message)
this.score.ignoreMessage(id, message)
this.gossipTracer.rejectMessage(id, message)
return false
}
}
Expand Down Expand Up @@ -492,6 +504,8 @@ class Gossipsub extends BasicPubsub {
iwantList = iwantList.slice(0, iask)
this.iasked.set(id, iasked + iask)

this.gossipTracer.addPromise(id, iwantList)

return {
messageIDs: iwantList
}
Expand Down Expand Up @@ -671,6 +685,17 @@ class Gossipsub extends BasicPubsub {
}
}

/**
* Apply penalties from broken IHAVE/IWANT promises
* @returns {void}
*/
_applyIwantPenalties (): void {
wemeetagain marked this conversation as resolved.
Show resolved Hide resolved
this.gossipTracer.getBrokenPromises().forEach((count, p) => {
this.log('peer %s didn\'t follow up in %d IWANT requests; adding penalty', p, count)
this.score.addPenalty(p, count)
})
}

/**
* Clear expired backoff expiries
* @returns {void}
Expand Down Expand Up @@ -756,6 +781,7 @@ class Gossipsub extends BasicPubsub {
this.iasked = new Map()
this.backoff = new Map()
this.outbound = new Map()
this.gossipTracer.clear()
clearTimeout(this._directPeerInitial)
}

Expand Down
99 changes: 99 additions & 0 deletions ts/tracer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { Message } from './message'
import { GossipsubIWantFollowupTime } from './constants'

/**
* IWantTracer is an internal tracer that tracks IWANT requests in order to penalize
* peers who don't follow up on IWANT requests after an IHAVE advertisement.
* The tracking of promises is probabilistic to avoid using too much memory.
*
* Note: Do not confuse these 'promises' with JS Promise objects.
* These 'promises' are merely expectations of a peer's behavior.
*/
export class IWantTracer {
getMsgId: (msg: Message) => string
/**
* Promises to deliver a message
* Map per message id, per peer, promise expiration time
*/
promises: Map<string, Map<string, number>>
constructor (getMsgId: (msg: Message) => string) {
this.getMsgId = getMsgId
this.promises = new Map()
}

/**
* Track a promise to deliver a message from a list of msgIDs we are requesting
* @param {string} p peer id
* @param {string[]} msgIds
* @returns {void}
*/
addPromise (p: string, msgIds: string[]): void {
// pick msgId randomly from the list
const ix = Math.floor(Math.random() * msgIds.length)
const msgId = msgIds[ix]

let peers = this.promises.get(msgId)
if (!peers) {
peers = new Map()
this.promises.set(msgId, peers)
}

if (!peers.has(p)) {
peers.set(p, Date.now() + GossipsubIWantFollowupTime)
}
}

/**
* Returns the number of broken promises for each peer who didn't follow up on an IWANT request.
* @returns {Map<string, number>}
*/
getBrokenPromises (): Map<string, number> {
const now = Date.now()
const result = new Map<string, number>()

this.promises.forEach((peers, msgId) => {
peers.forEach((expire, p) => {
// the promise has been broken
if (expire < now) {
// add 1 to result
result.set(p, (result.get(p) || 0) + 1)
// delete from tracked promises
peers.delete(p)
}
})
// clean up empty promises for a msgId
if (!peers.size) {
this.promises.delete(msgId)
}
})

return result
}

/**
* Someone delivered a message, stop tracking promises for it
* @param {string} p peer id
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems that we are not using the peerID

Copy link
Member Author

@wemeetagain wemeetagain Jul 6, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to have the same function signature as PeerScore#deliverMessage.
So that we might in the future abstract the different 'tracers' as in go-libp2p-pubsub
See https://github.com/libp2p/go-libp2p-pubsub/blob/master/pubsub.go#L972
Eg: they don't call each one individually:

this.score.deliverMessage(p, msg)
this.gossipTracer.deliverMessage(p, msg)
this.tagTracer.deliverMessage(p, msg)

They have some sort of registration for each of their tracers, then all tracers get called on a single call to deliverMessage.
Note the go-libp2p-pubsub tracer functionality is happening on the pubsub layer, not the gossipsub layer. Aggregating our "tracers" together would be a first step towards moving in that direction. Not saying thats what we should do, but aligning function signatures is a prereq.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, sounds good

* @param {Message} msg
* @returns {void}
*/
deliverMessage (p: string, msg: Message): void {
const msgId = this.getMsgId(msg)
this.promises.delete(msgId)
}

/**
* A message got rejected, so we can stop tracking promises and let the score penalty apply from invalid message delivery,
* unless its an obviously invalid message.
* @param {string} p peer id
* @param {Message} msg
* @returns {void}
*/
rejectMessage (p: string, msg: Message): void {
const msgId = this.getMsgId(msg)
this.promises.delete(msgId)
}

clear (): void {
this.promises.clear()
}
}