Skip to content

Commit

Permalink
Refactor index builder:
Browse files Browse the repository at this point in the history
 - allow worker count to be a command line parameter
 - split work by checkpoints rather than ledgers
 - move actual index insertion work to helpers
 - move progress bar into helpers
 - simplify participants code, payments vs. all
  • Loading branch information
Shaptic committed May 25, 2022
1 parent 4eeba9d commit 90d8ffa
Show file tree
Hide file tree
Showing 2 changed files with 195 additions and 97 deletions.
289 changes: 192 additions & 97 deletions exp/lighthorizon/index/cmd/single/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"flag"
"fmt"
"io"
"math"
"runtime"
"strings"
"sync/atomic"
"time"
Expand All @@ -20,22 +22,20 @@ import (
"golang.org/x/sync/errgroup"
)

var (
// Should we use runtime.NumCPU() for a reasonable default?
parallel = uint32(20)
)

func main() {
sourceUrl := flag.String("source", "gcs://horizon-archive-poc", "history archive url to read txmeta files")
targetUrl := flag.String("target", "file://indexes", "where to write indexes")
networkPassphrase := flag.String("network-passphrase", network.TestNetworkPassphrase, "network passphrase")
start := flag.Int("start", -1, "ledger to start at (default: earliest)")
end := flag.Int("end", -1, "ledger to end at (default: latest)")
start := flag.Int("start", -1, "ledger to start at (inclusive, default: earliest)")
end := flag.Int("end", -1, "ledger to end at (inclusive, default: latest)")
modules := flag.String("modules", "accounts,transactions", "comma-separated list of modules to index (default: all)")
flag.Parse()

log.SetLevel(log.InfoLevel)
// Should we use runtime.NumCPU() for a reasonable default?
// Yes, but leave a CPU open so I can actually use my PC while this runs.
workerCount := flag.Int("workers", runtime.NumCPU()-1, "number of workers (default: # of CPUs - 1)")

flag.Parse()
log.SetLevel(log.InfoLevel)
ctx := context.Background()

indexStore, err := index.Connect(*targetUrl)
Expand All @@ -59,140 +59,218 @@ func main() {

startTime := time.Now()

if *start < 2 {
*start = 2
}
if *end == -1 {
startLedger := uint32(max(*start, 2))
endLedger := uint32(*end)
if endLedger < 0 {
latest, err := ledgerBackend.GetLatestLedgerSequence(ctx)
if err != nil {
panic(err)
}
*end = int(latest)
endLedger = latest
}
startLedger := uint32(*start) //uint32((39680056) / 64)
endLedger := uint32(*end)
all := endLedger - startLedger
ledgerCount := 1 + (endLedger - startLedger) // +1 because endLedger is inclusive
parallel := max(1, *workerCount)

wg, ctx := errgroup.WithContext(ctx)
log.Infof("Creating indices for ledger range: %d through %d (%d ledgers)",
startLedger, endLedger, ledgerCount)
log.Infof("Using %d workers", parallel)

// Create a bunch of workers that process ledgers a checkpoint range at a
// time (better than a ledger at a time to minimize flushes).
wg, ctx := errgroup.WithContext(ctx)
ch := make(chan uint32, parallel)

// Submit the work to the channels, breaking up the range into checkpoints.
go func() {
for i := startLedger; i <= endLedger; i++ {
for i := startLedger; i <= endLedger; i += 64 {
ch <- i
}
close(ch)
}()

processed := uint64(0)
for i := uint32(0); i < parallel; i++ {
for i := 0; i < parallel; i++ {
wg.Go(func() error {
for ledgerSeq := range ch {
fmt.Println("Processing ledger", ledgerSeq)
ledger, err := ledgerBackend.GetLedger(ctx, ledgerSeq)
if err != nil {
log.WithField("error", err).Error("error getting ledgers")
ch <- ledgerSeq
continue
}
for ledgerStartSeq := range ch {
top := min(
64,
// If this is the last checkpoint range, we might not have
// requested a full 64 ledgers, so make sure we cap the
// worker appropriately. We also do +1 here because the
// `endLedger` in the range is inclusive.
int(1+endLedger-ledgerStartSeq),
)

checkpoint := ledgerSeq / 64
for i := 0; i < top; i++ {
ledgerSeq := ledgerStartSeq + uint32(i)

reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(*networkPassphrase, ledger)
if err != nil {
return err
}

for {
tx, err := reader.Read()
ledger, err := ledgerBackend.GetLedger(ctx, ledgerSeq)
if err != nil {
if err == io.EOF {
break
}
return err
}

if strings.Contains(*modules, "transactions") {
indexStore.AddTransactionToIndexes(
toid.New(int32(ledger.LedgerSequence()), int32(tx.Index), 0).ToInt64(),
tx.Result.TransactionHash,
)
log.WithField("error", err).Errorf("error getting ledger %d", ledgerSeq)
ch <- ledgerSeq
continue
}

if strings.Contains(*modules, "accounts") {
allParticipants, err := participantsForOperations(tx, false)
if err != nil {
return err
}
checkpoint := ledgerSeq / 64

err = indexStore.AddParticipantsToIndexes(checkpoint, "all_all", allParticipants)
if err != nil {
return err
}
reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(*networkPassphrase, ledger)
if err != nil {
return err
}

paymentsParticipants, err := participantsForOperations(tx, true)
if err != nil {
for {
tx, err := reader.Read()
if err == io.EOF {
break
} else if err != nil {
return err
}

err = indexStore.AddParticipantsToIndexes(checkpoint, "all_payments", paymentsParticipants)
if err != nil {
return err
}
for _, part := range strings.Split(*modules, ",") {
var err error

if tx.Result.Successful() {
allParticipants, err := participantsForOperations(tx, false)
if err != nil {
return err
switch part {
case "transactions":
err = processTransactionModule(indexStore, ledger, tx)
case "accounts":
err = processAccountsModule(indexStore, checkpoint, tx)
default:
err = fmt.Errorf("unknown module: %s", part)
}

err = indexStore.AddParticipantsToIndexes(checkpoint, "successful_all", allParticipants)
if err != nil {
return err
}

paymentsParticipants, err := participantsForOperations(tx, true)
if err != nil {
return err
}

err = indexStore.AddParticipantsToIndexes(checkpoint, "successful_payments", paymentsParticipants)
if err != nil {
return err
}
}
}
}
}

nprocessed := atomic.AddUint64(&processed, 1)
nprocessed := atomic.AddUint64(&processed, 1)

if nprocessed%100 == 0 {
log.Infof(
"Reading checkpoints... - %.2f%% - elapsed: %s, remaining: %s",
(float64(nprocessed)/float64(all))*100,
time.Since(startTime).Round(1*time.Second),
(time.Duration(int64(time.Since(startTime))*int64(all)/int64(nprocessed)) - time.Since(startTime)).Round(1*time.Second),
)
// 42 is an arbitrary number so we don't get boring
// multiples of 10 as % updates.
if nprocessed%42 == 0 {
postProgress("Reading checkpoints",
nprocessed, uint64(ledgerCount), startTime)
}
}

// Clear indexes to save memory
// Upload indices once per checkpoint to save memory
if err := indexStore.Flush(); err != nil {
return err
}
}

return nil
})
}

if err := wg.Wait(); err != nil {
panic(err)
}
log.Infof("Uploading indexes")

postProgress("Reading checkpoints",
uint64(ledgerCount), uint64(ledgerCount), startTime)

if processed != uint64(ledgerCount) {
panic(fmt.Errorf("wtf? processed %d but expected %d", processed, ledgerCount))
}

log.Infof("Processed %d ledgers via %d workers", processed, parallel)
log.Infof("Uploading indices to %s", *targetUrl)
if err := indexStore.Flush(); err != nil {
panic(err)
}
}

func postProgress(prefix string, done, total uint64, startTime time.Time) {
// This should never happen, more of a runtime assertion for now.
// We can remove it when production-ready.
if done > total {
panic(fmt.Errorf("error for %s: done > total (%d > %d)",
prefix, done, total))
}

progress := float64(done) / float64(total)
elapsed := time.Since(startTime)

// Approximate based on how many ledgers are left and how long this much
// progress took, e.g. if 4/10 took 2s then 6/10 will "take" 3s (though this
// assumes consistent ledger load).
remaining := (float64(elapsed) / float64(done)) * float64(total-done)

var remainingStr string
if math.IsInf(remaining, 0) || math.IsNaN(remaining) {
remainingStr = "unknown"
} else {
remainingStr = time.Duration(remaining).Round(time.Millisecond).String()
}

log.Infof("%s - %.1f%% (%d/%d) - elapsed: %s, remaining: ~%s", prefix,
100*progress, done, total,
elapsed.Round(time.Millisecond),
remainingStr,
)
}

func processTransactionModule(
indexStore index.Store,
ledger xdr.LedgerCloseMeta,
tx ingest.LedgerTransaction,
) error {
return indexStore.AddTransactionToIndexes(
toid.New(int32(ledger.LedgerSequence()), int32(tx.Index), 0).ToInt64(),
tx.Result.TransactionHash,
)
}

func processAccountsModule(
indexStore index.Store,
checkpoint uint32,
tx ingest.LedgerTransaction,
) error {
allParticipants, err := getParticipants(tx)
if err != nil {
return err
}

err = indexStore.AddParticipantsToIndexes(checkpoint, "all_all", allParticipants)
if err != nil {
return err
}

paymentsParticipants, err := getPaymentParticipants(tx)
if err != nil {
return err
}

err = indexStore.AddParticipantsToIndexes(checkpoint, "all_payments", paymentsParticipants)
if err != nil {
return err
}

if tx.Result.Successful() {
err = indexStore.AddParticipantsToIndexes(checkpoint, "successful_all", allParticipants)
if err != nil {
return err
}

err = indexStore.AddParticipantsToIndexes(checkpoint, "successful_payments", paymentsParticipants)
if err != nil {
return err
}
}

return nil
}

func getPaymentParticipants(transaction ingest.LedgerTransaction) ([]string, error) {
return participantsForOperations(transaction, true)
}

func getParticipants(transaction ingest.LedgerTransaction) ([]string, error) {
return participantsForOperations(transaction, false)
}

func participantsForOperations(transaction ingest.LedgerTransaction, onlyPayments bool) ([]string, error) {
var participants []string

Expand Down Expand Up @@ -310,19 +388,36 @@ func participantsForOperations(transaction ingest.LedgerTransaction, onlyPayment
return participants, nil
}

// getLedgerKeyParticipants returns a list of accounts that are considered
// "participants" in a particular ledger entry.
//
// This list will have zero or one element, making it easy to expand via `...`.
func getLedgerKeyParticipants(ledgerKey xdr.LedgerKey) []string {
var result []string
switch ledgerKey.Type {
case xdr.LedgerEntryTypeAccount:
result = append(result, ledgerKey.Account.AccountId.Address())
case xdr.LedgerEntryTypeClaimableBalance:
// nothing to do
return []string{ledgerKey.Account.AccountId.Address()}
case xdr.LedgerEntryTypeData:
result = append(result, ledgerKey.Data.AccountId.Address())
return []string{ledgerKey.Data.AccountId.Address()}
case xdr.LedgerEntryTypeOffer:
result = append(result, ledgerKey.Offer.SellerId.Address())
return []string{ledgerKey.Offer.SellerId.Address()}
case xdr.LedgerEntryTypeTrustline:
result = append(result, ledgerKey.TrustLine.AccountId.Address())
return []string{ledgerKey.TrustLine.AccountId.Address()}
case xdr.LedgerEntryTypeClaimableBalance:
// nothing to do
}
return []string{}
}

func max(a, b int) int {
if a > b {
return a
}
return b
}

func min(a, b int) int {
if a < b {
return a
}
return result
return b
}
3 changes: 3 additions & 0 deletions toid/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ func (id *ID) IncOperationOrder() {
}

// New creates a new total order ID
//
// FIXME: I feel like since ledger sequences are uint32s, TOIDs should
// take that into account for the ledger parameter...
func New(ledger int32, tx int32, op int32) *ID {
return &ID{
LedgerSequence: ledger,
Expand Down

0 comments on commit 90d8ffa

Please sign in to comment.