Skip to content

Commit

Permalink
changes based on bucks review
Browse files Browse the repository at this point in the history
  • Loading branch information
ukane-philemon committed Jun 28, 2022
1 parent 03fabed commit 2d94c2f
Show file tree
Hide file tree
Showing 31 changed files with 348 additions and 370 deletions.
215 changes: 111 additions & 104 deletions client/core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -1134,8 +1134,11 @@ type Core struct {
sentCommitsMtx sync.Mutex
sentCommits map[order.Commitment]chan struct{}

ratesMtx sync.Mutex
exchangeRateSources map[string]*commonSource
ratesMtx sync.Mutex
fiatRateSources map[string]*commonRateSource
// stopFiatRateFetching will be used to shutdown fetchFiatExchangeRates
// goroutine when all rate sources have been disabled.
stopFiatRateFetching context.CancelFunc
}

// New is the constructor for a new Core.
Expand Down Expand Up @@ -1229,7 +1232,7 @@ func New(cfg *Config) (*Core, error) {
localePrinter: message.NewPrinter(lang),
seedGenerationTime: seedGenerationTime,

exchangeRateSources: make(map[string]*commonSource),
fiatRateSources: make(map[string]*commonRateSource),
}

// Populate the initial user data. User won't include any DEX info yet, as
Expand Down Expand Up @@ -1262,14 +1265,14 @@ func (c *Core) Run(ctx context.Context) {
c.latencyQ.Run(ctx)
}()

// Retrieve disabled exchange rate sources from database.
// Retrieve disabled fiat rate sources from database.
disabledSources, err := c.db.DisabledRateSources()
if err != nil {
c.log.Errorf("Unable to retrieve disabled exchange rate source: %v", err)
c.log.Errorf("Unable to retrieve disabled fiat rate source: %v", err)
}

// Construct an exchange rate source.
for token, rateFetcher := range exchangeRateFetchers {
// Construct enabled fiat rate sources.
for token, rateFetcher := range fiatRateFetchers {
var isDisabled bool
for _, v := range disabledSources {
if token == v {
Expand All @@ -1278,16 +1281,16 @@ func (c *Core) Run(ctx context.Context) {
}
}
if !isDisabled {
c.exchangeRateSources[token] = newcommonSource(c.log, rateFetcher)
c.fiatRateSources[token] = newCommonRateSource(c.log, rateFetcher)
}
}

// Start goroutine for exchange rate fetcher's if we have at least one
// exchange rate source enabled.
if len(c.exchangeRateSources) > 0 {
c.fetchExchangeRates()
// Start goroutine for fiat rate fetcher's if we have at least one
// source.
if len(c.fiatRateSources) != 0 {
c.fetchFiatExchangeRates()
} else {
c.log.Debugf("no exchange rate source was initialized")
c.log.Debug("no fiat rate source initialized")
}

c.wg.Wait() // block here until all goroutines except DB complete
Expand Down Expand Up @@ -7482,40 +7485,65 @@ func (c *Core) findActiveOrder(oid order.OrderID) (*trackedTrade, error) {
return nil, fmt.Errorf("could not find active order with order id: %s", oid)
}

// fetchExchangeRates starts the exchange rate source goroutine and schedules
// fetchFiatExchangeRates starts the fiat rate fetcher goroutine and schedules
// refresh cycles.
func (core *Core) fetchExchangeRates() {
func (core *Core) fetchFiatExchangeRates() {
ctx, cancel := context.WithCancel(core.ctx)
core.stopFiatRateFetching = cancel

core.wg.Add(1)
go func() {
defer core.wg.Done()
tick := time.NewTimer(0) // starts rate fetching immediately.
for {
select {
case <-tick.C:
if core.isConversionDisabled() {
tick.Stop()
return
}
core.refreshExchangeRate()
case <-core.ctx.Done():
case <-ctx.Done():
tick.Stop()
return
case <-tick.C:
core.refreshFiatRates()
}
tick = core.nextRateSourceTick()
}
}()
}

// nextRateSourceTick checks the rate source' last request, and
// calculates when the next cycle should run.
// refreshFiatRates refreshes the fiat rates for rate sources whose values have
// not been updated since fiatRateRequestInterval.
func (core *Core) refreshFiatRates() {
ctx, cancel := context.WithTimeout(core.ctx, 4*time.Second)
defer cancel()

var wg sync.WaitGroup
core.ratesMtx.Lock()
for _, source := range core.fiatRateSources {
if time.Since(source.lastTry()) > fiatRateRequestInterval {
wg.Add(1)
go func(source *commonRateSource) {
defer wg.Done()
source.refreshRates(ctx, core.assetMap())
}(source)
}
}
core.ratesMtx.Unlock()
wg.Wait()

fiatRatesMap := core.fetchAllFiatRates()
if len(fiatRatesMap) != 0 {
core.notify(newFiatRatesUpdate(fiatRatesMap))
}
}

// nextRateSourceTick checks the fiat rate source's last request, and calculates
// when the next cycle should run.
func (core *Core) nextRateSourceTick() *time.Timer {
core.ratesMtx.Lock()
defer core.ratesMtx.Unlock()

minTick := 10 * time.Second
tOldest := time.Now()
for _, rateSource := range core.exchangeRateSources {
t := rateSource.lastTry()
for _, source := range core.fiatRateSources {
t := source.lastTry()
if t.Before(tOldest) {
tOldest = t
}
Expand All @@ -7527,38 +7555,36 @@ func (core *Core) nextRateSourceTick() *time.Timer {
return time.NewTimer(tilNext)
}

// ExchangeRateSources returns a list of exchange rate sources and their
// individual status.
func (core *Core) ExchangeRateSources() map[string]bool {
// FiatRateSources returns a list of fiat rate sources and their individual
// status.
func (core *Core) FiatRateSources() map[string]bool {
core.ratesMtx.Lock()
defer core.ratesMtx.Unlock()

rateSources := make(map[string]bool, len(exchangeRateFetchers))
for token := range exchangeRateFetchers {
rateSources[token] = core.exchangeRateSources[token] != nil
rateSources := make(map[string]bool, len(fiatRateFetchers))
for token := range fiatRateFetchers {
rateSources[token] = core.fiatRateSources[token] != nil
}
return rateSources
}

// fetchAllFiatRates returns exchange rate information for all supported assets
// that have a wallet. It also checks if exchange rates are expired and does
// some clean-up.
// fetchAllFiatRates returns fiat rate for all supported assets that have a
// wallet. It also checks if fiat rates are expired and does some clean-up.
func (core *Core) fetchAllFiatRates() map[uint32]float64 {
core.ratesMtx.Lock()
defer core.ratesMtx.Unlock()
supportedAssets := core.SupportedAssets()

core.ratesMtx.Lock()
fiatRatesMap := make(map[uint32]float64, len(supportedAssets))
for assetID := range supportedAssets {
var rateSum float64
var sources int32
for token, source := range core.exchangeRateSources {
// Remove exchange rate source with expired exchange rate data.
for token, source := range core.fiatRateSources {
// Remove fiat rate source with expired exchange rate data.
if source.isExpired(fiatRateDataExpiry) {
delete(core.exchangeRateSources, token)
delete(core.fiatRateSources, token)
continue
}
rateInfo, found := source.assetRate(assetID)
if found && time.Since(rateInfo.lastUpdate) < fiatRateDataExpiry {
rateInfo := source.assetRate(assetID)
if rateInfo != nil && time.Since(rateInfo.lastUpdate) < fiatRateDataExpiry {
sources++
rateSum += rateInfo.rate
}
Expand All @@ -7568,25 +7594,25 @@ func (core *Core) fetchAllFiatRates() map[uint32]float64 {
}
}

// Save disabled exchange rate source to database.
// Ensure disabled fiat rate fetchers are saved to database.
core.saveDisabledRateSources()
core.ratesMtx.Unlock()

return fiatRatesMap
}

// isConversionDisabled checks if fiat rate fetch for assets is disabled.
// This is when either no exchange rate source is enabled or rates are
// not up-to-date. Individual exchange rate source can be disabled without
// disabling rate fetching, as long as there is at least one exchange
// rate source.
// isConversionDisabled checks if fiat rate fetch for assets is disabled. This
// is when either no fiat rate source is enabled or rates are not up-to-date.
// Individual fiat rate source can be disabled without disabling rate fetching,
// as long as there is at least one exchange rate source.
func (core *Core) isConversionDisabled() bool {
core.ratesMtx.Lock()
defer core.ratesMtx.Unlock()
return len(core.exchangeRateSources) == 0
return len(core.fiatRateSources) == 0
}

// ToggleRateSourceStatus toggles an exchange rate source status. If
// disable is true, the exchange rate source is disabled, otherwise the rate
// ToggleRateSourceStatus toggles a fiat rate source status. If
// disable is true, the fiat rate source is disabled, otherwise the rate
// source is enabled.
func (core *Core) ToggleRateSourceStatus(source string, disable bool) error {
if disable {
Expand All @@ -7595,101 +7621,82 @@ func (core *Core) ToggleRateSourceStatus(source string, disable bool) error {
return core.enableRateSource(source)
}

// enableRateSource enables an exchange rate source.
// enableRateSource enables a fiat rate source.
func (core *Core) enableRateSource(source string) error {
core.ratesMtx.Lock()
defer core.ratesMtx.Unlock()

// Check if it's an invalid rate source or it is already enabled.
rateFetcher, found := exchangeRateFetchers[source]
rateFetcher, found := fiatRateFetchers[source]
if !found {
return errors.New("cannot enable unknown exchange rate source")
} else if core.exchangeRateSources[source] != nil {
return errors.New("cannot enable unknown fiat rate source")
} else if core.fiatRateSources[source] != nil {
return nil // already enabled.
}

// Build exchange rate source.
core.exchangeRateSources[source] = newcommonSource(core.log, rateFetcher)
// Build fiat rate source.
core.fiatRateSources[source] = newCommonRateSource(core.log, rateFetcher)

// If this is our first exchange rate source, start exchange rate fetcher
// If this is our first fiat rate source, start fiat rate fetcher
// gorountine, else fetch rates.
if len(core.exchangeRateSources) == 1 {
core.fetchExchangeRates()
if len(core.fiatRateSources) == 1 {
core.fetchFiatExchangeRates()
} else {
go func() {
ctx, cancel := context.WithTimeout(core.ctx, 4*time.Second)
defer cancel()
core.exchangeRateSources[source].refreshRates(ctx, core.assetMap())
core.fiatRateSources[source].refreshRates(ctx, core.assetMap())
}()
}

// Update disabled exchange rate source.
// Update disabled fiat rate source.
core.saveDisabledRateSources()

core.log.Infof("Enabled %s to fetch exchange rates.", source)
core.log.Infof("Enabled %s to fetch fiat rates.", source)
return nil
}

// disableRateSource disables an exchange rate source.
// disableRateSource disables an fiat rate source.
func (core *Core) disableRateSource(source string) error {
core.ratesMtx.Lock()
defer core.ratesMtx.Unlock()

// Check if it's an invalid exchange rate source or it is already disabled.
_, found := exchangeRateFetchers[source]
// Check if it's an invalid fiat rate source or it is already
// disabled.
_, found := fiatRateFetchers[source]
if !found {
return errors.New("cannot disable unknown exchange rate source")
} else if core.exchangeRateSources[source] == nil {
return errors.New("cannot disable unknown fiat rate source")
} else if core.fiatRateSources[source] == nil {
return nil // already disabled.
}

// Remove exchange rate source.
delete(core.exchangeRateSources, source)
// Remove fiat rate source.
delete(core.fiatRateSources, source)

// Save disabled exchange rate sources to database.
// Shutdown rate fetching if there are no exchange rate source.
if len(core.fiatRateSources) == 0 {
core.stopFiatRateFetching()
}

// Save disabled fiat rate sources to database.
core.saveDisabledRateSources()

core.log.Infof("Disabled %s from fetching exchange rates.", source)
core.log.Infof("Disabled %s from fetching fiat rates.", source)
return nil
}

// saveDisabledRateSources save disabled exchange rate sources to database.
// use under ratesMtx lock.
// saveDisabledRateSources save disabled fiat rate sources to database.
// Use under ratesMtx lock.
func (core *Core) saveDisabledRateSources() {
var disabled []string
for token := range exchangeRateFetchers {
if core.exchangeRateSources[token] == nil {
for token := range fiatRateFetchers {
if core.fiatRateSources[token] == nil {
disabled = append(disabled, token)
}
}

err := core.db.SaveDisabledRateSources(disabled)
if err != nil {
core.log.Errorf("Unable to save disabled exchange rate source to database: %v", err)
}
}

// refreshExchangeRate refreshes the exchange rates for exchange rate sources
// whose values have not been updated since fiatRateRequestInterval.
func (core *Core) refreshExchangeRate() {
ctx, cancel := context.WithTimeout(core.ctx, 4*time.Second)
defer cancel()

var wg sync.WaitGroup
core.ratesMtx.Lock()
for _, source := range core.exchangeRateSources {
if time.Since(source.lastTry()) > fiatRateRequestInterval {
wg.Add(1)
go func(source *commonSource) {
defer wg.Done()
source.refreshRates(ctx, core.assetMap())
}(source)
}
}
core.ratesMtx.Unlock()
wg.Wait()

fiatRatesMap := core.fetchAllFiatRates()
if len(fiatRatesMap) != 0 {
core.notify(newFiatRatesUpdate(fiatRatesMap))
core.log.Errorf("Unable to save disabled fiat rate source to database: %v", err)
}
}
Loading

0 comments on commit 2d94c2f

Please sign in to comment.