Skip to content

Commit

Permalink
fix: update interfaces (#140)
Browse files Browse the repository at this point in the history
Updates to latest code from libp2p/js-libp2p-interfaces#180
  • Loading branch information
achingbrain authored Mar 16, 2022
1 parent 89ae826 commit c6f622c
Show file tree
Hide file tree
Showing 8 changed files with 2,673 additions and 33 deletions.
28 changes: 18 additions & 10 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,10 @@
"extends": "ipfs",
"parserOptions": {
"sourceType": "module"
}
},
"ignorePatterns": [
"*.d.ts"
]
},
"release": {
"branches": [
Expand Down Expand Up @@ -126,7 +129,11 @@
"scripts": {
"lint": "aegir lint",
"dep-check": "aegir dep-check dist/src/**/*.js dist/test/**/*.js",
"build": "tsc",
"build": "tsc && npm run build:copy-proto-files",
"build:copy-proto-files": "mkdirp dist/src/message && cp src/message/*.js src/message/*.d.ts dist/src/message",
"generate": "npm run generate:proto && npm run generate:proto-types",
"generate:proto": "pbjs -t static-module -w es6 -r libp2p-floodsub --force-number --no-verify --no-delimited --no-create --no-beautify --no-defaults --lint eslint-disable -o src/message/rpc.js ./src/message/rpc.proto",
"generate:proto-types": "pbts -o src/message/rpc.d.ts src/message/rpc.js",
"pretest": "npm run build",
"test": "aegir test -f dist/test",
"test:chrome": "npm run test -- -t browser --cov",
Expand All @@ -138,17 +145,18 @@
"release": "semantic-release"
},
"dependencies": {
"@libp2p/interfaces": "^1.3.6",
"@libp2p/logger": "^1.0.3",
"@libp2p/pubsub": "^1.2.4",
"@libp2p/interfaces": "^1.3.14",
"@libp2p/logger": "^1.1.2",
"@libp2p/pubsub": "^1.2.10",
"protobufjs": "^6.11.2",
"uint8arrays": "^3.0.0"
},
"devDependencies": {
"@libp2p/interface-compliance-tests": "^1.0.8",
"@libp2p/peer-id": "^1.1.3",
"@libp2p/peer-id-factory": "^1.0.5",
"@multiformats/multiaddr": "^10.1.5",
"aegir": "^36.1.1",
"@libp2p/interface-compliance-tests": "^1.1.16",
"@libp2p/peer-id": "^1.1.8",
"@libp2p/peer-id-factory": "^1.0.8",
"@multiformats/multiaddr": "^10.1.7",
"aegir": "^36.1.3",
"multiformats": "^9.4.5",
"p-wait-for": "^4.1.0",
"sinon": "^13.0.1",
Expand Down
51 changes: 37 additions & 14 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
import { toString } from 'uint8arrays/to-string'
import { PubsubBaseProtocol } from '@libp2p/pubsub'
import { PubSubBaseProtocol } from '@libp2p/pubsub'
import { multicodec } from './config.js'
import { SimpleTimeCache } from './cache.js'
import type { PubSub, PubSubEvents, PubSubOptions, Message } from '@libp2p/interfaces/pubsub'
import type { PubSub, PubSubEvents, PubSubInit, Message, PubSubRPC, PubSubRPCMessage } from '@libp2p/interfaces/pubsub'
import type { PeerId } from '@libp2p/interfaces/peer-id'
import { logger } from '@libp2p/logger'
import { RPC } from './message/rpc.js'

const debugName = 'libp2p:floodsub'
const log = logger('libp2p:floodsub')

export { multicodec }

export interface FloodSubOptions extends PubSubOptions {
export interface FloodSubInit extends PubSubInit {
seenTTL?: number
}

Expand All @@ -18,13 +20,12 @@ export interface FloodSubOptions extends PubSubOptions {
* delivering an API for Publish/Subscribe, but with no CastTree Forming
* (it just floods the network).
*/
export class FloodSub <EventMap extends PubSubEvents = PubSubEvents> extends PubsubBaseProtocol<EventMap> implements PubSub<EventMap & PubSubEvents> {
export class FloodSub <EventMap extends PubSubEvents = PubSubEvents> extends PubSubBaseProtocol<EventMap> implements PubSub<EventMap & PubSubEvents> {
public seenCache: SimpleTimeCache<boolean>

constructor (options: FloodSubOptions) {
constructor (init?: FloodSubInit) {
super({
...options,
debugName: debugName,
...init,
canRelayMessage: true,
multicodecs: [multicodec]
})
Expand All @@ -35,10 +36,32 @@ export class FloodSub <EventMap extends PubSubEvents = PubSubEvents> extends Pub
* @type {TimeCache}
*/
this.seenCache = new SimpleTimeCache<boolean>({
validityMs: options.seenTTL ?? 30000
validityMs: init?.seenTTL ?? 30000
})
}

/**
* Decode a Uint8Array into an RPC object
*/
decodeRpc (bytes: Uint8Array): PubSubRPC {
return RPC.decode(bytes)
}

/**
* Encode an RPC object into a Uint8Array
*/
encodeRpc (rpc: PubSubRPC): Uint8Array {
return RPC.encode(rpc).finish()
}

decodeMessage (bytes: Uint8Array): PubSubRPCMessage {
return RPC.Message.decode(bytes)
}

encodeMessage (rpc: PubSubRPCMessage): Uint8Array {
return RPC.Message.encode(rpc).finish()
}

/**
* Process incoming message
* Extends base implementation to check router cache.
Expand All @@ -64,22 +87,22 @@ export class FloodSub <EventMap extends PubSubEvents = PubSubEvents> extends Pub
const peers = this.getSubscribers(message.topic)

if (peers == null || peers.length === 0) {
this.log('no peers are subscribed to topic %s', message.topic)
log('no peers are subscribed to topic %s', message.topic)
return
}

peers.forEach(id => {
if (this.peerId.equals(id)) {
this.log('not sending message on topic %s to myself', message.topic)
if (this.components.getPeerId().equals(id)) {
log('not sending message on topic %s to myself', message.topic)
return
}

if (id.equals(from)) {
this.log('not sending message on topic %s to sender %p', message.topic, id)
log('not sending message on topic %s to sender %p', message.topic, id)
return
}

this.log('publish msgs on topics %s %p', message.topic, id)
log('publish msgs on topics %s %p', message.topic, id)

this.send(id, { messages: [message] })
})
Expand Down
Loading

0 comments on commit c6f622c

Please sign in to comment.