diff --git a/README.md b/README.md index de62b49..05b75b5 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,9 @@ # @libp2p/floodsub [![libp2p.io](https://img.shields.io/badge/project-libp2p-yellow.svg?style=flat-square)](http://libp2p.io/) -[![IRC](https://img.shields.io/badge/freenode-%23libp2p-yellow.svg?style=flat-square)](http://webchat.freenode.net/?channels=%23libp2p) [![Discuss](https://img.shields.io/discourse/https/discuss.libp2p.io/posts.svg?style=flat-square)](https://discuss.libp2p.io) [![codecov](https://img.shields.io/codecov/c/github/libp2p/js-libp2p-floodsub.svg?style=flat-square)](https://codecov.io/gh/libp2p/js-libp2p-floodsub) -[![CI](https://img.shields.io/github/workflow/status/libp2p/js-libp2p-interfaces/test%20&%20maybe%20release/master?style=flat-square)](https://github.com/libp2p/js-libp2p-floodsub/actions/workflows/js-test-and-release.yml) +[![CI](https://img.shields.io/github/workflow/status/libp2p/js-libp2p-floodsub/test%20&%20maybe%20release/master?style=flat-square)](https://github.com/libp2p/js-libp2p-floodsub/actions/workflows/js-test-and-release.yml) > libp2p-floodsub, also known as pubsub-flood or just dumbsub, this implementation of pubsub focused on delivering an API for Publish/Subscribe, but with no CastTree Forming (it just floods the network). @@ -33,18 +32,21 @@ Instead please use [gossipsub](https://www.npmjs.com/package/@chainsafe/libp2p-g ## Usage ```JavaScript -import { FloodSub } from '@libp2p/floodsub' +import { createLibp2pNode } from 'libp2p' +import { floodsub } from '@libp2p/floodsub' -const fsub = new FloodSub() - -await fsub.start() +const node = await createLibp2pNode({ + pubsub: floodsub() + //... other options +}) +await node.start() -fsub.addEventListener('message', (data) => { - console.log(data) +node.pubsub.subscribe('fruit') +node.pubsub.addEventListener('message', (evt) => { + console.log(evt) }) -fsub.subscribe('fruit') -fsub.publish('fruit', new TextEncoder().encode('banana')) +node.pubsub.publish('fruit', new TextEncoder().encode('banana')) ``` ## License diff --git a/package.json b/package.json index 4f60aed..7b08200 100644 --- a/package.json +++ b/package.json @@ -150,23 +150,22 @@ "@libp2p/interface-peer-id": "^1.0.2", "@libp2p/interface-pubsub": "^3.0.0", "@libp2p/logger": "^2.0.0", - "@libp2p/pubsub": "^4.0.0", - "protons-runtime": "^3.1.0", + "@libp2p/pubsub": "^5.0.0", + "protons-runtime": "^4.0.1", "uint8arraylist": "^2.1.1", - "uint8arrays": "^3.0.0" + "uint8arrays": "^4.0.2" }, "devDependencies": { - "@libp2p/components": "^3.0.1", - "@libp2p/interface-mocks": "^6.0.1", - "@libp2p/interface-pubsub-compliance-tests": "^2.0.1", + "@libp2p/interface-mocks": "^7.0.1", + "@libp2p/interface-pubsub-compliance-tests": "^4.0.0", "@libp2p/peer-collections": "^2.0.0", "@libp2p/peer-id": "^1.1.10", "@libp2p/peer-id-factory": "^1.0.9", "@multiformats/multiaddr": "^11.0.3", "aegir": "^37.2.0", - "multiformats": "^9.4.5", + "multiformats": "^10.0.0", "p-wait-for": "^5.0.0", - "protons": "^5.1.0", + "protons": "^6.0.0", "sinon": "^14.0.0", "wherearewe": "^2.0.1" } diff --git a/src/index.ts b/src/index.ts index 9247bb3..3e4986b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,8 +1,8 @@ import { toString } from 'uint8arrays/to-string' -import { PubSubBaseProtocol } from '@libp2p/pubsub' +import { PubSubBaseProtocol, PubSubComponents } from '@libp2p/pubsub' import { multicodec } from './config.js' import { SimpleTimeCache } from './cache.js' -import type { PubSubInit, Message, PubSubRPC, PubSubRPCMessage, PublishResult } from '@libp2p/interface-pubsub' +import type { PubSubInit, Message, PubSubRPC, PubSubRPCMessage, PublishResult, PubSub } from '@libp2p/interface-pubsub' import type { PeerId } from '@libp2p/interface-peer-id' import { logger } from '@libp2p/logger' import { RPC } from './message/rpc.js' @@ -16,6 +16,10 @@ export interface FloodSubInit extends PubSubInit { seenTTL?: number } +export interface FloodSubComponents extends PubSubComponents { + +} + /** * FloodSub (aka dumbsub is an implementation of pubsub focused on * delivering an API for Publish/Subscribe, but with no CastTree Forming @@ -24,8 +28,8 @@ export interface FloodSubInit extends PubSubInit { export class FloodSub extends PubSubBaseProtocol { public seenCache: SimpleTimeCache - constructor (init?: FloodSubInit) { - super({ + constructor (components: FloodSubComponents, init?: FloodSubInit) { + super(components, { ...init, canRelayMessage: true, multicodecs: [multicodec] @@ -94,7 +98,7 @@ export class FloodSub extends PubSubBaseProtocol { } peers.forEach(id => { - if (this.components.getPeerId().equals(id)) { + if (this.components.peerId.equals(id)) { log('not sending message on topic %s to myself', message.topic) return } @@ -113,3 +117,7 @@ export class FloodSub extends PubSubBaseProtocol { return { recipients } } } + +export function floodsub (init: FloodSubInit = {}): (components: FloodSubComponents) => PubSub { + return (components: FloodSubComponents) => new FloodSub(components, init) +} diff --git a/src/message/rpc.ts b/src/message/rpc.ts index 8e0bec6..7cd3862 100644 --- a/src/message/rpc.ts +++ b/src/message/rpc.ts @@ -1,5 +1,7 @@ /* eslint-disable import/export */ +/* eslint-disable complexity */ /* eslint-disable @typescript-eslint/no-namespace */ +/* eslint-disable @typescript-eslint/no-unnecessary-boolean-literal-compare */ import { encodeMessage, decodeMessage, message } from 'protons-runtime' import type { Uint8ArrayList } from 'uint8arraylist' @@ -22,23 +24,23 @@ export namespace RPC { export const codec = (): Codec => { if (_codec == null) { - _codec = message((obj, writer, opts = {}) => { + _codec = message((obj, w, opts = {}) => { if (opts.lengthDelimited !== false) { - writer.fork() + w.fork() } if (obj.subscribe != null) { - writer.uint32(8) - writer.bool(obj.subscribe) + w.uint32(8) + w.bool(obj.subscribe) } if (obj.topic != null) { - writer.uint32(18) - writer.string(obj.topic) + w.uint32(18) + w.string(obj.topic) } if (opts.lengthDelimited !== false) { - writer.ldelim() + w.ldelim() } }, (reader, length) => { const obj: any = {} @@ -91,43 +93,43 @@ export namespace RPC { export const codec = (): Codec => { if (_codec == null) { - _codec = message((obj, writer, opts = {}) => { + _codec = message((obj, w, opts = {}) => { if (opts.lengthDelimited !== false) { - writer.fork() + w.fork() } if (obj.from != null) { - writer.uint32(10) - writer.bytes(obj.from) + w.uint32(10) + w.bytes(obj.from) } if (obj.data != null) { - writer.uint32(18) - writer.bytes(obj.data) + w.uint32(18) + w.bytes(obj.data) } if (obj.sequenceNumber != null) { - writer.uint32(26) - writer.bytes(obj.sequenceNumber) + w.uint32(26) + w.bytes(obj.sequenceNumber) } if (obj.topic != null) { - writer.uint32(34) - writer.string(obj.topic) + w.uint32(34) + w.string(obj.topic) } if (obj.signature != null) { - writer.uint32(42) - writer.bytes(obj.signature) + w.uint32(42) + w.bytes(obj.signature) } if (obj.key != null) { - writer.uint32(50) - writer.bytes(obj.key) + w.uint32(50) + w.bytes(obj.key) } if (opts.lengthDelimited !== false) { - writer.ldelim() + w.ldelim() } }, (reader, length) => { const obj: any = {} @@ -182,36 +184,38 @@ export namespace RPC { export const codec = (): Codec => { if (_codec == null) { - _codec = message((obj, writer, opts = {}) => { + _codec = message((obj, w, opts = {}) => { if (opts.lengthDelimited !== false) { - writer.fork() + w.fork() } if (obj.subscriptions != null) { for (const value of obj.subscriptions) { - writer.uint32(10) - RPC.SubOpts.codec().encode(value, writer) + w.uint32(10) + RPC.SubOpts.codec().encode(value, w, { + writeDefaults: true + }) } - } else { - throw new Error('Protocol error: required field "subscriptions" was not found in object') } if (obj.messages != null) { for (const value of obj.messages) { - writer.uint32(18) - RPC.Message.codec().encode(value, writer) + w.uint32(18) + RPC.Message.codec().encode(value, w, { + writeDefaults: true + }) } - } else { - throw new Error('Protocol error: required field "messages" was not found in object') } if (obj.control != null) { - writer.uint32(26) - ControlMessage.codec().encode(obj.control, writer) + w.uint32(26) + ControlMessage.codec().encode(obj.control, w, { + writeDefaults: false + }) } if (opts.lengthDelimited !== false) { - writer.ldelim() + w.ldelim() } }, (reader, length) => { const obj: any = { @@ -268,49 +272,49 @@ export namespace ControlMessage { export const codec = (): Codec => { if (_codec == null) { - _codec = message((obj, writer, opts = {}) => { + _codec = message((obj, w, opts = {}) => { if (opts.lengthDelimited !== false) { - writer.fork() + w.fork() } if (obj.ihave != null) { for (const value of obj.ihave) { - writer.uint32(10) - ControlIHave.codec().encode(value, writer) + w.uint32(10) + ControlIHave.codec().encode(value, w, { + writeDefaults: true + }) } - } else { - throw new Error('Protocol error: required field "ihave" was not found in object') } if (obj.iwant != null) { for (const value of obj.iwant) { - writer.uint32(18) - ControlIWant.codec().encode(value, writer) + w.uint32(18) + ControlIWant.codec().encode(value, w, { + writeDefaults: true + }) } - } else { - throw new Error('Protocol error: required field "iwant" was not found in object') } if (obj.graft != null) { for (const value of obj.graft) { - writer.uint32(26) - ControlGraft.codec().encode(value, writer) + w.uint32(26) + ControlGraft.codec().encode(value, w, { + writeDefaults: true + }) } - } else { - throw new Error('Protocol error: required field "graft" was not found in object') } if (obj.prune != null) { for (const value of obj.prune) { - writer.uint32(34) - ControlPrune.codec().encode(value, writer) + w.uint32(34) + ControlPrune.codec().encode(value, w, { + writeDefaults: true + }) } - } else { - throw new Error('Protocol error: required field "prune" was not found in object') } if (opts.lengthDelimited !== false) { - writer.ldelim() + w.ldelim() } }, (reader, length) => { const obj: any = { @@ -370,27 +374,25 @@ export namespace ControlIHave { export const codec = (): Codec => { if (_codec == null) { - _codec = message((obj, writer, opts = {}) => { + _codec = message((obj, w, opts = {}) => { if (opts.lengthDelimited !== false) { - writer.fork() + w.fork() } if (obj.topic != null) { - writer.uint32(10) - writer.string(obj.topic) + w.uint32(10) + w.string(obj.topic) } if (obj.messageIDs != null) { for (const value of obj.messageIDs) { - writer.uint32(18) - writer.bytes(value) + w.uint32(18) + w.bytes(value) } - } else { - throw new Error('Protocol error: required field "messageIDs" was not found in object') } if (opts.lengthDelimited !== false) { - writer.ldelim() + w.ldelim() } }, (reader, length) => { const obj: any = { @@ -440,22 +442,20 @@ export namespace ControlIWant { export const codec = (): Codec => { if (_codec == null) { - _codec = message((obj, writer, opts = {}) => { + _codec = message((obj, w, opts = {}) => { if (opts.lengthDelimited !== false) { - writer.fork() + w.fork() } if (obj.messageIDs != null) { for (const value of obj.messageIDs) { - writer.uint32(10) - writer.bytes(value) + w.uint32(10) + w.bytes(value) } - } else { - throw new Error('Protocol error: required field "messageIDs" was not found in object') } if (opts.lengthDelimited !== false) { - writer.ldelim() + w.ldelim() } }, (reader, length) => { const obj: any = { @@ -502,18 +502,18 @@ export namespace ControlGraft { export const codec = (): Codec => { if (_codec == null) { - _codec = message((obj, writer, opts = {}) => { + _codec = message((obj, w, opts = {}) => { if (opts.lengthDelimited !== false) { - writer.fork() + w.fork() } if (obj.topic != null) { - writer.uint32(10) - writer.string(obj.topic) + w.uint32(10) + w.string(obj.topic) } if (opts.lengthDelimited !== false) { - writer.ldelim() + w.ldelim() } }, (reader, length) => { const obj: any = {} @@ -560,32 +560,32 @@ export namespace ControlPrune { export const codec = (): Codec => { if (_codec == null) { - _codec = message((obj, writer, opts = {}) => { + _codec = message((obj, w, opts = {}) => { if (opts.lengthDelimited !== false) { - writer.fork() + w.fork() } if (obj.topic != null) { - writer.uint32(10) - writer.string(obj.topic) + w.uint32(10) + w.string(obj.topic) } if (obj.peers != null) { for (const value of obj.peers) { - writer.uint32(18) - PeerInfo.codec().encode(value, writer) + w.uint32(18) + PeerInfo.codec().encode(value, w, { + writeDefaults: true + }) } - } else { - throw new Error('Protocol error: required field "peers" was not found in object') } if (obj.backoff != null) { - writer.uint32(24) - writer.uint64(obj.backoff) + w.uint32(24) + w.uint64(obj.backoff) } if (opts.lengthDelimited !== false) { - writer.ldelim() + w.ldelim() } }, (reader, length) => { const obj: any = { @@ -639,23 +639,23 @@ export namespace PeerInfo { export const codec = (): Codec => { if (_codec == null) { - _codec = message((obj, writer, opts = {}) => { + _codec = message((obj, w, opts = {}) => { if (opts.lengthDelimited !== false) { - writer.fork() + w.fork() } if (obj.peerID != null) { - writer.uint32(10) - writer.bytes(obj.peerID) + w.uint32(10) + w.bytes(obj.peerID) } if (obj.signedPeerRecord != null) { - writer.uint32(18) - writer.bytes(obj.signedPeerRecord) + w.uint32(18) + w.bytes(obj.signedPeerRecord) } if (opts.lengthDelimited !== false) { - writer.ldelim() + w.ldelim() } }, (reader, length) => { const obj: any = {} diff --git a/test/compliance.spec.ts b/test/compliance.spec.ts index 6e1cecd..ded20c1 100644 --- a/test/compliance.spec.ts +++ b/test/compliance.spec.ts @@ -10,8 +10,7 @@ describe('interface compliance', () => { throw new Error('PubSubOptions is required') } - const pubsub = new FloodSub(args.init) - pubsub.init(args.components) + const pubsub = new FloodSub(args.components, args.init) return pubsub }, diff --git a/test/floodsub.spec.ts b/test/floodsub.spec.ts index ffd577c..4b7aae0 100644 --- a/test/floodsub.spec.ts +++ b/test/floodsub.spec.ts @@ -11,7 +11,6 @@ import { FloodSub, multicodec } from '../src/index.js' import { createEd25519PeerId } from '@libp2p/peer-id-factory' import { mockRegistrar } from '@libp2p/interface-mocks' import pWaitFor from 'p-wait-for' -import { Components } from '@libp2p/components' import { PeerSet } from '@libp2p/peer-collections' const topic = 'my-topic' @@ -24,13 +23,12 @@ describe('floodsub', () => { expect(multicodec).to.exist() floodsub = new FloodSub({ + peerId: await createEd25519PeerId(), + registrar: mockRegistrar() + }, { emitSelf: true, globalSignaturePolicy: StrictNoSign }) - floodsub.init(new Components({ - peerId: await createEd25519PeerId(), - registrar: mockRegistrar() - })) }) beforeEach(async () => {