Skip to content

Commit

Permalink
stellar#4222: used upsert function, unix seconds instead of millis fo…
Browse files Browse the repository at this point in the history
…r last modified
  • Loading branch information
sreuland committed Feb 22, 2022
1 parent 4d015a5 commit 5e5747d
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 19 deletions.
18 changes: 8 additions & 10 deletions services/horizon/internal/db2/history/filter_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package history

import (
"context"
"time"

sq "github.com/Masterminds/squirrel"
)
Expand All @@ -19,7 +20,7 @@ type FilterConfig struct {
Enabled bool `db:"enabled"`
Rules string `db:"rules"`
Name string `db:"name"`
LastModified uint64 `db:"last_modified"`
LastModified int64 `db:"last_modified"`
}

type QFilter interface {
Expand All @@ -45,15 +46,12 @@ func (q *Q) GetFilterByName(ctx context.Context, name string) (FilterConfig, err
}

func (q *Q) SetFilterConfig(ctx context.Context, config FilterConfig) error {
updateCols := map[string]interface{}{
filterRulesLastModifiedColumnName: sq.Expr("extract(epoch from now() at time zone 'utc')"),
filterRulesEnabledColumnName: config.Enabled,
filterRulesColumnName: config.Rules,
upsertFields := []upsertField{
{filterRulesLastModifiedColumnName, "bigint", []interface{}{time.Now().Unix()}},
{filterRulesEnabledColumnName, "bool", []interface{}{config.Enabled}},
{filterRulesColumnName, "jsonb", []interface{}{config.Rules}},
{filterRulesTypeColumnName, "text", []interface{}{config.Name}},
}

sql := sq.Update(filterRulesTableName).SetMap(updateCols).Where(
sq.Eq{filterRulesTypeColumnName: config.Name})

_, err := q.Exec(ctx, sql)
return err
return q.upsertRows(ctx, filterRulesTableName, filterRulesTypeColumnName, upsertFields)
}
4 changes: 2 additions & 2 deletions services/horizon/internal/ingest/filters/asset_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ type AssetFilterRules struct {

type AssetFilter struct {
canonicalAssetsLookup map[string]bool
lastModified uint64
lastModified int64
}

func GetAssetFilter(filterConfig *history.FilterConfig) (*AssetFilter, error) {
Expand Down Expand Up @@ -107,7 +107,7 @@ func (f *AssetFilter) FilterTransaction(ctx context.Context, transaction ingest.
allowed = true
}
}

if allowed {
return true, nil
}
Expand Down
6 changes: 3 additions & 3 deletions services/horizon/internal/ingest/group_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ func (g groupTransactionProcessors) Commit(ctx context.Context) error {
type groupTransactionFilterers struct {
filterers []processors.LedgerTransactionFilterer
processorsRunDurations
lastFilterConfigCheckUnixMS int64
lastFilterConfigCheckUnixEpoch int64
}

func newGroupTransactionFilterers(filterers []processors.LedgerTransactionFilterer, lastFilterConfigCheckUnixMS int64) *groupTransactionFilterers {
func newGroupTransactionFilterers(filterers []processors.LedgerTransactionFilterer, lastFilterConfigCheckUnixEpoch int64) *groupTransactionFilterers {
return &groupTransactionFilterers{
filterers: filterers,
processorsRunDurations: make(map[string]time.Duration),
lastFilterConfigCheckUnixMS: lastFilterConfigCheckUnixMS,
lastFilterConfigCheckUnixEpoch: lastFilterConfigCheckUnixEpoch,
}
}

Expand Down
8 changes: 4 additions & 4 deletions services/horizon/internal/ingest/processor_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ var (
"processor": "filters",
})
// the filter config cache will be checked against latest from db at most once per each of this interval,
filterConfigCheckIntervalMS int64 = 10000
filterConfigCheckIntervalSeconds int64 = 10
)

type ProcessorRunner struct {
Expand Down Expand Up @@ -162,7 +162,7 @@ func (s *ProcessorRunner) buildTransactionProcessor(
func (s *ProcessorRunner) buildTransactionFilterer() *groupTransactionFilterers {

// only attempt to refresh filter config cache state at configured interval limit
if time.Now().UnixMilli() < (groupFilterers.lastFilterConfigCheckUnixMS + filterConfigCheckIntervalMS) {
if time.Now().Unix() < (groupFilterers.lastFilterConfigCheckUnixEpoch + filterConfigCheckIntervalSeconds) {
return groupFilterers
}

Expand All @@ -171,7 +171,7 @@ func (s *ProcessorRunner) buildTransactionFilterer() *groupTransactionFilterers
if err != nil {
LOG.Errorf("unable to query filter configs, %v", err)
// reset the cache time regardless, so next attempt is at next interval
groupFilterers.lastFilterConfigCheckUnixMS = time.Now().UnixMilli()
groupFilterers.lastFilterConfigCheckUnixEpoch = time.Now().Unix()
return groupFilterers
}

Expand All @@ -189,7 +189,7 @@ func (s *ProcessorRunner) buildTransactionFilterer() *groupTransactionFilterers
}
}
}
groupFilterers = newGroupTransactionFilterers(newFilters, time.Now().UnixMilli())
groupFilterers = newGroupTransactionFilterers(newFilters, time.Now().Unix())
return groupFilterers
}

Expand Down

0 comments on commit 5e5747d

Please sign in to comment.