Skip to content

Commit

Permalink
init location indexer:
Browse files Browse the repository at this point in the history
- remove the region column and triggers from the cache table
- add the indexer with the schema changes and generator
- make the removeDuplicates function generic to be used with all indexers
  • Loading branch information
Omarabdul3ziz committed Nov 13, 2024
1 parent 90d3591 commit 0c0246a
Show file tree
Hide file tree
Showing 17 changed files with 236 additions and 161 deletions.
17 changes: 17 additions & 0 deletions grid-proxy/cmds/proxy_server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ type flags struct {
workloadsIndexerIntervalMins uint
featuresIndexerNumWorkers uint
featuresIndexerIntervalMins uint
locationIndexerNumWorkers uint
locationIndexerIntervalMins uint
}

func main() {
Expand Down Expand Up @@ -111,6 +113,9 @@ func main() {
flag.UintVar(&f.workloadsIndexerNumWorkers, "workloads-indexer-workers", 10, "number of workers checking on node workloads number")
flag.UintVar(&f.featuresIndexerIntervalMins, "features-indexer-interval", 60*24, "node features check interval in min")
flag.UintVar(&f.featuresIndexerNumWorkers, "features-indexer-workers", 10, "number of workers checking on node supported features")
flag.UintVar(&f.locationIndexerIntervalMins, "location-indexer-interval", 60*6, "node location check interval in min")
flag.UintVar(&f.locationIndexerNumWorkers, "location-indexer-workers", 100, "number of workers checking on node location")

flag.Parse()

// shows version and exit
Expand Down Expand Up @@ -153,13 +158,16 @@ func main() {
indexerIntervals := make(map[string]uint)
if !f.noIndexer {
startIndexers(ctx, f, &db, rpcRmbClient)

// for the health endpoint
indexerIntervals["gpu"] = f.gpuIndexerIntervalMins
indexerIntervals["health"] = f.healthIndexerIntervalMins
indexerIntervals["dmi"] = f.dmiIndexerIntervalMins
indexerIntervals["workloads"] = f.workloadsIndexerIntervalMins
indexerIntervals["ipv6"] = f.ipv6IndexerIntervalMins
indexerIntervals["speed"] = f.speedIndexerIntervalMins
indexerIntervals["features"] = f.featuresIndexerIntervalMins
indexerIntervals["location"] = f.locationIndexerIntervalMins
} else {
log.Info().Msg("Indexers did not start")
}
Expand Down Expand Up @@ -238,6 +246,15 @@ func startIndexers(ctx context.Context, f flags, db db.Database, rpcRmbClient *p
f.featuresIndexerNumWorkers,
)
featIdx.Start(ctx)

locationIdx := indexer.NewIndexer[types.NodeLocation](
indexer.NewLocationWork(f.locationIndexerIntervalMins),
"location",
db,
rpcRmbClient,
f.locationIndexerNumWorkers,
)
locationIdx.Start(ctx)
}

func app(s *http.Server, f flags) error {
Expand Down
8 changes: 8 additions & 0 deletions grid-proxy/internal/explorer/db/indexer_calls.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,11 @@ func (p *PostgresDatabase) UpsertNodeFeatures(ctx context.Context, features []ty
}
return p.gormDB.WithContext(ctx).Table("node_features").Clauses(conflictClause).Create(&features).Error
}

func (p *PostgresDatabase) UpsertNodeLocation(ctx context.Context, locations []types.NodeLocation) error {
conflictClause := clause.OnConflict{
Columns: []clause.Column{{Name: "country"}},
DoUpdates: clause.AssignmentColumns([]string{"continent", "updated_at"}),
}
return p.gormDB.WithContext(ctx).Table("node_location").Clauses(conflictClause).Create(&locations).Error
}
14 changes: 11 additions & 3 deletions grid-proxy/internal/explorer/db/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ func (d *PostgresDatabase) GetLastUpsertsTimestamp() (types.IndexersState, error
if res := d.gormDB.Table("node_features").Select("updated_at").Where("updated_at IS NOT NULL").Order("updated_at DESC").Limit(1).Scan(&report.Features.UpdatedAt); res.Error != nil {
return report, errors.Wrap(res.Error, "couldn't get features last updated_at")
}
if res := d.gormDB.Table("node_location").Select("updated_at").Where("updated_at IS NOT NULL").Order("updated_at DESC").Limit(1).Scan(&report.Features.UpdatedAt); res.Error != nil {
return report, errors.Wrap(res.Error, "couldn't get features last updated_at")
}
return report, nil
}

Expand All @@ -148,6 +151,7 @@ func (d *PostgresDatabase) Initialize() error {
&types.HasIpv6{},
&types.NodesWorkloads{},
&types.NodeFeatures{},
&types.NodeLocation{},
); err != nil {
return errors.Wrap(err, "failed to migrate indexer tables")
}
Expand Down Expand Up @@ -387,6 +391,7 @@ func (d *PostgresDatabase) nodeTableQuery(ctx context.Context, filter types.Node
LEFT JOIN health_report ON node.twin_id = health_report.node_twin_id
LEFT JOIN node_ipv6 ON node.twin_id = node_ipv6.node_twin_id
LEFT JOIN node_features ON node.twin_id = node_features.node_twin_id
LEFT JOIN node_location ON node.country = node_location.country
`)

if filter.IsGpuFilterRequested() {
Expand Down Expand Up @@ -440,7 +445,10 @@ func (d *PostgresDatabase) farmTableQuery(ctx context.Context, filter types.Farm
func (d *PostgresDatabase) GetFarms(ctx context.Context, filter types.FarmFilter, limit types.Limit) ([]Farm, uint, error) {
nodeQuery := d.gormDB.Table("resources_cache").
Select("resources_cache.farm_id", "renter", "resources_cache.extra_fee").
Joins("LEFT JOIN node ON node.node_id = resources_cache.node_id").
Joins(`
LEFT JOIN node ON node.node_id = resources_cache.node_id
LEFT JOIN node_location ON node_location.country = resources_cache.country
`).
Group(`resources_cache.farm_id, renter, resources_cache.extra_fee`)

if filter.NodeFreeMRU != nil {
Expand Down Expand Up @@ -469,7 +477,7 @@ func (d *PostgresDatabase) GetFarms(ctx context.Context, filter types.FarmFilter
}

if filter.Region != nil {
nodeQuery = nodeQuery.Where("LOWER(resources_cache.region) = LOWER(?)", *filter.Region)
nodeQuery = nodeQuery.Where("LOWER(node_location.continent) = LOWER(?)", *filter.Region)
}

if len(filter.NodeStatus) != 0 {
Expand Down Expand Up @@ -673,7 +681,7 @@ func (d *PostgresDatabase) GetNodes(ctx context.Context, filter types.NodeFilter
q = q.Where("node.city ILIKE '%' || ? || '%'", *filter.CityContains)
}
if filter.Region != nil {
q = q.Where("LOWER(resources_cache.region) = LOWER(?)", *filter.Region)
q = q.Where("LOWER(node_location.continent) = LOWER(?)", *filter.Region)
}
if filter.NodeID != nil {
q = q.Where("node.node_id = ?", *filter.NodeID)
Expand Down
27 changes: 3 additions & 24 deletions grid-proxy/internal/explorer/db/setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,6 @@ SELECT
rent_contract.contract_id as rent_contract_id,
count(node_contract.contract_id) as node_contracts_count,
node.country as country,
country.region as region,
COALESCE(dmi.bios, '{}') as bios,
COALESCE(dmi.baseboard, '{}') as baseboard,
COALESCE(dmi.processor, '[]') as processor,
Expand All @@ -136,7 +135,6 @@ FROM node
LEFT JOIN contract_resources ON node_contract.resources_used_id = contract_resources.id
LEFT JOIN node_resources_total AS node_resources_total ON node_resources_total.node_id = node.id
LEFT JOIN rent_contract on node.node_id = rent_contract.node_id AND rent_contract.state IN ('Created', 'GracePeriod')
LEFT JOIN country ON LOWER(node.country) = LOWER(country.name)
LEFT JOIN speed ON node.twin_id = speed.node_twin_id
LEFT JOIN dmi ON node.twin_id = dmi.node_twin_id
LEFT JOIN farm ON farm.farm_id = node.farm_id
Expand All @@ -163,7 +161,6 @@ GROUP BY
COALESCE(node_gpu_agg.gpus, '[]'),
COALESCE(node_gpu_agg.gpu_count, 0),
node.country,
country.region,
COALESCE(dmi.bios, '{}'),
COALESCE(dmi.baseboard, '{}'),
COALESCE(dmi.processor, '[]'),
Expand All @@ -172,8 +169,7 @@ GROUP BY
COALESCE(speed.download, 0),
node.certification,
node.extra_fee,
farm.pricing_policy_id,
country.region;
farm.pricing_policy_id;

DROP TABLE IF EXISTS resources_cache;
CREATE TABLE IF NOT EXISTS resources_cache(
Expand All @@ -194,7 +190,6 @@ CREATE TABLE IF NOT EXISTS resources_cache(
rent_contract_id INTEGER,
node_contracts_count INTEGER NOT NULL,
country TEXT,
region TEXT,
bios jsonb,
baseboard jsonb,
processor jsonb,
Expand Down Expand Up @@ -283,27 +278,11 @@ CREATE INDEX IF NOT EXISTS idx_public_config_node_id ON public_config USING gin(
/*
Node Trigger:
- Insert node record > Insert new resources_cache record
- Update node country > update resources_cache country/region
*/
CREATE OR REPLACE FUNCTION reflect_node_changes() RETURNS TRIGGER AS
$$
BEGIN
IF (TG_OP = 'UPDATE') THEN
BEGIN
UPDATE resources_cache
SET
country = NEW.country,
region = (
SELECT region FROM country WHERE LOWER(country.name) = LOWER(NEW.country)
)
WHERE
resources_cache.node_id = NEW.node_id;
EXCEPTION
WHEN OTHERS THEN
RAISE NOTICE 'Error updating resources_cache: %', SQLERRM;
END;

ELSIF (TG_OP = 'INSERT') THEN
IF (TG_OP = 'INSERT') THEN
BEGIN
INSERT INTO resources_cache
SELECT *
Expand All @@ -328,7 +307,7 @@ END;
$$ LANGUAGE plpgsql;

CREATE OR REPLACE TRIGGER tg_node
AFTER INSERT OR DELETE OR UPDATE OF country
AFTER INSERT OR DELETE
ON node
FOR EACH ROW EXECUTE PROCEDURE reflect_node_changes();

Expand Down
1 change: 1 addition & 0 deletions grid-proxy/internal/explorer/db/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ type Database interface {
UpsertNodeIpv6Report(ctx context.Context, ips []types.HasIpv6) error
UpsertNodeWorkloads(ctx context.Context, workloads []types.NodesWorkloads) error
UpsertNodeFeatures(ctx context.Context, features []types.NodeFeatures) error
UpsertNodeLocation(ctx context.Context, locations []types.NodeLocation) error
}

type ContractBilling types.ContractBilling
Expand Down
19 changes: 4 additions & 15 deletions grid-proxy/internal/indexer/health.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,10 @@ func (w *HealthWork) Get(ctx context.Context, rmb *peer.RpcClient, twinId uint32

func (w *HealthWork) Upsert(ctx context.Context, db db.Database, batch []types.HealthReport) error {
// to prevent having multiple data for the same twin from different finders
batch = removeDuplicates(batch)
return db.UpsertNodeHealth(ctx, batch)
unique := removeDuplicates(batch, func(n types.HealthReport) uint32 {
return n.NodeTwinId
})
return db.UpsertNodeHealth(ctx, unique)
}

func getHealthReport(response diagnostics.Diagnostics, twinId uint32) types.HealthReport {
Expand All @@ -53,16 +55,3 @@ func getHealthReport(response diagnostics.Diagnostics, twinId uint32) types.Heal

return report
}

func removeDuplicates(reports []types.HealthReport) []types.HealthReport {
seen := make(map[uint32]bool)
result := []types.HealthReport{}
for _, report := range reports {
if _, ok := seen[report.NodeTwinId]; !ok {
seen[report.NodeTwinId] = true
result = append(result, report)
}
}

return result
}
54 changes: 54 additions & 0 deletions grid-proxy/internal/indexer/location.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package indexer

import (
"context"
"time"

"github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/internal/explorer/db"
"github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/pkg/types"
"github.com/threefoldtech/tfgrid-sdk-go/rmb-sdk-go/peer"
"github.com/threefoldtech/zos/pkg/geoip"
)

const locationCmd = "zos.location.get"

var _ Work[types.NodeLocation] = (*LocationWork)(nil)

type LocationWork struct {
finders map[string]time.Duration
}

func NewLocationWork(interval uint) *LocationWork {
return &LocationWork{
finders: map[string]time.Duration{
"up": time.Duration(interval) * time.Minute,
},
}
}

func (w *LocationWork) Finders() map[string]time.Duration {
return w.finders
}

func (w *LocationWork) Get(ctx context.Context, rmb *peer.RpcClient, id uint32) ([]types.NodeLocation, error) {
var loc geoip.Location
if err := callNode(ctx, rmb, locationCmd, nil, id, &loc); err != nil {
return []types.NodeLocation{}, nil
}

return []types.NodeLocation{
{
Country: loc.Country,
Continent: loc.Continent,
UpdatedAt: time.Now().Unix(),
},
}, nil
}

func (w *LocationWork) Upsert(ctx context.Context, db db.Database, batch []types.NodeLocation) error {
unique := removeDuplicates(batch, func(n types.NodeLocation) string {
return n.Country
})

return db.UpsertNodeLocation(ctx, unique)
}
12 changes: 12 additions & 0 deletions grid-proxy/internal/indexer/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,3 +50,15 @@ func callNode(ctx context.Context, rmbClient *peer.RpcClient, cmd string, payloa

return rmbClient.Call(subCtx, twinId, cmd, payload, result)
}

func removeDuplicates[T any, K comparable](items []T, keyFunc func(T) K) (result []T) {
seen := make(map[K]bool)
for _, item := range items {
key := keyFunc(item)
if _, ok := seen[key]; !ok {
seen[key] = true
result = append(result, item)
}
}
return
}
52 changes: 52 additions & 0 deletions grid-proxy/internal/indexer/utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package indexer

import (
"reflect"
"testing"

"github.com/threefoldtech/tfgrid-sdk-go/grid-proxy/pkg/types"
)

func TestRemoveDuplicates(t *testing.T) {
t.Run("remove duplicate countries", func(t *testing.T) {
locations := []types.NodeLocation{
{Country: "Egypt", Continent: "Africa"},
{Country: "Egypt", Continent: "Africa"},
{Country: "Belgium", Continent: "Europe"},
}

uniqueLocations := []types.NodeLocation{
{Country: "Egypt", Continent: "Africa"},
{Country: "Belgium", Continent: "Europe"},
}

gotLocations := removeDuplicates(locations, func(n types.NodeLocation) string {
return n.Country
})

if !reflect.DeepEqual(uniqueLocations, gotLocations) {
t.Errorf("expected %v, but got %v", uniqueLocations, gotLocations)
}
})

t.Run("remove duplicate health reports", func(t *testing.T) {
healthReports := []types.HealthReport{
{NodeTwinId: 1, Healthy: true},
{NodeTwinId: 1, Healthy: true},
{NodeTwinId: 2, Healthy: true},
}

uniqueReports := []types.HealthReport{
{NodeTwinId: 1, Healthy: true},
{NodeTwinId: 2, Healthy: true},
}

gotReports := removeDuplicates(healthReports, func(h types.HealthReport) uint32 {
return h.NodeTwinId
})

if !reflect.DeepEqual(gotReports, uniqueReports) {
t.Errorf("expected %v, but got %v", uniqueReports, gotReports)
}
})
}
10 changes: 10 additions & 0 deletions grid-proxy/pkg/types/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,3 +107,13 @@ type NodeFeatures struct {
func (NodeFeatures) TableName() string {
return "node_features"
}

type NodeLocation struct {
Country string `json:"country" gorm:"unique;not null"`
Continent string `json:"continent"`
UpdatedAt int64 `json:"updated_at"`
}

func (NodeLocation) TableName() string {
return "node_location"
}
2 changes: 1 addition & 1 deletion grid-proxy/tests/queries/mock_client/farms.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ func (f *Farm) satisfyFarmNodesFilter(data *DBData, filter types.FarmFilter) boo
continue
}

if filter.Region != nil && !strings.EqualFold(*filter.Region, data.Regions[strings.ToLower(node.Country)]) {
if filter.Region != nil && !strings.EqualFold(*filter.Region, data.Regions[node.Country]) {
continue
}

Expand Down
Loading

0 comments on commit 0c0246a

Please sign in to comment.