Skip to content

Commit

Permalink
#4222: removed singleton pattern of filters
Browse files Browse the repository at this point in the history
  • Loading branch information
sreuland committed Feb 25, 2022
1 parent 0151a47 commit 6978425
Show file tree
Hide file tree
Showing 8 changed files with 115 additions and 56 deletions.
38 changes: 21 additions & 17 deletions services/horizon/internal/ingest/filters/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,41 +10,45 @@ import (
"github.com/stellar/go/support/errors"
)

// TODO:(fons) I don't think we should be using a singleton
// (we should just create an instance which lives in the processor)
var accountFilter = &AccountFilter{
whitelistedAccountsSet: map[string]struct{}{},
lastModified: 0,
}

type AccountFilterRules struct {
CanonicalWhitelist []string `json:"account_whitelist"`
}

type AccountFilter struct {
type accountFilter struct {
whitelistedAccountsSet map[string]struct{}
lastModified int64
}

type AccountFilter interface {
processors.LedgerTransactionFilterer
RefreshAccountFilter(filterConfig *history.FilterConfig) (error)
}

func NewAccountFilter() AccountFilter {
return &accountFilter{
whitelistedAccountsSet: map[string]struct{}{},
}
}

// TODO:(fons) this code should probably be generic for all filters
func GetAccountFilter(filterConfig *history.FilterConfig) (*AccountFilter, error) {
func (filter *accountFilter) RefreshAccountFilter(filterConfig *history.FilterConfig) (error) {
// only need to re-initialize the filter config state(rules) if it's cached version(in memory)
// is older than the incoming config version based on lastModified epoch timestamp
if filterConfig.LastModified > accountFilter.lastModified {
if filterConfig.LastModified > filter.lastModified {
var assetFilterRules AssetFilterRules
if err := json.Unmarshal([]byte(filterConfig.Rules), &assetFilterRules); err != nil {
return nil, errors.Wrap(err, "unable to serialize asset filter rules")
}
accountFilter = &AccountFilter{
whitelistedAccountsSet: listToMap(assetFilterRules.CanonicalWhitelist),
lastModified: filterConfig.LastModified,
return errors.Wrap(err, "unable to serialize asset filter rules")
}

filter.whitelistedAccountsSet = listToMap(assetFilterRules.CanonicalWhitelist)
filter.lastModified = filterConfig.LastModified

}

return accountFilter, nil
return nil
}

func (f *AccountFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) {
func (f *accountFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) {
// Whitelisting is disabled if the whitelist is empty
if len(f.whitelistedAccountsSet) == 0 {
return true, nil
Expand Down
38 changes: 22 additions & 16 deletions services/horizon/internal/ingest/filters/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"

"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/ingest/processors"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
Expand All @@ -16,40 +17,45 @@ var (
logger = log.WithFields(log.F{
"ingest filter": "asset",
})
// TODO:(fons) I don't think we should be using a singleton
// (we should just create an instance which lives in the processor)
assetFilter = &AssetFilter{
canonicalAssetsLookup: map[string]struct{}{},
}
)

type AssetFilterRules struct {
CanonicalWhitelist []string `json:"canonical_asset_whitelist"`
}

type AssetFilter struct {
type assetFilter struct {
canonicalAssetsLookup map[string]struct{}
lastModified int64
}

func GetAssetFilter(filterConfig *history.FilterConfig) (*AssetFilter, error) {
type AssetFilter interface {
processors.LedgerTransactionFilterer
RefreshAssetFilter(filterConfig *history.FilterConfig) (error)
}

func NewAssetFilter() AssetFilter {
return &assetFilter{
canonicalAssetsLookup: map[string]struct{}{},
}
}

func (filter *assetFilter) RefreshAssetFilter(filterConfig *history.FilterConfig) (error) {
// only need to re-initialize the filter config state(rules) if it's cached version(in memory)
// is older than the incoming config version based on lastModified epoch timestamp
if filterConfig.LastModified > assetFilter.lastModified {
if filterConfig.LastModified > filter.lastModified {
var assetFilterRules AssetFilterRules
if err := json.Unmarshal([]byte(filterConfig.Rules), &assetFilterRules); err != nil {
return nil, errors.Wrap(err, "unable to serialize asset filter rules")
}
assetFilter = &AssetFilter{
canonicalAssetsLookup: listToMap(assetFilterRules.CanonicalWhitelist),
lastModified: filterConfig.LastModified,
return errors.Wrap(err, "unable to serialize asset filter rules")
}

filter.canonicalAssetsLookup = listToMap(assetFilterRules.CanonicalWhitelist)
filter.lastModified = filterConfig.LastModified
}

return assetFilter, nil
return nil
}

func (f *AssetFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) {
func (f *assetFilter) FilterTransaction(ctx context.Context, transaction ingest.LedgerTransaction) (bool, error) {

tx, v1Exists := transaction.Envelope.GetV1()
if !v1Exists {
Expand Down Expand Up @@ -115,7 +121,7 @@ func (f *AssetFilter) FilterTransaction(ctx context.Context, transaction ingest.
return false, nil
}

func (f *AssetFilter) assetMatchedFilter(asset *xdr.Asset) bool {
func (f *assetFilter) assetMatchedFilter(asset *xdr.Asset) bool {
var matched = false
if _, found := f.canonicalAssetsLookup[asset.StringCanonical()]; found {
matched = true
Expand Down
6 changes: 4 additions & 2 deletions services/horizon/internal/ingest/filters/asset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,8 @@ func TestFilterHasMatch(t *testing.T) {
LastModified: 1,
Name: FilterAssetFilterName,
}
filter, err := GetAssetFilter(filterConfig)
filter := NewAssetFilter()
err := filter.RefreshAssetFilter(filterConfig)
tt.NoError(err)

var xdrAssetCode [12]byte
Expand Down Expand Up @@ -89,7 +90,8 @@ func TestFilterHasNoMatch(t *testing.T) {
Name: FilterAssetFilterName,
}

filter, err := GetAssetFilter(filterConfig)
filter := NewAssetFilter()
err := filter.RefreshAssetFilter(filterConfig)
tt.NoError(err)

var xdrAssetCode [12]byte
Expand Down
57 changes: 38 additions & 19 deletions services/horizon/internal/ingest/filters/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package filters

import (
"context"
"sync"
"time"

"github.com/stellar/go/services/horizon/internal/db2/history"
Expand All @@ -19,56 +18,68 @@ const (

var (
supportedFilterNames = []string{FilterAssetFilterName, FilterAccountFilterName}
cachedFilters []processors.LedgerTransactionFilterer
lastFilterConfigCheckUnixEpoch int64
LOG = log.WithFields(log.F{
"filters": "load",
})
lock sync.Mutex
)

type filtersCache struct {
cachedFilters map[string]processors.LedgerTransactionFilterer
lastFilterConfigCheckUnixEpoch int64
}

type Filters interface {
GetFilters(filterQ history.QFilter, ctx context.Context) []processors.LedgerTransactionFilterer
}

func NewFilters() Filters {
return &filtersCache{
cachedFilters: map[string]processors.LedgerTransactionFilterer{
FilterAssetFilterName: NewAccountFilter(),
FilterAccountFilterName: NewAssetFilter(),
},
}
}

// Provide list of the active filters. Optimize performance by caching the list, only
// rebuild the list on expiration time interval. Method is thread-safe.
func GetFilters(filterQ history.QFilter, ctx context.Context) []processors.LedgerTransactionFilterer {
lock.Lock()
defer lock.Unlock()
func (f *filtersCache) GetFilters(filterQ history.QFilter, ctx context.Context) []processors.LedgerTransactionFilterer {
// only attempt to refresh filter config cache state at configured interval limit
if time.Now().Unix() < (lastFilterConfigCheckUnixEpoch + filterConfigCheckIntervalSeconds) {
return append([]processors.LedgerTransactionFilterer{}, cachedFilters...)
if time.Now().Unix() < (f.lastFilterConfigCheckUnixEpoch + filterConfigCheckIntervalSeconds) {
return f.convertCacheToList()
}

LOG.Info("expired filter config cache, refresh from db")
filterConfigs, err := filterQ.GetAllFilters(ctx)
if err != nil {
LOG.Errorf("unable to query filter configs, %v", err)
// reset the cache time regardless, so next attempt is at next interval
lastFilterConfigCheckUnixEpoch = time.Now().Unix()
return append([]processors.LedgerTransactionFilterer{}, cachedFilters...)
f.lastFilterConfigCheckUnixEpoch = time.Now().Unix()
return f.convertCacheToList()
}

cachedFilters = []processors.LedgerTransactionFilterer{}
for _, filterConfig := range filterConfigs {
if filterConfig.Enabled {
switch filterConfig.Name {
case FilterAssetFilterName:
assetFilter, err := GetAssetFilter(&filterConfig)
assetFilter := f.cachedFilters[FilterAssetFilterName].(AssetFilter)
err := assetFilter.RefreshAssetFilter(&filterConfig)
if err != nil {
LOG.Errorf("unable to create asset filter %v", err)
LOG.Errorf("unable to refresh asset filter config %v", err)
continue
}
cachedFilters = append(cachedFilters, assetFilter)
case FilterAccountFilterName:
accountFilter, err := GetAccountFilter(&filterConfig)
accountFilter := f.cachedFilters[FilterAccountFilterName].(AccountFilter)
err := accountFilter.RefreshAccountFilter(&filterConfig)
if err != nil {
LOG.Errorf("unable to create asset filter %v", err)
LOG.Errorf("unable to refresh account filter config %v", err)
continue
}
cachedFilters = append(cachedFilters, accountFilter)
}

}
}
return append([]processors.LedgerTransactionFilterer{}, cachedFilters...)
return f.convertCacheToList()
}

func SupportedFilterNames(name string) bool {
Expand All @@ -79,3 +90,11 @@ func SupportedFilterNames(name string) bool {
}
return false
}

func (f *filtersCache) convertCacheToList() []processors.LedgerTransactionFilterer {
filters := []processors.LedgerTransactionFilterer{}
for _, filter := range f.cachedFilters {
filters = append(filters, filter)
}
return filters
}
4 changes: 3 additions & 1 deletion services/horizon/internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/ingest/filters"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
logpkg "github.com/stellar/go/support/log"
Expand Down Expand Up @@ -253,8 +254,8 @@ func NewSystem(config Config) (System, error) {
}

historyQ := &history.Q{config.HistorySession.Clone()}

historyAdapter := newHistoryArchiveAdapter(archive)
filters := filters.NewFilters()

system := &system{
cancel: cancel,
Expand All @@ -274,6 +275,7 @@ func NewSystem(config Config) (System, error) {
config: config,
historyQ: historyQ,
historyAdapter: historyAdapter,
filters: filters,
},
checkpointManager: historyarchive.NewCheckpointManager(config.CheckpointFrequency),
}
Expand Down
18 changes: 18 additions & 0 deletions services/horizon/internal/ingest/mock_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package ingest

import (
"context"

"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/ingest/processors"
"github.com/stretchr/testify/mock"
)

type mockFilters struct {
mock.Mock
}

func (m *mockFilters) GetFilters(filterQ history.QFilter, ctx context.Context) []processors.LedgerTransactionFilterer {
a := m.Called(filterQ, ctx)
return a.Get(0).([]processors.LedgerTransactionFilterer)
}
3 changes: 2 additions & 1 deletion services/horizon/internal/ingest/processor_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type ProcessorRunner struct {
historyQ history.IngestionQ
historyAdapter historyArchiveAdapterInterface
logMemoryStats bool
filters filters.Filters
}

func (s *ProcessorRunner) SetHistoryAdapter(historyAdapter historyArchiveAdapterInterface) {
Expand Down Expand Up @@ -153,7 +154,7 @@ func (s *ProcessorRunner) buildTransactionFilterer() *groupTransactionFilterers
return newGroupTransactionFilterers(nil)
}

return newGroupTransactionFilterers(filters.GetFilters(s.historyQ, s.ctx))
return newGroupTransactionFilterers(s.filters.GetFilters(s.historyQ, s.ctx))
}

// checkIfProtocolVersionSupported checks if this Horizon version supports the
Expand Down
7 changes: 7 additions & 0 deletions services/horizon/internal/ingest/processor_runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ func TestProcessorRunnerRunHistoryArchiveIngestionGenesis(t *testing.T) {
NetworkPassphrase: network.PublicNetworkPassphrase,
},
historyQ: q,
filters: &mockFilters{},
}

_, err := runner.RunGenesisStateIngestion()
Expand Down Expand Up @@ -117,6 +118,7 @@ func TestProcessorRunnerRunHistoryArchiveIngestionHistoryArchive(t *testing.T) {
config: config,
historyQ: q,
historyAdapter: historyAdapter,
filters: &mockFilters{},
}

_, err := runner.RunHistoryArchiveIngestion(63, MaxSupportedProtocolVersion, bucketListHash)
Expand Down Expand Up @@ -151,6 +153,7 @@ func TestProcessorRunnerRunHistoryArchiveIngestionProtocolVersionNotSupported(t
config: config,
historyQ: q,
historyAdapter: historyAdapter,
filters: &mockFilters{},
}

_, err := runner.RunHistoryArchiveIngestion(100, 200, xdr.Hash{})
Expand All @@ -170,6 +173,7 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) {
runner := ProcessorRunner{
ctx: ctx,
historyQ: q,
filters: &mockFilters{},
}

stats := &ingest.StatsChangeProcessor{}
Expand All @@ -191,6 +195,7 @@ func TestProcessorRunnerBuildChangeProcessor(t *testing.T) {
runner = ProcessorRunner{
ctx: ctx,
historyQ: q,
filters: &mockFilters{},
}

processor = buildChangeProcessor(runner.historyQ, stats, historyArchiveSource, 456)
Expand Down Expand Up @@ -288,6 +293,7 @@ func TestProcessorRunnerRunAllProcessorsOnLedger(t *testing.T) {
ctx: ctx,
config: config,
historyQ: q,
filters: &mockFilters{},
}

_, err := runner.RunAllProcessorsOnLedger(ledger)
Expand Down Expand Up @@ -336,6 +342,7 @@ func TestProcessorRunnerRunAllProcessorsOnLedgerProtocolVersionNotSupported(t *t
ctx: ctx,
config: config,
historyQ: q,
filters: &mockFilters{},
}

_, err := runner.RunAllProcessorsOnLedger(ledger)
Expand Down

0 comments on commit 6978425

Please sign in to comment.