Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon/internal/db2/history: Optimize query for reaping lookup tables #5393

Merged
merged 10 commits into from
Jul 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions services/horizon/internal/db2/history/key_value.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package history
import (
"context"
"database/sql"
"fmt"
"strconv"
"strings"

sq "github.com/Masterminds/squirrel"

"github.com/stellar/go/support/errors"
)

Expand All @@ -18,6 +21,7 @@ const (
stateInvalid = "exp_state_invalid"
offerCompactionSequence = "offer_compaction_sequence"
liquidityPoolCompactionSequence = "liquidity_pool_compaction_sequence"
lookupTableReapOffsetSuffix = "_reap_offset"
)

// GetLastLedgerIngestNonBlocking works like GetLastLedgerIngest but
Expand Down Expand Up @@ -203,6 +207,47 @@ func (q *Q) getValueFromStore(ctx context.Context, key string, forUpdate bool) (
return value, nil
}

type KeyValuePair struct {
Key string `db:"key"`
Value string `db:"value"`
}

func (q *Q) getLookupTableReapOffsets(ctx context.Context) (map[string]int64, error) {
keys := make([]string, 0, len(historyLookupTables))
for table := range historyLookupTables {
keys = append(keys, table+lookupTableReapOffsetSuffix)
}
offsets := map[string]int64{}
var pairs []KeyValuePair
query := sq.Select("key", "value").
From("key_value_store").
Where(map[string]interface{}{
"key": keys,
})
err := q.Select(ctx, &pairs, query)
if err != nil {
return nil, err
}
for _, pair := range pairs {
table := strings.TrimSuffix(pair.Key, lookupTableReapOffsetSuffix)
if _, ok := historyLookupTables[table]; !ok {
return nil, fmt.Errorf("invalid key: %s", pair.Key)
}

var offset int64
offset, err = strconv.ParseInt(pair.Value, 10, 64)
if err != nil {
return nil, fmt.Errorf("invalid offset: %s", pair.Value)
}
offsets[table] = offset
}
return offsets, err
}

func (q *Q) updateLookupTableReapOffset(ctx context.Context, table string, offset int64) error {
return q.updateValueInStore(ctx, table+lookupTableReapOffsetSuffix, strconv.FormatInt(offset, 10))
}

// updateValueInStore updates a value for a given key in KV store
func (q *Q) updateValueInStore(ctx context.Context, key, value string) error {
query := sq.Insert("key_value_store").
Expand Down
225 changes: 108 additions & 117 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ type IngestionQ interface {
NewTradeBatchInsertBuilder() TradeBatchInsertBuilder
RebuildTradeAggregationTimes(ctx context.Context, from, to strtime.Millis, roundingSlippageFilter int) error
RebuildTradeAggregationBuckets(ctx context.Context, fromLedger, toLedger uint32, roundingSlippageFilter int) error
ReapLookupTables(ctx context.Context, offsets map[string]int64) (map[string]LookupTableReapResult, error)
ReapLookupTables(ctx context.Context, batchSize int) (map[string]LookupTableReapResult, error)
CreateAssets(ctx context.Context, assets []xdr.Asset, batchSize int) (map[string]Asset, error)
QTransactions
QTrustLines
Expand Down Expand Up @@ -981,88 +981,27 @@ type LookupTableReapResult struct {
// which aren't used (orphaned), i.e. history entries for them were reaped.
// This method must be executed inside ingestion transaction. Otherwise it may
// create invalid state in lookup and history tables.
func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
func (q *Q) ReapLookupTables(ctx context.Context, batchSize int) (
map[string]LookupTableReapResult,
error,
) {
if q.GetTx() == nil {
return nil, errors.New("cannot be called outside of an ingestion transaction")
}

const batchSize = 1000
offsets, err := q.getLookupTableReapOffsets(ctx)
if err != nil {
return nil, fmt.Errorf("could not obtain offsets: %w", err)
}

results := map[string]LookupTableReapResult{}
for table, historyTables := range map[string][]tableObjectFieldPair{
"history_accounts": {
{
name: "history_effects",
objectField: "history_account_id",
},
{
name: "history_operation_participants",
objectField: "history_account_id",
},
{
name: "history_trades",
objectField: "base_account_id",
},
{
name: "history_trades",
objectField: "counter_account_id",
},
{
name: "history_transaction_participants",
objectField: "history_account_id",
},
},
"history_assets": {
{
name: "history_trades",
objectField: "base_asset_id",
},
{
name: "history_trades",
objectField: "counter_asset_id",
},
{
name: "history_trades_60000",
objectField: "base_asset_id",
},
{
name: "history_trades_60000",
objectField: "counter_asset_id",
},
},
"history_claimable_balances": {
{
name: "history_operation_claimable_balances",
objectField: "history_claimable_balance_id",
},
{
name: "history_transaction_claimable_balances",
objectField: "history_claimable_balance_id",
},
},
"history_liquidity_pools": {
{
name: "history_operation_liquidity_pools",
objectField: "history_liquidity_pool_id",
},
{
name: "history_transaction_liquidity_pools",
objectField: "history_liquidity_pool_id",
},
},
} {
for table, historyTables := range historyLookupTables {
startTime := time.Now()
query, err := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table])
if err != nil {
return nil, errors.Wrap(err, "error constructing a query")
}
query := constructReapLookupTablesQuery(table, historyTables, batchSize, offsets[table])

// Find new offset before removing the rows
var newOffset int64
err = q.GetRaw(ctx, &newOffset, fmt.Sprintf("SELECT id FROM %s where id >= %d limit 1 offset %d", table, offsets[table], batchSize))
err := q.GetRaw(ctx, &newOffset, fmt.Sprintf("SELECT id FROM %s where id >= %d limit 1 offset %d", table, offsets[table], batchSize))
if err != nil {
if q.NoRows(err) {
newOffset = 0
Expand All @@ -1079,6 +1018,10 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
return nil, errors.Wrapf(err, "error running query: %s", query)
}

if err = q.updateLookupTableReapOffset(ctx, table, newOffset); err != nil {
return nil, fmt.Errorf("error updating offset: %w", err)
}

rows, err := res.RowsAffected()
if err != nil {
return nil, errors.Wrapf(err, "error running RowsAffected after query: %s", query)
Expand All @@ -1093,22 +1036,86 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
return results, nil
}

var historyLookupTables = map[string][]tableObjectFieldPair{
"history_accounts": {
{
name: "history_transaction_participants",
objectField: "history_account_id",
},

{
name: "history_effects",
objectField: "history_account_id",
},
{
name: "history_operation_participants",
objectField: "history_account_id",
},
{
name: "history_trades",
objectField: "base_account_id",
},
{
name: "history_trades",
objectField: "counter_account_id",
},
},
"history_assets": {
{
name: "history_trades",
objectField: "base_asset_id",
},
{
name: "history_trades",
objectField: "counter_asset_id",
},
{
name: "history_trades_60000",
objectField: "base_asset_id",
},
{
name: "history_trades_60000",
objectField: "counter_asset_id",
},
},
"history_claimable_balances": {
{
name: "history_transaction_claimable_balances",
objectField: "history_claimable_balance_id",
},
{
name: "history_operation_claimable_balances",
objectField: "history_claimable_balance_id",
},
},
"history_liquidity_pools": {
{
name: "history_transaction_liquidity_pools",
objectField: "history_liquidity_pool_id",
},
{
name: "history_operation_liquidity_pools",
objectField: "history_liquidity_pool_id",
},
},
}

// constructReapLookupTablesQuery creates a query like (using history_claimable_balances
// as an example):
//
// delete from history_claimable_balances where id in
// delete from history_claimable_balances where id in (
//
// (select id from
// (select id,
// (select 1 from history_operation_claimable_balances
// where history_claimable_balance_id = hcb.id limit 1) as c1,
// (select 1 from history_transaction_claimable_balances
// where history_claimable_balance_id = hcb.id limit 1) as c2,
// 1 as cx,
// from history_claimable_balances hcb where id > 1000 order by id limit 100)
// as sub where c1 IS NULL and c2 IS NULL and 1=1);
// WITH ha_batch AS (
// SELECT id
// FROM history_claimable_balances
// WHERE id >= 1000
// ORDER BY id limit 1000
// ) SELECT e1.id as id FROM ha_batch e1
// WHERE NOT EXISTS (SELECT 1 FROM history_transaction_claimable_balances WHERE history_transaction_claimable_balances.history_claimable_balance_id = id limit 1)
// AND NOT EXISTS (SELECT 1 FROM history_operation_claimable_balances WHERE history_operation_claimable_balances.history_claimable_balance_id = id limit 1)
// )
//
// In short it checks the 100 rows omitting 1000 row of history_claimable_balances
// In short it checks the 1000 rows omitting 1000 row of history_claimable_balances
// and counts occurrences of each row in corresponding history tables.
// If there are no history rows for a given id, the row in
// history_claimable_balances is removed.
Expand All @@ -1118,45 +1125,29 @@ func (q Q) ReapLookupTables(ctx context.Context, offsets map[string]int64) (
// possible that rows will be skipped from deletion. But offset is reset
// when it reaches the table size so eventually all orphaned rows are
// deleted.
func constructReapLookupTablesQuery(table string, historyTables []tableObjectFieldPair, batchSize, offset int64) (string, error) {
var sb strings.Builder
var err error
_, err = fmt.Fprintf(&sb, "delete from %s where id IN (select id from (select id, ", table)
if err != nil {
return "", err
}

for i, historyTable := range historyTables {
_, err = fmt.Fprintf(
&sb,
`(select 1 from %s where %s = hcb.id limit 1) as c%d, `,
historyTable.name,
historyTable.objectField,
i,
func constructReapLookupTablesQuery(table string, historyTables []tableObjectFieldPair, batchSize int, offset int64) string {
var conditions []string

for _, historyTable := range historyTables {
conditions = append(
conditions,
fmt.Sprintf(
"NOT EXISTS ( SELECT 1 as row FROM %s WHERE %s.%s = id LIMIT 1)",
historyTable.name,
historyTable.name, historyTable.objectField,
),
)
if err != nil {
return "", err
}
}

_, err = fmt.Fprintf(&sb, "1 as cx from %s hcb where id >= %d order by id limit %d) as sub where ", table, offset, batchSize)
if err != nil {
return "", err
}

for i := range historyTables {
_, err = fmt.Fprintf(&sb, "c%d IS NULL and ", i)
if err != nil {
return "", err
}
}

_, err = sb.WriteString("1=1);")
if err != nil {
return "", err
}

return sb.String(), nil
return fmt.Sprintf(
"DELETE FROM %s WHERE id IN ("+
"WITH ha_batch AS (SELECT id FROM %s WHERE id >= %d ORDER BY id limit %d) "+
"SELECT e1.id as id FROM ha_batch e1 WHERE ",
table,
table,
offset,
batchSize,
) + strings.Join(conditions, " AND ") + ")"
}

// DeleteRangeAll deletes a range of rows from all history tables between
Expand Down
Loading
Loading