Skip to content

Commit

Permalink
init node location updater:
Browse files Browse the repository at this point in the history
- add updater package
- expose GetState from registerer
- fix longitude spelling
  • Loading branch information
Omarabdul3ziz committed Sep 22, 2024
1 parent ec5a85f commit 435ca91
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 26 deletions.
10 changes: 10 additions & 0 deletions cmds/modules/noded/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/threefoldtech/zos/pkg/perf/publicip"
"github.com/threefoldtech/zos/pkg/registrar"
"github.com/threefoldtech/zos/pkg/stubs"
"github.com/threefoldtech/zos/pkg/updater"
"github.com/threefoldtech/zos/pkg/utils"

"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -161,6 +162,15 @@ func action(cli *cli.Context) error {
WithVirtualized(len(hypervisor) != 0)

go registerationServer(ctx, msgBrokerCon, env, info)

log.Info().Msg("start node updater")
nodeUpdater := updater.NewUpdater(redis)
if err != nil {
return errors.Wrap(err, "failed to create new node updater")
}

go nodeUpdater.Start(ctx)

log.Info().Msg("start perf scheduler")

perfMon, err := perf.NewPerformanceMonitor(msgBrokerCon)
Expand Down
4 changes: 2 additions & 2 deletions pkg/geoip/geoip.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

// Location holds the result of a geoip request
type Location struct {
Longitute float64 `json:"longitude"`
Longitude float64 `json:"longitude"`
Latitude float64 `json:"latitude"`
Continent string `json:"continent"`
Country string `json:"country_name"`
Expand Down Expand Up @@ -45,7 +45,7 @@ func Fetch() (Location, error) {

func getLocation(geoIPService string) (Location, error) {
l := Location{
Longitute: 0.0,
Longitude: 0.0,
Latitude: 0.0,
Continent: "Unknown",
Country: "Unknown",
Expand Down
10 changes: 10 additions & 0 deletions pkg/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,17 @@ package pkg

//go:generate zbusc -module registrar -version 0.0.1 -name registrar -package stubs github.com/threefoldtech/zos/pkg+Registrar stubs/registrar_stub.go

type RegistrationState string

type State struct {
NodeID uint32
TwinID uint32
State RegistrationState
Msg string
}

type Registrar interface {
NodeID() (uint32, error)
TwinID() (uint32, error)
GetState() State
}
2 changes: 1 addition & 1 deletion pkg/registrar/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func registerNode(
}

location := substrate.Location{
Longitude: fmt.Sprint(info.Location.Longitute),
Longitude: fmt.Sprint(info.Location.Longitude),
Latitude: fmt.Sprint(info.Location.Latitude),
Country: info.Location.Country,
City: info.Location.City,
Expand Down
39 changes: 16 additions & 23 deletions pkg/registrar/registrar.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,18 @@ import (
"github.com/pkg/errors"
"github.com/rs/zerolog/log"
"github.com/threefoldtech/zbus"
"github.com/threefoldtech/zos/pkg"
"github.com/threefoldtech/zos/pkg/app"
"github.com/threefoldtech/zos/pkg/environment"
"github.com/threefoldtech/zos/pkg/stubs"
)

// should any of this be moved to pkg?
type RegistrationState string

const (
Failed RegistrationState = "Failed"
InProgress RegistrationState = "InProgress"
Done RegistrationState = "Done"
Failed pkg.RegistrationState = "Failed"
InProgress pkg.RegistrationState = "InProgress"
Done pkg.RegistrationState = "Done"

monitorAccountEvery = 30 * time.Minute
)
Expand All @@ -31,33 +31,26 @@ var (
ErrFailed = errors.New("registration failed")
)

type State struct {
NodeID uint32
TwinID uint32
State RegistrationState
Msg string
}

func FailedState(err error) State {
return State{
func FailedState(err error) pkg.State {
return pkg.State{
0,
0,
Failed,
err.Error(),
}
}

func InProgressState() State {
return State{
func InProgressState() pkg.State {
return pkg.State{
0,
0,
InProgress,
"",
}
}

func DoneState(nodeID uint32, twinID uint32) State {
return State{
func DoneState(nodeID uint32, twinID uint32) pkg.State {
return pkg.State{
nodeID,
twinID,
Done,
Expand All @@ -66,13 +59,13 @@ func DoneState(nodeID uint32, twinID uint32) State {
}

type Registrar struct {
state State
state pkg.State
mutex sync.RWMutex
}

func NewRegistrar(ctx context.Context, cl zbus.Client, env environment.Environment, info RegistrationInfo) *Registrar {
r := Registrar{
State{
pkg.State{
0,
0,
InProgress,
Expand All @@ -84,13 +77,13 @@ func NewRegistrar(ctx context.Context, cl zbus.Client, env environment.Environme
return &r
}

func (r *Registrar) setState(s State) {
func (r *Registrar) setState(s pkg.State) {
r.mutex.Lock()
defer r.mutex.Unlock()
r.state = s
}

func (r *Registrar) getState() State {
func (r *Registrar) GetState() pkg.State {
r.mutex.RLock()
defer r.mutex.RUnlock()
return r.state
Expand Down Expand Up @@ -162,11 +155,11 @@ func (r *Registrar) reActivate(ctx context.Context, cl zbus.Client, env environm
}

func (r *Registrar) NodeID() (uint32, error) {
return r.returnIfDone(r.getState().NodeID)
return r.returnIfDone(r.GetState().NodeID)
}

func (r *Registrar) TwinID() (uint32, error) {
return r.returnIfDone(r.getState().TwinID)
return r.returnIfDone(r.GetState().TwinID)
}

func (r *Registrar) returnIfDone(v uint32) (uint32, error) {
Expand Down
17 changes: 17 additions & 0 deletions pkg/stubs/registrar_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package stubs
import (
"context"
zbus "github.com/threefoldtech/zbus"
pkg "github.com/threefoldtech/zos/pkg"
)

type RegistrarStub struct {
Expand All @@ -26,6 +27,22 @@ func NewRegistrarStub(client zbus.Client) *RegistrarStub {
}
}

func (s *RegistrarStub) GetState(ctx context.Context) (ret0 pkg.State) {
args := []interface{}{}
result, err := s.client.RequestContext(ctx, s.module, s.object, "GetState", args...)
if err != nil {
panic(err)
}
result.PanicOnError()
loader := zbus.Loader{
&ret0,
}
if err := result.Unmarshal(&loader); err != nil {
panic(err)
}
return
}

func (s *RegistrarStub) NodeID(ctx context.Context) (ret0 uint32, ret1 error) {
args := []interface{}{}
result, err := s.client.RequestContext(ctx, s.module, s.object, "NodeID", args...)
Expand Down
84 changes: 84 additions & 0 deletions pkg/updater/updater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package updater

import (
"context"
"fmt"
"reflect"
"time"

"github.com/rs/zerolog/log"
substrate "github.com/threefoldtech/tfchain/clients/tfchain-client-go"
"github.com/threefoldtech/zbus"

"github.com/threefoldtech/zos/pkg/geoip"
"github.com/threefoldtech/zos/pkg/registrar"
"github.com/threefoldtech/zos/pkg/stubs"
)

const (
updateInterval = 24 * time.Hour
)

type Updater struct {
substrateGateway *stubs.SubstrateGatewayStub
registrar *stubs.RegistrarStub
}

func NewUpdater(bus zbus.Client) *Updater {
return &Updater{
substrateGateway: stubs.NewSubstrateGatewayStub(bus),
registrar: stubs.NewRegistrarStub(bus),
}
}

func (u *Updater) Start(ctx context.Context) {
for {
if u.registrar.GetState(ctx).State == registrar.Done {
if err := u.updateLocation(); err != nil {
log.Error().Err(err).Msg("updating location failed")
}
log.Info().Msg("node location updated")
}

select {
case <-ctx.Done():
log.Info().Msg("stop node updater. context cancelled")
return
case <-time.After(updateInterval):
continue
}
}
}

func (u *Updater) updateLocation() error {
nodeId, err := u.registrar.NodeID(context.Background())
if err != nil {
return fmt.Errorf("failed to get node id: %w", err)
}

node, err := u.substrateGateway.GetNode(context.Background(), nodeId)
if err != nil {
return fmt.Errorf("failed to get node from chain: %w", err)
}

loc, err := geoip.Fetch()
if err != nil {
return fmt.Errorf("failed to fetch location info: %w", err)
}

newLoc := substrate.Location{
City: loc.City,
Country: loc.Country,
Latitude: fmt.Sprintf("%f", loc.Latitude),
Longitude: fmt.Sprintf("%f", loc.Longitude),
}

if !reflect.DeepEqual(newLoc, node.Location) {
node.Location = newLoc
if _, err := u.substrateGateway.UpdateNode(context.Background(), node); err != nil {
return fmt.Errorf("failed to update node on chain: %w", err)
}
}

return nil
}

0 comments on commit 435ca91

Please sign in to comment.