Skip to content

Commit

Permalink
#4222: refactored filter list acquistion into filters package
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland committed Feb 25, 2022
1 parent b2deb81 commit 410876e
Show file tree
Hide file tree
Showing 8 changed files with 106 additions and 92 deletions.
9 changes: 9 additions & 0 deletions services/horizon/internal/actions/filter_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

horizonContext "github.com/stellar/go/services/horizon/internal/context"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/ingest/filters"
"github.com/stellar/go/support/render/problem"
)

Expand Down Expand Up @@ -177,6 +178,14 @@ func (handler FilterRuleHandler) upsert(filterRequest *filterResource, historyQ
filterConfig.Enabled = filterRequest.Enabled
filterConfig.Name = filterRequest.Name

if !filters.SupportedFilterNames(filterRequest.Name) {
p := problem.ServerError
p.Extras = map[string]interface{}{
"reason": fmt.Sprintf("invalid filter name, %v, no implementation for this exists", filterRequest.Name),
}
return p
}

filterRules, err := json.Marshal(filterRequest.Rules)
if err != nil {
p := problem.ServerError
Expand Down
25 changes: 0 additions & 25 deletions services/horizon/internal/db2/history/filter_rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package history

import (
"context"
"fmt"

sq "github.com/Masterminds/squirrel"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/render/problem"
)

const (
Expand All @@ -15,12 +13,6 @@ const (
filterRulesColumnName = "rules"
filterRulesEnabledColumnName = "enabled"
filterRulesLastModifiedColumnName = "last_modified"
FilterAssetFilterName = "asset"
FilterAccountFilterName = "account"
)

var (
supportedNames = []string{FilterAssetFilterName, FilterAccountFilterName}
)

type FilterConfig struct {
Expand Down Expand Up @@ -75,14 +67,6 @@ func (q *Q) UpsertFilterConfig(ctx context.Context, config FilterConfig) error {
filterRulesTypeColumnName: config.Name,
}

if !q.supportedFilterNames(config.Name) {
p := problem.ServerError
p.Extras = map[string]interface{}{
"reason": fmt.Sprintf("invalid filter name, %v, no implementation for this exists", config.Name),
}
return p
}

sqlUpdate := sq.Update(filterRulesTableName).SetMap(updateCols).Where(
sq.Eq{filterRulesTypeColumnName: config.Name})

Expand Down Expand Up @@ -111,12 +95,3 @@ func (q *Q) checkForError(builder sq.Sqlizer, ctx context.Context) (int64, error
}
return result.RowsAffected()
}

func (q *Q) supportedFilterNames(name string) bool {
for _, supportedName := range supportedNames {
if name == supportedName {
return true
}
}
return false
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

// TODO:(fons) I don't think we should be using a singleton
// (we should just create an instance which lives in the processor)
var accountSingleton = &AccountFilter{
var accountFilter = &AccountFilter{
whitelistedAccountsSet: map[string]struct{}{},
lastModified: 0,
}
Expand All @@ -30,18 +30,18 @@ type AccountFilter struct {
func GetAccountFilter(filterConfig *history.FilterConfig) (*AccountFilter, error) {
// only need to re-initialize the filter config state(rules) if it's cached version(in memory)
// is older than the incoming config version based on lastModified epoch timestamp
if filterConfig.LastModified > accountSingleton.lastModified {
if filterConfig.LastModified > accountFilter.lastModified {
var assetFilterRules AssetFilterRules
if err := json.Unmarshal([]byte(filterConfig.Rules), &assetFilterRules); err != nil {
return nil, errors.Wrap(err, "unable to serialize asset filter rules")
}
accountSingleton = &AccountFilter{
accountFilter = &AccountFilter{
whitelistedAccountsSet: listToMap(assetFilterRules.CanonicalWhitelist),
lastModified: filterConfig.LastModified,
}
}

return accountSingleton, nil
return accountFilter, nil
}

func (f *AccountFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ var (
})
// TODO:(fons) I don't think we should be using a singleton
// (we should just create an instance which lives in the processor)
assetSingleton = &AssetFilter{
assetFilter = &AssetFilter{
canonicalAssetsLookup: map[string]struct{}{},
}
)
Expand All @@ -35,18 +35,18 @@ type AssetFilter struct {
func GetAssetFilter(filterConfig *history.FilterConfig) (*AssetFilter, error) {
// only need to re-initialize the filter config state(rules) if it's cached version(in memory)
// is older than the incoming config version based on lastModified epoch timestamp
if filterConfig.LastModified > assetSingleton.lastModified {
if filterConfig.LastModified > assetFilter.lastModified {
var assetFilterRules AssetFilterRules
if err := json.Unmarshal([]byte(filterConfig.Rules), &assetFilterRules); err != nil {
return nil, errors.Wrap(err, "unable to serialize asset filter rules")
}
assetSingleton = &AssetFilter{
assetFilter = &AssetFilter{
canonicalAssetsLookup: listToMap(assetFilterRules.CanonicalWhitelist),
lastModified: filterConfig.LastModified,
}
}

return assetSingleton, nil
return assetFilter, nil
}

func (f *AssetFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestFilterHasMatch(t *testing.T) {
}`,
Enabled: true,
LastModified: 1,
Name: history.FilterAssetFilterName,
Name: FilterAssetFilterName,
}
filter, err := GetAssetFilter(filterConfig)
tt.NoError(err)
Expand Down Expand Up @@ -86,7 +86,7 @@ func TestFilterHasNoMatch(t *testing.T) {

Enabled: true,
LastModified: 1,
Name: history.FilterAssetFilterName,
Name: FilterAssetFilterName,
}

filter, err := GetAssetFilter(filterConfig)
Expand Down
82 changes: 82 additions & 0 deletions services/horizon/internal/ingest/filters/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package filters

import (
"context"
"sync"
"time"

"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/ingest/processors"
"github.com/stellar/go/support/log"
)

const (
FilterAssetFilterName = "asset"
FilterAccountFilterName = "account"
// the filter config cache will be checked against latest from db at most once per each of this interval,
filterConfigCheckIntervalSeconds int64 = 10
)

var (
supportedFilterNames = []string{FilterAssetFilterName, FilterAccountFilterName}
loadedFilters []processors.LedgerTransactionFilterer
lastFilterConfigCheckUnixEpoch int64
LOG = log.WithFields(log.F{
"filters": "load",
})
lock sync.Mutex
)

// Provide list of the active filters. Optimize performance by caching the list, only
// rebuild the list on expiration time interval. Method is thread-safe.
func GetFilters(filterQ history.QFilter, ctx context.Context) []processors.LedgerTransactionFilterer {
lock.Lock()
defer lock.Unlock()
// only attempt to refresh filter config cache state at configured interval limit
if time.Now().Unix() < (lastFilterConfigCheckUnixEpoch + filterConfigCheckIntervalSeconds) {
return append([]processors.LedgerTransactionFilterer{}, loadedFilters...)
}

loadedFilters = []processors.LedgerTransactionFilterer{}
LOG.Info("expired filter config cache, refresh from db")
filterConfigs, err := filterQ.GetAllFilters(ctx)
if err != nil {
LOG.Errorf("unable to query filter configs, %v", err)
// reset the cache time regardless, so next attempt is at next interval
lastFilterConfigCheckUnixEpoch = time.Now().Unix()
return append([]processors.LedgerTransactionFilterer{}, loadedFilters...)
}

loadedFilters := []processors.LedgerTransactionFilterer{}
for _, filterConfig := range filterConfigs {
if filterConfig.Enabled {
switch filterConfig.Name {
case FilterAssetFilterName:
assetFilter, err := GetAssetFilter(&filterConfig)
if err != nil {
LOG.Errorf("unable to create asset filter %v", err)
continue
}
loadedFilters = append(loadedFilters, assetFilter)
case FilterAccountFilterName:
accountFilter, err := GetAccountFilter(&filterConfig)
if err != nil {
LOG.Errorf("unable to create asset filter %v", err)
continue
}
loadedFilters = append(loadedFilters, accountFilter)
}

}
}
return append([]processors.LedgerTransactionFilterer{}, loadedFilters...)
}

func SupportedFilterNames(name string) bool {
for _, supportedName := range supportedFilterNames {
if name == supportedName {
return true
}
}
return false
}
8 changes: 3 additions & 5 deletions services/horizon/internal/ingest/group_processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,12 @@ func (g groupTransactionProcessors) Commit(ctx context.Context) error {
type groupTransactionFilterers struct {
filterers []processors.LedgerTransactionFilterer
processorsRunDurations
lastFilterConfigCheckUnixEpoch int64
}

func newGroupTransactionFilterers(filterers []processors.LedgerTransactionFilterer, lastFilterConfigCheckUnixEpoch int64) *groupTransactionFilterers {
func newGroupTransactionFilterers(filterers []processors.LedgerTransactionFilterer) *groupTransactionFilterers {
return &groupTransactionFilterers{
filterers: filterers,
processorsRunDurations: make(map[string]time.Duration),
lastFilterConfigCheckUnixEpoch: lastFilterConfigCheckUnixEpoch,
filterers: filterers,
processorsRunDurations: make(map[string]time.Duration),
}
}

Expand Down
54 changes: 2 additions & 52 deletions services/horizon/internal/ingest/processor_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ import (
"bytes"
"context"
"fmt"
"time"

logger "github.com/stellar/go/support/log"

"github.com/stellar/go/ingest"
"github.com/stellar/go/services/horizon/internal/db2/history"
Expand Down Expand Up @@ -82,15 +79,6 @@ type ProcessorRunnerInterface interface {
}

var _ ProcessorRunnerInterface = (*ProcessorRunner)(nil)
var (
// default empty filters, this will get populated on first processor invocation
groupFilterers = newGroupTransactionFilterers([]processors.LedgerTransactionFilterer{}, 0)
LOG = log.WithFields(logger.F{
"processor": "filters",
})
// the filter config cache will be checked against latest from db at most once per each of this interval,
filterConfigCheckIntervalSeconds int64 = 10
)

type ProcessorRunner struct {
config Config
Expand Down Expand Up @@ -162,48 +150,10 @@ func (s *ProcessorRunner) buildTransactionProcessor(

func (s *ProcessorRunner) buildTransactionFilterer() *groupTransactionFilterers {
if !s.config.EnableIngestionFiltering {
return newGroupTransactionFilterers(nil, time.Now().Unix())
return newGroupTransactionFilterers(nil)
}
// TODO(fons): I think this caching mechanism should probably live in the filters package

// only attempt to refresh filter config cache state at configured interval limit
if time.Now().Unix() < (groupFilterers.lastFilterConfigCheckUnixEpoch + filterConfigCheckIntervalSeconds) {
return groupFilterers
}

LOG.Info("expired filter config cache, refresh from db")
filterConfigs, err := s.historyQ.GetAllFilters(s.ctx)
if err != nil {
LOG.Errorf("unable to query filter configs, %v", err)
// reset the cache time regardless, so next attempt is at next interval
groupFilterers.lastFilterConfigCheckUnixEpoch = time.Now().Unix()
return groupFilterers
}

newFilters := []processors.LedgerTransactionFilterer{}
for _, filterConfig := range filterConfigs {
if filterConfig.Enabled {
switch filterConfig.Name {
case history.FilterAssetFilterName:
assetFilter, err := filters.GetAssetFilter(&filterConfig)
if err != nil {
LOG.Errorf("unable to create asset filter %v", err)
continue
}
newFilters = append(newFilters, assetFilter)
case history.FilterAccountFilterName:
accountFilter, err := filters.GetAccountFilter(&filterConfig)
if err != nil {
LOG.Errorf("unable to create asset filter %v", err)
continue
}
newFilters = append(newFilters, accountFilter)
}

}
}
groupFilterers = newGroupTransactionFilterers(newFilters, time.Now().Unix())
return groupFilterers
return newGroupTransactionFilterers(filters.GetFilters(s.historyQ, s.ctx))
}

// checkIfProtocolVersionSupported checks if this Horizon version supports the
Expand Down

0 comments on commit 410876e

Please sign in to comment.