Skip to content

Commit

Permalink
Remove Support for data-transfer/graphsync (#143)
Browse files Browse the repository at this point in the history
* Remove Support for data-transfer/graphsync

Support for sync over datatransfer/graphsync has been removed.

* Remove unused options
* NewSubscriber does not need datastore and topic args
* Update comments and dagsync README
  • Loading branch information
gammazero authored Jan 31, 2024
1 parent e98f3fd commit 7441bee
Show file tree
Hide file tree
Showing 27 changed files with 153 additions and 2,127 deletions.
2 changes: 1 addition & 1 deletion announce/p2psender/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
)

// config contains all options for configuring dtsync.publisher.
// config contains all options for configuring ipnisync.publisher.
type config struct {
topic *pubsub.Topic
extraData []byte
Expand Down
69 changes: 44 additions & 25 deletions dagsync/README.md
Original file line number Diff line number Diff line change
@@ -1,38 +1,60 @@
## dagsync

dagsync is an interface for [go-data-transfer](https://github.com/filecoin-project/go-data-transfer),
providing a 1:1 mechanism for maintaining a synchronized [IPLD dag](https://docs.ipld.io/) of data between
a publisher and a subscriber's current state for that publisher.
dagsync is an interface for maintaining a synchronized [IPLD dag](https://docs.ipld.io/) of IPNI advertisements between a publisher and a subscriber's current state for that publisher.

## Usage

Typically an application will be either a provider or a subscriber, but may be both.

### Publisher

Create a dagsync publisher. Update its root to cause it to publish.
Create a dagsync publisher. Update its root to publish a new advertisement. Send announcement messages to inform indexers a new advertisement is available.

```golang
pub, err := NewPublisher(host, dsstore, lsys, "/dagsync/topic")
publisher, err := ipnisync.NewPublisher(linkSys, privKey,
ipnisync.WithHTTPListenAddrs("http://127.0.0.1:0"),
ipnisync.WithStreamHost(publisherStreamHost),
)
if err != nil {
panic(err)
}
...

// Create announcement senders to send advertisement announcements to indexers.
var senders []announce.Sender
httpSender, err := httpsender.New(announceURLs, id)
if err != nil {
panic(err)
}
senders = append(senders, httpSender)
p2pSender, err := p2psender.New(publisherStreamHost, pubTopicName)
if err != nil {
panic(err)
}
senders = append(senders, p2pSender)

// ...

// Publish updated root.
err = publisher.UpdateRoot(ctx, lnk.(cidlink.Link).Cid)
adCid := lnk.(cidlink.Link).Cid
err = publisher.SetRoot(adCid)
if err != nil {
panic(err)
}
// Announce new advertisement.
err := announce.Send(ctx, adCid, adsPublishedHereAddrs, senders...)
if err != nil {
panic(err)
}
```

### Subscriber

The `Subscriber` handles subscribing to a topic, reading messages from the topic and tracking the state of each publisher.
The `Subscriber` reads advertisement chains from index-providers. Its announcement receiver receives libp2p pubsub messages from a topic and direct HTTP announcements. The Subscriber reads advertisements is response to announcement messages.

Create a `Subscriber`:

```golang
sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, "/dagsync/topic", nil)
sub, err := dagsync.NewSubscriber(dstHost, dstLinkSys, dagsync.RecvAnnounce(pubTopicName))
if err != nil {
panic(err)
}
Expand All @@ -46,34 +68,31 @@ defer cancelWatcher()
go watch(watcher)

func watch(notifications <-chan dagsync.SyncFinished) {
for {
syncFinished := <-notifications
// newHead is now available in the local dataStore
}
for {
syncFinished := <-notifications
// newHead is now available in the local dataStore
}
}
```

To shutdown a `Subscriber`, call its `Close()` method.

A `Subscriber` can be created with a function that determines if the `Subscriber` accepts or rejects messages from a publisher. Use the `AllowPeer` option to specify the function.
A `Subscriber` can be created with announce receive options that include a function that determines if the `Subscriber` accepts or rejects announcements from a publisher.
```golang
sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, "/dagsync/topic", nil, dagsync.AllowPeer(allowPeer))
sub, err := dagsync.NewSubscriber(dstHost, dstLinkSys,
dagsync.RecvAnnounce(pubTopicName, announce.WithALlowPeer(allowPeer)),
)

```

The `Subscriber` keeps track of the latest head for each publisher that it has synced. This avoids exchanging the whole DAG from scratch in every update and instead downloads only the part that has not been synced. This value is not persisted as part of the library. If you want to start a `Subscriber` which has already partially synced with a provider you can use the `SetLatestSync` method:
```golang
sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, "/dagsync/topic", nil)
sub, err := dagsync.NewSubscriber(dstHost, dstLinkSys)
if err != nil {
panic(err)
}
// Set up partially synced publishers
if err = sub.SetLatestSync(peerID1, lastSync1) ; err != nil {
panic(err)
}
if err = sub.SetLatestSync(peerID2, lastSync2) ; err != nil {
panic(err)
}
if err = sub.SetLatestSync(peerID3, lastSync3) ; err != nil {
panic(err)
}
sub.SetLatestSync(peerID1, lastSync1)
sub.SetLatestSync(peerID2, lastSync2)
sub.SetLatestSync(peerID3, lastSync3)
```
96 changes: 8 additions & 88 deletions dagsync/announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package dagsync_test

import (
"context"
"sync"
"testing"
"time"

Expand All @@ -17,7 +16,6 @@ import (
"github.com/ipni/go-libipni/announce"
"github.com/ipni/go-libipni/announce/p2psender"
"github.com/ipni/go-libipni/dagsync"
"github.com/ipni/go-libipni/dagsync/dtsync"
"github.com/ipni/go-libipni/dagsync/ipnisync"
"github.com/ipni/go-libipni/dagsync/test"
"github.com/libp2p/go-libp2p/core/host"
Expand All @@ -35,7 +33,7 @@ func TestAnnounceReplace(t *testing.T) {
blocksSeenByHook[c] = struct{}{}
}

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce(),
sub, err := dagsync.NewSubscriber(dstHost, dstLnkS, dagsync.RecvAnnounce(testTopic),
dagsync.BlockHook(blockHook))
require.NoError(t, err)
defer sub.Close()
Expand Down Expand Up @@ -172,7 +170,7 @@ func TestAnnounce_LearnsHttpPublisherAddr(t *testing.T) {
subh := test.MkTestHost(t)
subds := dssync.MutexWrap(datastore.NewMapDatastore())
subls := test.MkLinkSystem(subds)
sub, err := dagsync.NewSubscriber(subh, subds, subls, testTopic, dagsync.RecvAnnounce(), dagsync.StrictAdsSelector(false))
sub, err := dagsync.NewSubscriber(subh, subls, dagsync.RecvAnnounce(testTopic), dagsync.StrictAdsSelector(false))
require.NoError(t, err)
defer sub.Close()

Expand Down Expand Up @@ -226,13 +224,13 @@ func TestAnnounceRepublish(t *testing.T) {

topics := test.WaitForMeshWithMessage(t, testTopic, dstHost, dstHost2)

sub2, err := dagsync.NewSubscriber(dstHost2, dstStore2, dstLnkS2, testTopic,
dagsync.RecvAnnounce(announce.WithTopic(topics[1])), dagsync.StrictAdsSelector(false))
sub2, err := dagsync.NewSubscriber(dstHost2, dstLnkS2,
dagsync.RecvAnnounce("", announce.WithTopic(topics[1])), dagsync.StrictAdsSelector(false))
require.NoError(t, err)
defer sub2.Close()

sub1, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
dagsync.RecvAnnounce(announce.WithTopic(topics[0]), announce.WithResend(true)),
sub1, err := dagsync.NewSubscriber(dstHost, dstLnkS,
dagsync.RecvAnnounce("", announce.WithTopic(topics[0]), announce.WithResend(true)),
dagsync.StrictAdsSelector(false))
require.NoError(t, err)
defer sub1.Close()
Expand Down Expand Up @@ -344,84 +342,6 @@ func TestAllowPeerAllows(t *testing.T) {
}
}

func TestPublisherRejectsPeer(t *testing.T) {
t.Parallel()
// Init dagsync publisher and subscriber
srcStore := dssync.MutexWrap(datastore.NewMapDatastore())
dstStore := dssync.MutexWrap(datastore.NewMapDatastore())

srcHost := test.MkTestHost(t)
dstHost := test.MkTestHost(t)

topics := test.WaitForMeshWithMessage(t, testTopic, srcHost, dstHost)

srcLnkS := test.MkLinkSystem(srcStore)

blockID := dstHost.ID()
var blockMutex sync.Mutex

allowPeer := func(peerID peer.ID) bool {
blockMutex.Lock()
defer blockMutex.Unlock()
return peerID != blockID
}

p2pSender, err := p2psender.New(nil, "", p2psender.WithTopic(topics[0]))
require.NoError(t, err)

pub, err := dtsync.NewPublisher(srcHost, srcStore, srcLnkS, testTopic, dtsync.WithAllowPeer(allowPeer))
require.NoError(t, err)
defer pub.Close()

srcHost.Peerstore().AddAddrs(dstHost.ID(), dstHost.Addrs(), time.Hour)
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)
dstLnkS := test.MkLinkSystem(dstStore)

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic, dagsync.RecvAnnounce(announce.WithTopic(topics[1])))
require.NoError(t, err)
defer sub.Close()

err = srcHost.Connect(context.Background(), dstHost.Peerstore().PeerInfo(dstHost.ID()))
require.NoError(t, err)

require.NoError(t, test.WaitForP2PPublisher(pub, dstHost, testTopic))

watcher, cncl := sub.OnSyncFinished()
defer cncl()

c := mkLnk(t, srcStore)

// Update root with item
pub.SetRoot(c)
err = announce.Send(context.Background(), c, pub.Addrs(), p2pSender)
require.NoError(t, err)

select {
case <-time.After(updateTimeout):
t.Log("publisher blocked")
case <-watcher:
t.Fatal("sync should not have happened with blocked ID")
}

blockMutex.Lock()
blockID = peer.ID("")
blockMutex.Unlock()

c = mkLnk(t, srcStore)

// Update root with item
pub.SetRoot(c)
err = announce.Send(context.Background(), c, pub.Addrs(), p2pSender)
require.NoError(t, err)

select {
case <-time.After(updateTimeout):
t.Fatal("timed out waiting for SyncFinished")
case <-watcher:
t.Log("synced with allowed ID")
}
}

func mkLnk(t *testing.T, srcStore datastore.Batching) cid.Cid {
// Update root with item
np := basicnode.Prototype__Any{}
Expand Down Expand Up @@ -457,8 +377,8 @@ func initPubSub(t *testing.T, srcStore, dstStore datastore.Batching, allowPeer f
dstHost.Peerstore().AddAddrs(srcHost.ID(), srcHost.Addrs(), time.Hour)
dstLnkS := test.MkLinkSystem(dstStore)

sub, err := dagsync.NewSubscriber(dstHost, dstStore, dstLnkS, testTopic,
dagsync.RecvAnnounce(announce.WithTopic(topics[1]), announce.WithAllowPeer(allowPeer)))
sub, err := dagsync.NewSubscriber(dstHost, dstLnkS,
dagsync.RecvAnnounce(testTopic, announce.WithTopic(topics[1]), announce.WithAllowPeer(allowPeer)))
require.NoError(t, err)

err = srcHost.Connect(context.Background(), dstHost.Peerstore().PeerInfo(dstHost.ID()))
Expand Down
Loading

0 comments on commit 7441bee

Please sign in to comment.