Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include dhstore into e2e test #1248

Merged
merged 1 commit into from
Feb 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions api/v0/finder/client/http/dhash_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,21 +26,28 @@ const (
type DHashClient struct {
Client

metadataUrl string
pcache *providerCache
dhstoreUrl string
dhFinderUrl string
dhMetadataUrl string
pcache *providerCache
}

// NewDHashClient instantiates a new client that uses Reader Privacy API for querying data.
// It requires more roundtrips to fullfill one query however it also protects the user from a passive observer.
func NewDHashClient(baseURL string, options ...httpclient.Option) (*DHashClient, error) {
c, err := New(baseURL, options...)
// dhstoreUrl specifies the URL of the double hashed store that can respond to find encrypted multihash and find encrypted metadata requests.
// stiUrl specifies the URL of storetheindex that can respond to find provider requests.
// dhstoreUrl and stiUrl are expected to be the same when these services are deployed behing a proxy - indexstar.
func NewDHashClient(dhstoreUrl string, stiUrl string, options ...httpclient.Option) (*DHashClient, error) {
c, err := New(stiUrl, options...)
if err != nil {
return nil, err
}

return &DHashClient{
Client: *c,
metadataUrl: baseURL + metadataPath,
Client: *c,
dhstoreUrl: dhstoreUrl,
dhFinderUrl: dhstoreUrl + finderPath,
dhMetadataUrl: dhstoreUrl + metadataPath,
pcache: &providerCache{
ttl: pcacheTtl,
pinfos: make(map[peer.ID]*pinfoWrapper),
Expand All @@ -57,7 +64,7 @@ func (c *DHashClient) Find(ctx context.Context, mh multihash.Multihash) (*model.
if err != nil {
return nil, err
}
u := fmt.Sprint(c.finderURL, "/", smh.B58String())
u := fmt.Sprint(c.dhFinderUrl, "/", smh.B58String())
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -141,7 +148,7 @@ func (c *DHashClient) decryptFindResponse(ctx context.Context, resp *model.FindR
}

func (c *DHashClient) fetchMetadata(ctx context.Context, vk []byte) ([]byte, error) {
u := fmt.Sprint(c.metadataUrl, "/", b58.Encode(dhash.SHA256(vk, nil)))
u := fmt.Sprint(c.dhMetadataUrl, "/", b58.Encode(dhash.SHA256(vk, nil)))
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
if err != nil {
return nil, err
Expand Down Expand Up @@ -208,8 +215,10 @@ func (pc *providerCache) getResults(ctx context.Context, pid peer.ID, ctxID []by
cxps: make(map[string]*model.ContextualExtendedProviders),
}
pc.pinfos[pinfo.AddrInfo.ID] = wrapper
for _, cxp := range pinfo.ExtendedProviders.Contextual {
wrapper.cxps[cxp.ContextID] = &cxp
if pinfo.ExtendedProviders != nil {
for _, cxp := range pinfo.ExtendedProviders.Contextual {
wrapper.cxps[cxp.ContextID] = &cxp
}
}
}

Expand All @@ -221,6 +230,11 @@ func (pc *providerCache) getResults(ctx context.Context, pid peer.ID, ctxID []by
Provider: wrapper.pinfo.AddrInfo,
})

// return results if there are no further extended providers to unpack
if wrapper.pinfo.ExtendedProviders == nil {
return results, nil
}

// If override is set to true at the context level then the chain
// level EPs should be ignored for this context ID
override := false
Expand Down
5 changes: 5 additions & 0 deletions command/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ var initFlags = []cli.Flag{
Usage: "Configure the indexer to work with an assigner service",
Required: false,
},
&cli.StringFlag{
Name: "dhstore",
Usage: "Url of DHStore for double hashed index",
Required: false,
},
}

var providersGetFlags = []cli.Flag{
Expand Down
7 changes: 7 additions & 0 deletions command/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,5 +139,12 @@ func initCommand(cctx *cli.Context) error {
cfg.Discovery.UseAssigner = true
}

dhstoreUrl := cctx.String("dhstore")
if dhstoreUrl != "" {
log.Infow("dhstoreUrl", dhstoreUrl)
cfg.Indexer.DHStoreURL = dhstoreUrl
cfg.Indexer.DHBatchSize = -1
}

return cfg.Save(configFile)
}
52 changes: 51 additions & 1 deletion e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"testing"
"time"

finderhttpclient "github.com/ipni/storetheindex/api/v0/finder/client/http"
"github.com/ipni/storetheindex/config"
"github.com/multiformats/go-multihash"
"github.com/stretchr/testify/require"
)

Expand All @@ -36,6 +38,7 @@ type e2eTestRunner struct {

indexerReady chan struct{}
providerReady chan struct{}
dhstoreReady chan struct{}
providerHasPeer chan struct{}
}

Expand Down Expand Up @@ -86,6 +89,10 @@ func (e *e2eTestRunner) start(prog string, args ...string) *exec.Cmd {
} else if strings.Contains(line, "admin http server listening") {
e.providerReady <- struct{}{}
}
case "dhstore":
if strings.Contains(line, "Store opened.") {
e.dhstoreReady <- struct{}{}
}
}
}
}()
Expand Down Expand Up @@ -132,6 +139,7 @@ func TestEndToEndWithReferenceProvider(t *testing.T) {

indexerReady: make(chan struct{}, 1),
providerReady: make(chan struct{}, 1),
dhstoreReady: make(chan struct{}, 1),
providerHasPeer: make(chan struct{}, 1),
}

Expand Down Expand Up @@ -175,47 +183,73 @@ func TestEndToEndWithReferenceProvider(t *testing.T) {
e.env = append(e.env, fmt.Sprintf("%s=%s", name, out))
}

// install storetheindex
indexer := filepath.Join(e.dir, "storetheindex")
e.run("go", "install", ".")

provider := filepath.Join(e.dir, "provider")
dhstore := filepath.Join(e.dir, "dhstore")

cwd, err := os.Getwd()
require.NoError(t, err)

// install index-provider
err = os.Chdir(e.dir)
require.NoError(t, err)
e.run("git", "clone", "https://github.com/ipni/index-provider.git")
err = os.Chdir("index-provider/cmd/provider")
require.NoError(t, err)
e.run("go", "install")

// install dhstore
err = os.Chdir(e.dir)
require.NoError(t, err)
e.run("git", "clone", "https://github.com/ipni/dhstore.git", "dhstore_repo")
err = os.Chdir("dhstore_repo/cmd/dhstore")
require.NoError(t, err)
e.run("go", "install")

err = os.Chdir(cwd)
require.NoError(t, err)

// initialize index-provider
e.run(provider, "init")
cfg, err := config.Load(filepath.Join(e.dir, ".index-provider", "config"))
require.NoError(t, err)
providerID := cfg.Identity.PeerID
t.Logf("Initialized provider ID: %s", providerID)

e.run(indexer, "init", "--store", "sth", "--pubsub-topic", "/indexer/ingest/mainnet", "--no-bootstrap")
// initialize indexer
e.run(indexer, "init", "--store", "sth", "--pubsub-topic", "/indexer/ingest/mainnet", "--no-bootstrap", "--dhstore", "http://127.0.0.1:40080")
cfg, err = config.Load(filepath.Join(e.dir, ".storetheindex", "config"))
require.NoError(t, err)
indexerID := cfg.Identity.PeerID

// start provider
cmdProvider := e.start(provider, "daemon")
select {
case <-e.providerReady:
case <-ctx.Done():
t.Fatal("timed out waiting for provider to start")
}

// start dhstore
cmdDhstore := e.start(dhstore, "--storePath", e.dir)
select {
case <-e.dhstoreReady:
case <-ctx.Done():
t.Fatal("timed out waiting for dhstore to start")
}

// start indexer
cmdIndexer := e.start(indexer, "daemon")
select {
case <-e.indexerReady:
case <-ctx.Done():
t.Fatal("timed out waiting for indexer to start")
}

// connect provider to the indexer
e.run(provider, "connect",
"--imaddr", fmt.Sprintf("/dns/localhost/tcp/3003/p2p/%s", indexerID),
"--listen-admin", "http://localhost:3102",
Expand Down Expand Up @@ -262,6 +296,21 @@ func TestEndToEndWithReferenceProvider(t *testing.T) {
// Check that IndexCount with correct value appears in providers output.
require.Contains(t, string(outProvider), "IndexCount: 1043")

// Create double hashed client and verify that the multihashes ended up in dhstore
client, err := finderhttpclient.NewDHashClient("http://127.0.0.1:40080", "http://127.0.0.1:3000")
require.NoError(t, err)

mh, err := multihash.FromB58String("2DrjgbFdhNiSJghFWcQbzw6E8y4jU1Z7ZsWo3dJbYxwGTNFmAj")
require.NoError(t, err)

dhResp, err := client.Find(e.ctx, mh)
require.NoError(t, err)

require.Equal(t, 1, len(dhResp.MultihashResults))
require.Equal(t, dhResp.MultihashResults[0].Multihash, mh)
require.Equal(t, 1, len(dhResp.MultihashResults[0].ProviderResults))
require.Equal(t, providerID, dhResp.MultihashResults[0].ProviderResults[0].Provider.ID.String())

// Remove a car file from the provider. This will cause the provider to
// publish an advertisement that tells the indexer to remove the car file
// content by contextID. The indexer will then import the advertisement
Expand Down Expand Up @@ -336,6 +385,7 @@ func TestEndToEndWithReferenceProvider(t *testing.T) {

e.stop(cmdIndexer, time.Second)
e.stop(cmdProvider, time.Second)
e.stop(cmdDhstore, time.Second)
}

func retryWithTimeout(t *testing.T, timeout time.Duration, timeBetweenRetries time.Duration, retryableFn func() error) {
Expand Down