Skip to content

Commit

Permalink
stellar#4222: ported filtering-poc and started real asset filter feat…
Browse files Browse the repository at this point in the history
…ure dev
  • Loading branch information
sreuland committed Feb 22, 2022
1 parent 465741b commit 3f99e8b
Show file tree
Hide file tree
Showing 16 changed files with 687 additions and 46 deletions.
108 changes: 108 additions & 0 deletions services/horizon/internal/actions/filter_rules_asset.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package actions

import (
"encoding/json"
"fmt"
"net/http"

horizonContext "github.com/stellar/go/services/horizon/internal/context"
"github.com/stellar/go/services/horizon/internal/db2/history"
"github.com/stellar/go/services/horizon/internal/ingest/filters"
"github.com/stellar/go/support/render/problem"
)

type assetFilterResource struct {
Rules filters.AssetFilterRules `json:"rules"`
Enabled bool `json:"enabled"`
Name string `json:"name"`
}

func (afr assetFilterResource) Validate() error {
for _, asset := range afr.Rules.CanonicalWhitelist {
if !isAsset(asset) {
return fmt.Errorf("%q is not a valid asset issuer:code", asset)
}
}
return nil
}

func (afr *assetFilterResource) UnmarshalJSON(b []byte) error {
if err := json.Unmarshal(b, afr); err != nil {
return err
}
return afr.Validate()
}

type AssetFilterRuleHandler struct{}

func (handler AssetFilterRuleHandler) Get(w http.ResponseWriter, r *http.Request) {
historyQ, err := horizonContext.HistoryQFromRequest(r)
if err != nil {
problem.Render(r.Context(), w, err)
return
}
filter, err := historyQ.GetFilterByName(r.Context(), history.FilterAssetFilterName)
if err != nil {
problem.Render(r.Context(), w, err)
return
}

var assetFilterRules = filters.AssetFilterRules{}
if err = json.Unmarshal([]byte(filter.Rules), &assetFilterRules); err != nil {
p := problem.ServerError
p.Extras = map[string]interface{}{
"reason": "invalid asset filter rule json in db",
}
problem.Render(r.Context(), w, err)
return
}

assetFilterResource := &assetFilterResource{
Rules: assetFilterRules,
Enabled: filter.Enabled,
Name: filter.Name,
}

enc := json.NewEncoder(w)
if err = enc.Encode(assetFilterResource); err != nil {
problem.Render(r.Context(), w, err)
}
}

func (handler AssetFilterRuleHandler) Set(w http.ResponseWriter, r *http.Request) {
historyQ, err := horizonContext.HistoryQFromRequest(r)
if err != nil {
problem.Render(r.Context(), w, err)
return
}
var assetFilterRequest assetFilterResource
dec := json.NewDecoder(r.Body)
if err = dec.Decode(&assetFilterRequest); err != nil {
p := problem.BadRequest
p.Extras = map[string]interface{}{
"reason": err.Error(),
}
problem.Render(r.Context(), w, err)
return
}

var filterConfig history.FilterConfig
var assetFilterRules []byte
filterConfig.Enabled = assetFilterRequest.Enabled
filterConfig.Name = history.FilterAssetFilterName

if assetFilterRules, err = json.Marshal(assetFilterRequest.Rules); err != nil {
p := problem.ServerError
p.Extras = map[string]interface{}{
"reason": "unable to serialize asset filter rules resource from json",
}
problem.Render(r.Context(), w, err)
return
}
filterConfig.Rules = string(assetFilterRules)

if err = historyQ.SetFilterConfig(r.Context(), filterConfig); err != nil {
problem.Render(r.Context(), w, err)
return
}
}
83 changes: 83 additions & 0 deletions services/horizon/internal/db2/history/filter_rules.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package history

import (
"context"

sq "github.com/Masterminds/squirrel"
"github.com/stellar/go/support/errors"
)

const (
filterRulesTableName = "ingest_filter_rules"
filterRulesTypeColumnName = "name"
filterRulesColumnName = "rules"
filterRulesEnabledColumnName = "enabled"
filterRulesLastModifiedColumnName = "last_modified"
FilterAssetFilterName = "asset"
)

type FilterConfig struct {
Enabled bool `db:"enabled"`
Rules string `db:"rules"`
Name string `db:"name"`
LastModified int64 `db:"last_modified"`
}

type QFilter interface {
GetAllFilters(ctx context.Context) ([]FilterConfig, error)
GetFilterByName(ctx context.Context, name string) (FilterConfig, error)
SetFilterConfig(ctx context.Context, config FilterConfig) error
}

func (q *Q) GetAllFilters(ctx context.Context) ([]FilterConfig, error) {
var filterConfigs []FilterConfig
sql := sq.Select().From(filterRulesTableName)
err := q.Select(ctx, filterConfigs, sql)

return filterConfigs, err
}

func (q *Q) GetFilterByName(ctx context.Context, name string) (FilterConfig, error) {
var filterConfig FilterConfig
sql := sq.Select().From(filterRulesTableName).Where(sq.Eq{filterRulesTypeColumnName: name})
err := q.Select(ctx, filterConfig, sql)

return filterConfig, err
}

func (q *Q) SetFilterConfig(ctx context.Context, config FilterConfig) error {
updateCols := map[string]interface{}{
filterRulesLastModifiedColumnName: sq.Expr("extract(epoch from now() at time zone 'utc')"),
filterRulesEnabledColumnName: config.Enabled,
filterRulesColumnName: config.Rules,
filterRulesTypeColumnName: config.Name,
}

sqlUpdate := sq.Update(filterRulesTableName).SetMap(updateCols).Where(
sq.Eq{filterRulesTypeColumnName: config.Name})

rowCnt, err := q.checkForError(sqlUpdate, ctx)
if err != nil {
return err
}

if rowCnt < 1 {
sqlInsert := sq.Insert(filterRulesTableName).SetMap(updateCols)
rowCnt, err = q.checkForError(sqlInsert, ctx)
if err != nil {
return err
}
if rowCnt < 1 {
return errors.Errorf("insertion of filter rule did not result in new row created in db")
}
}
return nil
}

func (q *Q) checkForError(builder sq.Sqlizer, ctx context.Context) (int64, error) {
result, err := q.Exec(ctx, builder)
if err != nil {
return 0, err
}
return result.RowsAffected()
}
1 change: 1 addition & 0 deletions services/horizon/internal/db2/history/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ type AccountEntry struct {

type IngestionQ interface {
QAccounts
QFilter
QAssetStats
QClaimableBalances
QHistoryClaimableBalances
Expand Down
27 changes: 27 additions & 0 deletions services/horizon/internal/db2/history/mock_q_filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package history

import (
"context"

"github.com/stretchr/testify/mock"
)

// MockQAccountFilterWhitelist is a mock implementation of the QAccountFilterWhitelist interface
type MockQFilter struct {
mock.Mock
}

func (m *MockQFilter) GetAllFilters(ctx context.Context) ([]FilterConfig, error) {
a := m.Called(ctx)
return a.Get(0).([]FilterConfig), a.Error(1)
}

func (m *MockQFilter) GetFilterByName(ctx context.Context, name string) (FilterConfig, error) {
a := m.Called(ctx, name)
return a.Get(0).(FilterConfig), a.Error(1)
}

func (m *MockQFilter) SetFilterConfig(ctx context.Context, config FilterConfig) error {
a := m.Called(ctx, config)
return a.Error(0)
}
23 changes: 23 additions & 0 deletions services/horizon/internal/db2/schema/bindata.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
-- +migrate Up

CREATE TABLE ingest_filter_rules (
name character varying(256) NOT NULL UNIQUE,
enabled bool NOT NULL default false,
rules jsonb NOT NULL,
last_modified bigint NOT NULL
);

-- +migrate Down

DROP TABLE ingest_filter_rules cascade;
7 changes: 7 additions & 0 deletions services/horizon/internal/httpx/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,4 +340,11 @@ func (r *Router) addRoutes(config *RouterConfig, rateLimiter *throttled.HTTPRate
r.Internal.Get("/metrics", promhttp.HandlerFor(config.PrometheusRegistry, promhttp.HandlerOpts{}).ServeHTTP)
r.Internal.Get("/debug/pprof/heap", pprof.Index)
r.Internal.Get("/debug/pprof/profile", pprof.Profile)
r.Internal.Route("/ingestion/filter/rules", func(r chi.Router) {
r.Route("/asset", func(r chi.Router) {
handler := actions.AssetFilterRuleHandler{}
r.With(historyMiddleware).Put("/", handler.Set)
r.With(historyMiddleware).Get("/", handler.Get)
})
})
}
Loading

0 comments on commit 3f99e8b

Please sign in to comment.