Skip to content

Commit

Permalink
stellar#4475: fixed s3 url path building for job index
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland committed Aug 28, 2022
1 parent 4838973 commit b4a5a68
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 17 deletions.
1 change: 0 additions & 1 deletion exp/lighthorizon/build/k8s/lighthorizon_index.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ spec:
fluxcd.io/ignore: "true"
prometheus.io/port: "6060"
prometheus.io/scrape: "false"
creationTimestamp: null
labels:
app: lighthorizon-pubnet-index
spec:
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/index/backend/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestSimpleFileStore(t *testing.T) {
// Create a large (beyond a single chunk) list of arbitrary accounts, some
// regular and some muxed.
accountList := make([]string, 123)
for i, _ := range accountList {
for i := range accountList {
var err error
var muxed xdr.MuxedAccount
address := keypair.MustRandom().Address()
Expand Down
34 changes: 21 additions & 13 deletions exp/lighthorizon/index/cmd/batch/reduce/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package main

import (
"encoding/hex"
"fmt"
"hash/fnv"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -114,12 +112,15 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error {
for i := uint32(0); i < config.MapJobCount; i++ {
jobLogger := log.WithField("job", i)

url := filepath.Join(config.IndexRootSource, "job_"+strconv.FormatUint(uint64(i), 10))
jobLogger.Infof("Connecting to %s", url)
jobSubPath := "job_" + strconv.FormatUint(uint64(i), 10)
jobLogger.Infof("Connecting to url %s, sub-path %s", config.IndexRootSource, jobSubPath)
outerJobStore, err := index.ConnectWithConfig(index.StoreConfig{
URL: config.IndexRootSource,
URLSubPath: jobSubPath,
})

outerJobStore, err := index.Connect(url)
if err != nil {
return errors.Wrapf(err, "failed to connect to indices at %s", url)
return errors.Wrapf(err, "failed to connect to indices at %s, sub-path %s", config.IndexRootSource, jobSubPath)
}

accounts, err := outerJobStore.ReadAccounts()
Expand Down Expand Up @@ -201,16 +202,20 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error {
// indices from all jobs that touched this account.
for k := uint32(0); k < config.MapJobCount; k++ {
var jobErr error
url := filepath.Join(config.IndexRootSource, fmt.Sprintf("job_%d", k))

// FIXME: This could probably come from a pool. Every
// worker needs to have a connection to every index
// store, so there's no reason to re-open these for each
// inner loop.
innerJobStore, jobErr := index.Connect(url)
innerJobSubPath := "job_" + strconv.FormatUint(uint64(k), 10)
innerJobStore, jobErr := index.ConnectWithConfig(index.StoreConfig{
URL: config.IndexRootSource,
URLSubPath: innerJobSubPath,
})

if jobErr != nil {
accountLog.WithError(jobErr).
Errorf("Failed to open index at %s", url)
Errorf("Failed to open index at %s, sub-path %s", config.IndexRootSource, innerJobSubPath)
panic(jobErr)
}

Expand All @@ -227,7 +232,7 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error {

if jobErr = mergeIndices(mergedIndices, jobIndices); jobErr != nil {
accountLog.WithError(jobErr).
Errorf("Merge failure for index at %s", url)
Errorf("Merge failure for index at %s, sub-path %s", config.IndexRootSource, innerJobSubPath)
panic(jobErr)
}
}
Expand Down Expand Up @@ -281,12 +286,15 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error {

prefix := hex.EncodeToString([]byte{b})
for k := uint32(0); k < config.MapJobCount; k++ {
url := filepath.Join(config.IndexRootSource, fmt.Sprintf("job_%d", k))
var innerErr error
innerJobSubPath := "job_" + strconv.FormatUint(uint64(k), 10)
innerJobStore, innerErr := index.ConnectWithConfig(index.StoreConfig{
URL: config.IndexRootSource,
URLSubPath: innerJobSubPath,
})

innerJobStore, innerErr := index.Connect(url)
if innerErr != nil {
txLog.WithError(innerErr).Errorf("Failed to open index at %s", url)
txLog.WithError(innerErr).Errorf("Failed to open index at %s, sub-path %s", config.IndexRootSource, innerJobSubPath)
panic(innerErr)
}

Expand Down
10 changes: 10 additions & 0 deletions exp/lighthorizon/index/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ func ConnectWithConfig(config StoreConfig) (Store, error) {
}
switch parsed.Scheme {
case "s3":
s3Url := fmt.Sprintf("%s/%s", config.URL, config.URLSubPath)
parsed, err = url.Parse(s3Url)
if err != nil {
return nil, err
}
awsConfig := &aws.Config{}
query := parsed.Query()
if region := query.Get("region"); region != "" {
Expand All @@ -33,6 +38,11 @@ func ConnectWithConfig(config StoreConfig) (Store, error) {
return NewS3Store(awsConfig, parsed.Host, parsed.Path, config)

case "file":
fileUrl := filepath.Join(config.URL, config.URLSubPath)
parsed, err = url.Parse(fileUrl)
if err != nil {
return nil, err
}
return NewFileStore(filepath.Join(parsed.Host, parsed.Path), config)

default:
Expand Down
7 changes: 5 additions & 2 deletions exp/lighthorizon/index/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ type Store interface {

type StoreConfig struct {
// init time config
URL string
Workers uint32
// the base url for the store resource
URL string
// optional url path to append to the base url to realize the complete url
URLSubPath string
Workers uint32

// runtime config
ClearMemoryOnFlush bool
Expand Down

0 comments on commit b4a5a68

Please sign in to comment.