Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: switch back to protobufjs #310

Merged
merged 4 commits into from
Aug 2, 2022
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
"lint": "eslint --ext .ts src test",
"release": "aegir release --no-types",
"build": "aegir build",
"generate": "protons ./src/message/rpc.proto",
"prepare": "npm run build",
"pretest": "npm run build",
"pretest:e2e": "npm run build",
Expand Down Expand Up @@ -85,7 +84,7 @@
"it-pipe": "^2.0.3",
"it-pushable": "^3.0.0",
"multiformats": "^9.6.4",
"protons-runtime": "^1.0.4",
"protobufjs": "^6.11.2",
"uint8arrays": "^3.0.0"
},
"devDependencies": {
Expand Down Expand Up @@ -122,7 +121,6 @@
"p-wait-for": "^3.2.0",
"prettier": "^2.0.5",
"promisify-es6": "^1.0.3",
"protons": "^3.0.4",
"sinon": "^11.1.1",
"time-cache": "^0.3.0",
"ts-node": "^10.7.0",
Expand Down
121 changes: 64 additions & 57 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { CustomEvent, EventEmitter } from '@libp2p/interfaces/events'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'

import { MessageCache } from './message-cache.js'
import { RPC } from './message/rpc.js'
import { RPC, IRPC } from './message/rpc.js'
import * as constants from './constants.js'
import { createGossipRpc, shuffle, messageIdToString } from './utils/index.js'
import {
Expand Down Expand Up @@ -263,13 +263,13 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
* Map of pending messages to gossip
* peer id => control messages
*/
public readonly gossip = new Map<PeerIdStr, RPC.ControlIHave[]>()
public readonly gossip = new Map<PeerIdStr, RPC.IControlIHave[]>()

/**
* Map of control messages
* peer id => control message
*/
public readonly control = new Map<PeerIdStr, RPC.ControlMessage>()
public readonly control = new Map<PeerIdStr, RPC.IControlMessage>()

/**
* Number of IHAVEs received from peer in the last heartbeat
Expand Down Expand Up @@ -910,7 +910,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
/**
* Handles an rpc request from a peer
*/
public async handleReceivedRpc(from: PeerId, rpc: RPC): Promise<void> {
public async handleReceivedRpc(from: PeerId, rpc: IRPC): Promise<void> {
// Check if peer is graylisted in which case we ignore the event
if (!this.acceptFrom(from.toString())) {
this.log('received message from unacceptable peer %p', from)
Expand All @@ -921,7 +921,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
this.log('rpc from %p', from)

// Handle received subscriptions
if (rpc.subscriptions.length > 0) {
if (rpc.subscriptions && rpc.subscriptions.length > 0) {
// update peer subscriptions
rpc.subscriptions.forEach((subOpt) => {
this.handleReceivedSubscription(from, subOpt)
Expand All @@ -946,13 +946,15 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali

// Handle messages
// TODO: (up to limit)
for (const message of rpc.messages) {
const handleReceivedMessagePromise = this.handleReceivedMessage(from, message)
// Should never throw, but handle just in case
.catch((err) => this.log(err))

if (this.opts.awaitRpcMessageHandler) {
await handleReceivedMessagePromise
if (rpc.messages) {
for (const message of rpc.messages) {
const handleReceivedMessagePromise = this.handleReceivedMessage(from, message)
// Should never throw, but handle just in case
.catch((err) => this.log(err))

if (this.opts.awaitRpcMessageHandler) {
await handleReceivedMessagePromise
}
}
}

Expand All @@ -965,7 +967,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
/**
* Handles a subscription change from a peer
*/
private handleReceivedSubscription(from: PeerId, subOpt: RPC.SubOpts): void {
private handleReceivedSubscription(from: PeerId, subOpt: RPC.ISubOpts): void {
if (subOpt.topic == null) {
return
}
Expand Down Expand Up @@ -993,7 +995,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
* Handles a newly received message from an RPC.
* May forward to all peers in the mesh.
*/
private async handleReceivedMessage(from: PeerId, rpcMsg: RPC.Message): Promise<void> {
private async handleReceivedMessage(from: PeerId, rpcMsg: RPC.IMessage): Promise<void> {
this.metrics?.onMsgRecvPreValidation(rpcMsg.topic)

const validationResult = await this.validateReceivedMessage(from, rpcMsg)
Expand Down Expand Up @@ -1068,7 +1070,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
*/
private async validateReceivedMessage(
propagationSource: PeerId,
rpcMsg: RPC.Message
rpcMsg: RPC.IMessage
): Promise<ReceivedMessageResult> {
// Fast message ID stuff
const fastMsgIdStr = this.fastMsgIdFn?.(rpcMsg)
Expand Down Expand Up @@ -1167,15 +1169,15 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
/**
* Handles an rpc control message from a peer
*/
private async handleControlMessage(id: PeerIdStr, controlMsg: RPC.ControlMessage): Promise<void> {
private async handleControlMessage(id: PeerIdStr, controlMsg: RPC.IControlMessage): Promise<void> {
if (controlMsg === undefined) {
return
}

const iwant = this.handleIHave(id, controlMsg.ihave)
const ihave = this.handleIWant(id, controlMsg.iwant)
const prune = await this.handleGraft(id, controlMsg.graft)
await this.handlePrune(id, controlMsg.prune)
const iwant = controlMsg.ihave ? this.handleIHave(id, controlMsg.ihave) : []
const ihave = controlMsg.iwant ? this.handleIWant(id, controlMsg.iwant) : []
const prune = controlMsg.graft ? await this.handleGraft(id, controlMsg.graft) : []
controlMsg.prune && (await this.handlePrune(id, controlMsg.prune))

if (!iwant.length && !ihave.length && !prune.length) {
return
Expand Down Expand Up @@ -1218,7 +1220,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
/**
* Handles IHAVE messages
*/
private handleIHave(id: PeerIdStr, ihave: RPC.ControlIHave[]): RPC.ControlIWant[] {
private handleIHave(id: PeerIdStr, ihave: RPC.IControlIHave[]): RPC.IControlIWant[] {
if (!ihave.length) {
return []
}
Expand Down Expand Up @@ -1255,7 +1257,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
const iwant = new Map<MsgIdStr, Uint8Array>()

ihave.forEach(({ topicID, messageIDs }) => {
if (!topicID || !this.mesh.has(topicID)) {
if (!topicID || !messageIDs || !this.mesh.has(topicID)) {
return
}

Expand Down Expand Up @@ -1304,7 +1306,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
* Handles IWANT messages
* Returns messages to send back to peer
*/
private handleIWant(id: PeerIdStr, iwant: RPC.ControlIWant[]): RPC.Message[] {
private handleIWant(id: PeerIdStr, iwant: RPC.IControlIWant[]): RPC.IMessage[] {
if (!iwant.length) {
return []
}
Expand All @@ -1316,28 +1318,29 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
return []
}

const ihave = new Map<MsgIdStr, RPC.Message>()
const ihave = new Map<MsgIdStr, RPC.IMessage>()
const iwantByTopic = new Map<TopicStr, number>()
let iwantDonthave = 0

iwant.forEach(({ messageIDs }) => {
messageIDs.forEach((msgId) => {
const msgIdStr = this.msgIdToStrFn(msgId)
const entry = this.mcache.getWithIWantCount(msgIdStr, id)
if (entry == null) {
iwantDonthave++
return
}
messageIDs &&
messageIDs.forEach((msgId) => {
const msgIdStr = this.msgIdToStrFn(msgId)
const entry = this.mcache.getWithIWantCount(msgIdStr, id)
if (entry == null) {
iwantDonthave++
return
}

iwantByTopic.set(entry.msg.topic, 1 + (iwantByTopic.get(entry.msg.topic) ?? 0))
iwantByTopic.set(entry.msg.topic, 1 + (iwantByTopic.get(entry.msg.topic) ?? 0))

if (entry.count > constants.GossipsubGossipRetransmission) {
this.log('IWANT: Peer %s has asked for message %s too many times: ignoring request', id, msgId)
return
}
if (entry.count > constants.GossipsubGossipRetransmission) {
this.log('IWANT: Peer %s has asked for message %s too many times: ignoring request', id, msgId)
return
}

ihave.set(msgIdStr, entry.msg)
})
ihave.set(msgIdStr, entry.msg)
})
})

this.metrics?.onIwantRcv(iwantByTopic, iwantDonthave)
Expand All @@ -1355,7 +1358,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
/**
* Handles Graft messages
*/
private async handleGraft(id: PeerIdStr, graft: RPC.ControlGraft[]): Promise<RPC.ControlPrune[]> {
private async handleGraft(id: PeerIdStr, graft: RPC.IControlGraft[]): Promise<RPC.IControlPrune[]> {
const prune: TopicStr[] = []
const score = this.score.score(id)
const now = Date.now()
Expand Down Expand Up @@ -1447,7 +1450,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
/**
* Handles Prune messages
*/
private async handlePrune(id: PeerIdStr, prune: RPC.ControlPrune[]): Promise<void> {
private async handlePrune(id: PeerIdStr, prune: RPC.IControlPrune[]): Promise<void> {
const score = this.score.score(id)

for (const { topicID, backoff, peers } of prune) {
Expand Down Expand Up @@ -1475,7 +1478,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
}

// PX
if (peers.length) {
if (peers && peers.length) {
// we ignore PX from peers with insufficient scores
if (score < this.opts.scoreThresholds.acceptPXThreshold) {
this.log(
Expand Down Expand Up @@ -1567,7 +1570,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
/**
* Maybe attempt connection given signed peer records
*/
private async pxConnect(peers: RPC.PeerInfo[]): Promise<void> {
private async pxConnect(peers: RPC.IPeerInfo[]): Promise<void> {
if (peers.length > this.opts.prunePeers) {
shuffle(peers)
peers = peers.slice(0, this.opts.prunePeers)
Expand Down Expand Up @@ -1909,7 +1912,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
*/
private forwardMessage(
msgIdStr: string,
rawMsg: RPC.Message,
rawMsg: RPC.IMessage,
propagationSource?: PeerIdStr,
excludePeers?: Set<PeerIdStr>
): void {
Expand Down Expand Up @@ -1955,8 +1958,8 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
data, // the uncompressed form
sequenceNumber: rawMsg.seqno == null ? undefined : BigInt(`0x${uint8ArrayToString(rawMsg.seqno, 'base16')}`),
topic,
signature: rawMsg.signature,
key: rawMsg.key
signature: rawMsg.signature ?? undefined,
key: rawMsg.key ?? undefined
}
const msgId = await this.msgIdFn(msg)
const msgIdStr = this.msgIdToStrFn(msgId)
Expand Down Expand Up @@ -2106,7 +2109,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
/**
* Send an rpc object to a peer
*/
private sendRpc(id: PeerIdStr, rpc: RPC): boolean {
private sendRpc(id: PeerIdStr, rpc: IRPC): boolean {
const outboundStream = this.streamsOutbound.get(id)
if (!outboundStream) {
this.log(`Cannot send RPC to ${id} as there is no open stream to it available`)
Expand All @@ -2127,31 +2130,35 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
this.gossip.delete(id)
}

const rpcBytes = RPC.encode(rpc)
const rpcBytes = RPC.encode(rpc).finish()
outboundStream.push(rpcBytes)

this.metrics?.onRpcSent(rpc, rpcBytes.length)

return true
}

public piggybackControl(id: PeerIdStr, outRpc: RPC, ctrl: RPC.ControlMessage): void {
const tograft = ctrl.graft.filter(({ topicID }) => ((topicID && this.mesh.get(topicID)) || new Set()).has(id))
const toprune = ctrl.prune.filter(({ topicID }) => !((topicID && this.mesh.get(topicID)) || new Set()).has(id))
public piggybackControl(id: PeerIdStr, outRpc: IRPC, ctrl: RPC.IControlMessage): void {
const tograft = (ctrl.graft || []).filter(({ topicID }) =>
((topicID && this.mesh.get(topicID)) || new Set()).has(id)
)
const toprune = (ctrl.prune || []).filter(
({ topicID }) => !((topicID && this.mesh.get(topicID)) || new Set()).has(id)
)

if (!tograft.length && !toprune.length) {
return
}

if (outRpc.control) {
outRpc.control.graft = outRpc.control.graft.concat(tograft)
outRpc.control.prune = outRpc.control.prune.concat(toprune)
outRpc.control.graft = outRpc.control.graft && outRpc.control.graft.concat(tograft)
outRpc.control.prune = outRpc.control.prune && outRpc.control.prune.concat(toprune)
} else {
outRpc.control = { graft: tograft, prune: toprune, ihave: [], iwant: [] }
}
}

private piggybackGossip(id: PeerIdStr, outRpc: RPC, ihave: RPC.ControlIHave[]): void {
private piggybackGossip(id: PeerIdStr, outRpc: IRPC, ihave: RPC.IControlIHave[]): void {
if (!outRpc.control) {
outRpc.control = { ihave: [], iwant: [], graft: [], prune: [] }
}
Expand All @@ -2172,7 +2179,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
const doPX = this.opts.doPX
for (const [id, topics] of tograft) {
const graft = topics.map((topicID) => ({ topicID }))
let prune: RPC.ControlPrune[] = []
let prune: RPC.IControlPrune[] = []
// If a peer also has prunes, process them now
const pruning = toprune.get(id)
if (pruning) {
Expand Down Expand Up @@ -2275,7 +2282,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
/**
* Adds new IHAVE messages to pending gossip
*/
private pushGossip(id: PeerIdStr, controlIHaveMsgs: RPC.ControlIHave): void {
private pushGossip(id: PeerIdStr, controlIHaveMsgs: RPC.IControlIHave): void {
this.log('Add gossip to %s', id)
const gossip = this.gossip.get(id) || []
this.gossip.set(id, gossip.concat(controlIHaveMsgs))
Expand All @@ -2284,7 +2291,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
/**
* Make a PRUNE control message for a peer in a topic
*/
private async makePrune(id: PeerIdStr, topic: string, doPX: boolean): Promise<RPC.ControlPrune> {
private async makePrune(id: PeerIdStr, topic: string, doPX: boolean): Promise<RPC.IControlPrune> {
this.score.prune(id, topic)
if (this.streamsOutbound.get(id)!.protocol === constants.GossipsubIDv10) {
// Gossipsub v1.0 -- no backoff, the peer won't be able to parse it anyway
Expand All @@ -2296,7 +2303,7 @@ export class GossipSub extends EventEmitter<GossipsubEvents> implements Initiali
// backoff is measured in seconds
// GossipsubPruneBackoff is measured in milliseconds
// The protobuf has it as a uint64
const backoff = BigInt(this.opts.pruneBackoff / 1000)
const backoff = this.opts.pruneBackoff / 1000
if (!doPX) {
return {
topicID: topic,
Expand Down
10 changes: 5 additions & 5 deletions src/message-cache.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ export type CacheEntry = MessageId & {
}

interface MessageCacheEntry {
message: RPC.Message
message: RPC.IMessage
/**
* Tracks if the message has been validated by the app layer and thus forwarded
*/
Expand Down Expand Up @@ -55,7 +55,7 @@ export class MessageCache {
* Adds a message to the current window and the cache
* Returns true if the message is not known and is inserted in the cache
*/
put(messageId: MessageId, msg: RPC.Message, validated = false): boolean {
put(messageId: MessageId, msg: RPC.IMessage, validated = false): boolean {
const { msgIdStr } = messageId
// Don't add duplicate entries to the cache.
if (this.msgs.has(msgIdStr)) {
Expand Down Expand Up @@ -90,15 +90,15 @@ export class MessageCache {
/**
* Retrieves a message from the cache by its ID, if it is still present
*/
get(msgId: Uint8Array): RPC.Message | undefined {
get(msgId: Uint8Array): RPC.IMessage | undefined {
return this.msgs.get(this.msgIdToStrFn(msgId))?.message
}

/**
* Increases the iwant count for the given message by one and returns the message together
* with the iwant if the message exists.
*/
getWithIWantCount(msgIdStr: string, p: string): { msg: RPC.Message; count: number } | null {
getWithIWantCount(msgIdStr: string, p: string): { msg: RPC.IMessage; count: number } | null {
const msg = this.msgs.get(msgIdStr)
if (!msg) {
return null
Expand Down Expand Up @@ -137,7 +137,7 @@ export class MessageCache {
* This function also returns the known peers that have sent us this message. This is used to
* prevent us sending redundant messages to peers who have already propagated it.
*/
validate(msgId: MsgIdStr): { message: RPC.Message; originatingPeers: Set<PeerIdStr> } | null {
validate(msgId: MsgIdStr): { message: RPC.IMessage; originatingPeers: Set<PeerIdStr> } | null {
const entry = this.msgs.get(msgId)
if (!entry) {
return null
Expand Down
Loading