Skip to content

Commit

Permalink
exp/lighthorizon/cmd: index batch fix s3 sub paths in reduce (#4552)
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland authored Aug 31, 2022
1 parent f8fcb05 commit a8f9750
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 27 deletions.
11 changes: 6 additions & 5 deletions .github/workflows/horizon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ jobs:
- if: github.ref == 'refs/heads/master'
name: Push to DockerHub
run: docker push stellar/horizon-verify-range:latest

horizon-light:
name: Test and Push the horizon light images
runs-on: ubuntu-latest
Expand All @@ -141,18 +142,18 @@ jobs:
docker run -e ARCHIVE_TARGET=file:///ledgerexport\
-e START=5\
-e END=150\
-e NETWORK_PASSPHRASE="Test SDF Network ; September 2015"\
-e CAPTIVE_CORE_CONFIG="/captive-core-testnet.cfg"\
-e HISTORY_ARCHIVE_URLS="https://history.stellar.org/prd/core-testnet/core_testnet_001,https://history.stellar.org/prd/core-testnet/core_testnet_002"\
-e NETWORK_PASSPHRASE="Public Global Stellar Network ; September 2015"\
-e CAPTIVE_CORE_CONFIG="/captive-core-pubnet.cfg"\
-e HISTORY_ARCHIVE_URLS="https://history.stellar.org/prd/core-live/core_live_001"\
-v $PWD/ledgerexport:/ledgerexport\
stellar/lighthorizon-ledgerexporter
# run map job
docker run -e NETWORK_PASSPHRASE='testnet' -e JOB_INDEX_ENV=AWS_BATCH_JOB_ARRAY_INDEX -e AWS_BATCH_JOB_ARRAY_INDEX=0 -e BATCH_SIZE=64 -e FIRST_CHECKPOINT=64 \
docker run -e NETWORK_PASSPHRASE='pubnet' -e JOB_INDEX_ENV=AWS_BATCH_JOB_ARRAY_INDEX -e AWS_BATCH_JOB_ARRAY_INDEX=0 -e BATCH_SIZE=64 -e FIRST_CHECKPOINT=64 \
-e WORKER_COUNT=1 -e RUN_MODE=map -v $PWD/ledgerexport:/ledgermeta -e TXMETA_SOURCE=file:///ledgermeta -v $PWD/index:/index -e INDEX_TARGET=file:///index stellar/lighthorizon-index-batch
# run reduce job
docker run -e NETWORK_PASSPHRASE='testnet' -e JOB_INDEX_ENV=AWS_BATCH_JOB_ARRAY_INDEX -e AWS_BATCH_JOB_ARRAY_INDEX=0 -e MAP_JOB_COUNT=1 -e REDUCE_JOB_COUNT=1 \
docker run -e NETWORK_PASSPHRASE='pubnet' -e JOB_INDEX_ENV=AWS_BATCH_JOB_ARRAY_INDEX -e AWS_BATCH_JOB_ARRAY_INDEX=0 -e MAP_JOB_COUNT=1 -e REDUCE_JOB_COUNT=1 \
-e WORKER_COUNT=1 -e RUN_MODE=reduce -v $PWD/index:/index -e INDEX_SOURCE_ROOT=file:///index -e INDEX_TARGET=file:///index stellar/lighthorizon-index-batch
# Push images
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/build/index-batch/README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# `stellar/horizon-indexer`
# `stellar/lighthorizon-index-batch`

This docker image contains the ledger/checkpoint indexing executables. It allows running multiple instances of `map`/`reduce` on a single machine or running it in [AWS Batch](https://aws.amazon.com/batch/).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ spec:
- name: RUN_MODE
value: "reduce"
- name: MAP_JOB_COUNT
value: 52
value: "52"
- name: REDUCE_JOB_COUNT
value: 52
value: "52"
- name: WORKER_COUNT
value: 8
value: "8"
- name: INDEX_SOURCE_ROOT
value: "<url of index location>"
- name: JOB_INDEX_ENV
Expand Down
1 change: 0 additions & 1 deletion exp/lighthorizon/build/k8s/lighthorizon_index.yml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ spec:
fluxcd.io/ignore: "true"
prometheus.io/port: "6060"
prometheus.io/scrape: "false"
creationTimestamp: null
labels:
app: lighthorizon-pubnet-index
spec:
Expand Down
2 changes: 1 addition & 1 deletion exp/lighthorizon/index/backend/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func TestSimpleFileStore(t *testing.T) {
// Create a large (beyond a single chunk) list of arbitrary accounts, some
// regular and some muxed.
accountList := make([]string, 123)
for i, _ := range accountList {
for i := range accountList {
var err error
var muxed xdr.MuxedAccount
address := keypair.MustRandom().Address()
Expand Down
36 changes: 22 additions & 14 deletions exp/lighthorizon/index/cmd/batch/reduce/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package main

import (
"encoding/hex"
"fmt"
"hash/fnv"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -42,7 +40,7 @@ func ReduceConfigFromEnvironment() (*ReduceConfig, error) {
indexTargetEnv = "INDEX_TARGET"
)

jobIndexEnv := os.Getenv(jobIndexEnvName)
jobIndexEnv := strings.TrimSpace(os.Getenv(jobIndexEnvName))
if jobIndexEnv == "" {
return nil, errors.New("env variable can't be empty " + jobIndexEnvName)
}
Expand Down Expand Up @@ -114,12 +112,15 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error {
for i := uint32(0); i < config.MapJobCount; i++ {
jobLogger := log.WithField("job", i)

url := filepath.Join(config.IndexRootSource, "job_"+strconv.FormatUint(uint64(i), 10))
jobLogger.Infof("Connecting to %s", url)
jobSubPath := "job_" + strconv.FormatUint(uint64(i), 10)
jobLogger.Infof("Connecting to url %s, sub-path %s", config.IndexRootSource, jobSubPath)
outerJobStore, err := index.ConnectWithConfig(index.StoreConfig{
URL: config.IndexRootSource,
URLSubPath: jobSubPath,
})

outerJobStore, err := index.Connect(url)
if err != nil {
return errors.Wrapf(err, "failed to connect to indices at %s", url)
return errors.Wrapf(err, "failed to connect to indices at %s, sub-path %s", config.IndexRootSource, jobSubPath)
}

accounts, err := outerJobStore.ReadAccounts()
Expand Down Expand Up @@ -201,16 +202,20 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error {
// indices from all jobs that touched this account.
for k := uint32(0); k < config.MapJobCount; k++ {
var jobErr error
url := filepath.Join(config.IndexRootSource, fmt.Sprintf("job_%d", k))

// FIXME: This could probably come from a pool. Every
// worker needs to have a connection to every index
// store, so there's no reason to re-open these for each
// inner loop.
innerJobStore, jobErr := index.Connect(url)
innerJobSubPath := "job_" + strconv.FormatUint(uint64(k), 10)
innerJobStore, jobErr := index.ConnectWithConfig(index.StoreConfig{
URL: config.IndexRootSource,
URLSubPath: innerJobSubPath,
})

if jobErr != nil {
accountLog.WithError(jobErr).
Errorf("Failed to open index at %s", url)
Errorf("Failed to open index at %s, sub-path %s", config.IndexRootSource, innerJobSubPath)
panic(jobErr)
}

Expand All @@ -227,7 +232,7 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error {

if jobErr = mergeIndices(mergedIndices, jobIndices); jobErr != nil {
accountLog.WithError(jobErr).
Errorf("Merge failure for index at %s", url)
Errorf("Merge failure for index at %s, sub-path %s", config.IndexRootSource, innerJobSubPath)
panic(jobErr)
}
}
Expand Down Expand Up @@ -281,12 +286,15 @@ func mergeAllIndices(finalIndexStore index.Store, config *ReduceConfig) error {

prefix := hex.EncodeToString([]byte{b})
for k := uint32(0); k < config.MapJobCount; k++ {
url := filepath.Join(config.IndexRootSource, fmt.Sprintf("job_%d", k))
var innerErr error
innerJobSubPath := "job_" + strconv.FormatUint(uint64(k), 10)
innerJobStore, innerErr := index.ConnectWithConfig(index.StoreConfig{
URL: config.IndexRootSource,
URLSubPath: innerJobSubPath,
})

innerJobStore, innerErr := index.Connect(url)
if innerErr != nil {
txLog.WithError(innerErr).Errorf("Failed to open index at %s", url)
txLog.WithError(innerErr).Errorf("Failed to open index at %s, sub-path %s", config.IndexRootSource, innerJobSubPath)
panic(innerErr)
}

Expand Down
10 changes: 10 additions & 0 deletions exp/lighthorizon/index/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ func ConnectWithConfig(config StoreConfig) (Store, error) {
}
switch parsed.Scheme {
case "s3":
s3Url := fmt.Sprintf("%s/%s", config.URL, config.URLSubPath)
parsed, err = url.Parse(s3Url)
if err != nil {
return nil, err
}
awsConfig := &aws.Config{}
query := parsed.Query()
if region := query.Get("region"); region != "" {
Expand All @@ -33,6 +38,11 @@ func ConnectWithConfig(config StoreConfig) (Store, error) {
return NewS3Store(awsConfig, parsed.Host, parsed.Path, config)

case "file":
fileUrl := filepath.Join(config.URL, config.URLSubPath)
parsed, err = url.Parse(fileUrl)
if err != nil {
return nil, err
}
return NewFileStore(filepath.Join(parsed.Host, parsed.Path), config)

default:
Expand Down
7 changes: 5 additions & 2 deletions exp/lighthorizon/index/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,11 @@ type Store interface {

type StoreConfig struct {
// init time config
URL string
Workers uint32
// the base url for the store resource
URL string
// optional url path to append to the base url to realize the complete url
URLSubPath string
Workers uint32

// runtime config
ClearMemoryOnFlush bool
Expand Down

0 comments on commit a8f9750

Please sign in to comment.