Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added experimental index provider feature to go-ipfs #8771

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,7 @@ docs/examples/go-ipfs-as-a-library/example-folder/Qm*
/parts
/stage
/prime

# ignore vscode config
.vscode
.idea
36 changes: 36 additions & 0 deletions cmd/ipfs/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ const (
enableIPNSPubSubKwd = "enable-namesys-pubsub"
enableMultiplexKwd = "enable-mplex-experiment"
agentVersionSuffix = "agent-version-suffix"
enableIndexerProviderKwd = "index-provider-experiment"
indexerProviderFullKwd = "index-provider-full-import-experiment"

// apiAddrKwd = "address-api"
// swarmAddrKwd = "address-swarm"
)
Expand Down Expand Up @@ -181,6 +184,8 @@ Headers.
cmds.BoolOption(enablePubSubKwd, "Enable experimental pubsub feature. Overrides Pubsub.Enabled config."),
cmds.BoolOption(enableIPNSPubSubKwd, "Enable IPNS over pubsub. Implicitly enables pubsub, overrides Ipns.UsePubsub config."),
cmds.BoolOption(enableMultiplexKwd, "DEPRECATED"),
cmds.StringOption(enableIndexerProviderKwd, "Enable experimental indexer provider feature and provide new Cids that are added, with indexer address"),
cmds.StringOption(indexerProviderFullKwd, "Enable experimental indexer provider feature and provide all locally stored Cids, with indexer address"),
Comment on lines +187 to +188
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to use this area to talk about some potential design issues because I think a thread will be useful here and GitHub doesn't provide them unless you anchor to some code.

My understanding of the design here is that you're planning to run this fork by:

  1. Running ipfs daemon --index-provider-full-import-experiment once, waiting for everything to finish by watching some logs
  2. Kill the node and from then on run ipfs daemon --index-provider-experiment which will periodically notice any new blocks you have added locally and advertise them

Some things that this misses, some of which you might be ok with and some not. (Note: most if not all of these are likely blockers to merging the code upstream, but we can go a step at a time here)

  • If the node crashes before I've advertised some new data I will never advertise it. Unless I run an expensive full import again which will be expensive to the client and server due to how the data is batched up (i.e. there is unlikely to be any overlap with the previous advertisements)
  • If I delete data I don't "unadvertise" it. This is an easy enough fix, although the issue above remains
  • If I change my identifier (whether just multiaddrs because of dynamic DNS, or peerIDs due to rotation) then I lose all my advertisements and I have to rely on the indexer figuring out to drop them
  • If I use the node without running the daemon (e.g. ipfs add while the daemon isn't running) the advertisement will get dropped. Potentially fixable, but at the cost of adding tiny advertisements which IIUC perform poorly with indexer syncing
  • I can only provide everything, not some subset (e.g. pins, roots, etc.)

IIUC a pattern which might be more useful for your use case (a pinning service with lots of data it wants to advertise to the indexers) might look like this:

  • Only advertised pinned data
  • On startup grab the list of pins and diff them against the list of pins you've already advertised
  • Advertise any new ones, de-advertise any missing ones, update the list of pins that have been advertised as you go
    • Note: advertise new ones means walking the DAG, collecting all the CIDs and advertising them
  • Hook into the pinner/put a wrapper around it and whenever a pin is successfully added or removed modify the advertisement list

Some advantages:

  • It maps nicely to what the indexer is trying to do which is track groups of objects that want to be advertised/de-advertised together
  • It doesn't leave your state inconsistent on-crash or require you to put together something like a journaling recovery system to deal with mismatched state on a per-block level (e.g. new block added/removed but not advertised/de-advertised)
  • It's a less costly diff then diffing your whole blockstore

Some disadvantages:

  • Might stress out the indexer system since you may have millions of pins that are relatively small and IIUC the indexer ingestion throughput is limited by needing to walk the linear advertisement list. These millions of small pins is a different set of workload then say a Filecoin storage deal for 32 GBs of data
    • Could potentially be worked around by batching up the pins into groups for the indexer, but it means tracking more locally and needing to update these larger advertisements whenever a pin is added/removed from the batch. The updates might not mean transferring so much data though depending on how the advertisement DAG is structured
  • You still sort of end up with a journaling system on the pin level
    • Requires storing the list of advertised pins, which is itself a bit like a journaling system
    • Startup costs involve doing a diff on the pinsets which could potentially be expensive, this could happen in the background if you're ok waiting a little while for the indexer subsystem to become functional
  • Doesn't handle advertising unpinned blocks on your node (including MFS, although that seems handlable separately if needed)

The above are just some suggestions though, some of the error cases unhandled by this design that can show up might not be a problem at the moment to start with.

cmds.StringOption(agentVersionSuffix, "Optional suffix to the AgentVersion presented by `ipfs id` and also advertised through BitSwap."),

// TODO: add way to override addresses. tricky part: updating the config if also --init.
Expand Down Expand Up @@ -541,6 +546,37 @@ func daemonFunc(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment
fmt.Println("(Hit ctrl-c again to force-shutdown the daemon.)")
}()

idxProviderAddress, idxProviderSet := req.Options[enableIndexerProviderKwd].(string)
idxProviderFullAddress, idxProviderFullSet := req.Options[indexerProviderFullKwd].(string)

// if idxProviderFull is set, we will provide all existing blocks we have. This
// is meant to be a one-time operation, and not meant to be used at the same time
// as the idxProviderSet - which advertises newly added blocks on a timer
idxProviderNode := &ipfsIdxProviderNode{node}
if idxProviderFullSet {
Comment on lines +552 to +556
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New components, whether configurable or not, being instantiated in daemon.go is inconsistent with how we normally initialize things which may lead to issues as with the instantiation of the node.BaseBlocks below.

Instead this logic should live in core/node with the other components. If you want to have this be configurable with a flag rather than just a config file option you can use the IPNS over PubSub (search for ipnsps) example for illustration since it also has a CLI flag. You can also add an indexer provider field to IpfsNode in the event you want to write some extra commands that allow you to introspect on it or just want to work with it once it's running.

Happy to give more guidance here if needed.

err := setupIdxProvider(cctx, idxProviderNode, idxProviderFullAddress)
if err != nil {
log.Errorf("error setting up idx provider: %s", err)
return err
}
err = advertiseAllCids(cctx.Context(), idxProviderNode)
if err != nil {
log.Errorf("error advertising local cids: %s", err)
return err
}
}
if idxProviderSet {
err = setupIdxProvider(cctx, &ipfsIdxProviderNode{node}, idxProviderAddress)
if err != nil {
log.Errorf("error setting up idx provider: %s", err)
return err
}
// wrap the blockstore in a providerBlockstore so we can watch for puts/deletes
pbs := NewProviderBlockstore(node.BaseBlocks)
pbs.startBlockstoreProvider(cctx, idxProviderNode)
node.BaseBlocks = pbs
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doing this is generally unsafe and likely won't help you out with anything since at this point any other parts of the program that already have access to this pointer have already moved on and won't notice this change. This should be done within the dependency injection/node creation.


🐉 🐉 🐉
If you're wondering why your e2e test works at all given that I'm saying it shouldn't, see below:
🐉 🐉 🐉

It's because ipfs add ends up utilizing the base blockstore when doing adds

https://github.com/ipfs/go-ipfs/blob/6c6a55056db47c9982d9d75e86272aba33b453bb/core/coreapi/coreapi.go#L169

https://github.com/ipfs/go-ipfs/blob/6c6a55056db47c9982d9d75e86272aba33b453bb/core/coreapi/unixfs.go#L82

However, other functionality such as ipfs pin does not keep referring back to BaseBlocks but instead to the the pinning object:
https://github.com/ipfs/go-ipfs/blob/6c6a55056db47c9982d9d75e86272aba33b453bb/core/coreapi/coreapi.go#L170

which leverages an existing dagservice (there's a blockstore underneath all that)

https://github.com/ipfs/go-ipfs/blob/6c6a55056db47c9982d9d75e86272aba33b453bb/core/node/core.go#L55

Offhand, I'm not sure if ipfs add using BaseBlocks is a bug or not, but either way setting BaseBlocks isn't what you want to do here.

}

// Give the user heads up if daemon running in online mode has no peers after 1 minute
if !offline {
time.AfterFunc(1*time.Minute, func() {
Expand Down
Loading